זיהוי SMS ספאמי בעזרת בינה מלאכותית

מחבר:
בתאריך:

קיימות שתי גישות לעיבוד טקסט. הראשונה מתייחסת לטקסט כאל סדרה sequence שחיוני לשמור בה על סדר המילים. השנייה כאל "שק מילים" bag of words שאינה שומרת על הסדר. המודלים של עיבוד שפה טבעית NLP נחלקים בהתאם למודלים מבוססי סדר (RNN וטרנספורמרים) ולעומתם מודלים bag of words. במדריך זה נסביר איך להשתמש במודל bag of words לצורך אבחנה בין הודעות SMS לגיטימיות וספאמיות (ham or spam) באמצעות ספריית למידת מכונהKeras. יכולת חשובה במדינה שבה מציפים אותנו השכם והערב בהודעות טקסט שמטרתם לקדם: הלוואות, קנאביס או פוליטיקאים מזדמנים.

ham or spam SMS dataset from Kaggle

להורדת הקוד שנפתח במדריך

 

נייבא את הספריות הדרושות ללמידת מכונה:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

בשביל לעבוד עם מערכת הקבצים:

import os

את למידת המכונה נעשה באמצעות TensorFlow וממשק Keras:

import tensorflow as tf
from tensorflow import keras

 

מסד הנתונים

את מסד הנתונים הורדתי כקובץ CSV מאתר התחרויות Kaggle https://www.kaggle.com/datasets/uciml/sms-spam-collection-dataset

# import dataset
df = pd.read_csv('spam.csv', encoding = "ISO-8859-1", usecols=["v1", "v2"])
df.columns = ["Category", "Message"]
df

ham or spam SMS dataset from Kaggle

df.shape
(5572, 2)
df.isnull().sum().sum()
0
df.Category.unique()
array(['ham', 'spam'], dtype=object)
df.groupby(['Category']).count()

Category

Message

ham

4825

spam

747

categories = df.Category.unique() # ['ham','spam']
NUM_CLASSES = len(categories)

מסד הנתונים כולל 5,547 הודעות SMS באנגלית המסווגות ל-2 קטגוריות: ham (לגיטימיות) או spam. הסט אינו מאוזן עם 13.5% בלבד מההודעות המסווגות ספאם.

 

הכנת הנתונים ללמידת מכונה

כדי לעבוד עם ה-pipeline של Keras נסדר את הודעות הטקסט בתיקיות ע"פ המבנה:

data/
  exp_0/
    test/
      ham/
      spam/
    train/
      ham/
      spam/
    val/
      ham/
      spam/

בתוך התיקייה data תהיה תיקיית הניסוי exp_0, ובתוכה 3 תיקיות ע"פ החלוקה המקובלת לקבוצת אימון מבחן ובקרה (test, train, val). בתוך כל אחת משלוש התיקיות יהיו תיקיות הקטגוריה: ham או spam. כל אחת מתיקיות הקטגוריה יחזיקו קבצי טקסט (סיומת txt) שבכל אחד מהקבצים תהיה כתובה הודעת SMS אחת.

נסדר את הקבצים בתיקיות:

BASE_DIR = './'
DATA_DIR = os.path.join(BASE_DIR, 'data/')
# make data directory
!mkdir -p ./data/
!mkdir -p ./data/ham_or_spam
SRC_DIR = os.path.join(DATA_DIR, 'ham_or_spam')
!mkdir -p ./data/ham_or_spam/ham
!mkdir -p ./data/ham_or_spam/spam
len_df = len(df)
for idx, row in df.iterrows():
   if idx > len_df:
       break
   else:
       new_path = os.path.join(DATA_DIR, 'ham_or_spam', row[0], str(idx)+'.txt')
       f = open(new_path, 'w')
       f.write(row[1])
       f.close()
       idx+=1
dir_list = os.listdir(SRC_DIR)
for name in sorted(dir_list):
   path = os.path.join(name)
   print(path)
ham
spam
# make data directory for the experiment
EXP_DIR = "exp_0"
!mkdir -p ./data/exp_0/
# make train, val, test directories
!mkdir -p ./data/exp_0/train ./data/exp_0/val ./data/exp_0/test
 
