Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Spark Streaming Snippets
Search
AGAWA Koji
March 14, 2016
Programming
1.1k
0
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
Spark Streaming Snippets
AGAWA Koji
March 14, 2016
More Decks by AGAWA Koji
See All by AGAWA Koji
Software Architecture in an AI-Driven World
atty303
79
47k
PipeCDプラグインへの期待 / Anticipating PipeCD Plugins
atty303
0
120
EmscriptenでC/C++アプリをWASM化してブラウザで動かしてみた
atty303
0
660
良いソフトウェアとコードレビュー / Good software and code review
atty303
38
18k
Scala + Caliban で作るGraphQL バックエンド / Making GraphQL Backend with Scala + Caliban
atty303
0
600
Scala.jsとAndroidでドメイン層を共有しよう / Scala.js and Android
atty303
0
810
もう一つのビルドツール mill で作る Docker イメージ / Build docker image with mill the yet another build tool
atty303
2
2.6k
Case of Ad Delivery System is Implemented by Scala and DDD
atty303
4
3.7k
ログのメトリックを取ってみる話
atty303
0
1k
Other Decks in Programming
See All in Programming
AI 時代のソフトウェア設計の学び方
masuda220
PRO
29
12k
The NotImplementedError Problem in Ruby
koic
1
800
AI時代のUIはどこへ行く?その2!
yusukebe
21
7.2k
正しくソフトウェアを作る、前提を疑うための認知の視点 / doubt-premise
minodriven
21
6.7k
そのテスト、説明できますか?~LWテスト戦略FW~のご紹介
nakahara
0
130
さぁV100、メモリをお食べ・・・
nilpe
0
140
不変条件と整合性境界—ビジネスが決める設計判断と実現パターン / Invariants and Consistency Boundaries
nrslib
13
5k
Hunting Vulnerabilities in Symfony with LLMs
vinceamstoutz
0
540
Lemonade + Foundry Toolkit でお手軽アプリ開発
seosoft
1
340
Vue × Nuxt × Oxc どこまで使える?実運用の現在地
andpad
0
250
RTSPクライアントを自作してみた話
simotin13
0
610
Strategic Design in the Frontend: Moduliths & Micro Frontends @DDDEurope
manfredsteyer
PRO
0
100
Featured
See All Featured
Building AI with AI
inesmontani
PRO
1
1.1k
Leading Effective Engineering Teams in the AI Era
addyosmani
9
2.1k
Statistics for Hackers
jakevdp
799
230k
AI Search: Where Are We & What Can We Do About It?
aleyda
0
7.6k
How GitHub (no longer) Works
holman
316
150k
Responsive Adventures: Dirty Tricks From The Dark Corners of Front-End
smashingmag
254
22k
ReactJS: Keep Simple. Everything can be a component!
pedronauck
666
130k
Believing is Seeing
oripsolob
1
150
Building Applications with DynamoDB
mza
96
7.1k
We Have a Design System, Now What?
morganepeng
55
8.2k
End of SEO as We Know It (SMX Advanced Version)
ipullrank
3
4.2k
Taking LLMs out of the black box: A practical guide to human-in-the-loop distillation
inesmontani
PRO
3
2.3k
Transcript
Spark Streaming Snippets @a#y303
ࠓ·Ͱ࡞ͬͨ Spark Streaming ΞϓϦ • rtg -- ϦΞϧλΠϜϦλή༻σʔλੜ • pixelwriter
-- Dynamic Crea4ve ༻ϚʔΫσʔλॻ͖ࠐΈ • feedsync -- ϑΟʔυಉظ • segment:elas4c -- ϦΞϧλΠϜηάϝϯτԽ • (logblend -- ҟͳΔΠϕϯτϩάͷ Join)
͜ΕΒͷΞϓϦ͔Βదʹڞ༗͢Δͱخͦ͠͏ͳͱ͜ ΖΛൈ͍ͯΈͨ • SparkBoot • Cron • UpdatableBroadcast • Connector
• SparkStreamingSpec
SparkBoot h"ps:/ /gist.github.com/a"y303/c83f3c8cb8a930951be0 • Spark ΞϓϦͷ main ࣮ • SparkContext
/ StreamingContext Λఏڙ͢Δ • Configura4on ཧ • spark-submit ͷ --files Ͱ applica4on.conf ΛૹͬͯΧελ ϚΠζ
όονͷ߹ object TrainingBatchApp extends SparkBatchBoot { val appName = "TrainingBatchApp"
override def mkApp(sc: SparkContext, args: Array[String]): SparkApp = new TrainingBatchApp(sc, appConfig) } class TrainingBatchApp( sc: SparkContext, appConfig: Config) extends SparkApp { def run(): Try[Int] = Try { 0 } }
ετϦʔϛϯάͷ߹ object PredictStreamingApp extends SparkStreamingBoot { val appName = "PredictStreamingApp"
override val checkpointPath: String = "app.training.streaming.checkpoint-path" override val batchDurationPath: String = "app.training.streaming.batch-duration" override def mkApp(sc: SparkContext, ssc: StreamingContext, args: Array[String]): SparkApp = new PredictStreamingApp(ssc, batchDuration, appConfig) } class PredictStreamingApp( ssc: StreamingContext, appConfig: Config) extends SparkApp { val sparkContext = ssc.sparkContext def run(): Try[Int] = Try { 0 } }
Cron తͳ͜ͱΛΔ • batch-dura+on ΑΓִ͍ؒͰఆظ࣮ߦ͍ͨ͠ॲཧ͕͋Δ • ֎෦σʔλετΞ͔ΒಡΜͰ͍ΔϚελσʔλͷϦϑϨογϡ ͳͲ
def repeatedly(streamingContext: StreamingContext, interval: Duration) (f: (SparkContext, Time) => Unit):
Unit = { // τϦΨʔΛੜ͢Δ DStream val s = streamingContext.queueStream( mutable.Queue.empty[RDD[Unit]], oneAtATime = true, defaultRDD = streamingContext.sparkContext.makeRDD(Seq(()))) .repartition(1) s.window(s.slideDuration, interval) .foreachRDD { (rdd, time) => f(rdd.context, time) rdd.foreach(_ => ()) } }
͍ํ repetedly(streamingContext, Durations.seconds(300)) { (sc, time) => // @driver: 5
ຖʹ࣮ߦ͢Δॲཧ }
ߋ৽Մೳͳ Broadcast • Streaming ͕ಈ͖࢝Ίͨޙʹ Broadcast Λߋ৽͍ͨ͠ • ୯ͳΔ Broadcast
Λอ࣋͢Δϥούʔ
/** * ͷߋ৽(࠶ϒϩʔυΩϟετ)͕Մೳͳ Broadcast * * https://gist.github.com/Reinvigorate/040a362ca8100347e1a6 * @author Reinvigorate
*/ case class UpdatableBroadcast[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } }
͍ํ def loadModel(): Model = ??? val ub: UpdatableBroadcast[Model] =
UpdatableBroadcast(streamingContext, loadModel()) StreamingUtil.repeatedly(streamingContext, refreshInterval) { (_, _) => ub.update(loadModel()) } dstream.foreachRDD { rdd => val model = ub.value // use model }
֎෦ଓͷநԽ • Spark Ͱ֎෦ϦιʔεʹΞΫηε͢Δͱ͖ɺݸʑͷ Executor ͕ ଓΛҡ࣋͢Δඞཁ͕͋Δ • Driver ͔Β
Executor ʹʮଓͦͷͷʯΛૹ৴͢Δ͜ͱͰ͖ ͳ͍ • ʮଓ͢Δํ๏ʯΛ Connector trait ͱͯ͠நԽ͍ͯ͠Δ
trait Connector[A] extends java.io.Closeable with Serializable { def get: A
def close(): Unit def using[B](f: A => B): B = f(get) }
case class PoolAerospikeConnector(name: Symbol, config: AerospikeConfig) extends Connector[AerospikeClient] { def
get: AerospikeClient = PoolAerospikeConnector.defaultHolder.getOrCreate(name, mkClient) def close(): Unit = PoolAerospikeConnector.defaultHolder.remove(name)( AerospikeConnector.aerospikeClientClosable) private val mkClient: () => AerospikeClient = () => new AerospikeClient(config.clientPolicy.underlying, config.asHosts:_*) } object PoolAerospikeConnector { private val defaultHolder = new DefaultResourceHolder[AerospikeClient] }
case class ScalikeJdbcConnector(name: Symbol, config: Config) extends Connector[Unit] { def
get: Unit = { if (!ConnectionPool.isInitialized(name)) { // Load MySQL JDBC driver class Class.forName("com.mysql.jdbc.Driver") ConnectionPool.add(name, config.getString("url"), config.getString("user"), config.getString("password")) } } def close(): Unit = ConnectionPool.close(name) }
case class KafkaProducerConnector[K :ClassTag, V :ClassTag]( name: Symbol, config: java.util.Map[String,
AnyRef]) extends Connector[ScalaKafkaProducer[K, V]] { def get: ScalaKafkaProducer[K, V] = KafkaProducerConnector.defaultHolder.getOrCreate(name, mkResource) .asInstanceOf[ScalaKafkaProducer[K, V]] def close(): Unit = KafkaProducerConnector.defaultHolder.remove(name)( KafkaProducerConnector.kafkaProducerClosable) private val mkResource = () => { val keySer = mkDefaultSerializer[K](ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) val valueSer = mkDefaultSerializer[V](ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) new ScalaKafkaProducer[K, V]( new KafkaProducer[K, V](config, keySer.orNull, valueSer.orNull)) } private def mkDefaultSerializer[A :ClassTag](configKey: String): Option[Serializer[A]] = { if (!config.containsKey(configKey)) { implicitly[ClassTag[A]].runtimeClass match { case c if c == classOf[Array[Byte]] => Some(new ByteArraySerializer().asInstanceOf[Serializer[A]]) case c if c == classOf[String] => Some(new StringSerializer().asInstanceOf[Serializer[A]]) case _ => None } } else None } }
Spark Streaming ͷςετ h"ps:/ /gist.github.com/a"y303/18e64e718f0cf3261c0e
class CountProductSpec extends SpecWithJUnit with SparkStreamingSpec { val batchDuration: Duration
= Duration(1000) "Count" >> { val (sourceQueue, resultQueue) = startQueueStream[Product, (Product, Long)] { inStream => // ςετରͷ Streaming ॲཧ CountProduct(inStream).run(sc) } // ೖྗΩϡʔʹςετσʔλΛೖ͢Δ sourceQueue += sc.parallelize(Seq( Product(1, "id"), Product(1, "id"), Product(2, "id"))) // ࣌ؒΛਐΊΔ advance() // ग़ྗ͞ΕΔσʔλΛςετ͢Δ resultQueue.dequeue must eventually(contain(exactly( Product(1, "id") -> 2L, Product(2, "id") -> 1L ))) } }