Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Kontextfrei: A new approach to testable Spark a...

Kontextfrei: A new approach to testable Spark applications

Scalar 2017, 08.04.2017

Apache Spark has become the de-facto standard for writing big data processing pipelines. While the business logic of Spark applications is often at least as complex as what we have been dealing with in a pre-big data world, enabling developers to write comprehensive, fast unit test suites has not been a priority in the design of Spark. The main problem is that you cannot test your code without at least running a local SparkContext. These tests are not really unit tests, and they are too slow for pursuing a test-driven development approach. In this talk, I will introduce thekontextfrei library, which aims to liberate you from the chains of the SparkContext. I will show how it helps restoring the fast feedback loop we are taking for granted. In addition, I will explain how kontextfrei is implemented and discuss some of the design decisions made and look at alternative approaches and current limitations.

Daniel Westheide

April 08, 2017
Tweet

More Decks by Daniel Westheide

Other Decks in Programming

Transcript

  1. Recap: Spark in a nutshell > framework for distributed processing

    of big and fast data > core abstraction: RDD[A]
  2. IO actions import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD val sparkContext = new

    SparkContext("local[1]", "test-app") val rawRepoStarredEvts = sparkContext.textFile("test-data/repo_starred.csv")
  3. Testing Spark apps > Does it succeed in a realistic

    environment? > Does it succeed with real-world data sets? > Is it fast enough with real-world data sets? > Is the business logic correct?
  4. Property-based testing > good fit for testing pure functions >

    specify properties that must hold true > tested with randomly generated input data
  5. Testing business logic > fast feedback loop > write test

    ~> fail ~> implement ~> succeed > sbt ~testQuick
  6. Testing RDDs > always requires a SparkContext > property-based testing:

    functions are tested with lots of different inputs
  7. Coping strategies > global SparkContext for all tests > unit

    testing only the functions you pass to RDD operators > being lazy about testing
  8. > kontextfrei – adjective | kon·text–frei: of, relating to, or

    being a grammar or language based on rules that describe a change in a string without reference to elements not in the string; also: being such a rule1 > other meanings: the state of being liberated from the chains of the SparkContext 1: see https://www.merriam-webster.com/dictionary/context-free
  9. kontextfrei resolvers += "dwestheide" at “https://dl.bintray.com/dwestheide/maven", libraryDependencies ++= Seq( "com.danielwestheide"

    %% "kontextfrei-core-spark-2.1.0" % "0.5.0", "com.danielwestheide" %% "kontextfrei-scalatest-spark-2.1.0" % "0.5.0" ) https://github.com/dwestheide/kontextfrei https://dwestheide.github.io/kontextfrei/
  10. kontextfrei > abstracts over RDD > business logic and test

    properties without RDD dependency > execute on RDD or local Scala collections
  11. Design goal > as close to Spark as possible >

    execution model > user API > extensible
  12. Business logic import com.danielwestheide.kontextfrei.DCollectionOps class JobLogic[DCollection[_]: DCollectionOps] { import com.danielwestheide.kontextfrei.syntax.Imports._

    def usersByPopularity(repoStarredEvts: DCollection[RepoStarred]) : DCollection[(String, Long)] = { repoStarredEvts .map(evt => evt.owner -> 1L) .reduceByKey(_ + _) .sortBy(_._2, ascending = false) } }
  13. class RDDOps extends DCollectionOps[RDD] { override final def map[A: ClassTag,

    B: ClassTag](as: RDD[A])( f: A => B): RDD[B] = as.map(f) } trait RDDOpsSupport { implicit def rddCollectionOps( implicit sparkContext: SparkContext): DCollectionOps[RDD] = new RDDOps(sparkContext) }
  14. Glue code import com.danielwestheide.kontextfrei.rdd.RDDOpsSupport import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD object Main

    extends App with RDDOpsSupport { implicit val sparkContext = new SparkContext("local[1]", "test-app") try { val logic = new JobLogic[RDD] val repoStarredEvts = logic .parseRepoStarredEvents( sparkContext.textFile("test-data/repo_starred.csv")) val usersByPopularity = logic.usersByPopularity(repoStarredEvts) logic .toCsv(usersByPopularity) .saveAsTextFile("target/users_by_popularity.csv") } finally { sparkContext.stop() } }
  15. Test support > base trait for tests (highly optional) >

    generic Gen[DCollection[A]] and Arbitrary[DCollection[A]] instances > generic Collecting instance
  16. App-specific base spec trait BaseSpec[DColl[_]] extends KontextfreiSpec[DColl] with DCollectionGen with

    CollectingInstances with Generators with PropSpecLike with GeneratorDrivenPropertyChecks with MustMatchers
  17. Test code import com.danielwestheide.kontextfrei.syntax.Imports._ trait UsersByPopularityProperties[DColl[_]] extends BaseSpec[DColl] { def

    logic: JobLogic[DColl] property("Total counts correspond to number of events") { forAll { starredEvents: DColl[RepoStarred] => val result = logic.usersByPopularity(starredEvents).collect().toList result.map(_._2).sum mustEqual starredEvents.count() } } }
  18. Verify that it works ;) import org.apache.spark.rdd.RDD class UsersByPopularityIntegrationSpec extends

    IntegrationSpec with UsersByPopularityProperties[RDD] { override val logic = new JobLogic[RDD] }
  19. Workflow > local development: > sbt ~testQuick > very fast

    feedback loop > CI server: > sbt test it:test > slower, catching more potential runtime errors
  20. Alternative design > Interpreter pattern > Describe computation, run it

    with a Spark or local executor > implemented by Apache Crunch: Hadoop pipeline, Spark, and in-memory pipelines
  21. Shortcomings > only supports RDD > sometimes in Spark, business

    logic cannot be cleanly isolated > not all RDD operators implemented (yet) > no support for broadcast variables or accumulators > API exposes advanced Scala features
  22. Summary > Spark doesn’t really support unit testing > kontextfrei

    restores the fast feedback loop > early stage, but successfully used in production > looking for contributors