Monday, July 10, 2017

Building Big Data Apps


After spending the last 18 months using Spark to write an entity resolution software for over a terrabyte of data, here are some miscellaneous notes of what I wish I'd known from the start. In no particular order:

1. Make sure your app can recover from a failure easily. Write to HDFS after each major stage. This will also help debugging when the answer that comes out of the sausage machine wasn't entirely what you were expecting. ("Simply split long-running jobs into batches and write intermediate results to disk. This way, you have a fresh environment for every batch and don’t have to worry about metadata build-up" from here). Also, the topology for one stage may not be appropriate for another (see here for an example where smaller numbers of executors with more resources - contrary to the general Spark advice - gives better performance).

2. Small inefficiencies can cause big problems. Use jstack liberally.

3. Don't use Spark as a key/value lookup. That's not what it's built for. Use another system. Don't try to hack it by using a broadcast variable as that simply doesn't scale.

4. Use realistic data, both in size and quality. Making fake data is surprisingly hard if you want the output to remotely correspond to the real world.

5. Have an automated test in a realistic environment (you don't want authentication problems to show up late, for example). Run the app daily in this environment to show any performance changes

6. "Don't start modeling before designing some measurable goals" [1]. Define acceptance criteria early on as to what you'd expect to come out and what you wouldn't. Estimate the false positive/negative rate. For example, at one point we expected 220k of company entities to resolve with Orbis data. Using a very simple query, we were seeing about 130k of our businesses resolve to something. Therefore, the true positive rate could not be higher than about 60% (and may have been less) therefore there was work to do here.

7. Pass small objects around, preferably just IDs. This is what the built-in Connected Component algorithm does. It will improve performance.

The stages of my app

There were 6 stages to my application:

1. Build a matrix using TF-IDF to assign weights to words.
2. Calculate the cosine similarities.
3. Execute bespoke business rules.
4. Find connected components.
5. Turn those IDs back into entities.
6. Consolidate the relationships between these resolved entities in a process called Edge Contraction.

There were some interesting modifications to this basic flow.

1. Feature hashing improved run time performance but the largest connected component went from 600 to 26 000 (BlackRock/Merrill Lynch who seem to have created what appears to be a lot of shell companies with similar names).

2. By ignoring all words that appear in over 1000 documents, there was no need for stop words. This was useful since the corpus was multilingual.

A note on requirements gathering

One tip in finding which database suits you come from the late Dr Jim Gray. "Gray's recipe for designing a database for a given discipline is that it must be able to answer the key 20 questions that the scientist wants to ask of it." [2] A real-world example can be found here. The idea being that 20 questions is the roughly the minimum number of questions you need before a pattern emerges.

This teased out of the business that we needed more than just a graph database (like Neo4J) which they seem for some reason to have fixated on at one point. We also needed the batch processing that GraphX gave us.

[1] Practical Data Science with R.
[2] The Fourth Paradigm: Data-Intensive Scientific Discovery, Tony Hey.

Parquet Flaw


This Spark/Parquet abuse bit us hard this week. "In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory" (from here). This is great for compression as values do not even appear in the file as they are encoded in the directory structure of a Parquet file.

Unfortunately, if you save the Dataset to HDFS, it appears that a new file on HDFS is created for each contiguous block that belongs to the same Parquet directory. For example, if there were only two values for a key, 1 and 2, and the Dataset would map elements to partitions like this:

1,1,2,1,2,2,2,1,1,1...

then this piece of the sequence would result in 5 different files having keys [1,1], [2], [1], [2,2,2] and [1,1,1].

So, instead of fewer but larger files, we had many smaller files. This is a scenario that HDFS does not handle very well and it manifested its displeasure by a DDOS on the Name Node.

What's more, resolving the problem by deleting them also caused the Name Node pain. Using the -skipTrash flag made things a little better.

The solution was to sort the Dataset before saving.

Tuesday, July 4, 2017

Functional Foibles


Functional languages are supposed to protect you from nasties like NullPointerException etc but there are some gotchas.

      a[NoSuchElementException] should be thrownBy {
        List.empty[Int].head
      }

Curiously, Haskell does the same.

$ ghci
GHCi, version 7.10.3: http://www.haskell.org/ghc/  :? for help
Prelude> head [] :: [Int]
*** Exception: Prelude.head: empty list

But this little annoyance occurred in a Spark job (here illustrated with ScalaTest):

      an[UnsupportedOperationException] should be thrownBy {
        List.empty[Int].max
      }

A neat mitigation can be found here and looks like this:

      List.empty[Int].reduceOption(Math.max) shouldBe None

An interesting point was made here on how things should be:
I would suggest that max is not an option. If you have a type (empty[T]), then this fold (I mean max) should return the neutral element regarding max(x,y), that is, the minimal element of the (partial) order of T, if one exists; if it does not, then it makes sense to throw an exception. 
Same as with other monoidal operations, e.g. empty.sum is a zero, not an exception. 
This would be the right mathematical answer.
As a reminder, partial ordering exhibits anti-symmetry, transitivity and reflexivity, that is x ≤ x (contrast this with total ordering which exhibits anti-symmetry, transitivity and totality, that is x ≤ y or y ≤ x).

So, what's being said here is that the minimal element for, say, a Double is Double.MinValue. This is indeed transitive (obviously), reflexive (clearly MinValue  MinValue) and anti-symmetric (if MinValue  x and  MinValue then the only conclusion is x = MinValue).

Compare this to Double.NaN which is not reflexive and therefore a partial ordering does not exist.

So, to be mathematically correct, List.empty[Int].max should return Double.MinValue.

"Head is a different story; here we have a semigroup, not a monoid. There's no neutral element."

And so it is:

      List.empty[Int].sum shouldBe 0




Spark Dataframes and Datasets


Although RDDs are conceptually simple, all new optimizations are coming from DataFrames and Datasets. As I upgrade my software from RDDs, these are the notes I've made.

Parquet

Apache Parquet is a columnar storage format. It can sometimes lead to improved performance, particularly with large data sets. By using a schema and by Parquet storing each column in its own file, queries over 400mb of data can take just one second.

You can covert CSV to Parquet with something like:

spark
  .read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvInPath)
  .write.mode(saveMode)
  .parquet(parquetPath)

