In this example, we will be analyzing movie discussions and create a trivial synonym engine. Our engine is based on Word2Vec, a family of shallow, two-layer neural networks that are trained to reconstruct linguistic contexts of words. In essence, Word2Vec attempts to understand meaning and semantic relationships among words.
We will be using the Spark machine learning package to implement our synonyms service. Spark Machine Learning comes in two flavours:
Since Spark 2.0, MLlib is in maintenance mode, meaning that no new features are implemented for it. Therefore, for new projects, it should be avoided. Some features of MLlib are yet to be ported to SparkML, and the documentation is better for MLlib.
For the remaining of the tutorial, we will be using the SparkML variant.
The dataset we will be using comes from Kaggle; the full dataset is available at this location.
We load the data as an RDD file. As the data contains HTML code, we need to clear it out. We also need to remove punctuation marks and lower case all our words. This will make our input vocabulary much smaller and therefore Word2Vec will not need to use too big vectors.
val path = s"datasets/imdb.csv"
val data = sc.textFile(path)
// Remove HTML, string escapes and punctuation
.map(w => w.replaceAll("""<(?!\/?a(?=>|\s.*>))\/?.*>""", ""))
.map(w => w.replaceAll("""[\…\”\'\’\`\,\(\)\"\\]""", ""))
// Make lowercase
.map(w => w.toLowerCase)
// Word2Vec works at the sentence level
.flatMap(c => c.split("[.?!;:]")).map(_.trim)
Intitializing Scala interpreter ...
Spark Web UI available at http://192.168.178.199:4040 SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1633376834637) SparkSession available as 'spark'
path: String = datasets/imdb.csv data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:34
Let's check what our raw data looks like
data.take(3).foreach(l => println(" R:" + l))
R:review R:jennifer ehle was sparkling in pride and prejudice R:jeremy northam was simply wonderful in the winslow boy
Since SparkML is based on Dataframes, we need to convert our source RDD to a suitable Dataframe. To do so, we first create a schema, consisting of a Sequence of fields that contain Arrays of Strings :-)
Remember that Word2Vec treats text as a bag of words; a bag of word representation on a computer is an Array of Strings.
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Convert trainData from RDD[String] to DataFrame[Array[String]]
val schema = StructType(Seq(StructField("text", ArrayType(StringType, true), true)))
var documentDF = spark.createDataFrame(data.map(r => org.apache.spark.sql.Row(r.split(" "))), schema)
documentDF.take(2).foreach(println)
[WrappedArray(review)] [WrappedArray(jennifer, ehle, was, sparkling, in, pride, and, prejudice)]
import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row import org.apache.spark.sql.types._ schema: org.apache.spark.sql.types.StructType = StructType(StructField(text,ArrayType(StringType,true),true)) documentDF: org.apache.spark.sql.DataFrame = [text: array<string>]
In the dataframe above, we have lots of words that are repeating: think for example articles ('a', 'the'), prepositions (at, on, in) etc. Those words do not add much information to our dataset. You can get an intuitive understanding about this fact by trying to remove those words from everyday sentences: for example, "a cat is under the table" can be converted to "cat is under table" or even to "cat is table" and still get the idea.
To increase the information density of our vectors, we can remove stopwords with StopWordsRemover
transformer.
We do so in a non-distructive manner; we add a new column in our Dataframe where the contents of our input text
have been processed to remove stopwords.
import org.apache.spark.ml.feature.StopWordsRemover
// Remove stop words
val stopWordsRemover = new StopWordsRemover().setInputCol("text").setOutputCol("nostopwords")
documentDF = stopWordsRemover.transform(documentDF)
documentDF.take(2).foreach(println)
[WrappedArray(review),WrappedArray(review)] [WrappedArray(jennifer, ehle, was, sparkling, in, pride, and, prejudice),WrappedArray(jennifer, ehle, sparkling, pride, prejudice)]
import org.apache.spark.ml.feature.StopWordsRemover stopWordsRemover: org.apache.spark.ml.feature.StopWordsRemover = StopWordsRemover: uid=stopWords_2855c252242d, numStopWords=181, locale=en_GB, caseSensitive=false documentDF: org.apache.spark.sql.DataFrame = [text: array<string>, nostopwords: array<string>]
We are now ready to train our model!
To exclude the long tail of words that do not appear frequently, we remove words will less than 10 appearences in our dataset.
import org.apache.spark.ml.feature.Word2Vec
// Learn a mapping from words to Vectors
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(200)
.setMinCount(10)
val model = word2Vec.fit(documentDF)
import org.apache.spark.ml.feature.Word2Vec word2Vec: org.apache.spark.ml.feature.Word2Vec = w2v_b8b665ef2a0e model: org.apache.spark.ml.feature.Word2VecModel = Word2VecModel: uid=w2v_b8b665ef2a0e, numWords=21593, vectorSize=200
Out of the box, the Word2Vec API only allows us to check related for a single word. Let's give it a try:
// Find synonyms for a single word
model.findSynonyms("pitt", 10).collect.foreach(println)
[finney,0.7722606062889099] [perlman,0.7646912336349487] [dourif,0.746439516544342] [garrett,0.7226223945617676] [ford,0.6852742433547974] [palillo,0.6821386218070984] [pyun,0.6814386248588562] [bale,0.6788373589515686] [jeremy,0.678361177444458] [slater,0.6644403338432312]
What we see is that Word2Vec actually managed to uncover some related terms given a popular name in the dataset. What is more interesting however, is to see whether we can extract meaningfull terms with respect to a provided phrase. For this, we need to use Word2Vec's findSynonyms(s: Vector)
function.
To do so, we first define a function toDF
that converts an input string to a vector representation suitable for searching; this basically just tokenizes an input string and converts it to a Spark Dataframe (hence the name).
def toDF(s: String) =
spark.createDataFrame(Seq(s.trim
.toLowerCase
.split(" ")
).map(Tuple1.apply))
.toDF("text")
toDF("James Bond").collect.foreach(println)
[WrappedArray(james, bond)]
toDF: (s: String)org.apache.spark.sql.DataFrame
We then call the transform
method on the created Dataframe; this converts our Dataframe to a vector representation using the same vocabulary as our corpus.
val q = model.transform(toDF("James Bond"))
q.printSchema
root |-- text: array (nullable = true) | |-- element: string (containsNull = true) |-- result: vector (nullable = true)
q: org.apache.spark.sql.DataFrame = [text: array<string>, result: vector]
To automate the steps above, we create a method that takes a query (as String) and prints the 10 most relevant terms in our model, excluding terms that are included in the query itself.
def query(s: String) = {
val q = model.transform(toDF(s))
val qTokens = s.toLowerCase.split(" ")
model.findSynonyms(q.first.getAs[Vector]("result"), 10)
.filter(r => !qTokens.contains(r(0)))
.collect
.foreach(println)
}
query: (s: String)Unit
query("Movie")
[film,0.8398674130439758] [flick,0.598155677318573] [cartoon,0.5386905670166016] [mini-series,0.535006582736969] [moviesbut,0.5239917635917664] [documentary,0.5068673491477966] [show,0.5063693523406982] [stinker,0.49897581338882446] [picture,0.49263861775398254]
query("brad pitt is a great actor")
[dourif,0.6699931025505066] [terence,0.6649662256240845] [mabius,0.659092128276825] [dustin,0.6457685232162476] [stoltz,0.6436259746551514] [silva,0.6400138139724731] [raoul,0.6366493105888367] [elisabeth,0.6356052160263062]
One of the nice side effects of being able to uncover latent meanings with tools like Word2Vec is being able to solve analogy problems. In the original Word2Vec paper, the authors show that, when trained on a sufficiently large corpus (billions of items), Word2Vec models can uncover relationships such as:
v(king) - v(man) + v(woman) =~ v(queen)
or, otherwise put: Man is to a king what Woman is to a queen (i.e. their gender). This works simply by performing algebraic vector operations on transformed vector reprensetations of words.
To check whether our model can uncover such relationships as well, we first implement a few simple vector operations.
import org.apache.spark.ml.linalg.DenseVector
import math._
def vectorDiff(xs: Vector, ys: Vector) : Vector =
new DenseVector((xs.toArray zip ys.toArray).map { case (x,y) => x - y})
def vectorDistance(xs: Vector, ys: Vector) =
sqrt((xs.toArray zip ys.toArray).map { case (x,y) => pow(y - x, 2) }.sum)
import org.apache.spark.ml.linalg.DenseVector import math._ vectorDiff: (xs: org.apache.spark.ml.linalg.Vector, ys: org.apache.spark.ml.linalg.Vector)org.apache.spark.ml.linalg.Vector vectorDistance: (xs: org.apache.spark.ml.linalg.Vector, ys: org.apache.spark.ml.linalg.Vector)Double
Then, we implement our analogy function; it returns the Euclidean distance between the vector differences between the entered terms as pairs:
def analogy(x: String, isToY: String, likeZ: String, isToA: String) {
val q = model.transform(toDF(x))
val w = model.transform(toDF(isToY))
val m = model.transform(toDF(likeZ))
val k = model.transform(toDF(isToA))
val left = vectorDiff(q.first.getAs[Vector]("result"), w.first.getAs[Vector]("result"))
val right = vectorDiff(k.first.getAs[Vector]("result"), m.first.getAs[Vector]("result"))
println(vectorDistance(left, right))
}
analogy: (x: String, isToY: String, likeZ: String, isToA: String)Unit
analogy("king","man","queen","woman")
4.502840631713733
analogy("soldier","army","sailor","navy")
2.1463648185567026
analogy("Athens","Greece","Paris","France")
2.1083978795843925
analogy("brother","sister","grandson","grandaughter")
1.8668779419101222
// The dataset is from the mid-00s :-)
analogy("brad pitt","angelina jolie","Leonardo DiCaprio", "Gisele Bundchen")
1.644923423629974