Friday, March 27, 2015

Not so clever Spark


More playing with new shiny toys...

Apache Spark promises to make handling big data faster. It also interfaces with (among other things) Cassandra. This is very useful as Cassandra has limited commands for processing data.

I tried reading in my data from Cassandra on the Spark CLI, so:

val rdd = sc.cassandraTable("keyspace", "table")

Now, I want to sum up all the values for a particular subset of my rows in table. Being little more than a key/value store, I can't do this in Cassandra so let's try it in Spark:

rdd.filter(_.getString("my_where_field") == "X").map(_.getDouble("my_value")).fold(0)((x, y) => x + y)

Not so fast. Really. It takes minutes. The solution is to be more precise earlier:

val rdd = sc.cassandraTable("keyspace", "table").where("my_where_field" = 'X')
rdd.aggregate(0.0D)((acc, row) => acc + row.getDouble("my_value"), (d1, d2) => d1 + d2)

(The second function ((d1, d2) => d1 + d2) defines how the results from all the jobs are processed.)

This is much faster! About 3s over 10 000 rows out of 100 million.

Spark is very nice but don't expect queries to run within a time the average web user's is familiar with. It spins up JVMs on all the boxes in the cluster and executes jobs by default in a serial manner. However, if you want to reduce number crunching from, say, an hour down to minutes, it might be the tool for you.

But do be aware that it is still somewhat immature. I downloaded the latest DataStax bundle that has version 1.1.0 (it's on 1.3 at the moment) and got a ClassCastException deep down in Scala code using Spark SQL (which to be fair is not considered production ready).

val results = csc.cassandraSql("select sum(my_value) from table where my_where_field = 'X'
.
.
results.collec().foreach(println)
15/03/27 19:09:27 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 18, 10.20.166.198): java.lang.ClassCastException: java.math.BigDecimal cannot be cast to scala.math.BigDecimal
    scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)
    org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)



No comments:

Post a Comment