Thursday, December 31, 2015

Latent Semantic Analysis with Spark


I'm reading the excellent Advanced Analytics with Spark as I need to learn more about natural language processing (NLP).

The method in the book is called Latent Semantic Analysis. Basically, this is building a vector for each document that captures the relevant words and their significance. More specifically, all vectors have the same size and the values are roughly speaking the fraction of a given term in the document multiplied by the number of occurrences of this term across all documents (with some logs thrown in to dampen outliers). A given index in each vector represents the same term over all documents.

The code for the book can be retrieved with:

git clone https://github.com/sryza/aas.git

The first step is a process of lemmatization which removes stop words and aggregates related words. For this, an NLP library from Stanford University is used. The code looks like this:

    val lemmatized = plainText.mapPartitions(iter => {
      val pipeline = createNLPPipeline()
.
.

Interestingly, we "use mapPartitions so that we only initialize the NLP pipeline once per partition instead of once per document". [1]

(The variable lemmatized is an RDD of documents where the documents are just a Seq of Strings that have had the stop words removed and related words aggregated.)

By mapping over lemmatized, we make an RDD of Map[String, Int] that represent the term count per document and we call this docTermFreqs. We combine all these for each partition of documents and merge them all together at the end. "When the records being aggregated and the result object have the same type (eg, in sum), reduce is useful, but when the types differ, as they do here, aggregate  is a more powerful alternative" [1]

Unfortunately, this approach can lead to OutOfMemoryErrors. So, an alternative is to find the distribution of terms over the documents with this:

    val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)

(where 15 is our number of partitions). We also limit the number of terms:

    docFreqs.top(numTerms)(ordering)

where numTerms is arbitrary but defaults to 50 000. From this, we have enough information to calculate the Inverse Document Frequencies:

    docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap

The use of log dampens the effect of outliers.

In turn, with this we can map over all docTermFreqs creating a sparse vector of all the terms and their scores as we go.

The Maths

So much for the Scala, now for the mathematics. LSA depends on a trick called Singular Value Decomposition. This is a very general technique that is used in many diverse maths problems. A good demonstration of it (using Python) is available here. A simple(ish), instuitive explanation of it is here.

One use of SVD is to reduce the dimensions of the problem to something more manageable. One consequence of such a reduction is a loss of accuracy but if this loss is small, the approximation might be acceptable.

A consequence of SVD is that our matrix is broken down into three matrices, each with its own properties:

X = U D VT 

U is an N × p orthogonal matrix (UT U = Ip ) whose columns uj are called the left singular vectors;
V is a p × p orthogonal matrix (VT V = Ip ) with columns vj called the right singular vectors
D is a p × p diagonal matrix, with diagonal elements d1 ≥ d2 ≥ · · · ≥ dp ≥ 0 known as the singular values 
(taken from [2]). In English,

X is an (N x p) matrix that holds the observations. In general, N is the number of samples and p the number of features. For our purposes, N is the number of documents and p the number of terms. That is, the rows correspond to documents and the columns to terms. The values are:

(the calculation for a term from docFreqs seen earlier) x (frequency of this term in this document) / (total number of terms in this document).

V is a matrix "where each row corresponds to a term and each column corresponds to a concept. It defines a mapping between term space (the space where each point is an n-dimensional vector holding a weight for each term) and concept space (the space where each point is an n-dimensional vector holding a weight for each concept)."

U is a "matrix where each row corresponds to a document and each column corresponds to a concept. It defines a mapping between document space and concept space." [1]

There's a lot more to the mathematics and I'll post more soon but this is a good starting place to get more familiar with it.

Spark

Now, this is just a refresher on Spark's handling of matrices. Data handled as rows in Spark is easy. But if you want to do anything interesting with a matrix (multiply, take a transpose etc) this works less well. You need to deal with columns and the data in a column might be striped across many machines unlike rows.

At the moment (it's an experimental API), Spark has 4 implementations of DistributedMatrix each with slightly different abilities. The BlockMatrix implementation is currently the only one that allows one distributed matrix to be multiplied by another. As its name suggests, it uses a mathematical trick to better handle operations by breaking it into a block matrix.

Other implementations can be multiplied by a Matrix object (dense or sparse) but this appears to be local to the node on which the code runs.


[1] Advanced Analytics with Spark
[2] Elements of Statistical Learning (free download)

No comments:

Post a Comment