Thursday, September 10, 2015

Notes on the Future (part 1)


Scala's Futures are very different to Java's. Where Java's Futures encourage blocking by their API, Scala's encourages asynchronous coding... apart from one gotcha that follows shortly.

Futures are Monads so they have all that Monad goodness and we can use them in for comprehensions. Unfortunately, this very idiom can introduce a bug. Take this code:

for {
  i <- Future(5)
  j <- Future(11)
} yield j

It creates two futures operating that return the values 5 and 11, possibly on different threads, and returns a Future that contains 11 just like a good Monad should.

"Here, the beauty of the for comprehension sugar can work against us. The for comprehension hides all of that ugly nesting from us, making it look like these are on the same level, but nesting is indeed happening here. What might look concurrent isn't." [1]

Gotcha

What happens here is the first Future is executed and only then is the second. The solution is simple. The code should look like this:

val iFuture = Future(5)
val jFuture = Future(11)
for {
  i <- iFuture
  j <- jFuture
} yield j

For just returning a number, you probably won't notice anything amiss. But for something more ambitious, you may very well notice that the first cut of this code is sub-optimal.

"Only men's minds could have unmapped into abstraction..."

In Scala, we can get our dirty little hands on the contents of a Future by mapping like:

import scala.concurrent.ExecutionContext

    implicit val xc = ExecutionContext.global // needed for next map:

    j map { value =>
      println("j = " + value)
    }

but there are other ways, too. Being monads, we can flatMap and so if we slightly modify the code in [1]:

  implicit val xc = ExecutionContext.global

  val timeToWait = Duration(5, TimeUnit.SECONDS)

  val future1 = Future[Int] {
    (1 to 3).foldLeft(0) { (a, i) =>
      log("future1 "  + i)
      pause()
      a + i
    }
  }

  val future2 = Future[String] {
    ('A' to 'C').foldLeft("") { (a, c) =>
      log("future2 " + c)
      pause()
      a + c
    }
  }

  def spliceFutures(): Unit = {
    val awaitable = future1 flatMap { numsum: Int =>
      log("in future1.flatMap") // ForkJoinPool-1-worker-1: in future1.flatMap

      future2 map { str: String =>
        log("in future2.map") // ForkJoinPool-1-worker-5: in future2.map
        (numsum, str)
      }
    }

    val awaited = Await.result(awaitable, timeToWait)
    println(awaited) // (6,ABC)
  }

  def log(msg: String): Unit = {
    println(Thread.currentThread().getName() + ": " + msg)
  }

"The most powerful way to use these combinators is in combining the results from futures that are already executing. This means that we can still have a lot of independent futures running, and have another piece of code combine the eventual results from those futures into another "final" future that operates on those results."[1]

You'll notice from the log statements that the code in the map and flatMap run in different threads.

But wait, there's more. You can add callbacks with onSuccessonFailure and onComplete, all of which have return type of Unit (so you know that the functions passed to it must have side-effects) and there is no guarantee that they'll run on the same thread that ran the body of the Future.

  val onCompleteFn: Try[Boolean] => Unit = {
    case Success(x)   => println(" success: " + x)
    case Failure(err) => println(" failure: " + err + ", " + err.getCause)
  }
.
.
    aFuture onComplete onCompleteFn 

In fact, Future.onComplete is the daddy. All the other combinator methods (map, flatMap, recover, transform etc) all call it. We'll come back to these other combinators in another post.

Problems in your Future

If an Exception is thrown in your Future, then it will be captured in the failure case in onCompleteFn above. But if an Error is thrown, you'll get a Boxed Error.

Another surprise comes from filter. For Lists, Trys and Options, things are quite straightforward:

    val myList = List(1,2,3)
    val filteredList = myList.filter(_ > 10)
    println(filteredList)   // List()

    val myOption = Option(1)
    val filteredOption = myOption.filter(_ > 10)
    println(filteredOption) // None

    val myTry = Try(1)
    val filteredTry = myTry.filter(_ > 10)
    println(filteredTry)    // Failure(java.util.NoSuchElementException: Predicate does not hold for 1)

That is, a filter whose predicate leads to nothing is represented by the "negative" subclass of the monads. However, there is no such class for an "empty" Future.

    import scala.concurrent.ExecutionContext.Implicits._
    import scala.concurrent.duration._

    val myFuture       = Future { 1 }
    val filteredFuture = myFuture filter {_ > 10}
    filteredFuture onComplete {
      case Success(x) => println(s"success: $x")
      case Failure(x) => println(s"failure: $x")
    } // failure: java.util.NoSuchElementException: Future.filter predicate is not satisfied

    Await.result(filteredFuture, 1.second) // Exception in thread "main" java.util.NoSuchElementException: Future.filter predicate is not satisfied

That is, the Failure case is the path taken by onComplete. If we were so foolish to block using Await, then an Exception is thrown.

[1] Akka Concurrency, Derek Wyatt.

No comments:

Post a Comment