Spam Classification Using PySpark in Python

In this tutorial, we will build a spam classifier in Python using Apache Spark which can tell whether a given message is spam or not! I have used a classic spam-ham dataset from the UCI dataset repository for this tutorial.

Spam Classifier Using PySpark

So we will first create a spark session and import the data and then rename the columns for ease of use.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
data = spark.read.csv("smsspamcollection/SMSSpamCollection",inferSchema=True,sep='\t')
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

Let’s just have a look at our data.

data.show()
+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if that’s th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows

Clean and Prepare the Data

Creating a new length feature:

from pyspark.sql.functions import length
data = data.withColumn('length',length(data['text']))
data.show()
+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if that’s th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows

If you observe carefully the difference is pretty clear and obvious. Let’s use the group-by  method use to confirm this trend:

data.groupby('class').mean().show()
+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+

An average difference of 139 and 71 is big enough to use it as a feature in modeling.

Feature Transformations

Now we are going to tokenize the words, remove stopwords, and implement TF-IDF step by step and then convert them into a single dense vector using the vector-assembler.

from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

Model and Pipeline

We are going to use the Naive-Bayes classifier as our model for this tutorial.

from pyspark.ml.classification import NaiveBayes
# Use defaults
nb = NaiveBayes()

Pipelining is important so that we don’t have to re-iterate the previous preprocessing steps for further experimentation.

from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

Training and Evaluation

Let’s just quickly check that all the preprocessing was done correctly.

clean_data = clean_data.select(['label','features'])
clean_data.show()
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows

Now we are all set for training and evaluation with a 70:30 split.

(training,testing) = clean_data.randomSplit([0.7,0.3])
spam_predictor = nb.fit(training)
test_results = spam_predictor.transform(testing)
test_results.show()
+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,3,9,1...|[-572.06653080640...|[1.0,3.2853946379...|       0.0|
|  0.0|(13424,[0,1,5,15,...|[-1001.2595934260...|[1.0,2.7574544759...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-806.54241219940...|[1.0,1.8852085844...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-537.01474869015...|[1.0,2.8233277769...|       0.0|
|  0.0|(13424,[0,1,11,32...|[-869.75284680397...|[1.0,1.0200416791...|       0.0|
|  0.0|(13424,[0,1,14,31...|[-215.73138546316...|[1.0,5.8314497532...|       0.0|
|  0.0|(13424,[0,1,14,78...|[-686.70388741456...|[1.0,2.0779668967...|       0.0|
|  0.0|(13424,[0,1,23,63...|[-1310.9094107562...|[1.0,4.4866463813...|       0.0|
|  0.0|(13424,[0,1,24,31...|[-356.01275797052...|[1.0,3.5064139479...|       0.0|
|  0.0|(13424,[0,1,27,35...|[-1493.1508968151...|[0.99999997768200...|       0.0|
|  0.0|(13424,[0,1,30,12...|[-599.21107138763...|[1.0,2.9152869776...|       0.0|
|  0.0|(13424,[0,1,46,17...|[-1139.5052371653...|[9.30530122501920...|       1.0|
|  0.0|(13424,[0,1,146,1...|[-254.82374909461...|[0.20196018944218...|       1.0|
|  0.0|(13424,[0,2,3,6,9...|[-3301.7237938480...|[1.0,1.0067276963...|       0.0|
|  0.0|(13424,[0,2,4,7,2...|[-511.87873676486...|[1.0,1.3675876660...|       0.0|
|  0.0|(13424,[0,2,4,8,1...|[-1316.0759246967...|[1.0,1.3703321229...|       0.0|
|  0.0|(13424,[0,2,4,40,...|[-1582.4686915061...|[0.99999999442732...|       0.0|
|  0.0|(13424,[0,2,4,44,...|[-1909.3003347074...|[1.0,4.9564372811...|       0.0|
|  0.0|(13424,[0,2,7,11,...|[-851.30100707005...|[1.0,3.9992581112...|       0.0|
|  0.0|(13424,[0,2,7,43,...|[-590.79005982680...|[1.0,5.8169836805...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 20 rows

That was not very intuitive though! Let’s use the  MulticlassClassificationEvaluator function to extract more meaning out of the results.

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))
Accuracy of model at predicting spam was: 0.9210916371646145

92% accuracy is not very bad. Hopefully, this tutorial helped you to start working on some NLP stuff using Spark. This dataset is obviously not ideal for using Spark as it is a Big Data framework but it still serves for demonstration purposes. You can get the actual notebook for the code here.

Leave a Reply