Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Monoids, Store, and Dependency Injection - Abst...

Ryan Weald
January 16, 2014

Monoids, Store, and Dependency Injection - Abstractions for Spark Streaming Jobs

Talk I gave at a Spark Meetup on 01/16/2014

Abstract:
One of the most difficult aspects of deploying spark streaming as part of your technology stack is maintaining all the code associated with stream processing jobs. In this talk I will discuss the tools and techniques that Sharethrough has found most useful for maintaining a large number of spark streaming jobs. We will look in detail at the way Monoids and Twitter's Algebrid library can be used to create generic aggregations. As well as the way we can create generic interfaces for writing the results of streaming jobs to multiple data stores. Finally we will look at the way dependency injection can be used to tie all the pieces together, enabling raping development of new streaming jobs.

Ryan Weald

January 16, 2014
Tweet

More Decks by Ryan Weald

Other Decks in Technology

Transcript

  1. @rweald What We’re Going to Cover •What we do and

    Why we choose Spark •Common patterns in spark streaming jobs •Monoids as an abstraction for aggregation •Abstraction for saving the results of jobs •Using dependency injection for improved testability and developer happiness Friday, January 17, 14
  2. @rweald Why Spark Streaming •Liked theoretical foundation of mini-batch •Scala

    codebase + functional API •Young project with opportunities to contribute •Batch model for iterative ML algorithms Friday, January 17, 14
  3. @rweald Which publisher pages has an ad unit appeared on?

    Real World Example Friday, January 17, 14
  4. @rweald Mapping Data inputData.map { rawRequest => val params =

    QueryParams.parse(rawRequest) val pubPage = params.getOrElse( "pub_page_location", "http://example.com") val creative = params.getOrElse( "creative_key", "unknown") val uri = new java.net.URI(pubPage) val cleanPubPage = uri.getHost + "/" + uri.getPath (creative, cleanPubPage) } Friday, January 17, 14
  5. @rweald Basic Aggregation val sum: (Set[String], Set[String]) => Set[String] =

    _ ++ _ creativePubPages.map { case(ckey, pubPage) (ckey, Set(pubPage)) }.reduceByKey(sum) Friday, January 17, 14
  6. @rweald WTF is a Monoid? trait Monoid[T] { def zero:

    T def plus(r: T, l: T): T } * Just need to make sure plus is associative. (1+ 5) + 2 == (2 + 1) + 5 Friday, January 17, 14
  7. @rweald Monoid Example SetMonoid extends Monoid[Set[String]] { def zero =

    Set.empty[String] def plus(l: Set[String], r: Set[String]) = l ++ r } SetMonoid.plus(Set("a"), Set("b")) //returns Set("a", "b") SetMonoid.plus(Set("a"), Set("a")) //returns Set("a") Friday, January 17, 14
  8. @rweald Algebird Based Aggregation import com.twitter.algebird._ val bfMonoid = BloomFilter(500000,

    0.01) creativePubPages.map { case(ckey, pubPage) (ckey, bfMonoid.create(pubPage)) }.reduceByKey(bfMonoid.plus(_, _)) Friday, January 17, 14
  9. @rweald Add set of users who have seen creative to

    same job Friday, January 17, 14
  10. @rweald Algebird Based Aggregation val aggregator = new Monoid[(BF, BF)]

    { def zero = (bfMonoid.zero, bfMonoid.zero) def plus(l: (BF, BF), r: (BF, BF)) = { (bfMonoid.plus(l._1, r._1), bfMonoid.plus(l._2, r._2)) } } creativePubPages.map { case(ckey, pubPage, userId) ( ckey, bfMonoid.create(pubPage), bfMonoid.create(userID) ) }.reduceByKey(aggregator.plus(_, _)) Friday, January 17, 14
  11. @rweald Storage API Requirements •Incremental updates (preferably associative) •Pluggable to

    support “big data” stores •Allow for testing jobs Friday, January 17, 14
  12. @rweald Storage API trait MergeableStore[K, V] { def get(key: K):

    V def put(kv: (K,V)): V /* * Should follow same associative property * as our Monoid from earlier */ def merge(kv: (K,V)): V } Friday, January 17, 14
  13. @rweald Storing Spark Results def saveResults(result: DStream[String, BF], store: HBaseStore[String,

    BF]) = { result.foreach { rdd => rdd.foreach { element => val (keys, value) = element store.merge(keys, impressions) } } } Friday, January 17, 14
  14. @rweald Generic Storage Method def saveResults(result: DStream[String, BF], store: StorageFactory)

    = { val store = StorageFactory.create result.foreach { rdd => rdd.foreach { element => val (keys, value) = element store.merge(keys, impressions) } } } Friday, January 17, 14
  15. @rweald DI the Store You Need! trait StorageFactory { def

    create: Store[String, BF] } class DevModule extends ScalaModule { def configure() { bind[StorageFactory].to[InMemoryStorageFactory] } } class ProdModule extends ScalaModule { def configure() { bind[StorageFactory].to[HBaseStorageFactory] } } Friday, January 17, 14
  16. @rweald Potential API additions? class PairDStreamFunctions[K, V] { def aggregateByKey(aggregator:

    Monoid[V]) def store(store: MergeableStore[K, V]) } Friday, January 17, 14