Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Spark Streaming Snippets

Spark Streaming Snippets

Avatar for AGAWA Koji

AGAWA Koji

March 14, 2016
Tweet

More Decks by AGAWA Koji

Other Decks in Programming

Transcript

  1. ࠓ·Ͱ࡞ͬͨ Spark Streaming ΞϓϦ • rtg -- ϦΞϧλΠϜϦλή༻σʔλੜ੒ • pixelwriter

    -- Dynamic Crea4ve ༻ϚʔΫσʔλॻ͖ࠐΈ • feedsync -- ঎඼ϑΟʔυಉظ • segment:elas4c -- ϦΞϧλΠϜηάϝϯτԽ • (logblend -- ҟͳΔΠϕϯτϩάͷ Join)
  2. SparkBoot h"ps:/ /gist.github.com/a"y303/c83f3c8cb8a930951be0 • Spark ΞϓϦͷ main ࣮૷ • SparkContext

    / StreamingContext Λఏڙ͢Δ • Configura4on ؅ཧ • spark-submit ͷ --files Ͱ applica4on.conf ΛૹͬͯΧελ ϚΠζ
  3. όονͷ৔߹ object TrainingBatchApp extends SparkBatchBoot { val appName = "TrainingBatchApp"

    override def mkApp(sc: SparkContext, args: Array[String]): SparkApp = new TrainingBatchApp(sc, appConfig) } class TrainingBatchApp( sc: SparkContext, appConfig: Config) extends SparkApp { def run(): Try[Int] = Try { 0 } }
  4. ετϦʔϛϯάͷ৔߹ object PredictStreamingApp extends SparkStreamingBoot { val appName = "PredictStreamingApp"

    override val checkpointPath: String = "app.training.streaming.checkpoint-path" override val batchDurationPath: String = "app.training.streaming.batch-duration" override def mkApp(sc: SparkContext, ssc: StreamingContext, args: Array[String]): SparkApp = new PredictStreamingApp(ssc, batchDuration, appConfig) } class PredictStreamingApp( ssc: StreamingContext, appConfig: Config) extends SparkApp { val sparkContext = ssc.sparkContext def run(): Try[Int] = Try { 0 } }
  5. def repeatedly(streamingContext: StreamingContext, interval: Duration) (f: (SparkContext, Time) => Unit):

    Unit = { // τϦΨʔΛੜ੒͢Δ DStream val s = streamingContext.queueStream( mutable.Queue.empty[RDD[Unit]], oneAtATime = true, defaultRDD = streamingContext.sparkContext.makeRDD(Seq(()))) .repartition(1) s.window(s.slideDuration, interval) .foreachRDD { (rdd, time) => f(rdd.context, time) rdd.foreach(_ => ()) } }
  6. /** * ஋ͷߋ৽(࠶ϒϩʔυΩϟετ)͕Մೳͳ Broadcast * * https://gist.github.com/Reinvigorate/040a362ca8100347e1a6 * @author Reinvigorate

    */ case class UpdatableBroadcast[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } }
  7. ࢖͍ํ def loadModel(): Model = ??? val ub: UpdatableBroadcast[Model] =

    UpdatableBroadcast(streamingContext, loadModel()) StreamingUtil.repeatedly(streamingContext, refreshInterval) { (_, _) => ub.update(loadModel()) } dstream.foreachRDD { rdd => val model = ub.value // use model }
  8. ֎෦઀ଓͷந৅Խ • Spark Ͱ֎෦ϦιʔεʹΞΫηε͢Δͱ͖ɺݸʑͷ Executor ͕ ઀ଓΛҡ࣋͢Δඞཁ͕͋Δ • Driver ͔Β

    Executor ʹʮ઀ଓͦͷ΋ͷʯΛૹ৴͢Δ͜ͱ͸Ͱ͖ ͳ͍ • ʮ઀ଓ͢Δํ๏ʯΛ Connector trait ͱͯ͠ந৅Խ͍ͯ͠Δ
  9. trait Connector[A] extends java.io.Closeable with Serializable { def get: A

    def close(): Unit def using[B](f: A => B): B = f(get) }
  10. case class PoolAerospikeConnector(name: Symbol, config: AerospikeConfig) extends Connector[AerospikeClient] { def

    get: AerospikeClient = PoolAerospikeConnector.defaultHolder.getOrCreate(name, mkClient) def close(): Unit = PoolAerospikeConnector.defaultHolder.remove(name)( AerospikeConnector.aerospikeClientClosable) private val mkClient: () => AerospikeClient = () => new AerospikeClient(config.clientPolicy.underlying, config.asHosts:_*) } object PoolAerospikeConnector { private val defaultHolder = new DefaultResourceHolder[AerospikeClient] }
  11. case class ScalikeJdbcConnector(name: Symbol, config: Config) extends Connector[Unit] { def

    get: Unit = { if (!ConnectionPool.isInitialized(name)) { // Load MySQL JDBC driver class Class.forName("com.mysql.jdbc.Driver") ConnectionPool.add(name, config.getString("url"), config.getString("user"), config.getString("password")) } } def close(): Unit = ConnectionPool.close(name) }
  12. case class KafkaProducerConnector[K :ClassTag, V :ClassTag]( name: Symbol, config: java.util.Map[String,

    AnyRef]) extends Connector[ScalaKafkaProducer[K, V]] { def get: ScalaKafkaProducer[K, V] = KafkaProducerConnector.defaultHolder.getOrCreate(name, mkResource) .asInstanceOf[ScalaKafkaProducer[K, V]] def close(): Unit = KafkaProducerConnector.defaultHolder.remove(name)( KafkaProducerConnector.kafkaProducerClosable) private val mkResource = () => { val keySer = mkDefaultSerializer[K](ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) val valueSer = mkDefaultSerializer[V](ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) new ScalaKafkaProducer[K, V]( new KafkaProducer[K, V](config, keySer.orNull, valueSer.orNull)) } private def mkDefaultSerializer[A :ClassTag](configKey: String): Option[Serializer[A]] = { if (!config.containsKey(configKey)) { implicitly[ClassTag[A]].runtimeClass match { case c if c == classOf[Array[Byte]] => Some(new ByteArraySerializer().asInstanceOf[Serializer[A]]) case c if c == classOf[String] => Some(new StringSerializer().asInstanceOf[Serializer[A]]) case _ => None } } else None } }
  13. class CountProductSpec extends SpecWithJUnit with SparkStreamingSpec { val batchDuration: Duration

    = Duration(1000) "Count" >> { val (sourceQueue, resultQueue) = startQueueStream[Product, (Product, Long)] { inStream => // ςετର৅ͷ Streaming ॲཧ CountProduct(inStream).run(sc) } // ೖྗΩϡʔʹςετσʔλΛ౤ೖ͢Δ sourceQueue += sc.parallelize(Seq( Product(1, "id"), Product(1, "id"), Product(2, "id"))) // ࣌ؒΛਐΊΔ advance() // ग़ྗ͞ΕΔσʔλΛςετ͢Δ resultQueue.dequeue must eventually(contain(exactly( Product(1, "id") -> 2L, Product(2, "id") -> 1L ))) } }