Commit 2b6f4795 authored by Benjamin's avatar Benjamin

First commit

parents
import keras
import numpy
import pandas as pd
import custom_models.feature_selection_extraction.ML_DL_feature_extraction_selection
import custom_models.feature_selection_extraction.algorithmic_feature_extraction_selection
from project_utilities import my_datasets
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras import layers
from keras.backend import clear_session
import tensorflow as tf
from keras import metrics
from pandas import DataFrame
class PresetSoftmaxClassifier:
vectorized_dataset = DataFrame
classes = list
def __init__(self, vectorized_dataset, classes: list):
self.vectorized_dataset = vectorized_dataset
self.classes = classes
if __name__ == '__main__':
dataset = my_datasets.ITSupportDatasetBuilder() \
.with_summaries_and_descriptions_combined() \
.with_overall_priority_column() \
.with_pre_processed_descriptions() \
.build().corpus
doc2vec_IT = custom_models.feature_selection_extraction.ML_DL_feature_extraction_selection.ITSupportDoc2VecImplementation(
dataset=dataset, model_type=custom_models.feature_selection_extraction.ML_DL_feature_extraction_selection.Doc2VecModels.DBOW)
# doc2vec_IT.pre_process_texts()
doc2vec_IT.tag_documents()
doc2vec_IT.create_model()
doc2vec_IT.build_vocabulary()
doc2vec_IT.train_model(dataset_shuffles=1, epochs=10) # dataset_shuffles=10, epochs=30)
print("Got here 0.5")
doc2vec_IT.generate_vectors()
Z = tf.keras.utils.to_categorical(dataset.Priority, num_classes=5)
print(Z)
'''descriptions_train, descriptions_test, tfidf.training_labels, tfidf.testing_labels = train_test_split(
dataset.Descriptions, Z, test_size=0.3,
random_state=1000)
vectorizer.fit(descriptions_train)
tfidf.training_descriptions = vectorizer.transform(descriptions_train)
tfidf.testing_descriptions = vectorizer.transform(descriptions_test)'''
# tfidf.training_labels = tf.keras.utils.to_categorical(tfidf.training_labels, num_classes=5)
print(dataset.train_labels)
# vectorizer.fit(tfidf.training_labels)
input_dim = dataset.training_descriptions.shape[1]#tfidf.training_descriptions.shape[1]
model = Sequential()
model.add(layers.Dense(10, input_dim=input_dim, activation='relu'))
model.add(layers.Dense(5, activation='softmax'))
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=[metrics.Recall()])
# model.summary()
history = model.fit(tfidf.training_descriptions, tfidf.training_labels,
epochs=100,
verbose=False,
validation_data=(tfidf.testing_descriptions, tfidf.testing_labels),
batch_size=5)
loss, accuracy = model.evaluate(tfidf.testing_descriptions, tfidf.testing_labels, verbose=False)
print("Testing Accuracy: {:.4f}".format(accuracy))
# matrix = metrics.confusion_matrix(tfidf.testing_labels.argmax(axis=1), y_prediction.argmax(axis=1))
y_prediction = model.predict(tfidf.testing_descriptions)
y_prediction = numpy.argmax(y_prediction, axis=1)
tfidf.testing_labels = numpy.argmax(tfidf.testing_labels, axis=1)
print(keras.metrics.categorical_accuracy(tfidf.testing_labels, y_prediction))
# tf.keras.metrics.confusion_matrix(tfidf.testing_labels.argmax(axis=1), y_prediction.argmax(axis=1))
# cm = ITSupportPriorityConfusionMatrixEvaluator(predictions=y_prediction, actual_values=tfidf.testing_labels, labels=['P1', 'P2', 'P3', 'P4', 'P5'])
# clear_session()
# keras.metrics.confusion_matrix(tfidf.testing_labels, y_prediction)
'''from sklearn.linear_model import LogisticRegression
classifier = LogisticRegression()
classifier.fit(tfidf.training_descriptions, tfidf.training_labels)
score = classifier.score(tfidf.testing_descriptions, tfidf.testing_labels)'''
from scikitplot.metrics import plot_confusion_matrix
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(5, 5))
num_to_pnum = ['P5', 'P4', 'P3', 'P2', 'P1']
tfidf.testing_labels_lab = [num_to_pnum[x] for x in tfidf.testing_labels]
y_pred_lab = [num_to_pnum[x] for x in y_prediction]
# print(tfidf.testing_labels_lab, type(tfidf.testing_labels))
# plot_confusion_matrix(tfidf.testing_labels_lab, y_pred_lab, ax=ax, labels=['P1', 'P2', 'P3', 'P4', 'P5'])
# plt.show()
from project_utilities.evaluators import ITSupportPriorityConfusionMatrixEvaluator
cm = ITSupportPriorityConfusionMatrixEvaluator(
predictions=y_pred_lab,
actual_values=tfidf.testing_labels_lab,
labels=['P1', 'P2', 'P3', 'P4', 'P5'])
cm.plot_confusion_matrix(fullscreen_requested=True)
from enum import Enum
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
class ModelType(Enum):
MULTINOMIAL_LOGISTIC_REGRESSION = 1
MULTINOMIAL_NAIVE_BAYES = 2
LINEAR_SUPPORT_VECTOR_CLASSIFICATION = 3
RANDOM_FOREST = 4
class ITMachineLearningClassifierImplementation:
cores_allocated: int
def __init__(self, vectors, labels, cores_allocated: int = 1) -> None:
self.model = None
self.cores_allocated = cores_allocated
self.vectors = vectors
self.labels = labels
def use_preconfigured_model(self, preconfigured_model):
self.model = preconfigured_model
def train_model(self):
self.model.fit(self.vectors, self.labels)
def make_predictions(self, items):
return self.model.predict(items)
class ITMultinomialLogisticRegression(ITMachineLearningClassifierImplementation):
def __init__(self, vectors, labels, inverse_regularisation_strength: float, cores_allocated: int = 1):
super().__init__(vectors=vectors, labels=labels, cores_allocated=cores_allocated)
self.model = LogisticRegression(n_jobs=self.cores_allocated,
C=inverse_regularisation_strength,
multi_class='multinomial',
solver='newton-cg',
verbose=1)
class ITMultinomialNaiveBayes(ITMachineLearningClassifierImplementation):
def __init__(self, vectors, labels):
super().__init__(vectors, labels)
self.model = MultinomialNB()
class ITSupportVectorClassifier(ITMachineLearningClassifierImplementation):
def __init__(self, vectors, labels):
super().__init__(vectors, labels)
self.model = LinearSVC()
class ITRandomForestClassifier(ITMachineLearningClassifierImplementation):
def __init__(self, vectors, labels, tree_quantity: int = 200, max_tree_depth: int = 10, randomness: int = 1):
super().__init__(vectors, labels)
RandomForestClassifier(n_estimators=tree_quantity, max_depth=max_tree_depth, random_state=randomness)
if __name__ == "__main__":
# logreg = ITMultinomialLogisticRegression(6, 1e5)
pass
import multiprocessing
import time
from enum import Enum
import gensim.models
import gensim.models.doc2vec
from gensim.test.test_doc2vec import ConcatenatedDoc2Vec
from pandas import DataFrame
from sklearn import utils
from sklearn.model_selection import train_test_split
from tqdm import tqdm
# from project_utilities import preprocessing_functionality, my_datasets
from project_utilities import my_datasets
import preprocessing_functionality
class Doc2VecModels(Enum):
DBOW = 1
DM = 2
COMBINED = 3
class ITSupportDoc2VecImplementation:
dataset = DataFrame
tagged_training_documents = DataFrame
tagged_testing_documents = DataFrame
model_type = Doc2VecModels
model = gensim.models.Doc2Vec
train_descriptions = \
test_descriptions = \
train_labels = \
test_labels = tuple
def __init__(self, dataset, model_type):
self.dataset = dataset
self.model_type = model_type
self.alpha_change = None
tqdm.pandas(desc="progress-bar")
def split_texts(self):
training_data, testing_data = train_test_split(self.dataset, test_size=0.1, random_state=1000)
return training_data, testing_data
def tag_documents(self):
training_documents, testing_documents = self.split_texts()
self.tagged_training_documents = training_documents.apply(
lambda docs: gensim.models.doc2vec.TaggedDocument(
words=preprocessing_functionality.tokenize_text(docs.Description),
tags=[docs.Priority]),
axis=1)
self.tagged_testing_documents = testing_documents.apply(
lambda docs: gensim.models.doc2vec.TaggedDocument(
words=preprocessing_functionality.tokenize_text(docs.Description),
tags=[docs.Priority]),
axis=1)
def create_model(self):
cores = multiprocessing.cpu_count()
match self.model_type:
case Doc2VecModels.DBOW:
self._create_dbow_model(cores)
case Doc2VecModels.DM:
self._create_dm_model(cores)
case Doc2VecModels.COMBINED:
self._create_combined_model(cores)
case _:
raise TypeError("Must be a Doc2Vec model type (DBOW, DM, COMBINED)")
def _create_dbow_model(self, cores):
self.model = gensim.models.Doc2Vec(
dm=0, vector_size=1000, negative=5, hs=0, min_count=2, sample=0, workers=cores)
self.alpha_change = 0.0002
def _create_dm_model(self, cores):
self.model = gensim.models.Doc2Vec(
dm=1, dm_mean=1, vector_size=300, window=10, negative=5,
min_count=1, workers=cores, alpha=0.065, min_alpha=0.065)
self.alpha_change = -0.002
def _create_combined_model(self, cores):
dbow_model = gensim.models.Doc2Vec(
dm=0, vector_size=300, negative=5, hs=0, min_count=2, sample=0, workers=cores)
dm_model = gensim.models.Doc2Vec(
dm=1, dm_mean=1, vector_size=300, window=10, negative=5,
min_count=1, workers=cores, alpha=0.065, min_alpha=0.065)
self.model = ConcatenatedDoc2Vec([dbow_model, dm_model])
def build_vocabulary(self):
vocabulary = [x for x in tqdm(self.tagged_training_documents.values)]
self.model.build_vocab(vocabulary)
def train_model(self, dataset_shuffles: int = 1, epochs: int = 1):
for training_round in range(dataset_shuffles):
shuffled_training_data = utils.shuffle([x for x in tqdm(self.tagged_training_documents.values)])
datapoint_quantity = len(self.tagged_training_documents)
self.model.train(shuffled_training_data, total_examples=datapoint_quantity,
epochs=epochs)
self.model.alpha += self.alpha_change
self.model.min_alpha = self.model.alpha
#@numba.jit(forceobj=True)
def vectorize_tagged_documents(self, tagged_documents):
sentences = tagged_documents.values
targets, regressors = zip(*[(doc.tags[0], self.model.infer_vector(doc.words)) for doc in sentences])
return targets, regressors
def generate_vectors(self):
self.train_labels, self.train_descriptions = self.vectorize_tagged_documents(self.tagged_training_documents)
self.test_labels, self.test_descriptions = self.vectorize_tagged_documents(self.tagged_testing_documents)
if __name__ == '__main__':
dataset = my_datasets.ITSupportDatasetBuilder()\
.with_overall_priority_column()\
.with_summaries_and_descriptions_combined()\
.with_pre_processed_descriptions()\
.build()
doc2vec_IT = ITSupportDoc2VecImplementation(dataset=dataset.corpus, model_type=Doc2VecModels.DM)
#doc2vec_IT.pre_process_texts()
doc2vec_IT.tag_documents()
doc2vec_IT.create_model()
t1 = time.perf_counter()
doc2vec_IT.build_vocabulary()
doc2vec_IT.train_model(dataset_shuffles=1, epochs=1)
print("time: " + str(time.perf_counter() - t1))
doc2vec_IT.generate_vectors()
print(doc2vec_IT.tagged_training_documents[50])
#print(doc2vec_IT.X_test)
import numpy
from tqdm import tqdm
from project_utilities import my_datasets, preprocessing_functionality
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from project_utilities import evaluators
import pandas
import numba
from custom_models.classifiers import ML_classifiers
@numba.jit(forceobj=1)
def preprocess_corpus(corpus: pandas.DataFrame, *columns):
for column in columns:
corpus[column] = corpus[column].apply(preprocessing_functionality.clean_text)
return corpus
class ITSupportTFIDFImplementation:
vectorizer = TfidfVectorizer
dataset = pandas.DataFrame
vectorized_descriptions: list
training_descriptions = \
testing_descriptions = \
training_labels = \
testing_labels = numpy.ndarray
def __init__(self, dataset: pandas.DataFrame):
tqdm.pandas(desc="progress-bar")
self.vectorizer = TfidfVectorizer(max_features=10000)
self.dataset = dataset
def vectorize_descriptions(self):
self.vectorized_descriptions = self.vectorizer.fit_transform(self.dataset['Description'].values).toarray()
def split_dataset(self, percentage_testing: float):
self.training_descriptions, self.testing_descriptions, self.training_labels, self.testing_labels = \
train_test_split(self.vectorized_descriptions, self.dataset['Priority'].values,
test_size=percentage_testing, random_state=1000)
def Main():
# Get Dataset
dataset = my_datasets.ITSupportDatasetBuilder() \
.with_summaries_and_descriptions_combined() \
.with_overall_priority_column() \
.with_pre_processed_descriptions() \
.build().corpus
tfidf = ITSupportTFIDFImplementation(dataset)
tfidf.vectorize_descriptions()
tfidf.split_dataset(0.1)
logreg = ML_classifiers.ITMultinomialLogisticRegression(vectors=tfidf.training_descriptions,
labels=tfidf.training_labels,
cores_allocated=-1,
inverse_regularisation_strength=1e5)
print('Training Model')
logreg.train_model()
label_predictions = logreg.make_predictions(tfidf.testing_descriptions)
print('Made Predictions') #classification_report(tfidf.testing_labels, label_predictions))
labels = ['P5', 'P4', 'P3', 'P2', 'P1']
cm = evaluators.ITSupportPriorityConfusionMatrixEvaluator(label_predictions, tfidf.testing_labels, labels)
cm.plot_confusion_matrix(fullscreen_requested=True)
if __name__ == '__main__':
Main()
from pandas import read_csv, DataFrame, concat
import numpy as np
def getDataset():
ticket_data = getRawDataset()
impacts = ticket_data['Impact'].tolist()
urgencies = ticket_data['Urgency'].tolist()
texts = ticket_data['Description'].tolist()
dict_corpus = {'Descriptions': [], 'Impacts': [], 'Urgencies': []}
for index in range(len(impacts)):
if not (impacts[index] is np.nan
or urgencies[index] is np.nan
or texts[index] is np.nan):
dict_corpus['Descriptions'].append(texts[index])
dict_corpus['Impacts'].append(impacts[index])
dict_corpus['Urgencies'].append(urgencies[index])
data_frame_corpus = DataFrame(dict_corpus)
return data_frame_corpus
def getRawDataset():
ticket_data_low_prio = read_csv('project_utilities/Datasets/ITSupport_Tickets.csv')
ticket_data_high_prio = read_csv('custom_models/ITSupport_Tickets_High_Prio.csv')
ticket_data_whole = concat([ticket_data_low_prio, ticket_data_high_prio])
return ticket_data_whole
def convertToPriorities(dataset: DataFrame or dict) -> DataFrame:
prio_to_num = {'Low': 0, 'Medium': 1, 'High': 2}
num_to_pnum = ['P5', 'P4', 'P3', 'P2', 'P1']
pnums = []
for priorities in zip(dataset['Impacts'], dataset['Urgencies']):
numbered_priority = sum([prio_to_num[priorities[0]], prio_to_num[priorities[1]]])
pnums.append(num_to_pnum[numbered_priority])
dataset['Priorities'] = pnums
return dataset
if __name__ == '__main__':
hi = getDataset()
print(convertToPriorities(hi))
from nlp_functionality import *
from nltk import word_tokenize
import yake
# RAKE
#rake_nltk_var = Rake()
# YAKE
kw_extractor = yake.KeywordExtractor()
language = "en"
max_ngram_size = 2
deduplication_threshold = 0.3
numOfKeywords = 5
def myFunc(e):
return e[1]
def extractKeywordsWithYAKE(text):
kw_extractor = yake.KeywordExtractor(lan=language, n=max_ngram_size, dedupLim=deduplication_threshold, top=numOfKeywords, features=None)
hi = kw_extractor.extract_keywords(text.lower())
hi.sort(key=lambda a: a[1])
tokenized = [item for sublist in [word_tokenize(y) for y in [x[0] for x in hi]] for item in sublist]
tokenized2 = []
for x in tokenized:
if x not in tokenized2:
tokenized2.append(x)
##print(tokenized2)
#[print(x) for x in hi]
return tokenized2
#print("\n\n\n------------------------- --------------------------\n\n\n")
#ppd = preProcessText(text)
#ppd2 = kw_extractor.extract_keywords(" ".join(ppd))
#[print(x) for x in ppd2]
"""
def extractKeywordsWithRAKE(text):
ppd = preProcessText(text)
rake_nltk_var.extract_keywords_from_text(" ".join(ppd))
ranked_keywords = rake_nltk_var.get_ranked_phrases()
tokenized = [word_tokenize(y) for y in [x[0] for x in ranked_keywords]]
[print(x[0]) for x in ranked_keywords]
print(tokenized)
print("\n\n+++++++++++++++++++++++++++++++\n\n")
def TF(text):
new_dict = {}
pp_text = preProcessText(text)
for word in pp_text:
try:
new_dict[word] += 1
except KeyError:
new_dict [word] = 1
new_dict1 = dict(sorted(new_dict.items(), key=lambda item: item[1], reverse=1))
new_dict2 = {}
tempwords = []
tempnums = []
for key in new_dict:
if len(tempwords) != numOfKeywords:
tempwords.append(key)
tempnums.append(new_dict1[key])
else:
if new_dict[key] > min(tempnums):
tempindex = tempnums.index(min(tempnums))
del tempnums[tempindex]
del tempwords[tempindex]
tempwords.append(key)
tempnums.append(new_dict1[key])
#print(tempnums)
for index, x in enumerate(tempwords):
new_dict2[x] = tempnums[index]
print(new_dict2)
return new_dict2
def TF_YAKE(keywords, TF_dict):
crossover = []
for key in TF_dict:
for word in keywords:
if key in word[0]:
crossover.append(word[0])
print(crossover)
"""
if __name__ == "__main__":
text = "To whom it may concern, When going to log into my university email today, it said that I needed to approve a request on my authenticator app. So I downloaded the app and logged in but then it asked for my phone number to send me a verification code so I put in my number and waited. But I realised I had no service so no code was sent. So I turned my phone on and off again to regain service and eventually the code came through. Then when I went back onto the app there was no where to put the code and i was on what I assume is the home page, Ive added screenshots below. So I assumed I was somehow logged in. But then I went to log into my uni email and it said it had sent a request, I got no request in the app and it said on my laptop that my request was denied, even though I saw no request. So I cant access my uni email. I then spoke to a member of the it team at Microsoft and they basically said there was nothing they could do and the university it team would be the best people to help. Ill put the screen shots of what he said below. So now Im just wondering if you could please help me cause I cannot access my uni emails now"
#extractKeywordsWithRAKE(text)
keywords = extractKeywordsWithYAKE(text)
new_dict = TF(text)
print("\n\n\n---------------------------------------------------\n\n\n")
text = """Issues with the desktop computers - unable to download windows update - my
colleague and I have had messages pop up on our computers
to say the windows update wasn't downloaded and I've attached a screenshot
of the message we get when we select more info.
Additionally, we have little storage on our devices that affect our daily
use of onedrive and teams.
Thank you very much for all your help with this"""
extractKeywordsWithYAKE(text)
TF(text)
print("\n\n\n---------------------------------------------------\n\n\n")
text = """"I'm having trouble logging in to my E vision as it's saying my password is
incorrect so I'm therefore having trouble re-enrolling.
If there's anything you guys can do to help, I would greatly appreciate it.
All the best, """
extractKeywordsWithYAKE(text)
print("\n\n\n---------------------------------------------------\n\n\n")
extractKeywordsWithYAKE("Got issues with microsoft office, cannot save word document, need this doing for tomorrow")
TF(text)
from project_utilities import my_datasets, evaluators
from custom_models.feature_selection_extraction import ML_DL_feature_extraction_selection, algorithmic_feature_extraction_selection
if __name__ == '__main__':
algorithmic_feature_extraction_selection.Main()
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
import numpy as np
from nltk import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer, PorterStemmer
import re
import string
stopwords = stopwords.words('english')
lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()
Vector = list[float]
def preProcessText(raw_text: str) -> Vector:
lowercase_text = raw_text.lower()
lowercase_text.strip("\n")
punctuation_removed_text = removePunctuation(lowercase_text)
tokenized_text = word_tokenize(punctuation_removed_text)
stopword_removed_text = removeStopWords(tokenized_text)
lemmatized_text = lemmatizeTokenizedText(stopword_removed_text)
return lemmatized_text
def removePunctuation(text: str) -> str:
non_apostrophe_punctuation = "!\"#$%&()*+-./:;<=>?@[\]^_`{|}~\n,"
punctuation_removed_text = "".join([char for char in text if char not in non_apostrophe_punctuation])
return punctuation_removed_text
def removeStopWords(text: list[str]) -> list[str]:
stopword_removed_text = [word for word in text if word not in stopwords]
return stopword_removed_text
def lemmatizeTokenizedText(text: list[str]) -> list[str]:
#lemmatized_text = map(text, lemmatizer.lemmatize)
lemmatized_text = [lemmatizer.lemmatize(word) for word in text]
return lemmatized_text
def stemText(text):
stemmed_text = [stemmer.stem(x) for x in text]
return stemmed_text
def readTickets(ticketlist):
return
def cleanTicket(ticket):
#remove links
return
def preprocess_text(text):
new_text = text.lower()
#new_text = re.sub('[^a-zA-Z]', ' ', new_text )
new_text = re.sub(r'\s+', ' ', new_text)
all_sentences = sent_tokenize(new_text)
all_sentences = [re.sub('[^a-zA-Z-]', ' ', x) for x in all_sentences]
all_words = [word_tokenize(sent) for sent in all_sentences]
all_words = [[y for y in x if len(y) > 2] for x in all_words]
#bigrams = []
"""for sentence in all_words:
for i, x in enumerate(sentence):
try:
bigram = x + ' ' + sentence[i + 1]
bigrams.append(bigram)
except IndexError:
pass
print(bigrams)"""
all_words = [removeStopWords(x) for x in all_words]
all_words = [lemmatizeTokenizedText(x) for x in all_words]
all_words = [' '.join(x) for x in all_words]
#all_words = [item for sublist in all_words for item in sublist]
return all_words
def preprocess_text_bigrams(text):
new_text = text.lower()
#new_text = re.sub('[^a-zA-Z]', ' ', new_text )
new_text = re.sub(r'\s+', ' ', new_text)
all_sentences = sent_tokenize(new_text)
all_sentences = [re.sub('[^a-zA-Z-]', ' ', x) for x in all_sentences]
all_words = [word_tokenize(sent) for sent in all_sentences]
all_words = [[y for y in x if len(y) > 2] for x in all_words]
bigrams = []
for sentence in all_words:
sentence_bigrams = []
for i, x in enumerate(sentence):
try:
bigram = x + sentence[i + 1]
sentence_bigrams.append(bigram)
except IndexError:
pass
[bigrams.append(x) for x in sentence_bigrams]
#bigrams.append(sentence_bigrams)
#print(bigrams)
"""bigrams = [[y.split(' ') for y in x] for x in bigrams]
bigrams = [[lemmatizeTokenizedText(word) for word in bigram] for bigram in bigrams]
print(bigrams)"""
#all_words = [[removeStopWords(y)] for x in all_words]
#all_words = [lemmatizeTokenizedText(x) for x in all_words]
return bigrams
if __name__ == "__main__":
preprocess_text_bigrams("""I'm having trouble logging in to my E vision as it's saying my password is
incorrect so I'm therefore having trouble re-enrolling.
If there's anything you guys can do to help, I would greatly appreciate it.
All the best, """)
from sklearn.metrics import confusion_matrix
from seaborn import heatmap
from matplotlib.pyplot import show, subplots, get_current_fig_manager
from pandas import DataFrame
from numpy import sum as numpy_sum, ndarray, empty_like
class ITSupportPriorityConfusionMatrixEvaluator:
"""Class for storing and showing a confusion matrix.
Adapted from https://www.kaggle.com/code/agungor2/various-confusion-matrix-plots/notebook"""
dataset_confusion_matrix = confusion_matrix
dataset_confusion_matrix_sums = confusion_matrix
dataset_confusion_matrix_percentages = confusion_matrix
dataset_annotations = ndarray
predictions = tuple or ndarray
actual_values = tuple or ndarray
labels = list
def __init__(self, predictions: tuple or ndarray, actual_values: tuple or ndarray, labels: list):
self.predictions = predictions
self.actual_values = actual_values
self.labels = labels
print(self.labels)
self.dataset_confusion_matrix = confusion_matrix(self.actual_values, self.predictions, labels=self.labels)
self.dataset_annotations = empty_like(self.dataset_confusion_matrix).astype(str)
self.confusion_matrix_sums = numpy_sum(self.dataset_confusion_matrix, axis=1, keepdims=True)
self.confusion_matrix_percentages = self.dataset_confusion_matrix / self.confusion_matrix_sums.astype(
float) * 100
def plot_confusion_matrix(self, fullscreen_requested: bool = False):
self.__update_dataset_annotations()
dataset_confusion_matrix_data_frame = DataFrame(self.dataset_confusion_matrix,
index=self.labels,
columns=self.labels)
dataset_confusion_matrix_data_frame.index.name = 'Actual'
dataset_confusion_matrix_data_frame.columns.name = 'Predicted'
label_quantity = len(self.labels)
fig, ax = subplots(figsize=(label_quantity, label_quantity))
# Adapted from https://stackoverflow.com/questions/42111075/seaborn-heatmap-color-scheme-based-on-row-values
normalised_confusion_matrix = dataset_confusion_matrix_data_frame.div(
dataset_confusion_matrix_data_frame.max(axis=1), axis=0)
heatmap(normalised_confusion_matrix, cmap="YlGnBu", annot=self.dataset_annotations, fmt='', ax=ax)
# Adapted from https://stackoverflow.com/questions/12439588/how-to-maximize-a-plt-show-window-using-python
# (dinvlad)
if fullscreen_requested:
fig_manager = get_current_fig_manager()
fig_manager.window.state('zoomed')
show()
def __update_dataset_annotations(self):
n_rows, n_columns = self.dataset_confusion_matrix.shape
[self.alter_annotation(row, column) for row in range(n_rows) for column in range(n_columns)]
def alter_annotation(self, row: int, column: int):
cell_predicted_count = self.dataset_confusion_matrix[row, column]
cell_percentage_of_category = self.confusion_matrix_percentages[row, column]
category_count = self.confusion_matrix_sums[row]
if row == column or cell_predicted_count != 0:
self.dataset_annotations[row, column] = '%.1f%%\n%d/%d' % (
cell_percentage_of_category, cell_predicted_count, category_count)
else:
self.dataset_annotations[row, column] = '%d%%\n%d/%d' % (0, 0, category_count)
from pandas import read_csv, read_pickle, DataFrame, concat
from dataclasses import dataclass
import preprocessing_functionality
'''@dataclass
class ITSupportDataset:
"""Class for storing the IT Support Ticket Descriptions, Impacts, Urgencies, and Overall Priority"""
corpus = DataFrame
raw_dataset = DataFrame
def __init__(self, combined_title_description_requested: bool = False):
self.__get_raw_dataset()
self.__get_dataset(combined_title_description_requested)
self.__add_overall_priority_column()
def __get_raw_dataset(self):
self.raw_dataset = read_csv('C:\\Users\\Benjamin\\PycharmProjects\\DISSERTATION_ARTEFACT\\project_utilities'
'\\Datasets\\ITSupport_Tickets.csv')
#ticket_data_high_prio = read_csv('C:\\Users\\Benjamin\\PycharmProjects\\DISSERTATION_ARTEFACT\\project_utilities'
#'\\Datasets\\ITSupport_Tickets_High_Prio.csv')
#self.raw_dataset = ticket_data_low_prio
def __get_dataset(self, combined_title_description_requested: bool):
impacts = self.raw_dataset['Impact'].tolist()
urgencies = self.raw_dataset['Urgency'].tolist()
texts = self.raw_dataset['Description'].tolist()
if combined_title_description_requested:
summaries = self.raw_dataset['Incident_Summary'].tolist()
non_nulled_dataset = self.__remove_nulls_with_summaries(impacts, urgencies, texts, summaries)
else:
non_nulled_dataset = self.__remove_nulls(impacts, urgencies, texts)
self.corpus = DataFrame(non_nulled_dataset)
def __remove_nulls(self, impacts, urgencies, descriptions):
dict_corpus = {'Descriptions': [], 'Impacts': [], 'Urgencies': []}
for index in range(len(impacts)):
if not (impacts[index] is np.nan
or urgencies[index] is np.nan
or descriptions[index] is np.nan):
dict_corpus['Descriptions'].append(descriptions[index])
dict_corpus['Impacts'].append(impacts[index])
dict_corpus['Urgencies'].append(urgencies[index])
return dict_corpus
def __remove_nulls_with_summaries(self, impacts, urgencies, descriptions, summaries):
dict_corpus = {'Descriptions': [], 'Impacts': [], 'Urgencies': []}
for index in range(len(impacts)):
if not (impacts[index] is np.nan
or urgencies[index] is np.nan
or descriptions[index] is np.nan):
dict_corpus['Descriptions'].append(str(summaries[index]) + ' ' + str(descriptions[index]))
dict_corpus['Impacts'].append(impacts[index])
dict_corpus['Urgencies'].append(urgencies[index])
return dict_corpus
def __add_overall_priority_column(self):
prio_to_num = {'Low': 0, 'Medium': 1, 'High': 2}
num_to_pnum = ['P5', 'P4', 'P3', 'P2', 'P1']
pnums = []
for priorities in zip(self.corpus['Impacts'], self.corpus['Urgencies']):
numbered_priority = sum([prio_to_num[priorities[0]], prio_to_num[priorities[1]]])
pnums.append(num_to_pnum[numbered_priority])
self.corpus['Priorities'] = pnums'''
@dataclass
class ITSupportDatasetWithBuilder:
"""Class for storing the IT Support Ticket Descriptions, Impacts, Urgencies, and Overall Priority
Contains an associated Builder Class for flexible object creation."""
corpus = DataFrame
def __init__(self):
self.__get_raw_dataset()
self.__remove_nulls()
def __get_raw_dataset(self):
ticket_data_low_prio = read_csv('/\\project_utilities'
'\\Datasets\\ITSupport_Tickets.csv')
ticket_data_high_prio = read_csv('/\\project_utilities'
'\\Datasets\\ITSupport_Tickets_High_Prio.csv')
self.corpus = concat([ticket_data_low_prio, ticket_data_high_prio])
def combine_summaries_with_descriptions(self):
combined_columns = []
for description, summary in zip(self.corpus['Description'].values, self.corpus['Incident_Summary'].values):
combined_columns.append(str(summary) + ' ' + str(description))
self.corpus['Description'] = combined_columns
def __remove_nulls(self):
self.corpus.replace('[None]', None, inplace=True)
self.corpus.dropna(axis=0, subset=['Description', 'Impact', 'Urgency'], inplace=True, how='any')
self.corpus.fillna('', axis=1, inplace=True)
def add_overall_priority_column(self):
prio_to_num = {'Low': 0, 'Medium': 1, 'High': 2}
num_to_pnum = ['P5', 'P4', 'P3', 'P2', 'P1']
pnums = []
for impact, urgency, date in zip(
self.corpus['Impact'].values, self.corpus['Urgency'].values, self.corpus['Added Date']):
try:
numbered_priority = sum([prio_to_num[impact], prio_to_num[urgency]])
pnums.append(num_to_pnum[numbered_priority])
except KeyError:
print(date)
self.corpus['Priority'] = pnums
def pre_process_texts(self):
self.corpus['Description'] = self.corpus['Description'].apply(preprocessing_functionality.clean_text)
self.corpus['Description'] = self.corpus['Description'].str.split()
self.corpus['Description'] = self.corpus['Description'].apply(preprocessing_functionality.stem_text)
class ITSupportDatasetBuilder(object):
def __init__(self):
self._dataset = ITSupportDatasetWithBuilder()
def with_summaries_and_descriptions_combined(self):
self._dataset.combine_summaries_with_descriptions()
return self
def with_overall_priority_column(self):
self._dataset.add_overall_priority_column()
return self
def with_pre_processed_descriptions(self):
self._dataset.pre_process_texts()
return self
def build(self):
return self._dataset
'''
#Previous method, more efficient, way more lines though
impacts = self.raw_dataset['Impact'].tolist()
urgencies = self.raw_dataset['Urgency'].tolist()
descriptions = self.raw_dataset['Description'].tolist()
summaries = self.raw_dataset['Incident_Summary'].tolist()'''
'''dict_corpus = {'Descriptions': [], 'Impacts': [], 'Urgencies': [], 'Summaries': []}
start1, start2, end1, end2 = 0, 0, 0, 0
start1 = time.perf_counter_ns()
for description, impact, urgency, summary in zip(descriptions, impacts, urgencies, summaries):
if not (impact is np.nan
or urgency is np.nan
or description is np.nan):
dict_corpus['Descriptions'].append(description)
dict_corpus['Impacts'].append(impact)
dict_corpus['Urgencies'].append(urgency)
dict_corpus['Summaries'].append(str(summary))
end1 = time.perf_counter_ns()'''
# start2 = time.perf_counter_ns()
# self.corpus = self.raw_dataset
# end2 = time.perf_counter_ns()
# timing1, timing2 = end1 - start1, end2 - start2
# print(f"Iterative: {timing1}, Pandas: {timing2}, difference = {abs(timing1-timing2)}")
# return dict_corpus
if __name__ == '__main__':
# obj = ITSupportDataset(combined_title_description_requested=False)
'''times = []
while True:
for x in range(100):
h1 = time.perf_counter_ns()
dataset = ITSupportDatasetBuilder().with_summaries_and_descriptions_combined().with_overall_priority_column().build()
times.append(time.perf_counter_ns() - h1)
# dataset = ITSupportDatasetBuilder().with_overall_priority_column().build()
print(numpy.mean(times))'''
# dataset = ITSupportDatasetBuilder().with_summaries_and_descriptions_combined().with_overall_priority_column().build()
'''ticket_data_low_prio = read_csv('C:\\Users\\Benjamin\\PycharmProjects\\DISSERTATION_ARTEFACT\\project_utilities'
'\\Datasets\\ITSupport_Tickets.csv')
ticket_data_high_prio = read_csv('C:\\Users\\Benjamin\\PycharmProjects\\DISSERTATION_ARTEFACT\\project_utilities'
'\\Datasets\\ITSupport_Tickets_High_Prio.csv')
corpus = concat([ticket_data_low_prio, ticket_data_high_prio])
corpus.to_pickle('corpus.pickle')'''
dataset = ITSupportDatasetBuilder().with_summaries_and_descriptions_combined().with_overall_priority_column().build()
print(dataset.corpus.shape)
import nltk
import pandas
from nltk.stem import WordNetLemmatizer, PorterStemmer
from bs4 import BeautifulSoup
import re
lemmatizer = nltk.stem.WordNetLemmatizer()
stemmer = nltk.PorterStemmer()
def clean_text(text):
text = BeautifulSoup(text, "lxml").text
text = re.sub(r'\|\|\|', r' ', text)
text = re.sub(r'http\S+', r'<URL>', text)
text = text.lower()
text = text.replace('x', '')
return text
def tokenize_text(text):
untokenized_sentences = nltk.sent_tokenize(text)
tokenized_sentences = [tokenize_sentence(sentence) for sentence in untokenized_sentences]
tokens = [token for tokenized_sentence in tokenized_sentences for token in tokenized_sentence]
return tokens
def tokenize_sentence(sentence):
tokenized_sentence = nltk.word_tokenize(sentence)
tokenized_sentence = [word for word in tokenized_sentence if len(word) > 2]
return tokenized_sentence
def lemmatize_text(text: list):
lemmatized = [lemmatizer.lemmatize(word) for word in text]
return ' '.join(lemmatized)
def stem_text(text: list):
stemmed = [stemmer.stem(word) for word in text]
return ' '.join(stemmed)
if __name__ == '__main__':
print(stemmer.stem('connecting'))
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment