Upgrade to Pro — share decks privately, control downloads, hide ads and more …

複雑なStep FunctionsをAWS CDKでコード管理するときに気にしたところ

複雑なStep FunctionsをAWS CDKでコード管理するときに気にしたところ

クラスメソッドのCDK事情大公開スペシャル#2で『複雑なStep FunctionsをAWS CDKでコード管理するときに気にしたところ』というタイトルで登壇しました #cm_cdk_special
https://dev.classmethod.jp/articles/aws-cdk-tips-managing-complex-step-functions/

Yoshiyuki Nakano

January 17, 2025
Tweet

More Decks by Yoshiyuki Nakano

Other Decks in Technology

Transcript

  1. 自己紹介 部署 クラスメソッド株式会社 産業支援グループ リテールアプリ共創部 職務 ソフトウェアエンジニア(LINE ミニアプリの開発、保守/ 運用) 名前

    中野ヨシユキ (@engin_yo) 拠点 福岡オフィス 趣味 ランニング(福岡オフィスの有志でマラソン大会参加予定) 自作キーボード
  2. 1. 背景 大量のデータを処理する際に、処理時間を短縮するために並列処理の実装が必要があった 具体的には以下のような要件イメージ 100 個程度の CSV ファイルを生成し、それぞれのファイルに対して共通した処理を行う 処理結果を別の CSV

    ファイルとして保存する ステート間でデータを受け渡す際に、データサイズが 256KB を超える可能性があるため、S3 に保 存したファイルを経由してデータを受け渡す 一部の並列処理が失敗しても、全体の処理を継続して最終的な結果を判定する 4
  3. アーキテクチャイメージ Step Functions 並列処理 CSV ファイル生成 処理対象の配列を渡す 処理対象: $.items 処理対象:

    $.items 処理対象: $.items 処理対象: $.items 処理済CSV を保存 処理済CSV を保存 処理済CSV を保存 処理済CSV を保存 並列処理完了 エラーなし エラーあり S3 バケット 開始 データ生成Lambda Distributed Map 処理Lambda 1 処理Lambda 2 処理Lambda ... 処理Lambda 100 エラー確認 成功 失敗 5
  4. アーキテクチャ選定理由 『StepFunctions vs ECS Fargate 』で比較検討したが、中期的にも保守のしやすさからStep Functions を 選定 エラーハンドリングの保守がしやすい、理解しやすい

    AWS CDK とStep Functions のコード管理の相性がよく、JSON やYAML での保守の必要がない 現時点で、自チームのコンテナワークロードの採用が少ない 仮に追加改修やメンテナンスが発生した場合も、比較的に対応工数が少なく、Step Functions 自体 のインフラ側のメンテナンスを意識しすぎずにすむ 6
  5. 原因1️⃣: Step Functions の制約 1. 実行履歴 1 つのステートマシンで25,000 イベントまで上限 チャンクで同一処理したいような用途で、Map

    ステートを使った並列ループ処理すると上限に簡 単に超えてしまう 2. 処理能力 従来のMap State は40 並列までしか対応していない 3. データサイズ ペイロード256KB 制限 状態間のデータ受け渡しの制約があるため、チャンク処理で実装する場合にステート間の値の取 り回しで上限を超えてしまう これらの制約が大規模処理実装の障壁になった 8
  6. 3-1. 解決策: Distributed Map による大規模並列処理の運用 AWS CDK でも対応しているDistributed Map を利用

    1. 実行履歴の対応 実行履歴は各並列処理ごとのワークフローで25,000 件の上限になるため上限の懸念はクリア 2. 処理能力の対応 最大40 並列 -> 10,000 並列まで実行可能 3. 大容量データ処理 S3 などのリソースと統合できるため、ステート間の値の受け渡しでS3 内のファイルを利用できる 11
  7. 基本構造 1 const stateMachine = new sfn.StateMachine( 2 this, 3

    "ParallelProcessingStateMachine", 4 { 5 definition: generateDataTask.next(processMap).next(checkError), 6 logs: { 7 destination: logGroup, 8 level: sfn.LogLevel.ALL, 9 includeExecutionData: true, 10 }, 11 tracingEnabled: true, 12 timeout: cdk.Duration.hours(1), 13 } 14 ); 13
  8. 基本構造 5 definition: generateDataTask.next(processMap).next(checkError), 1 const stateMachine = new sfn.StateMachine(

    2 this, 3 "ParallelProcessingStateMachine", 4 { 6 logs: { 7 destination: logGroup, 8 level: sfn.LogLevel.ALL, 9 includeExecutionData: true, 10 }, 11 tracingEnabled: true, 12 timeout: cdk.Duration.hours(1), 13 } 14 ); 14
  9. ステート定義部分 1 // データ処理タスク 2 const processDataTask = new tasks.LambdaInvoke(this,

    "ProcessData", { 3 lambdaFunction: processDataFunction, 4 outputPath: "$.Payload", 5 }).addCatch(handleMapError, { 6 resultPath: "$.error", 7 }); 8 9 // Distriuted Map の定義 10 const processMap = new sfn.DistributedMap(this, "ProcessMap", { 11 maxConcurrency: 100, 12 itemsPath: "$.items", 13 itemSelector: { 14 "jobId.$": "$.jobId", 15 "index.$": "$$.Map.Item.Value.index", 16 "inputLocation.$": "$$.Map.Item.Value.location", 17 }, 18 resultPath: "$.mapResults", 19 }); 20 21 processMap.itemProcessor(processDataTask); 16
  10. ステート定義部分 21 processMap.itemProcessor(processDataTask); 1 // データ処理タスク 2 const processDataTask =

    new tasks.LambdaInvoke(this, "ProcessData", { 3 lambdaFunction: processDataFunction, 4 outputPath: "$.Payload", 5 }).addCatch(handleMapError, { 6 resultPath: "$.error", 7 }); 8 9 // Distriuted Map の定義 10 const processMap = new sfn.DistributedMap(this, "ProcessMap", { 11 maxConcurrency: 100, 12 itemsPath: "$.items", 13 itemSelector: { 14 "jobId.$": "$.jobId", 15 "index.$": "$$.Map.Item.Value.index", 16 "inputLocation.$": "$$.Map.Item.Value.location", 17 }, 18 resultPath: "$.mapResults", 19 }); 20 17
  11. ステート定義部分 13 itemSelector: { 14 "jobId.$": "$.jobId", 15 "index.$": "$$.Map.Item.Value.index",

    16 "inputLocation.$": "$$.Map.Item.Value.location", 17 }, 1 // データ処理タスク 2 const processDataTask = new tasks.LambdaInvoke(this, "ProcessData", { 3 lambdaFunction: processDataFunction, 4 outputPath: "$.Payload", 5 }).addCatch(handleMapError, { 6 resultPath: "$.error", 7 }); 8 9 // Distriuted Map の定義 10 const processMap = new sfn.DistributedMap(this, "ProcessMap", { 11 maxConcurrency: 100, 12 itemsPath: "$.items", 18 resultPath: "$.mapResults", 19 }); 20 21 processMap.itemProcessor(processDataTask); 18
  12. ペイロードサイズの上限 Step Functions のステート間でデータを受け渡す際、ペイロードサイズの制限(256KB )がある AWS の re:Post 上でも公式見解としてもデータ受け渡しのサイズが 256KB

    を超える場合は S3 にデータ を吐き出して取り回すことを推奨 対応方法 S3 を使用してデータを受け渡し ステート間では、S3 のバケット名とキーのみを受け渡し 処理結果も同様に S3 に保存 19
  13. エラーハンドリング 4 }).addCatch(handleMapError, { 5 resultPath: "$.error", 6 }); 1

    const processDataTask = new tasks.LambdaInvoke(this, "ProcessData", { 2 lambdaFunction: processDataFunction, 3 outputPath: "$.Payload", 7 8 -- 中略 -- 9 10 // 成功・失敗の判定ステート 11 const success = new sfn.Succeed(this, "Success"); 12 const failed = new sfn.Fail(this, "Failed", { 13 error: "MapStateError", 14 cause: "Error in map state execution", 15 }); 16 17 // エラーチェックの条件分岐 18 const checkError = new sfn.Choice(this, "CheckError") 19 .when(sfn.Condition.isPresent("$.error"), failed) 20 .otherwise(success); 21
  14. 並列で実行されるステートの同時実行数 並列で生成さえる同時実行のステートが100 を超えないように指定 Lambda が必要以上に増えすぎて、Lambda から叩かれる API 負荷を事前に見積もって同時実行数を決め ておく必要がある 1

    const processMap = new sfn.DistributedMap(this, "ProcessMap", { 2 maxConcurrency: 100, 3 itemsPath: "$.items", 4 itemSelector: { 5 "jobId.$": "$.jobId", 6 "index.$": "$$.Map.Item.Value.index", 7 "inputLocation.$": "$$.Map.Item.Value.location", 8 }, 9 resultPath: "$.mapResults", 10 }); 22
  15. Before: 複雑な条件分岐を1 つのステートにまとめている 1 const stateMachine = new sfn.StateMachine( 2

    this, 3 "ParallelProcessingStateMachine", 4 { 5 definition: new tasks.LambdaInvoke(this, "GenerateData", { 6 lambdaFunction: generateDataFunction, 7 outputPath: "$.Payload", 8 payload: sfn.TaskInput.fromObject({ 9 jobId: sfn.JsonPath.format( 10 "{}", 11 sfn.JsonPath.stringAt("$$.Execution.StartTime") 12 ), 13 }), 14 }) 15 .next( 16 new sfn.DistributedMap(this, "ProcessMap", { 17 // --- 以下、省略。定義が続く。 。 。 --- 18 } 19 ); 27
  16. After: 複雑な条件分岐を1 つの定義にまとめず切り出す 1 // ステートマシンの定義 2 const stateMachine =

    new sfn.StateMachine( 3 this, 4 "ParallelProcessingStateMachine", 5 { 6 definition: generateDataTask.next(processMap).next(checkError), 7 // -- 省略 -- 8 } 9 ); 28
  17. After: 複雑な条件分岐を1 つの定義にまとめず切り出す 1 const processMap = new sfn.DistributedMap(this, "ProcessMap",

    { 2 maxConcurrency: 100, 3 itemsPath: "$.items", 4 itemSelector: { 5 "jobId.$": "$.jobId", 6 "index.$": "$$.Map.Item.Value.index", 7 "inputLocation.$": "$$.Map.Item.Value.location", 8 }, 9 resultPath: "$.mapResults", 10 }); 11 12 processMap.itemProcessor(processDataTask); 13 14 // 成功・失敗の判定ステート 15 const success = new sfn.Succeed(this, "Success"); 16 const failed = new sfn.Fail(this, "Failed", { 17 error: "MapStateError", 18 cause: "Error in map state execution", 19 }); 20 21 // エラーチェックの条件分岐 22 const checkError = new sfn.Choice(this, "CheckError") 23 .when(sfn.Condition.isPresent("$.error"), failed) 24 .otherwise(success); 29
  18. Before: 分割で実行する処理の継続判定をステートマシン側で実行 SFn 側でループ処理の継続判定をする際にデータ加工をitemSelector やparamters などを活用すると CDK コードがファットになってしまったりデバッグが大変になったり 1 const

    updateChunkStatus = new sfn.Pass(this, `UpdateChunkStatus-${id}`, { 2 parameters: { 3 "offset.$": "States.MathAdd($.offset, $.limit)", 4 "items.$": "$.items", 5 "jobId.$": "$.jobId", 6 "fileName.$": "$.fileName", 7 "limit.$": "$.limit", 8 }, 9 }); 10 11 const processDataInvoke = new tasks.LambdaInvoke( 12 this, 13 `ProcessDataInvoke-${id}`, 14 { 15 lambdaFunction: processDataFunction, 16 outputPath: "$.Payload", 17 retryOnServiceExceptions: true, // サービス例外時のリトライを有効化 18 } 19 ).addCatch(handleMapError, { 20 // -- 省略 -- 21 }); 31
  19. After: 分割で実行する処理の継続判定をステートマシン側でやらない チャンク処理のように分割してLambda で処理を実行する場合に 継続判定もLambda に任せたほうが結果的にCDK コードをシンプルに保ちやすい 1 processMap.itemProcessor( 2

    processDataInvoke.next( 3 new cdk.aws_stepfunctions.Choice(construct, "checkRemainingChunks") 4 .when( 5 cdk.aws_stepfunctions.Condition.booleanEquals("$.hasNext", true), 6 processMap 7 ) 8 .otherwise( 9 new cdk.aws_stepfunctions.Succeed(construct, "AllChunksProcessed") 10 ) 11 ) 12 ); 32
  20. After: 分割で実行する処理の継続判定をステートマシン側でやらない Lambda 関数の処理で、戻り値に後続の処理を実施するか判定するフラグを含めて毎回終了させる 1 export const handler = async

    (event, _context) => { 2 try { 3 const { fileName, lastEvaluatedKey } = event; 4 5 // --- ファイルへの処理内容 -- 6 7 return { 8 lastEvaluatedKey: LastEvaluatedKey, 9 fileName: FileName, 10 // lastEvaluatedKey を使って続きの処理があるかどうかを判定する 11 ...(LastEvaluatedKey != null ? { hasNext: true } : { hasNext: false }), 12 }; 13 } catch (e) { 14 throw e; 15 } 16 }; 33
  21. 6. まとめ 1. Distributed Map の活用 大規模な並列処理(10,000 並列まで)が可能 実行履歴の制限を回避できる S3

    との統合で大容量データ処理が容易に 2. CDK コードの可読性向上のポイント 複雑な条件分岐は小さな単位に分割 ステート定義を変数化して見通しを良く 分割処理の継続判定はLambda に任せてシンプルに 34
  22. 参考資料 大規模な並列ワークロード向けに Step Functions で Map ステートを分散モードで使用する - AWS Step

    Functions AWS CDK API Reference Simplifying developer experience with variables and JSONata in AWS Step Functions | AWS Compute Blog AWS Step Functions のMap ステート内でエラーが起きても全体を停止しない方法 | DevelopersIO [AWS Step Functions]Map ステート内でエラーをキャッチしてステートマシンを失敗させる(AWS CDK) | DevelopersIO 35
  23. 37