TRAIN_DIR = os.path.join(DATA_DIR, EXP_DIR, "train")
VAL_DIR = os.path.join(DATA_DIR, EXP_DIR, "val")
TEST_DIR = os.path.join(DATA_DIR, EXP_DIR, "test")
import os, pathlib, shutil, random
for category in categories:
   if not os.path.exists(os.path.join(TRAIN_DIR, category)):
       os.makedirs(os.path.join(TRAIN_DIR, category))
   if not os.path.exists(os.path.join(VAL_DIR, category)):
       os.makedirs(os.path.join(VAL_DIR, category))
  
   files = os.listdir(os.path.join(SRC_DIR, category))
   random.Random(42).shuffle(files)
   num_val_samples = int(0.3 * len(files))
   val_files = files[-num_val_samples:]
   for fname in val_files:
       new_fname = fname
       shutil.copy2(os.path.join(SRC_DIR, category, fname),
                   os.path.join(VAL_DIR, category, new_fname))
   train_files = files[:-num_val_samples]
   for fname in train_files:
       new_fname = fname
       shutil.copy2(os.path.join(SRC_DIR, category, fname),
                   os.path.join(TRAIN_DIR, category, new_fname))
for category in categories:
   if not os.path.exists(os.path.join(TEST_DIR, category)):
       os.makedirs(os.path.join(TEST_DIR, category))
 
   files = os.listdir(os.path.join(VAL_DIR, category))
   random.Random(42).shuffle(files)
   num_val_samples = int(0.33 * len(files))
   val_files = files[:num_val_samples]
   for fname in val_files:
       new_fname = fname
       shutil.move(os.path.join(VAL_DIR, category, fname),
                   os.path.join(TEST_DIR, category, new_fname))

נוודא את מה שעשינו:

for category in categories:
   file_count = len(os.listdir(os.path.join(TRAIN_DIR, category)))
   print(f"TRAIN {category} has {file_count} files")
 
   file_count = len(os.listdir(os.path.join(VAL_DIR, category)))
   print(f"VAL {category} has {file_count} files")
 
   file_count = len(os.listdir(os.path.join(TEST_DIR, category)))
   print(f"TEST {category} has {file_count} files")
TRAIN ham has 3378 files
VAL ham has 970 files
TEST ham has 477 files
TRAIN spam has 523 files
VAL spam has 151 files
TEST spam has 73 files
  • סה"כ 6 תיקיות.

נייבא את התיקיות ל- Keras במבנה של אצוות batches תוך התחשבות בקטגוריות:

batch_size = 32
 
train_ds = keras.utils.text_dataset_from_directory(
   os.path.join(DATA_DIR, EXP_DIR, "train"),
   label_mode="categorical",
   batch_size=batch_size
)
 
val_ds = keras.utils.text_dataset_from_directory(
   os.path.join(DATA_DIR, EXP_DIR, "val"),
   label_mode="categorical",
   batch_size=batch_size
)
 
test_ds = keras.utils.text_dataset_from_directory(
   os.path.join(DATA_DIR, EXP_DIR, "test"),
   label_mode="categorical",
   shuffle=False,
   batch_size=batch_size
)
Found 3901 files belonging to 2 classes.
Found 1121 files belonging to 2 classes.
Found 550 files belonging to 2 classes.

התוצאה הינה generator. נציץ בו:

for inputs, targets in train_ds:
   print("inputs.shape:", inputs.shape)
   print("inputs.dtype:", inputs.dtype)
   print("targets.shape:", targets.shape)
   print("targets.dtype:", targets.dtype)
   print("inputs[0]:", inputs[0])
   print("targets[0]:", targets[0])
   break
inputs.shape: (32,)
inputs.dtype: dtype: 'string'
targets.shape: (32, 2)
targets.dtype: dtype: 'float32'
inputs[0]: tf.Tensor(b'Hi Petey!noixc3xa5xc3x95m ok just wanted 2 chat coz avent spoken 2 u 4 a long time-hope ur doin alrite.have good nit at js love ya am.x', shape=(), dtype=string)
targets[0]: tf.Tensor([1. 0.], shape=(2,), dtype=float32)

כיוון שמחשבים לא יודעים לעבוד עם טקסט צריך להפוך את התכנים לוקטורים המייצגים כל אחד טוקן אחר (המונח token מקביל פחות או יותר למילה). תהליך ההפיכה לוקטורים מכונה וקטוריזציה. נעזר בשכבת TextVectorization כדי לעשות וקטוריזציה לטקסט:

