Upgrade to Pro — share decks privately, control downloads, hide ads and more …

ZIOでサクッとFunctionalにETL

Sponsored · Your Podcast. Everywhere. Effortlessly. Share. Educate. Inspire. Entertain. You do you. We'll handle the rest.
Avatar for wakye5815 wakye5815
September 06, 2024
2.2k

 ZIOでサクッとFunctionalにETL

Avatar for wakye5815

wakye5815

September 06, 2024
Tweet

Transcript

  1. 7

  2. ZIO Streamsで表現するには ワークフロー定義に3つの抽象化されたモジュールを利用する ZStream[Env,Err,Out] ストリームワークフローのデータソースとして機能 ZPipeline[Env, Err, In, Out] 実態は

    ZStream[Env,Err,In] => ZStream[Env,Err,Out] ZSink[Env,Err,In,Left,Consumed] ZSink[Env,Err,In,Left,Consumed] => ZIO[Env,Err,Consumed] な ZStream#run に渡して消費する 18
  3. 組み合わせると def extractionStream(apiEndpoint: URL): ZStream[Any, Throwable, ApiResponse] def transformationPipeline: ZPipeline[Any,

    Throwable, APIResponse, (String, Array[Byte])] def loadingSink: ZSink[Any, IOException, (String, Array[Byte]), Nothing, Unit] object Main extends ZIOAppDefault override def run = val effects = apiEndpoints.map { apiEndpoint => extractionStream(apiEndpoint) >>> transformationPipeline >>> loadingSink } ZIO.collectAllPar(effects).unit 19
  4. Extractの実装 def extractionStream(apiEndpoint: URL): : ZStream[Any, Throwable, ApiResponse] = ZStream.paginateZIO(apiEndpoint)

    { apiEndpoint => for response: Task[ApiResponse] <- if(isBlocking(apiEndpoint)) then ZIO.attemptBlocking(callBlockingApi(apiEndpoint)) else ZIO.fromFutureJava(callApi(apiEndpoint)) nextEndpoint = Option(response.nextEndpoint) yield (response, nextEndpoint) } 21
  5. Transformの実装 def transformationPipeline: ZPipeline[Any, Throwable, APIResponse, (String, Array[Byte])] = val

    gzipPipeline = ZPipeline.mapZIO { (response: APIResponse) => ZIO.scoped { for byteArrayOs <- ZIO.fromAutoCloseable(ZIO.attempt(new ByteArrayOutputStream())) gzipOs <- ZIO.fromAutoCloseable(ZIO.attempt(new GZIPOutputStream(byteArrayOs))) _ <- ZIO.attemptBlockingIO(gzipOs.write(response.getRawResponse.getBytes)) yield byteArrayOs }.map(_.toByteArray) } val fileNamePipeline = ZPipeline.mapAccum[Array[Byte], Int, (String, Array[Byte])](1)( (index, data) => (index + 1, (s"data_$index.json.gz", data)) ) gzipPipeline >>> fileNamePipeline 23
  6. Loadの実装 def loadingSink: ZSink[Any, IOException, (String, Array[Byte]), Nothing, Unit] =

    ZSink.foreach { (pathWithData: (String, Array[Byte])) => val blobId = BlobId.of(BUCKET_NAME, pathWithData._1) val blobInfo = BlobInfo.newBuilder(blobId).build() ZIO.attemptBlockingIO(storage.create(blobInfo, pathWithData._2)) } 25