DataFrames

We can now read this in with something like:

val df = spark.read.parquet(parquetPath)
df.withColumn("cus_id_no", $"cus_id_no".cast("bigint"))
  .filter($"prim_acc_no".isNotNull)
  .na.fill(0) // fill null numeric
  .na.fill("")// fill null string

Here, we're also filtering and providing defaults.

DataSets

DataSets are type safe. They can be derived from DataFrames with something like this:

headerDS = headerDF.withColumn("COLUMN", UDF).withColumn( .... ).as[MyCaseClass]

and manipulated with something like this:

val ds        = df.as[CaseClass]
val groupedBy = ds.groupByKey(_.x)
val joined    = ds.joinWith(groupedBy, ds("cus_id_no") === groupedBy("_1"), "left_outer")

DataFrames are just of type Dataset[Row] - see the type alias in the package object of org.apache.spark.sql:

type DataFrame = Dataset[Row]

Tungsten

"By avoiding the memory and GC overhead of regular Java objects, Tungsten is able to process larger data sets than the same hand-written aggregations" (from here).

Among Tungsten's clever features, it:

1. does not deserialize the whole object (very useful in a joinBy etc)

2. manages objects off-heap (see here).

Catalyst

"At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasiquotes) in a novel way to build an extensible query optimizer." from here.

The bad news