MAX_NUM_WORDS = 20000
 
text_vectorization = layers.TextVectorization(
   ngrams=2,
   max_tokens=MAX_NUM_WORDS,
   output_mode="tf_idf"
)

תהליך וקטוריזציה כולל: 

  • סטנדרטיזציה של התכנים ההופכת את כל האותיות לקטנות, מסירה סימני פיסוק, ותווים לא תקניים.
  • הפיכה לטוקנים על ידי חיתוך הרצף ברווחים שבין המילים.
  • אינדוקס המקנה לכל טוקן מספר אינדקס ייחודי (עד למספר מקסימום של 20,000 טוקנים)

נוסף לכך, העברנו 2 פרמטרים ל-TextVectorization():

  • ngrams המורה לחלק לטוקנים שאורכם, במקרה שלנו, 2 (ולא 1) כדי לשמור על מידה של הקשר. לדוגמה, אם מחלקים לטוקנים באורך 1 אז הביטוי "united kingdom" הופך לשני טוקנים: "united" ו-"kingdom" שאינם קשורים ביניהם. אבל כאשר משתמשים ב ngrams=2 אז נוצר טוקן "united kingdom" ששומר על הביטוי.
  • tf_idf - היא גישה מנרמלת אשר מקנה יתר חשיבות לטוקנים המופיעים פעמים רבות בקובץ מסוים, ומעט לטוקנים כלליים המופיעים בתדירות גבוהה בכל המסמכים (דוגמת the או a).

נאמן את Keras על סט נתוני האימון בלבד באמצעות המתודה adapt:

# prepare a text only dataset
text_only_train_ds = train_ds.map(lambda x, y: x)
# use the adapt method to let the adapt method
# learn from the train dataset
text_vectorization.adapt(text_only_train_ds)

נבחן את הוקטורים המייצגים עכשיו את המידע בטקסט.

# what's in the vocabulary?
print(len(text_vectorization.get_vocabulary()))
print(text_vectorization.get_vocabulary()[:10])
20000
['[UNK]', 'to', 'i', 'you', 'a', 'the', 'u', 'and', 'is', 'in']
  • אורך ה-vocabulary הוא 20,000
  • הטוקנים מסודרים ב-vocabulary לפי סדר שכיחות יורד. לכן, בראש הרשימה נמצאים טוקנים כדוגמת, 'to', 'the', 'and', 'is'
  • הראשון ברשימה הוא טוקן '[UNK]' קיצור של unknown המייצג טוקנים שלא נמצאים במסד הנתונים. מכונה גם טוקן OOV - Out Of Vocabulary.
  • הטוקנים מופיעים באותיות קטנות בלבד. לכן אנחנו מוצאים במילון את הטוקן 'i' ולא את 'I'.

נעשה וקטוריזציה למשפט לדוגמה:

# vectorize a test sentence
output = text_vectorization([["i love the amiga"]])
output

התוצאה:

tf.Tensor: shape=(1, 20000), dtype=float32, numpy=array([[1., 0., 1., ..., 0., 0., 0.]], dtype=float32)
  • 20,000 פריטים בוקטור המקודד את המשפט הקצר אותו אנו בודקים. רוב הפריטים הם 0 לבד מכמה המייצגים את פריטי הוקטור המופיעים גם במשפט.

 

חפירה אל תוך הווקטור המקודד

נציץ ב-10 הפריטים הראשונים בוקטור המקודד:

output.numpy()[0, :10]
array([1., 0., 1., 0., 0., 1., 0., 0., 0., 0.], dtype=float32)
  • הפריטים המסומנים 1 מופיעים גם במשפט:

    • במקום הראשון המייצג את הטוקן 'UNK' מילה לא ידועה (במקרה זה amiga).
    • במקום השני המייצג את הטוקן 'to' שלא נמצא במשפט לדוגמה ועל כן מסומן ב-0
    • המקום השלישי מייצג את הטוקן 'i' אשר נמצא במשפט ועל כן מסומן ב-1. באותו אופן, המקום השישי מייצג את הטוקן 'the' אשר נמצא במשפט ועל כן מסומן ב-1.

