Upgrade to Pro — share decks privately, control downloads, hide ads and more …

データ基盤でのコンテナ活用事例 / jawsug-niigata21-data-platfor...

データ基盤でのコンテナ活用事例 / jawsug-niigata21-data-platform-with-container

2025/03/15 (土) JAWS-UG新潟#21 にて事例発表した資料

イベントページ:
https://jawsug-niigata.connpass.com/event/343520/

kasacchiful

March 15, 2025
Tweet

More Decks by kasacchiful

Other Decks in Programming

Transcript

  1. Oracle DBはオンプレミスや他アカウントのRDSにある お客様にてTransit GatewayやDirectConnectが整備済 Transit Gateway経由でアクセス可能 接続先は4箇所 (オンプレミス・RDS含め) Oracle DBのバージョンは全て19c

    ただし、RDS以外は sec_case_sensitive_logon がFalse 「ログイン時のパスワードの大文字・小文字を区別しない」古い設定 「python-oracledb」ライブラリのthinモード(Oracleクライアント不要)では 接続できず、thickモード(Oracleクライアント必要)での接続が必須 Oracle Instant Client導入するにしても容量が大きいため、今回はコンテナ化 (Lambdaコンテナ) で対応する Oracle DBへのクエリはS3に保管しておいて、実行時にダウンロード Oracle DB → S3 7 / 29
  2. Dockerfileを作りましょう。 FROM amazonlinux:2023 ARG BUILD_DIR=/build ARG PYTHON_VERSION=3.12.9 ARG FUNCTION_DIR=/function ##

    Install packages RUN yum update -y && yum install -y tar gzip make gcc openssl-devel bzip2-devel libffi-devel zlib-devel readline-devel xz-devel git libaio zip jq ## Install Oracle Instant Client COPY oracle-instantclient19.20-basic-19.20.0.0.0-1.x86_64.rpm / RUN yum install -y /oracle-instantclient19.20-basic-19.20.0.0.0-1.x86_64.rpm ## Install Python RUN mkdir -p ${BUILD_DIR}/python && cd ${BUILD_DIR} \ && curl https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz | tar xz \ && cd Python-${PYTHON_VERSION} && ./configure && make && make install && cd - && rm -rf Python-${PYTHON_VERSION} # Copy function code RUN mkdir -p ${FUNCTION_DIR} COPY lambda_function.py ${FUNCTION_DIR} COPY requirements.txt / # Install dependencies RUN python3 -m pip install --target ${FUNCTION_DIR} awslambdaric RUN python3 -m pip install --target ${FUNCTION_DIR} -r /requirements.txt WORKDIR ${FUNCTION_DIR} ENTRYPOINT [ "/usr/local/bin/python3", "-m", "awslambdaric" ] CMD [ "lambda_function.handler" ] Dockerfile 10 / 29
  3. Dockerfileと同じ階層に、 lambda_funciton.py と requirements.txt を配置 import os import boto3 import

    botocore import oracledb import awswrangler as wr import pandas as pd import shutil un = os.environ.get("DB_USERNAME") pw = os.environ.get("DB_PASSWORD") cs = os.environ.get("DB_CONNECTSTRING") query_bucket = os.environ.get("QUERY_S3_BUCKET") query_prefix = os.environ.get("QUERY_S3_PREFIX") target_bucket = os.environ.get("TARGET_BUCKET") target_prefix = os.environ.get("TARGET_PREFIX") ## カーソルから列名を取得する def get_columns(cursor): columns = [] for col in cursor.description: columns.append(col.name) return columns def handler(event, context): queries = event["queries"] s3 = boto3.client("s3") ret = [] ## db connection oracledb.init_oracle_client() with oracledb.connect(user=un, password=pw, dsn=cs) as connection: for query in queries: ## download query file obj_path = os.path.join(query_prefix, query["s3_path"]) tmp_path = os.path.join("/tmp", obj_path) tmp_dir = os.path.dirname(tmp_path) if os.path.exists(tmp_dir): shutil.rmtree(tmp_dir) os.makedirs(tmp_dir) s3.download_file(query_bucket, obj_path, tmp_path) ## load query string with open(tmp_path) as f: sql = f.read() ## db exec query with connection.cursor() as cursor: results = cursor.execute(sql) columns = get_columns(cursor) data = cursor.fetchall() ## upload to s3 df = pd.DataFrame(data, columns=columns, dtype="object") store_path = os.path.join( f"s3://{target_bucket}", target_prefix, f"{query['table']}.csv", ) wr.s3.to_csv( df=df, path=store_path, index=False, ) ret.append({ "sql": sql, "store_path": store_path, }) return ret Lambda関数 11 / 29
  4. あとは、ECRリポジトリにあるコマンドを順番に実行して、ビルド & プッシュ。 ## login aws ecr get-login-password --region ap-northeast-1

    | \ docker login --username AWS --password-stdin \ <AWS ACCOUNT ID>.dkr.ecr.ap-northeast-1.amazonaws.com ## build docker build --platform linux/amd64 -t <Dockerイメージ名>:latest . ## push先に元イメージを参照するtagを作成 docker tag <Dockerイメージ名>:latest \ <AWS ACCOUNT ID>.dkr.ecr.ap-northeast-1.amazonaws.com/<ECRリポジトリ名>:latest ## push docker push <AWS ACCOUNT ID>.dkr.ecr.ap-northeast-1.amazonaws.com/<ECRリポジトリ名>:latest Amazon ECRへのイメージプッシュ 12 / 29
  5. Lambda関数にコンテナイメージを関連付ければOK aws lambda create-function --function-name oracle-container-lambda-function \ --role OracleContainerLambdaFunctionRole \

    --package-type Image \ --code ImageUri="<AWS ACCOUNT ID>.dkr.ecr.ap-northeast-1.amazonaws.com/<ECRリポジトリ名>:latest" Lambda関数定義 13 / 29
  6. 通常のLambda関数の呼び出しを行うだけ ステートマシンからのpayloadも、Lambda関数の event 引数に格納されている Comment: >- ETL1 State Machine StartAt:

    ETL_process States: ETL_process: Type: Task Resource: ${LambdaFunctionARN} ResultPath: "$" End: true Step Functionsステートマシン上でのLambdaコンテナ 15 / 29
  7. Lambdaコンテナではなく、通常のzipアーカイブの関数を用意 今回はSnowflakeのSQL REST APIをコールして、SQLでデータを取り込む ## <省略> def handler(event, context): query

    = event["query"] query_s3_path = query["s3_path"] ## 認証トークン生成 jwt_token, account_id = generate_jwt_token() ## クエリ取得 sql = load_query_statement(query_s3_path) ## Snowflake SQL APIコール用設定 url = "" ## <省略> headers = {} ## <省略> query_params = {} ## <省略> ## Snowflake SQL APIコール response = requests.post(url, headers=headers, json=query_params,) ## <省略> return {} S3 → Snowflakeも似たような感じ 16 / 29
  8. ecs_startup.py という関数を用意 lambda_function.py に渡す event 引数に、ステートマシンから渡されるコマ ンド引数を割り当てる import os import

    sys import json import boto3 import botocore import traceback ## main if __name__ == '__main__': args = sys.argv event = {} queries_jsonstr = args[1] event['queries'] = json.loads(queries_jsonstr) ssm = boto3.client('ssm') ssm_path_prefix = '<SSMパラメータストアのキーのパス>' next_token = '' res = ssm.get_parameters_by_path( Path = f'{ssm_path_prefix}/', Recursive = True, WithDecryption = True, MaxResults = 10, ) params = res['Parameters'] ## Lambdaコンテナで使っていた環境変数を、SSMパラメータストアから割り当て os.environ['DB_USERNAME'] = '' ## <省略> ## call the main function. sfn = boto3.client("stepfunctions") try: from lambda_function import handler result = handler(event, {}) ## 任意のメッセージや結果を返したい場合は、send_task_success()をコール sfn.send_task_success( taskToken=os.environ.get("TASK_TOKEN"), output=json.dumps(result), ) except Exception as e: print(e) traceback.print_exc() ## 任意のエラーメッセージを返したい場合は、send_task_failure()をコール sfn.send_task_failure( taskToken=os.environ.get("TASK_TOKEN"), error='States.TaskFailed', cause=f'${e}', ) raise Exception エントリポイント用の関数を用意 19 / 29
  9. ENTRYPOINTは先ほど作成した ecs_startup.py に変更 awslambdaric は不要 CMDは、ステートマシンから渡す # Copy function code

    RUN mkdir -p ${FUNCTION_DIR} COPY lambda_function.py ${FUNCTION_DIR} COPY requirements.txt / COPY ecs_startup.py ${FUNCTION_DIR} # Install dependencies RUN python3 -m pip install --target ${FUNCTION_DIR} -r /requirements.txt WORKDIR ${FUNCTION_DIR} SHELL ["/bin/bash", "-c"] ENTRYPOINT [ "/usr/local/bin/python3", "./ecs_startup.py"] Dockerfileのエントリポイントを変更 20 / 29
  10. ECSサービスを定義する代わりに、ステートマシン上で定義する ステートマシンでコールされた分だけ、タスクが起動する Comment: >- ETL1 State Machine StartAt: ETL_query_serialize States:

    ETL_query_serialize: Type: Pass Parameters: query_serialize.$: States.JsonToString($.queries) Next: ETL_Command ETL_Command: Type: Pass Parameters: commands.$: States.Array($.query_serialize) Next: ETL_process ETL_process: Type: Task Resource: "arn:aws:states:::ecs:runTask.sync" ResultPath: null Parameters: LaunchType: FARGATE Cluster: ${EcsClusterArn} TaskDefinition: ${EcsTaskDefinitionArn} NetworkConfiguration: AwsvpcConfiguration: Subnets: - ${subnetId1} - ${subnetId2} SecurityGroups: - ${securityGroupId1} AssignPublicIp: DISABLED Overrides: ContainerOverrides: - Name: ${EcsTaskDefinitionContainerName} Command.$: $.commands Environment: - Name: TASK_TOKEN Value.$: $$.Task.Token End: true Step Functionsのステートマシン上でサービス定義 21 / 29
  11. コンテナのCMDは、ContainerOverridesで上書き可能 ステートマシンのpayloadから、必要な項目を配列で渡してあげると、各々の引数 としてエントリポイントに渡せる JSON項目は、 States.JsonToString() で文字列化すれば渡せる ETL_Command: Type: Pass Parameters:

    commands.$: States.Array($.query_serialize) Next: ETL_process Overrides: ContainerOverrides: - Name: ${EcsTaskDefinitionContainerName} Command.$: $.commands payloadは、必要な項目をCommandに渡す 22 / 29
  12. ステートマシンのIAMロールに以下の権限を追加しないと、ステートマシンからの ECSタスク起動時にエラーになるので注意 手動で設定する場合に忘れがち { "Version": "2012-10-17", "Statement": [ { "Effect":

    "Allow", "Action": [ "events:PutTargets", "events:PutRule", "events:DescribeRule" ], "Resource": [ "arn:aws:events:ap-northeast-1:<AWS_ACCOUNT_ID>:rule/StepFunctionsGetEventsForECSTaskRule" ] }, // <省略> ] } ステートマシンの実行権限に注意 24 / 29
  13. AWS SAMのテンプレートで定義して、ビルド & デプロイ sam build sam deploy \ --stack-name

    etl-module-sample \ --s3-bucket deploy-bucket \ --s3-prefix sam \ --parameter-overrides Env=dev \ --capabilities "CAPABILITY_NAMED_IAM" samconfig.toml に必要なデプロイ設定仕込んでおけば、以下のように指定してもOK sam deploy --config-env dev 実際の設定 & デプロイはAWS SAMで実施 25 / 29
  14. Lambda → 軽めのタスクを並列でぶん回したい場合はLambdaが良い 起動は早い タイムアウト・メモリ・ストレージ等の制約は大きい タイムアウト: 最大15分 メモリ: 最大10GB エフェメラルストレージ(

    tmp ): 512MB〜10GB ECS/Fargate → 重めのタスク起動に向いている 起動するまで時間はそれなりにかかる タイムアウト・メモリ・ストレージ等の制約は少ない タイムアウト: 最大14日 メモリ: 最大120GB エフェメラルストレージ: 20GB〜200GB LambdaコンテナとECS/Fargateコンテナの違い 27 / 29