With these APIs, you have to accept a much less rich interface. For instance, groupByKey returns a KeyValueGroupedDataset which has a limited set of functions (for instance, there is no filter and mapping to any type that doesn't have an org.apache.spark.sql.Encoder associated with it leads to a compile-time error of "... Support for serializing other types will be added in future releases")

You can always convert them to RDDs but then you lose all optimization benefits.

The difference

This takes 1.2 hours on Orbis data:

headerDS.rdd.map(x => x.BVDIDnumber -> Seq(x)).reduceByKey(_++_).filter(_._2.size>1).take(10)

whereas this takes 5 mins:

headerDF.groupByKey(_.getString(0)).flatMapGroups { case (key, iter) => val ys = iter.toSeq ; if (ys.size >1) Seq(ys.map(_.mkString(", "))) else Seq.empty }.take(10)

Aside: this idiom reflects a monadic principle.

The filter method is completely described in one simple law:
FIL1. m filter p ≡ m flatMap {x => if(p(x)) unit(x) else mzero}

(from here).

Joins

If we want to optimize a join, we might want to re-partion the DataFrame with something like this so that all joins will take place in one partition.

aDataFrame.repartition(FIELD_NAME).write.format("parquet").mode("overwrite").save(FILE_NAME)

Now, if we want to do a join, it looks a little like this:

aDataFrame.join(other, other("record_id") <=> $"record".getItem("record_id"))

where

other.printSchema
root
 |-- document_type: string (nullable = true)
 |-- record_id: string (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- entitySize: integer (nullable = true)

and

aDataFrame.printSchema
root
 |-- record: struct (nullable = true)
 |    |-- document_type: string (nullable = true)
 |    |-- record_id: string (nullable = true)
.
.


Sunday, June 18, 2017

Entropy


In my physics degree, we were asked to calculate the entropy of a chess board. Students smarter than me snorted at this silly exercise. Yet, entropy is just a statistical measure. You cannot measure it directly (there are no entropy-ometers) but it exists everywhere and can be used in odd places like machine learning (eg maximum entropy classifiers which "from all the models that fit our training data, selects the one which has the largest entropy").

Alternatively, you might want to find the configuration with the smallest entropy. An example is here where the quality of a clustering algorithm (k-means in this case) is assessed by looking at the entropy of the detected clusters. "As an external criteria, entropy uses external information — class labels in this case. Indeed, entropy measures the purity of the clusters with respect to the given class labels. Thus, if every cluster consists of objects with only a single class label, the entropy is 0. However, as the class labels of objects in a cluster become more varied, the entropy value increases."

For instance, say you are trying to find the parameters for an equation such that it best fits the data. "At the very least, you need to provide ... a score for each candidate parameter it tries. This score assignment is commonly called a cost function. The higher the cost, the worse the model parameter will be... The cost function is derived from the principle of maximum entropy." [1]

What is Entropy

I found this description of heads (H) and tails (T) from tossing a coin enlightening:

"If the frequency [f] of H is 0.5 and f(T) is 0.5, the entropy E, in bits per toss, is

-0.5 log2 0.5

for heads, and a similar value for tails. The values add up (in this case) to 1.0. The intuitive meaning of 1.0 (the Shannon entropy) is that a single coin toss conveys 1.0 bit of information.

Contrast this with the situation that prevails when using a "weighted" or unfair penny that lands heads-up 70% of the time. We know intuitively that tossing such a coin will produce less information because we can predict the outcome (heads), to a degree. Something that's predictable is uninformative. Shannon's equation gives

-0.7 log2 (0.7) = 0.3602

for heads and

-0.3 log2  (0.3) = 0.5211

for tails, for an entropy of 0.8813 bits per toss. In this case we can say that a toss is 11.87% [1.0 - 0.8813] redundant."

Here's another example:

X = 'a' with probability 0.5
    'b' with probability 0.25
    'c' with probability 0.125
    'd' with probability 0.125

The entropy of this configuration is:

H(X) = -0.5 log(0.5) - 0.25 log(0.25) - 0.125 log(0.125) - 0.125 log(0.125) = 1.75 bits

What does this actually mean? Well, if the average number of questions asked ("Is X 'a'? If not, is it 'b'? ...") then "the resulting expected number of binary questions required is 1.75" [2].

Derivation of entropy

Basically, we want entropy to be extensive. That is "parameters that scale with the system. In other words U(aS,aV,aN)=aU(S,V,N)".

So, if SX is the entropy of system X, then the combined entropy of two systems, A and B, would be:

SC = SA + SB

Second, we want it to be largest when all the states are equally probably. Let's call the function f then the average value is:

S = <f> = Σipif(pi)               Equation 1

Now, given two sub-systems, A and B, the system they make up C will have entropy:

SC = ΣiΣjpipjf(pi)f(pj)            Equation 2

that is, the we are summing probabilities over the states where A is in state i and B is in state j.

For equation 2 to conform to the form of equation 1, let's introduce the variable pij=pipj.Then:

SC = ΣiΣjpijf(pij)

For this to be true, f = C ln p since ln(ab) = ln(a) + ln(b).

This is the argument found here.

[1] Machine Learning with Tensor Flow.
[2] Elements of Information Theory.

Sunday, June 11, 2017

Scala Equality


The code:

Set(1) == List(1)

will always return false but will also always compile. This is pathological.

"Equality in Scala is a mess ... because the language designers decided Java interoperability trumped doing the reasonable thing in this case." (from here).

There are ways of solving this problem, the simplest being to use === that a number of libraries offer. Here are a three different ways of doing it:

  def scalazTripleEquals(): Unit = {
    import scalaz._
    import Scalaz._
//    println(List(1) === List("1")) // doesn't compile :)
  }

  def scalaUtilsTripleEquals(): Unit = {
    import org.scalautils.TypeCheckedTripleEquals._
//    println(List(1) === List("1")) // doesn't compile :)
  }

  def scalacticTripleEquals(): Unit = {
    import org.scalactic._
    import TypeCheckedTripleEquals._
//    println(List(1) === (List("1"))) // doesn't compile :)
  }

But what about incorporating it into the Scala language itself?

Scala creator, Martin Odersky's views can be found here here. He is proposing "it is opt-in. To get safe checking, developers have to annotate with @equalityClass ... So this means we still keep universal equality as it is in Scala now - we don’t have a choice here anyway, because of backwards compatibility."

Warning: ScalaTest

There is a horrible gotcha using Scalactic and ScalaTest (which is odd since they are stable mates). The problem is that you want compilation to fail for something likes this:

import org.scalatest.{FlatSpec, Matchers}

class MyTripleEqualsFlatSpec extends FlatSpec with Matchers {

  "triple equals" should "not compile" in {
    List(1)  should === (List("1"))
  }

}

Only it doesn't. It happily compiles! This is not what was expected at all given the code in scalacticTripleEquals() above. The solution can be found here. You must change the class signature to:

class MyTripleEqualsFlatSpec extends FlatSpec with Matchers with TypeCheckedTripleEquals {

for the compiler to detect this error.

Saturday, June 10, 2017

Either Mither


Either has changed. A superficial Google might suggest that Either represents just that - either A or B.

"You cannot, at least not directly, use an Either instance like a collection, the way you are familiar with from Option and Try. This is because Either is designed to be unbiased.

"Try is success-biased: it offers you map, flatMap and other methods that all work under the assumption that the Try is a Success, and if that’s not the case, they effectively don’t do anything, returning the Failure as-is." (from here)

And: "if you use Either for error reporting, then it is true that you want it to be biased to one side, but that is only one usecase of many, and hardcoding a single special usecase into a general interface smells of bad design" from here.

But then you see this in the Scala documentation: "Either is right-biased, which means that Right is assumed to be the default case to operate on. If it is Left, operations like map, flatMap, ... return the Left value unchanged".

This apparent contradiction arises as Scala 2.12 changed Either. It has become biased.

Let's demonstrate using ScalaTest. First, we define an Either and some functions to act on it:

    type LeftType   = List[Int]
    type RightType  = Int
    type EitherType = Either[LeftType, RightType]
    val left        = Left(List[Int]())
    val right       = Right(1)

    val rightFn: (RightType) => RightType = _ + 1
    val leftFn:  (LeftType)  => LeftType  = _ :+ 1

Then, the test looks like:

    "Either" should {
      "be agnostic if left or right has been explicitly stated on an Either that's a Left" in {
        val either: EitherType = left
        either.left.map(leftFn) shouldEqual Left(List(1))
        either.right.map(rightFn) shouldEqual left // unchanged
      }
      "be agnostic if left or right has been explicitly stated on an Either that's a Right" in {
        val either: EitherType = right
        either.right.map(rightFn) shouldEqual Right(2)
        either.left.map(leftFn) shouldEqual right // unchanged
      }
.
.

So far, so good. But this following code is new in 2.12 (it won't compile in earlier versions):

      "ignore the unbiased side" in {
        val either: EitherType = left
        either.map(rightFn) shouldEqual left
      }
      "map the biased side" in {
        val either: EitherType = right
        either.map(rightFn) shouldEqual Right(2)
      }

Either's new monadic functions cannot take anything other than a function of type (RightType) => ...

The mnemonic is that if you're using this for error reporting, then Right is the right answer.

Tuesday, May 30, 2017

Rocha Thatte Cycle Detection Algorithm


Flows of money and ownership in a network of businesses and individuals can indicate fraudulent behaviour. For instance, if there is a cycle in the network such that X owns Y who owns Z and Z audits X, you can quickly see that there is a conflict of interest. Such suspicions are of interest to us.

GraphX is very good at sniffing out these networks but you don't get cycle detection out-of-the-box. So, I rolled-my-own that happened to be similar to an algorithm somebody else has already discovered, the Rocha Thatte algorithm.

The algorithm

The Wikipedia page gives an excellent overview so I won't bore you with the details. Suffice to say that each vertex passes all the new paths going through it to its neighbouring vertex at each super-step.

The code is quite simple since GraphX does all the heavy lifting. Let's introduce a few type aliases:

import org.apache.spark.graphx.{VertexId, EdgeTriplet}

  type VertexPrg[T]   = (VertexId, T, T) => T
  type EdgePrg[T, ED] = (EdgeTriplet[T, ED]) => Iterator[(VertexId, T)]

No matter which algorithm we create, they have to implement these functions.

Now, we'll introduce some-domain specific aliases:

  type Path[T]  = Seq[T]
  type Paths[T] = Set[Path[T]]

and finally, the GraphX merge function (which is just (U,U)=>U) for us would look like:

  type MergeFn[T] = (Paths[T], Paths[T]) => Paths[T]

then the implementation of Rocha Thatte looks like this. The 'program' that runs on the vertex can be created here:

def vertexPrg[T](merge: MergeFn[T]): VertexPrg[Paths[T]] = { case (myId, myAttr, message) =>
  merge(myAttr, message)


and the 'program' running on edges looks like this:

type AddStepFn[T, ED] = (EdgeTriplet[Paths[T], ED]) = Paths[T]
def edgePrg[T, ED](add: AddStepFn[T, ED]): EdgePrg[Paths[T], ED] = { case edge =>
  import edge._
  val allPaths = add(edge)
  // TODO check attributes here for whatever business reasons you like
  if (allPaths == dstAttr) Iterator.empty else Iterator((dstId, allPaths))
}

(note: I've simplified the implementation for illustrative purposes. This code performs no checks.)

The problem

The shape of the graph is important. For instance, I ran this algorithm on a (disconnected) network with about 200 million edges, 400 million vertices and sub-graphs with a maximum diameter of 6. It ran in about 20 minutes on a cluster with 19 beefy boxes.

However, I ran it on a much smaller (connected) network of 26 thousand vertices, 175 thousand edges and a diameter of 10 with little success. I found that I could manage only 3 iterations before Spark executors started to die with (apparently) memory problems.

The problem was that this graph had regions that were highly interconnected (it actually represented all the business entities we had that were related to BlackRock Investment Management and Merrill Lynch, of which there are many). Let's say that a particular vertex has 100 immediate neighbours each with 100 of their own and each of them had 100. This quickly explodes into many possible paths through the original vertex (about 1 million) after only 3 super-steps.

It's not too surprising that this is an issue for us. After all, Spark's ScalaDocs do say "ideally the size of [the merged messages] should not increase."

For such a dense graph, super nodes are almost inevitable. For our purposes, we could ignore them but YMMV depending on your business requirements.

Monday, May 29, 2017

Good Hash


Since the feature hashing code that comes with Spark is based on a 32-bit hash giving us lots of collisions, we went for a 64-bit implementation. But what makes a good hash algorithm?

Rather than roll my own, I used this code. But how do I make sure it's good? This Stack Exchange answer gave me some clues by using the chi-squared test. Here, we basically compare with some fairly simple maths, our expected answer to our actual answer to indicate whether there is an unusually high number of collisions that our algorithm generates.

"The chi-squared test of goodness of fit is used to test the hypothesis that the distribution of a categorical variable within a population follows a specific pattern of proportions, while the alternative hypothesis is that the distribution of the variable follows some other pattern." [1]

The Chi-Square Test

"The values of column and rows totals are called marginals because they are on the margin of the table... The numbers within the table ... are called joint frequencies...

"If the two variables are not related, we would expect that the frequency of each cell would be the product of its marginals, divided by the sample size." [1]

This is because the probabilities of A and B are:

P(A ∩ B) = P(A) P(B) iff A ⊥ B

In other words, given a sample size of N, the probability of A is

P(A) = NA / N

and

P(B) = NB / N

so

P(A ∩ B) = NB NA / N2

so the expected frequency would be

N P(A ∩ B) = NB NA / N

exactly as [1] says.

The Chi-Square distribution then looks like this:

χ = Σi,j=1 (Oij - Eij)2/ Eij

where i and j are being summed over all the rows and columns.

The Results

Using this new hashing algorithm for our feature hashing resulted in no collisions while hashing all words in the corpus of text where previously we were getting about 70 million collisions with the 32-bit version.

Consequently, the number of records we then had to compare dropped by about 20 million (or about 2% of the total).

Unfortunately, although I am able to execute a chi-square test on the 32-bit algorithm, the 64-bit algorithm has such a large possible number of hash values, it's not practically possible to test it in such a manner.

[1] Statistics in a Nutshell, Sarah Boslaugh


Friday, May 26, 2017

The Probability Monad (part 1)


This is an interesting idea: probability distributions can be modeled as monads. The canonical description lives here but it's very Haskell-heavy. So, in an attempt to grok the probability monad, you might like to look at a Scala implementation here.

[Aside. I tried looking at the Haskell code but had to run
cabal install --dependencies-only --enable-tests
see here for more information.]

The monad in this Scala library is Distribution[T] where T can be, say, a Double such as in a Gaussian distribution:

  object normal extends Distribution[Double] {
    override def get = rand.nextGaussian()
  }

It could be something more interesting, for instance, here the Distribution monad in this particular case is parameterized with a List[Int].

  /**
   * You roll a 6-sided die and keep a running sum. What is the probability the
   * sum reaches exactly 30?
   */
  def dieSum(rolls: Int): Distribution[List[Int]] = {
    always(List(0)).markov(rolls)(runningSum => for {
      d <- die
    } yield (d + runningSum.head) :: runningSum)
  }
  def runDieSum = dieSum(30).pr(_ contains 30)

Simulation, simulation, simulation

The method pr will create a simulation where we sample an arbitrary number of monads (default of 10 000). We then filter them for those that contain a score of exactly 30 and calculate the subsequent probability.

Filtering the monads means that traversing the list of 10 000 and calling filter on each one to find ones with a score of 30. Each monad in the list is actually a recursive structure 30 deep (the number of  rolls of the dice; any more is pointless as the total will necessarily be greater than 30).

That's the high-level description. Let's drill down.

State monads again

This recursive structure is a state monad. The monads are created by the recursive calls to markov(). This method creates a new monad by calling flatMap on itself. The get method of this new, inner monad takes the value of its outer monad, passes it to the function that flatMap takes as an argument and in turn calls get on the result.

Having created this inner monad, markov() is called on it and we start the next level of recursion until we have done so 30 times. It is this chain of get calling get when the time comes that will build up the state.

Consequently, we have the outermost monad being a constant Distribution that holds List(0). This is what a call to the outermost get will return. However, get is not publicly accessible. We can only indirectly access it by calling the monad functions.

In short, we have what is a little like a doubly-linked list. The outermost monad contains the "seed", List(0), and a reference to the next monad. The inner monads contain a reference to the next monad (if there is one) and a reference to its outer monad's value via get.

Note that it is the innermost monad that is passed back to the call site calling dieSum, in effect turning the structure inside out.

Anyway, the next job is to filter the structure. This creates a new monad (referencing the erstwhile innermost monad) to do the job but remember monads are lazy so nothing happens yet. It's only when we call a sample method on this monad that something starts to happen. At this point, get is called and we work our way up the get-chain until we reach the outermost monad that contains List(0). Then we "pop" each monad, executing the runningSum => function on the results of the monad before. This is where we roll the die and add append the cumulative result to the List.

If the given of the filter monad is not met, then we keep trying the whole thing again until it is.

Finally, we count the results that meet our predicate dividing by total number of runs. Evidently, we've taken a frequentist approach to probabilities here.


Sunday, May 21, 2017

Lazy Scala


This might be elementary but when a simple error trips me more than once, I blog it.

You probably know that an Iterator can only be accessed once as it's stateful. "An Iterator can only be used once because it is a traversal pointer into a collection, and not a collection in itself" from here. (Iterable just means the implementing class can generate an Iterator).

The simple mistake I made was to check the size of the iterator in a temporary log statement before mapping over it. What was a little more interesting was that other collections behave the same way if you call them via their TraversableOnce interface.

To demonstrate, say we have a Set

    val aSet = Set(1, 2, 3)

and two functions that are identical other than the type of their argument:

  def mapSet[T](xs: Set[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)
    println(mapped.mkString("\n"))
    mapped
  }

  def mapTraversableOnce[T](xs: TraversableOnce[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)
    println(mapped.mkString("\n"))
    mapped
  }

then mapTraversableOnce will return an empty iterator while mapSet will return a Set of Strings. This will come as a surprise to anybody expecting Object Oriented behaviour.

Iterator also violates the functor laws. Take two otherwise identical methods:

  def isTraversableFunctor[T, U, V](xs: Traversable[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)
  }

  def isTraversableOnceFunctor[T, U, V](xs: TraversableOnce[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)
  }

and pass them aSet. The first will say it's a functor, the second says it's not.

This is somewhat trivial as the reason it fails is that the iterator has not been materialized. "TraversableOnce's map is not a functor map, but, then again, it never pretended to be one. It's design goals specifically precluded it from being a functor map" from Daniel C. Sobral.

Laziness

Iterator comes into its own when we want access to the underlying elements lazily. But there are other ways to do this like Stream "which implements all its transformer methods lazily."

Almost all collections are eager by default. Use the view method to make them lazy. Use force on these to make them eager again.

Finally, some methods are naturally lazy. For instance, exists terminates quickly if it finds what it's looking for.


Tuesday, May 16, 2017

Playing with Pregel


Implementing your own distributed Pregel algorithm in GraphX is surprisingly simple but there are a few things to know that will help you.

First, what is GraphX's Pregel implementation? Well, it takes three functions:

1. one that merges messages of type T, that is (T, T) => T.

2. one that runs on each vertex with an attribute type of VD and that takes that message and creates a new state. Its type is (VertexId, VD, T) => VD.

3. one that runs on each edge/vertex/vertex combination and produces messages to be sent to the vertices. Its type is (EdgeTriplet[VD, ED]) => Iterator[(VertexId, VD)] where ED is the edge's attribute type.

TL;DR: the vertex holds its state and the edges send it messages. If this starts sounding like the Actor Model pattern to you, you're not alone

"It's a bit similar to the actor mode if you think of each vertex as an actor, except that vertex state and messages between vertices are fault tolerant and durable, and communication proceeds in fixed rounds: at every iteration the framework delivers all messages sent in the previous iteration. Actors normally have no such timing guarantee." - Martin Kleppmann.

So, off I went and wrote my code where all the paths through a vertex are stored as state at that vertex.

And it ran like a dog. After five hours of processing, it died with out of memory exceptions.

Judicious jstack-ing the JVMs in the cluster showed that threads were hanging around in Seq.containsSlice (we're using Scala 2.10.6). This Scala method was being used to find sub-sequences of VertexIds (which are just an alias for Longs) in the paths that had already been seen.

Desperate, I turned the Seq of Longs to Strings and then used String.contains and the whole thing ran in less than 25 minutes.

This is not the first time I've seen innocent looking code bring a cluster to its knees. Curious, I wrote some micro-benchmarking code comparing these two methods using JMH and got this:


     Benchmark                                   Mode  Cnt          Score          Error  Units
     ContainsSliceBenchmark.scalaContainsSlice  thrpt    5     594519.717 ±    33463.038  ops/s
     ContainsSliceBenchmark.stringContains      thrpt    5  307051356.098 ± 44642542.440  ops/s

That's three orders of magnitude slower.

Although it's a hack, this approach gives great performance. And it shows that taking jstack snapshots of your cluster is the best way to understand why your big data application is so slow.

Friday, May 5, 2017

Spark OutOfMemoryErrors.


Spark can deal with data sets that are much larger than the available physical memory. So, you never have problems with OutOfMemoryErrors, right? Well, no. It depends what you're doing.

When asked how to avoid OOMEs, the first thing people generally say is increase the number of partitions so that there is never too much data in memory at one time. However, this doesn't always work and you're better off having a look at the root cause.

This very week was one of those times. I was getting FetchFailedException. "This error is almost guaranteed to be caused by memory issues on your executors" (from here). Increasing the number of partitions didn't help.

The problem was that I had to find similar entities and declare there was a relationship between them. Foolishly, I tried to do a Cartesian product of all entities that were similar. This is fine 99.99% of the time but scales very badly - O(n2) - for outlying data points where n is large.

The symptoms were that everything was running tickety-boo until the very end where the last few partitions were taking tens of minutes and then the fatal FetchFailedException was thrown.

These pairs were going to be passed to GraphX's Connected Components algorithm so instead of creating a complete graph (which is what the Cartesian product of the entities would give you) you need to a different graph topology. This is not as simple as it sounds as you have to be a little clever in choosing an alternative. For instance, a graph that is just a line will take ages for GraphX to process if it is sufficiently large. For me, my driver ran out of memory after tens of thousands of Pregel supersteps.

A more efficient approach is a star topology that has branches of length 1. This graph is very quickly explored by Pregel/ConnectedComponents and has the same effect as a complete graph or a line since all vertices are part of the same connected component and we don't care at this point how they got there.

The result was the whole graph was processed in about 20 minutes.


Monday, April 24, 2017

Crash Course in Conditional Random Fields


In an attempt to try to grok Conditional Random Fields used in the Mallet library in a hurry, I've made some miscellaneous notes.

The basics


You will have been taught at school that the joint probability of A and B is:

P(A ∩ B) = P(A) P(B)

iff A and B and independent (see here).

Depending on your interpretation of probability theory, it is axiomatic that the relationship between the joint probability and the conditional probability is:

P(A ∩ B) = P(A|B) P(B)

These will come in useful.

CRFs

"A conditional random field is simply a conditional distribution p(y|x) with an associated graphical structure." [1]

We consider the distribution over

V = X ∪ Y

where:

V are random variables
X observed inputs
Y outputs we'd like to predict

"The main idea is to represent a distribution over a large number of random variables by a product of local functions [ΨA] that each depend on only a small number of variables." [1]

The Local Function

The local function has the form:

ΨA (xA, yA) = exp { Σk θA,k fA,k(xA, yA) }

where k is the k-th feature of a feature vector.

Graphical Models

"Traditionally, graphical models have been used to represent the joint probability distribution p(y, x) ... But modeling the joint distribution can lead to difficulties ... because it requires modeling the distribution p(x), which can include complex dependencies [that] can lead to intractable models. A solution to this problem is to directly model the conditional distribution p(y|x). " [1]

Undirected Graphical Model

If these variables are A ⊂ V, we define an undirected graphical model as the set of all distributions that can be written as:

p(x, y) = (1/Z) ∏A ΨA (xA, yA)

for any choice of factors, F = {ΨA}, where:

ΨA is a function υn → ℜ+ where υ is the set of values v can take
Z is the partition function that normalizes the values such that

Z = Σx,yA ΨA (xA, yA)

Factor Graph

This undirected graphical model can be represented as a factor graph. "A factor graph is a bipartite graph G = (V, F, E)" that is, a graph with two disjoint sets of vertices. One set is the variables, vs ∈ V, the other is the factors ΨA ∈ F. All edges are between these two sets. An edge only exists if vertex vs is an argument for vertex ΨA.

Directed Model (aka Bayesian Network)

This is based on a directed graph G = (V, E) and is a family of distributions (aka "model") that factorize as:

p(x,y) = ∏v∈V p(v| π(v))

where π(v) are the parents of v in the graph G.

Naive Bayesian Classifier

As a concrete example, let's look at Mallet's implementation of Naive Bayes.

Firstly, what make Naive Bayes naive? "Naive Bayes is a simple multiclass classification algorithm with the assumption of independence between every pair of features." (from here).

We ask ourselves: given the data, what is the probability that the classifiers is C? Or, in math-speak, what is p(C|D)?

Now, the data, D, is made up of lots of little data points, { d1, d2, d3, ... }. And given that little equation at the top of this post, if all these data points are independent then:

p(D|C) = p({ d1, d2, d3, ... } | C) = p(d1|C) p(d2,|C) p(d3,|C) ...

Mallet's Java code for this is surprisingly easy and there is a JUnit test that demonstrates it. In the test, there is a dictionary of all the words in a corpus. It's a small dictionary of the words { win, puck, team, speech, vote }.

We create two vectors that represent the weightings for these words if a document relates to politics or sport. Not surprisingly, {speech, vote, win} have a higher weighting for politics and {team, puck, win} have a higher weighting for sports but all words have a nominal value of 1 added (before normalization) in each vector. This is Laplace Smoothing and it ensures the maths doesn't blow up when we try to take the log of zero.

Note that these weightings for each category are by definition p(D|C), that is, the probability of the data given a classification.

This being Bayes, we must have a prior. We simply assume the probability of an incoming document to be about sports or politics as equally likely since there is no evidence to the contrary.

Now, a feature vector comes in with {speech, win} equally weighted. To which class should it go?

The code effectively
  1. Calculates an inner product between the feature vector and each of (the logarithm of) the class's vector and then adds (the logarithm of) the bias. This is the multiplication in the Local Function section above.
  2. Deducts the maximum result for all results. This appears to be to handle rounding errors and drops out of the equation when we normalize (see below)
  3. Takes the exponential of each result.
  4. Normalizes by dividing by the sum of these exponentials. This is the partition function we saw above, Z.
The most probable is the class we're looking for.

But why does the code do this? Bayes theorem gives us:

p(C|D) = p(D|C) p(C) / p(D) 
            = prior x p(D|C) / Z 
            = prior x p(d1|C) p(d2,|C) p(d3,|C) ...  / Z

now, if we let:

λy    = ln(prior)
λy,j  = ln(p(dj|Cy))

then

p(C|D) = (eλy ∏j=1K eλy,jdj) / Z =  (eλy + Σj=1K λy,jdj) / Z

which is what the Java code does and is the same as equation 1.6 in [1]. Note the Java effectively has a e-maxScore in the numerator and denominator which of course cancels out.

Finally, that exponential value λy + Σj=1K λy,jdj can be more elegantly expressed as a matrix multiplication between our dictionary weights for the classifier, f and the vector to classify, θ (where the bias has been subsumed into the 0-th entry of the vector). That then gives us the Local Function above and so we have come full circle.

Aside: One last note on Naive Bayesian Classifiers. Like the TF-IDF statistic, it works far better than it should do. "Statisticians are somewhat disturbed by use of the NBC (which they dub Idiot's Bayes) because the naive assumption of independence is almost always invalid in the real world.  However, the method has been shown to perform surprisingly well in a wide variety of contexts.Research continues on why this is." (from here)

[1] An Introduction to Conditional Random Fields for Relational Learning.

Sunday, April 9, 2017

Generating a Gaussian distribution


Firstly, let's consider what a Gaussian is. It can be derived here. The definition Dr Wilson gives is: "Data are said to be normally distributed if the rate at which the frequencies fall off is proportional to the distance of the score from the mean, and to the frequencies themselves". Or, in maths speak:

df = k (x - μ) f(x)
dx

where k is a constant and μ is the mean. Integrating this formula and setting it to 1 (since it's a probability distribution) via integration by parts and integration by substitution, you'll eventually get:

f(x) =   e (x - μ)2/2 σ 2 
       σ √2π

which is the Gaussian, where σ is the standard deviation.

Now, squaring it and integrating it twice still equals 1 (since obviously 12 = 1) and looks like this:

(1/2π) ∫-∞-∞  e-x2/2 e-y2/2  dx dy

where I've absorbed the constants into variables and used both x and y for each integration. The trick is now to use polar co-ordinates, so:

x     = r cos θ
y     = r sin θ
dx dy = r dr dθ

and that integral becomes:

(1/2π) ∫0  ∫0  r e-r2/2  dr

which, with another integration by substitution with ψ = r2/2, becomes:

(1/2π) ∫0  ∫0  e  dψ

and again with Υ = e.

(1/2π) ∫0  ∫10  Υ dΥ = (1/2π)2π x 1 = 1

Where Transformation Sampling comes in

"We don't care about this integral. What we do is we use our relationship between integration and sampling" [Werner Krauth at Coursera] where θ is a random number between 0 and  and Υ is a random number between 0 and 1.

So, now we work backwards. If ψ = r2/2 and Υ = e, then r =  √ (- 2 ln Υ) where Υ is sampled over the interval [0,1]. The easy bit is θ which is simply sampled in the interval [0, ].

Plugging these numbers back into the equations for polar co-ordinates, we can now give samples for x and y. Notice that you actually get 2 values (x and y) for the price of one. So, implementations (like java.lang.Random.nextGaussian) maintain state (the next Gaussian value if it's available) which is calculated for "free" - except that the method is synchronized...

Implementation

What we have above is a decent implementation for generating Gaussian values but it's inefficient. There is a nice math-hack to make it better.

Java Number Cruncher (p367) says:
We begin this algorithm by generating two random values v1 and v2 that are uniformly distributed between -1 and +1. We then compute the value s = v12 + v22. If s>1, we reject the values of v1 and v2 generate new ones... With two good values of v1 and v2, we then compute:
x1 = v1 √((-2 ln s) / s)
x2 = v2 √((-2 ln s) / s)
Where this comes from had me scuttling off to here which describes the Box Muller method. Basically, we want a number between 0 and 1 for Υ and any value in [0, ] for θ. Well, this is just finding uniformly distributed values within the unit circle. Since the process of scattering points uniformly over a box in no way favours nor discriminates between points within a circle within that box, we can be sure that just taking those inner points is also uniformly distributed.

What we want to do is get rid of those expensive sin and cos functions for x and y. We note that

cos θ = adjacent / hypotenuse = v1 / s
sin θ = opposite / hypotenuse = v2 / s

since s2 = r is the length of our vector (as defined in the above algorithm from Java Number Crunchers). We then plug that into our polar co-ordinates equation above, we get the equations for x1 and x2 we see above. QED.