המילה 'love' שכיחה פחות ולכן לא נמצאת בין עשרת הפריטים הראשונים של מערך אוצר המילים. כדי לאתר אותה ניצור מילון הממפה מילים על מספר האינדקס שלהם ב-vocabulary:

# a dict mapping words to their indices
vocab = text_vectorization.get_vocabulary()
word_index = dict(zip(vocab, range(len(vocab))))

מה במילון?

# a dict mapping words to their indices
vocab = text_vectorization.get_vocabulary()
word_index = dict(zip(vocab, range(len(vocab))))

print(word_index)
{'[UNK]': 0,
 'to': 1,
 'i': 2,
 'you': 3,
 'a': 4,
 'the': 5,
...
'close': 992,
 'choose': 993,
 'carlos': 994,
 'card': 995,
 'can i': 996,
 'but its': 997,
 'but it': 998,
 'but im': 999,
 ...}
  • זהו מילון בו הטוקנים הם המפתחות והאינדקסים הם הערכים
  • שילוב של טוקנים המורכבים ממילה אחת או שתיים unigrams ו- bigrams.

כאשר הטוקן 'love' מופיע בעמדה מספר 66:

word_index['love'] # 66

כדי לראות את כל הטוקנים המקודדים נהפוך את כיוון המילון word_index כדי שהאינדקסים יהוו את המפתחות והטוקנים את הערכים:

inv_word_index = {v: k for k, v in word_index.items()}

נחלץ מתוך המשפט המקודד את הטוקנים להם הוא מקודד:

# vectorize a test sentence
output = text_vectorization([["i love the amiga"]])
output
tf.Tensor: shape=(1, 20000), dtype=float32, numpy=array([[1., 0., 1., ..., 0., 0., 0.]], dtype=float32)
np_outputs = output.numpy()[0]
for idx, b in enumerate(np_outputs):
    if b==1 and inv_word_index[idx]:
        print(inv_word_index[idx])

התוצאה:

[UNK]
i
the
love
i love
love the

 

המשך הכנת הנתונים ללמידת מכונה

ניישם את תהליך הוקטוריזציה על שלוש מסדי הנתונים train, val, test:

vectorized_train_ds = train_ds.map(
   lambda x, y: (text_vectorization(x), y))
 
vectorized_val_ds = val_ds.map(
   lambda x, y: (text_vectorization(x), y))
 
vectorized_test_ds = test_ds.map(
   lambda x, y: (text_vectorization(x), y))

מה קיבלנו?

for inputs, targets in vectorized_train_ds:
 print(inputs.shape)
 print(targets.shape)
 print(inputs[0])
 print(targets[0])
 break
(32, 20000)
(32, 2)
tf.Tensor([7.100872  1.4940877 1.4599936 ... 0.        0.        0.       ], shape=(20000,), dtype=float32)
tf.Tensor([1. 0.], shape=(2,), dtype=float32)
  • בכל אצווה 32 וקטורים באורך 20,000.

 

המודל ואימונו

המודל שלנו יהיה שטוח, ויכיל שכבה אחת מסוג Dense:

def get_model(max_tokens, hidden_dim):
   inputs = keras.Input(shape=(max_tokens,))
   x = layers.Dense(hidden_dim, activation="relu")(inputs)
   x = layers.Dropout(0.25)(x)
   # classification
   outputs = layers.Dense(NUM_CLASSES, activation="softmax")(x)
   model = keras.Model(inputs, outputs)
   model.compile(optimizer=tf.keras.optimizers.Adam(),
                 loss="categorical_crossentropy",
                 metrics=["accuracy"])
   return model

model = get_model(MAX_NUM_WORDS, 32)

מה במודל?

model.summary()
___________________________________________
 Layer (type)                Output Shape              Param #   
=========================================
 input_1 (InputLayer)        [(None, 20000)]      0         
                                                                 
 dense (Dense)               (None, 32)                640032    
                                                                 
 dropout (Dropout)           (None, 32)                0         
                                                                 
 dense_1 (Dense)             (None, 2)                 66        
                                                                 
=========================================
Total params: 640,098
Trainable params: 640,098
Non-trainable params: 0

