# import necessary libraries
from conllu import parse
import numpy as np
import nltk
from itertools import chain
import operator
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelBinarizer
from matplotlib.font_manager import FontProperties
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.metrics import classification_report, confusion_matrix
plt.rcParams.update({'figure.max_open_warning': 0})
from sklearn import metrics
from nltk.tag import hmm
import pycrfsuite
import warnings
from sklearn.exceptions import UndefinedMetricWarning
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
def read_conllu(file):
# read conllu file
text_file = open(file, "r", encoding='UTF-8')
lines = text_file.read().split('\n')
text_file.close()
for line in lines[:]:
# remove unnecessary lines
if line.startswith('# sent_id') or line.startswith('# newdoc id'):
lines.remove(line)
# remove empty lines
if len(line) == 0:
lines.remove(line)
# return useful lines of text
return lines
def const_pos_dict(lines):
dict_pos = {}
# counter, to check if a new sentence begins
counter_sharp = 0
word_tags = []
for line in lines[:]:
if line.startswith('# text'):
counter_sharp += 1
if counter_sharp % 2 == 0:
dict_pos[sentence] = word_tags
word_tags= []
counter_sharp -= 1
sentence = line[9:]
continue
try:
word_tags.append((parse(line)[0][0]["form"], parse(line)[0][0]["xpostag"]))
except IndexError:
if line[2:4] == "is" and line[5:7] == "be":
word_tags.append((line[2:4], "VBZ")) #VBZ
elif line[2:7]== "DEBRA":
word_tags.append((line[2:7], "NNP")) #PROPN
elif line[2:13]== "PERLINGIERE":
word_tags.append((line[2:13], "NNP")) #PROPN
elif line[3:5] == "is" and line[6:8] == "be":
word_tags.append((line[3:5], "VBZ"))
elif line[8:12] == "than":
word_tags.append((line[8:12], "IN"))
elif line[2:6] == "26th":
word_tags.append((line[2:6], "NN"))
elif line[2:6] == "29th":
word_tags.append((line[2:6], "NN"))
elif line[2:3] == ",":
word_tags.append((line[2:3], ","))
elif line[2:3] == ":":
word_tags.append((line[2:3], ":"))
elif line[2:6] == "John":
word_tags.append((line[2:6], "GW"))
elif line[2:17] == "Salinardo@ENRON":
word_tags.append((line[2:17], "ADD"))
elif line[2:4] == "it":
word_tags.append((line[2:4], "PRP"))
elif line[2:5] == "add":
word_tags.append((line[2:5], "VB"))
elif line[2:5] == "for":
word_tags.append((line[2:5], "IN"))
elif line[2:5] == "you":
word_tags.append((line[2:5], "PRP"))
elif line[2:3] == "-":
word_tags.append((line[2:3], ","))
elif line[3:4] == ".":
word_tags.append((line[3:4], "."))
elif line[3:9] == "openly":
word_tags.append((line[3:9], "RB"))
elif line[3:7] == "left":
word_tags.append((line[3:7], "VBN"))
elif line[3:7] == "many":
word_tags.append((line[3:7], "JJ"))
elif line[2:11] == "effective":
word_tags.append((line[2:11], "JJ"))
elif line[2:5] == "Sep":
word_tags.append((line[2:5], "NNP"))
elif line[3:7] == "26th":
word_tags.append((line[3:7], "NN"))
elif line[2:13] == "spreadsheet":
word_tags.append((line[2:13], "NN"))
elif line[3:7] == "Best":
word_tags.append((line[3:7], "JJS"))
elif line[3:4] == ")":
word_tags.append((line[3:4], "-RRB-"))
elif line[3:7] == "29th":
word_tags.append((line[3:7], "NN"))
elif line[3:4] == '"':
word_tags.append((line[3:4], "''"))
elif line[3:9] == 'values':
word_tags.append((line[3:4], "NNS"))
elif line[3:4] == ",":
word_tags.append((line[3:4], ","))
else:
print("ERROR", line)
# return a dictionary, havings as keys sentences and as values lists
# with tuples, with a token in the first position and its POS tag
# in the second position
return dict_pos
# read test and train data
# The data are for the english language of the Universal Dependencies treebanks
# (http://universaldependencies.org/). Only the words, sentences, and POS tags
# of the treebanks (notthe dependencies or other annotations) are considered.
dict_train = const_pos_dict(read_conllu("en-ud-train.conllu"))
dict_test = const_pos_dict(read_conllu("en-ud-test.conllu"))
train_data = list(dict_train.keys())
train_data_pos = list(dict_train.values())
test_data = list(dict_test.keys())
test_data_pos = list(dict_test.values())
# create lists with train and test tokens
test_tokens = []
for true in test_data_pos:
test_tokens.append([x[0] for x in true])
train_tokens = []
for true in train_data_pos:
train_tokens.append([x[0] for x in true])
def learning_curves(results, metric, method):
fontP = FontProperties()
fontP.set_size('small')
fig = plt.figure()
fig.suptitle('Learning Curves - ' + method, fontsize=17)
ax = fig.add_subplot(111)
ax.axis([300, 13733, 0.25, 1.1])
if metric == "accuracy":
line_up, = ax.plot(results['train_size'], results['on_train_acc'],
'o-', label='Accuracy on Train')
line_down, = ax.plot(results['train_size'], results['on_test_acc'],
'o-', label='Accuracy on Test')
plt.ylabel('Accuracy', fontsize=13)
plt.legend([line_up, line_down], ['Accuracy on Train', 'Accuracy on Test'],
prop=fontP)
elif metric=="f1-score":
line_up, = ax.plot(results['train_size'], results['on_train_f1'],
'o-', label='F1-score on Train')
line_down, = ax.plot(results['train_size'], results['on_test_f1'],
'o-', label='F1-score on Test')
plt.ylabel('F1-score', fontsize=13)
plt.legend([line_up, line_down], ['F1-score on Train', 'F1-score on Test'],
prop=fontP)
plt.xlabel('Number of training instances', fontsize=13)
plt.grid(True)
def compute_f1_tag(results):
train_f1 = {}
f1_list = []
for i in range(len(results[0]['on_train_f1_class'][0])):
for j in range(len(results[0]['on_train_f1_class'])):
f1_list.append(results[0]['on_train_f1_class'][j][i])
train_f1[list(results[1].keys())[i]] = f1_list
f1_list = []
test_f1 = {}
f1_list = []
for i in range(len(results[0]['on_test_f1_class'][0])):
for j in range(len(results[0]['on_test_f1_class'])):
f1_list.append(results[0]['on_test_f1_class'][j][i])
test_f1[list(results[1].keys())[i]] = f1_list
f1_list = []
return train_f1, test_f1
def plot_f1_curve_tag(results, train_f1, test_f1):
fontP = FontProperties()
fontP.set_size('small')
keys_dist = list(test_f1.keys())
counter = 0
while counter < len(keys_dist):
fig, (ax1, ax2) = plt.subplots(figsize=(12, 4), ncols=2)
key1 = keys_dist[counter]
ax1.axis([300, 13733, -0.1, 1.1])
line_up, = ax1.plot(results[0]['train_size'], train_f1[key1],
'o-', label='F1-score on Train')
line_down, = ax1.plot(results[0]['train_size'], test_f1[key1],
'o-', label='Accuracy on Test')
ax1.set_title('Learning Curves - POS tag ' + key1, fontsize=17)
ax1.set_ylabel('F1-score', fontsize=13)
ax1.legend([line_up, line_down], ['F1-score on Train', 'F1-score on Test'],
prop=fontP)
ax1.set_xlabel('Number of training instances', fontsize=13)
ax1.grid(True)
if (counter + 1) != len(keys_dist):
key1 = keys_dist[counter + 1]
ax2.axis([300, 13733, -0.1, 1.1])
line_up, = ax2.plot(results[0]['train_size'], train_f1[key1],
'o-', label='F1-score on Train')
line_down, = ax2.plot(results[0]['train_size'], test_f1[key1],
'o-', label='Accuracy on Test')
ax2.set_title('Learning Curves - POS tag ' + key1, fontsize=17)
ax2.set_ylabel('F1-score', fontsize=13)
ax2.legend([line_up, line_down], ['F1-score on Train', 'F1-score on Test'],
prop=fontP)
ax2.set_xlabel('Number of training instances', fontsize=13)
ax2.grid(True)
counter += 2
The Hidden Markov Model (HMM) is a powerful statistical tool for modeling generative sequences that can be characterized by an underlying process generating an observable sequence. HMMs have found application in many areas interested in signal processing, and in particular speech processing, but have also been applied with success to low level NLP tasks such as part-of-speech tagging, phrase chunking, and extracting target information from documents.
# compute the overall accuracy and the f1 score. Additionally, compute the f1 score
# per POS tag.
def benchmark_hmm(clf, train_data_pos, test_data_pos, tokens, train=True, classes=False):
tagger = clf.train_supervised(train_data_pos)
predictions = []
for datum in tokens:
predictions.append(tagger.tag(datum))
y_true = []
if not train:
for true in test_data_pos:
y_true.append(tuple([x[1] for x in true]))
else:
for true in train_data_pos:
y_true.append(tuple([x[1] for x in true]))
y_pred = []
for pred in predictions:
y_pred.append(tuple([[x[1]] for x in pred]))
lb = LabelBinarizer()
y_true_combined = lb.fit_transform(list(chain.from_iterable(y_true)))
y_pred_combined = lb.transform(list(chain.from_iterable(y_pred)))
tagset = set(lb.classes_)
tagset = sorted(tagset, key=lambda tag: tag.split('-', 1)[::-1])
class_indices = {cls: idx for idx, cls in enumerate(lb.classes_)}
labels = [class_indices[cls] for cls in tagset]
f1 = metrics.f1_score(y_true_combined, y_pred_combined, average='weighted')
f1_cl = metrics.f1_score(y_true_combined, y_pred_combined, labels=labels, average=None)
accuracy = metrics.accuracy_score(y_true_combined, y_pred_combined)
result = {'f1' : f1, 'accuracy' : accuracy, 'train size' : len(train_data_pos),
'test size' : len(test_data_pos), 'predictions': y_pred, "f1_class": f1_cl}
if not classes:
return result
else:
return result, class_indices
# make the necessary computations for the learning curves and save
# the results.
def calc_metrics_hmm(train_data_pos, test_data_pos, test_tokens, train_tokens):
train_data_pos_s_s, train_tokens_s_s = np.asarray(train_data_pos), np.asarray(train_tokens)
test_data_pos_s_s, test_tokens_s_s = np.asarray(test_data_pos), np.asarray(test_tokens)
results = {}
results['train_size'] = []
results['on_test_acc'] = []
results['on_train_acc'] = []
results['on_test_f1'] = []
results['on_train_f1'] = []
results['on_test_f1_class'] = []
results['on_train_f1_class'] = []
print(train_data_pos_s_s.shape[0])
for i in range(1, 11):
if(i==10):
train_x_part = train_data_pos_s_s
train_y_part = train_tokens_s_s
else:
to = int(i*(train_data_pos_s_s.shape[0]/10))
train_x_part = train_data_pos_s_s[0: to]
train_y_part = train_tokens_s_s[0: to]
clf = hmm.HiddenMarkovModelTrainer()
print(train_x_part.shape)
results['train_size'].append(train_x_part.shape[0])
if i == 10:
out = benchmark_hmm(clf, train_x_part, test_data_pos_s_s, test_tokens_s_s, False, True)
result = out[0]
classes = out[1]
else:
result = benchmark_hmm(clf, train_x_part, test_data_pos_s_s, test_tokens_s_s, False)
results['on_test_acc'].append(result['accuracy'])
results['on_test_f1'].append(result['f1'])
results['on_test_f1_class'].append(result['f1_class'])
result = benchmark_hmm(clf, train_x_part, test_data_pos_s_s, train_y_part)
results['on_train_acc'].append(result['accuracy'])
results['on_train_f1'].append(result['f1'])
results['on_train_f1_class'].append(result['f1_class'])
return results, classes
results_hmm = calc_metrics_hmm(train_data_pos, test_data_pos, test_tokens, train_tokens)
# plot the learning curves for accuracy and the f1-score
learning_curves(results_hmm[0], "accuracy", "Hidden Markov Model (HMM)")
learning_curves(results_hmm[0], "f1-score", "Hidden Markov Model (HMM)")
# plot the learning curves for the f1-score per POS tag
train_f1, test_f1 = compute_f1_tag(results_hmm)
plot_f1_curve_tag(results_hmm, train_f1, test_f1)
Conditional random fields (CRFs) are a probabilistic framework for labeling and segmenting structured data, such as sequences, trees and lattices. The underlying idea is that of defining a conditional probability distribution over label sequences given a particular observation sequence, rather than a joint distribution over both label and observation sequences.
# Define some features, to be used by conditional random fields.
# In our case we use word identity, word suffix and word shape;
# Additionally, some information from nearby words is used. The use
# of features is one of the main differences between the HMM and CRFs.
def word2features(sent, i):
word = sent[i][0]
postag = sent[i][1]
features = [
'word.lower=' + word.lower(),
'word[-3:]=' + word[-3:],
'word[-2:]=' + word[-2:],
'word.isupper=%s' % word.isupper(),
'word.istitle=%s' % word.istitle(),
'word.isdigit=%s' % word.isdigit()
]
if i > 0:
word1 = sent[i-1][0]
postag1 = sent[i-1][1]
features.extend([
'-1:word.lower=' + word1.lower(),
'-1:word.istitle=%s' % word1.istitle(),
'-1:word.isupper=%s' % word1.isupper()
])
else:
features.append('BOS')
if i < len(sent) - 1:
word1 = sent[i+1][0]
postag1 = sent[i+1][1]
features.extend([
'+1:word.lower=' + word1.lower(),
'+1:word.istitle=%s' % word1.istitle(),
'+1:word.isupper=%s' % word1.isupper()
])
else:
features.append('EOS')
return features
def sentence2features(sentence):
return [word2features(sentence, i) for i in range(len(sentence))]
def sentence2postags(sentence):
return [postag for token, postag in sentence]
def sentence2tokens(sentence):
return [token for token, postag in sentence]
# split the data into train and test set, after applying
# the necessary pre-processing functions on the initial
# data
X_train = [sentence2features(s) for s in train_data_pos]
y_train = [sentence2postags(s) for s in train_data_pos]
X_test = [sentence2features(s) for s in test_data_pos]
y_test = [sentence2postags(s) for s in test_data_pos]
# compute the overall accuracy and the f1 score. Additionally, compute the f1 score
# per POS tag.
def benchmark_crf(clf, X_train, y_train, X_test, y_test, classes=False):
tagger = pycrfsuite.Tagger()
tagger.open('crf_conllu_en.pycrfsuite')
y_pred = []
for datum in X_test:
y_pred.append(tagger.tag(datum))
y_true = y_test
lb = LabelBinarizer()
y_true_combined = lb.fit_transform(list(chain.from_iterable(y_true)))
y_pred_combined = lb.transform(list(chain.from_iterable(y_pred)))
tagset = set(lb.classes_)
tagset = sorted(tagset, key=lambda tag: tag.split('-', 1)[::-1])
class_indices = {cls: idx for idx, cls in enumerate(lb.classes_)}
labels = [class_indices[cls] for cls in tagset]
f1 = metrics.f1_score(y_true_combined, y_pred_combined, average='weighted')
f1_cl = metrics.f1_score(y_true_combined, y_pred_combined, labels=labels, average=None)
accuracy = metrics.accuracy_score(y_true_combined, y_pred_combined)
result = {'f1' : f1, 'accuracy' : accuracy, 'train size' : len(train_data_pos),
'test size' : len(test_data_pos), 'predictions': y_pred, "f1_class": f1_cl}
if not classes:
return result
else:
return result, class_indices
# make the necessary computations for the learning curves and save
# the results.
def calc_metrics_crf(X_train, y_train, X_test, y_test):
train_x_s_s, train_y_s_s = X_train, y_train
test_x_s_s, test_y_s_s = X_test, y_test
results = {}
results['train_size'] = []
results['on_test_acc'] = []
results['on_train_acc'] = []
results['on_test_f1'] = []
results['on_train_f1'] = []
results['on_test_f1_class'] = []
results['on_train_f1_class'] = []
print(len(train_x_s_s))
for i in range(1, 11):
if(i==10):
train_x_part = train_x_s_s
train_y_part = train_y_s_s
else:
to = int(i*(len(train_x_s_s)/10))
train_x_part = train_x_s_s[0: to]
train_y_part = train_y_s_s[0: to]
print(len(train_x_part))
trainer = pycrfsuite.Trainer(verbose=False)
for xseq, yseq in zip(train_x_part, train_y_part):
trainer.append(xseq, yseq)
trainer.set_params({
'c1': 1.0, # coefficient for L1 penalty
'c2': 1e-3, # coefficient for L2 penalty
'max_iterations': 50, # stop earlier
# include transitions that are possible, but not observed
'feature.possible_transitions': True
})
trainer.train('crf_conllu_en.pycrfsuite')
results['train_size'].append(len(train_x_part))
if i == 10:
out = benchmark_crf(trainer, train_x_part, train_y_part, X_test, y_test, True)
result = out[0]
classes = out[1]
else:
result = benchmark_crf(trainer, train_x_part, train_y_part, X_test, y_test)
results['on_test_acc'].append(result['accuracy'])
results['on_test_f1'].append(result['f1'])
results['on_test_f1_class'].append(result['f1_class'])
result = benchmark_crf(trainer, train_x_part, train_y_part, train_x_part, train_y_part)
results['on_train_acc'].append(result['accuracy'])
results['on_train_f1'].append(result['f1'])
results['on_train_f1_class'].append(result['f1_class'])
return results, classes
results_crf = calc_metrics_crf(X_train, y_train, X_test, y_test)
# plot the learning curves for accuracy and f1-score.
learning_curves(results_crf[0], "f1-score", "Conditional random fields (CRFs)")
learning_curves(results_crf[0], "accuracy", "Conditional random fields (CRFs)")
# plot the learning curves for f1-score per POS tag.
train_f1, test_f1 = compute_f1_tag(results_crf)
plot_f1_curve_tag(results_crf, train_f1, test_f1)
# create a frequency dictionary having as keys tuples with tokens
# in the first position and their POS tag in the second positions
# and their frequency as value
def baseline_freq_dict(train_data_pos):
freq_dict = {}
for datum_pos in train_data_pos:
for datum_tuple in datum_pos:
if datum_tuple not in freq_dict:
freq_dict[datum_tuple] = 1
else:
freq_dict[datum_tuple] += 1
tokens = []
for i in freq_dict.keys():
tokens.append(i[0])
baseline_dict = {}
for token in tokens:
temp_dict = {}
for key in freq_dict.keys():
if key[0] == token:
temp_dict[key] = freq_dict[key]
maximum = max(temp_dict.items(), key=operator.itemgetter(1))[0]
baseline_dict[maximum[0]] =maximum[1]
freq_dict_pos = {}
for datum_pos in train_data_pos:
for datum_tuple in datum_pos:
if datum_tuple[1] not in freq_dict_pos:
freq_dict_pos[datum_tuple[1]] = 1
else:
freq_dict_pos[datum_tuple[1]] += 1
most_freq_tag = max(freq_dict_pos.items(), key=operator.itemgetter(1))[0]
baseline_dict["<UNK>"] = most_freq_tag
return baseline_dict
# baseline POS tagger, which always tags each word with the
# most frequent tag it had in the training set. In case of
# unseen words in the test set the most common POS tag of the
# training set is used.
def baseline(baseline_dict, data_pos):
y_pred = []
for datum_pos in data_pos:
pred = []
for token in (sentence2tokens(datum_pos)):
if token in baseline_dict.keys():
pred.append(baseline_dict[token])
else:
pred.append(baseline_dict["<UNK>"])
y_pred.append(pred)
return y_pred
# compute the overall accuracy and the f1 score. Additionally, compute the f1 score
# per POS tag.
def benchmark_baseline(baseline_dict, train_data_pos, test_data_pos, tokens, train=True, classes=False):
y_true = []
if not train:
for true in test_data_pos:
y_true.append(tuple([x[1] for x in true]))
y_pred = baseline(baseline_dict, test_data_pos)
else:
for true in train_data_pos:
y_true.append(tuple([x[1] for x in true]))
y_pred = baseline(baseline_dict, train_data_pos)
lb = LabelBinarizer()
y_true_combined = lb.fit_transform(list(chain.from_iterable(y_true)))
y_pred_combined = lb.transform(list(chain.from_iterable(y_pred)))
tagset = set(lb.classes_)
tagset = sorted(tagset, key=lambda tag: tag.split('-', 1)[::-1])
class_indices = {cls: idx for idx, cls in enumerate(lb.classes_)}
labels = [class_indices[cls] for cls in tagset]
f1 = metrics.f1_score(y_true_combined, y_pred_combined, average='weighted')
f1_cl = metrics.f1_score(y_true_combined, y_pred_combined, labels=labels, average=None)
accuracy = metrics.accuracy_score(y_true_combined, y_pred_combined)
result = {'f1' : f1, 'accuracy' : accuracy, 'train size' : len(train_data_pos),
'test size' : len(test_data_pos), 'predictions': y_pred, "f1_class": f1_cl}
if not classes:
return result
else:
return result, class_indices
# make the necessary computations for the learning curves and save
# the results.
def calc_metrics_baseline(train_data_pos, test_data_pos, test_tokens, train_tokens):
train_data_pos_s_s, train_tokens_s_s = np.asarray(train_data_pos), np.asarray(train_tokens)
test_data_pos_s_s, test_tokens_s_s = np.asarray(test_data_pos), np.asarray(test_tokens)
results = {}
results['train_size'] = []
results['on_test_acc'] = []
results['on_train_acc'] = []
results['on_test_f1'] = []
results['on_train_f1'] = []
results['on_test_f1_class'] = []
results['on_train_f1_class'] = []
print(train_data_pos_s_s.shape[0])
for i in range(1, 11):
if(i==10):
train_x_part = train_data_pos_s_s
train_y_part = train_tokens_s_s
else:
to = int(i*(train_data_pos_s_s.shape[0]/10))
train_x_part = train_data_pos_s_s[0: to]
train_y_part = train_tokens_s_s[0: to]
baseline_dict = baseline_freq_dict(train_x_part)
print(train_x_part.shape)
results['train_size'].append(train_x_part.shape[0])
if i == 10:
out = benchmark_baseline(baseline_dict, train_x_part, test_data_pos_s_s, test_tokens_s_s, False, True)
result = out[0]
classes = out[1]
else:
result = benchmark_baseline(baseline_dict, train_x_part, test_data_pos_s_s, test_tokens_s_s, False)
results['on_test_acc'].append(result['accuracy'])
results['on_test_f1'].append(result['f1'])
results['on_test_f1_class'].append(result['f1_class'])
result = benchmark_baseline(baseline_dict, train_x_part, test_data_pos_s_s, train_y_part)
results['on_train_acc'].append(result['accuracy'])
results['on_train_f1'].append(result['f1'])
results['on_train_f1_class'].append(result['f1_class'])
return results, classes
results_baseline = calc_metrics_baseline(train_data_pos, test_data_pos, test_tokens, train_tokens)
# plot the learning curves for accuracy and f1-score.
# Obviously, comparing the baseline approach with HMM and CRFs we observe
# that HMM does not perform better. On the contrary, CRFs perform better
# perhaps because that approach takes into account also other features,
# as mentioned above.
learning_curves(results_baseline[0], "accuracy", "Baseline POS Tagger")
learning_curves(results_baseline[0], "f1-score", "Baseline POS Tagger")
# plot the learning curves for f1-score per POS tag.
train_f1, test_f1 = compute_f1_tag(results_baseline)
plot_f1_curve_tag(results_baseline, train_f1, test_f1)
While comparing the baseline approach with HMM and CRFs we observe that HMM does not perform better. On the contrary, CRFs perform significantly well because as mentioned above that approach takes into account previous tokens and uses features instead of just tokens.
# hmm metrics
print("HMM accuracy:", results_hmm[0]['on_test_acc'][-1])
print("HMM F1-score:", results_hmm[0]['on_test_f1'][-1])
print()
# crf metrics
print("CRF accuracy:", results_crf[0]['on_test_acc'][-1])
print("CRF F1-score:", results_crf[0]['on_test_f1'][-1])
print()
# baseline metrics
print("Baseline accuracy:", results_baseline[0]['on_test_acc'][-1])
print("Baseline F1-score:", results_baseline[0]['on_test_f1'][-1])