Thursday, December 31, 2015

Latent Semantic Analysis with Spark

I'm reading the excellent Advanced Analytics with Spark as I need to learn more about natural language processing (NLP).

The method in the book is called Latent Semantic Analysis. Basically, this is building a vector for each document that captures the relevant words and their significance. More specifically, all vectors have the same size and the values are roughly speaking the fraction of a given term in the document multiplied by the number of occurrences of this term across all documents (with some logs thrown in to dampen outliers). A given index in each vector represents the same term over all documents.

The code for the book can be retrieved with:

git clone

The first step is a process of lemmatization which removes stop words and aggregates related words. For this, an NLP library from Stanford University is used. The code looks like this:

    val lemmatized = plainText.mapPartitions(iter => {
      val pipeline = createNLPPipeline()

Interestingly, we "use mapPartitions so that we only initialize the NLP pipeline once per partition instead of once per document". [1]

(The variable lemmatized is an RDD of documents where the documents are just a Seq of Strings that have had the stop words removed and related words aggregated.)

By mapping over lemmatized, we make an RDD of Map[String, Int] that represent the term count per document and we call this docTermFreqs. We combine all these for each partition of documents and merge them all together at the end. "When the records being aggregated and the result object have the same type (eg, in sum), reduce is useful, but when the types differ, as they do here, aggregate  is a more powerful alternative" [1]

Unfortunately, this approach can lead to OutOfMemoryErrors. So, an alternative is to find the distribution of terms over the documents with this:

    val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)

(where 15 is our number of partitions). We also limit the number of terms:

where numTerms is arbitrary but defaults to 50 000. From this, we have enough information to calculate the Inverse Document Frequencies:{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap

The use of log dampens the effect of outliers.

In turn, with this we can map over all docTermFreqs creating a sparse vector of all the terms and their scores as we go.

The Maths

So much for the Scala, now for the mathematics. LSA depends on a trick called Singular Value Decomposition. This is a very general technique that is used in many diverse maths problems. A good demonstration of it (using Python) is available here. A simple(ish), instuitive explanation of it is here.

One use of SVD is to reduce the dimensions of the problem to something more manageable. One consequence of such a reduction is a loss of accuracy but if this loss is small, the approximation might be acceptable.

A consequence of SVD is that our matrix is broken down into three matrices, each with its own properties:

X = U D VT 

U is an N × p orthogonal matrix (UT U = Ip ) whose columns uj are called the left singular vectors;
V is a p × p orthogonal matrix (VT V = Ip ) with columns vj called the right singular vectors
D is a p × p diagonal matrix, with diagonal elements d1 ≥ d2 ≥ · · · ≥ dp ≥ 0 known as the singular values 
(taken from [2]). In English,

X is an (N x p) matrix that holds the observations. In general, N is the number of samples and p the number of features. For our purposes, N is the number of documents and p the number of terms. That is, the rows correspond to documents and the columns to terms. The values are:

(the calculation for a term from docFreqs seen earlier) x (frequency of this term in this document) / (total number of terms in this document).

V is a matrix "where each row corresponds to a term and each column corresponds to a concept. It defines a mapping between term space (the space where each point is an n-dimensional vector holding a weight for each term) and concept space (the space where each point is an n-dimensional vector holding a weight for each concept)."

U is a "matrix where each row corresponds to a document and each column corresponds to a concept. It defines a mapping between document space and concept space." [1]

There's a lot more to the mathematics and I'll post more soon but this is a good starting place to get more familiar with it.


Now, this is just a refresher on Spark's handling of matrices. Data handled as rows in Spark is easy. But if you want to do anything interesting with a matrix (multiply, take a transpose etc) this works less well. You need to deal with columns and the data in a column might be striped across many machines unlike rows.

At the moment (it's an experimental API), Spark has 4 implementations of DistributedMatrix each with slightly different abilities. The BlockMatrix implementation is currently the only one that allows one distributed matrix to be multiplied by another. As its name suggests, it uses a mathematical trick to better handle operations by breaking it into a block matrix.

Other implementations can be multiplied by a Matrix object (dense or sparse) but this appears to be local to the node on which the code runs.

[1] Advanced Analytics with Spark
[2] Elements of Statistical Learning (free download)

Thursday, December 10, 2015

The Magnet Pattern a variant on the Type Class Pattern where Scala copies a Haskell idiom. A good article is here. The idea is that there is just one method that takes a variety of objects with the same super-trait depending on what parameters were used in calling that method. This replaces having many overloaded methods.

[Just to refresh one's memory: the Type Class pattern is a method of ad hoc polymorphism but it differs from the Pimp My Library pattern. A good comparison of the two can be found here. Essentially, the Pimp My Library pattern appears to add methods to classes that were not there when they were first written. The key thing about the Type Class pattern is that it asks for a witness to evidence that something is possible for a certain type. Think about Scala's TraversableOnce.sum method that asks for evidence that the type it contains has an associated Numeric. In Pimp My Library, you're creating the functionality per type. In the Type Class pattern your providing the functionality for a more general function to do its work.]

What is apparent immediately is that you really need to know about implicit rules. Here are some notes but a really good reference lies here.

Let's take a fairly pointless class:

class AContainer

Using the Type Class Pattern, we can add a method to it so:

  class PimpMe {
    def pimpedMethodOnClass: String = "pimpedMethodOnClass"
  implicit def pimpedWithADefCreatingClass(aContainter: AContainer): PimpMe = new PimpMe

Now, we can call this method as if it were on the class itself:

    println(aContainer.pimpedMethodOnClass) // "pimpedWithADef"

More efficiently (see below) we could write:

  implicit class PimpMeEfficiently(val aContainer: AContainer) extends AnyVal {
    def pimpedMethodOnEfficientClass: String = "pimpedMethodOnEfficientClass"
    println(aContainer.pimpedMethodOnEfficientClass) // "pimpedMethodOnEfficientClass"

The reason for AnyVal is "properly-defined user value classes provide a way to improve performance on user-defined types by avoiding object allocation at runtime, and by replacing virtual method invocations with static method invocations" (from the ScalaDocs).

The Magnet Pattern Simplified
A simplified magnet pattern implementation is a single method. Let's use a slightly more interesting magnet class whose toString method yields a different value. In real life, each subclass would have a more interesting implementation:

  class Magnet(string: String) {
    override def toString() = string

And let's have a very trivial implementation of the method taking the magnet:

  def magnetMethod(magnet: Magnet): Unit = println(magnet)

Now, we can call this method with what look like different arguments, for example:

  implicit def fromStringAndInt(tuple: (String, Int)) = new Magnet(s"Implementation X: (String, Int) = (${tuple._1}, ${tuple._2})")
    magnetMethod("aString", 1)   // "Implementation X: (String, Int) = (aString, 1)" (thanks to fromStringAndInt)
    magnetMethod(("aString", 1)) // ditto

Note calling the magnet method with a tuple has the same effect as calling it with individual arguments.

Now, we could have a different implementation but let's be simple and just pretend there is different functionality. But we really do have a different call site:

  implicit def fromDouble(double: Double) = new Magnet(s"Implementation Y: double = $double")
    magnetMethod(1.1d) // "Implementation Y: double = 1.1" (thanks to fromDouble)

We're calling the same method with different arguments (cardinality as well as types)!