את המודל הרצתי במשך 5 סיבובים בלבד על CPU. משך האימון על המחשב האישי שלי היה פחות מ-30 שניות:

callbacks = [
   keras.callbacks.ModelCheckpoint("binary_2gram.keras", save_best_only=True)
]
 
history = model.fit(vectorized_train_ds.cache(),
         validation_data=vectorized_val_ds.cache(),
         epochs=5,
         callbacks=callbacks)
  • את המודל הטוב ביותר שמצא Keras בתהליך האימון אחסנתי בתוך קובץ binary_2gram.keras בעזרת callback
  • השתמשתי ב-cache של Keras כדי לחסוך את הצורך לטעון את המידע המאוחסן במסדי הנתונים בכל epoch.

 

הערכת התוצאות

נטען את המודל הטוב ביותר אותו מצאנו בתהליך האימון:

model = keras.models.load_model("binary_2gram.keras")

מה מידת הדיוק של המודל?

loss, acc = model.evaluate(vectorized_test_ds)
18/18 [==============================] - 0s 12ms/step - loss: 0.0771 - accuracy: 0.9855
  • כמעט 99% 

היכן היו עיקר השגיאות של המודל?

from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, f1_score

y_pred = model.predict(vectorized_test_ds)
y_pred = np.argmax(y_pred,axis=1)
y_actual = []
for bx in test_ds:
   mx = np.argmax(bx[1].numpy(), axis=1)
   y_actual.extend(mx)

print(confusion_matrix(y_actual, y_pred))
[[471   6]
 [  2  71]]
print(classification_report(y_actual, y_pred))
precision    recall  f1-score   support

           0       1.00      0.99      0.99       477
           1       0.92      0.97      0.95        73

    accuracy                           0.99       550
   macro avg       0.96      0.98      0.97       550
weighted avg       0.99      0.99      0.99       550

אומנם שיעור הדיוק נראה גבוה כמעט 99% אבל הבעיה היא שהמודל עלול לסווג הודעות לגיטימיות כאילו היו ספאם, וכך לשלוח הודעות שמחכים להם אל פח האשפה.

כששיניתי את הפונקציה האחראית לתהליך הוקטוריזציה לאפשרות המשתמשת ב-ngrams ללא נרמול TF-IDF :

text_vectorization = layers.TextVectorization(
   ngrams=2,
   max_tokens=MAX_NUM_WORDS,
   output_mode="multi_hot",
)

גברה רמת הדיוק ל-99.3% ויותר חשוב מכך מבין 4 הדוגמאות שהמודל שגה בסיווג שלהם לא הייתה ולו דוגמה אחת של הודעה לגיטימית שסווגה "ספאם":

[[477   0]
 [  4  69]]

שתי השיטות בהם השתמשנו במדריך נחשבות פשוטות ביותר כי אינם מתחשבות בסדר המילים bag of words והסתפקות במודל Dense ולמרות זאת מציגות אחוז דיוק גבוה מאוד (~99%) בתום אימון קצר בן פחות מדקה על מחשב ביתי מצויד ב-CPU. מבין שתי השיטות שבדקנו במדריך הפשוטה יותר שהסתמכה על bigrams בלבד ללא נרמול TF-IDF הגיעה לתוצאות טובות יותר הודות לשיפור רמת הדיוק ומניעת זיהוי שגוי מסוג False positive. המסקנה מזה היא שחשוב שיהיה ארסנל מספיק של פתרונות כשבאים לטפל בבעיה וגם שאין לבחול בפתרונות פשוטים כשמנסים לפתור בעיה מורכבת. למעשה, עדיף להתחיל מכלי כמה שיותר פשוט ורק אם הוא לא עובד לפנות לכלי חזק ממנו.

להורדת הקוד אותו פיתחנו במדריך

 

אולי גם זה יעניין אותך

פיתוח מודל לאנליזת סנטימנט באמצעות למידת מכונה ו-TensorFlow

שימוש ב-word embedding שאומן מראש במודל Keras לסיווג טקסטים

הטרנספורמרים משנים את עולם הבינה המלאכותית

לכל המדריכים בנושא של למידת מכונה

 

אהבתם? לא אהבתם? דרגו!

0 הצבעות, ממוצע 0 מתוך 5 כוכבים

 

 

הוסף תגובה חדשה

 

= 7 + 8