Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Step Functions と Lambda Function で組む Athena による...

Step Functions と Lambda Function で組む Athena によるファイル処理 / serverless lightweight etl

本セッションではイベント管理サービスを例にとり、セッション情報のCSVファイルとチケット情報のCSVファイルからひとつのファイルに変換するETLを考えていきます。AWS Glue と Amazon Athena のことを知り、サーバーレスアプリケーションに組み込むイメージを持って帰ってもらうことが目的です。

アプローチ:
・AWS Glue と Amazon Athena のことを知る
・ファイル to ファイル のETLユースケースを考える
・AWS CDK でリソースを作成してみる

Avatar for Yusuke Wada

Yusuke Wada

July 29, 2020
Tweet

More Decks by Yusuke Wada

Other Decks in Technology

Transcript

  1. "84(MVF  ߏ੒ཁૉ͸ɺσʔλΧλϩάͱαʔόʔϨε&5-ɺΦʔ έετϨʔγϣϯͷ̏ͭ ߏ੒ཁૉ Ϧιʔε໊௨শ ʢ$MPVE'PSNBUJPOΑΓʣ σʔλΧλϩά "84(MVF5BCMF αʔόʔϨε&5-

    "84(MVF+PC ΦʔέετϨʔγϣϯ "84(MVF8PSLqPX ࢀߟɿAWS CloudFormation ςϯϓϨʔτΛ࢖༻ͨ͠σʔλΧλϩάͷࣄલઃఆ - https://docs.aws.amazon.com/ja_jp/glue/latest/dg/populate-with-cloudformation-templates.html
  2. "84(MVF  ߏ੒ཁૉ͸ɺσʔλΧλϩάͱαʔόʔϨε&5-ɺΦʔ έετϨʔγϣϯͷ̏ͭ ߏ੒ཁૉ Ϧιʔε໊௨শ ʢ$MPVE'PSNBUJPOΑΓʣ σʔλΧλϩά "84(MVF5BCMF αʔόʔϨε&5-

    "84(MVF+PC ΦʔέετϨʔγϣϯ "84(MVF8PSLqPX ࢀߟɿAWS CloudFormation ςϯϓϨʔτΛ࢖༻ͨ͠σʔλΧλϩάͷࣄલઃఆ - https://docs.aws.amazon.com/ja_jp/glue/latest/dg/populate-with-cloudformation-templates.html
  3. "84(MVF5BCMF  σʔλͷ࣮ମ 4ͳͲ  εΩʔϚఆٛͳͲͷϝλσʔ λ 20190806 AWS Black

    Belt Online Seminar AWS Glue https://www.slideshare.net/AmazonWebServicesJapan/20190806-aws-black-belt-online-seminar-aws-glue?ref=https://aws.amazon.com/jp/blogs/news/webinar-bb-aws-glue-2019/
  4. "84(MVF+PC  εΫϦϓτ͸1ZUIPO4IFMMͱ4QBSLͰબ୒ "84(MVF+PC 1ZUIPO4IFMM "84(MVF+PC 4QBSL  ʢ$MPVE'PSNBUJPOΑΓʣ ϝϞϦ

    (#PS(# (# ࣮ߦ࣌ؒ σϑΥϧτ࣌ؒ σϑΥϧτ࣌ؒ ಉ࣮࣌ߦ਺ ΞΧ΢ϯτ಺Ͱ ΞΧ΢ϯτ಺Ͱ ՝ֹۚ ੑೳͱ࣮ߦ࣌ؒϕʔεͷैྔ՝ۚ ͨͩ͠গͳ͘ͱ΋෼ʹ੾Γ্͛ ੑೳͱ࣮ߦ࣌ؒϕʔεͷैྔ՝ۚ ͨͩ͠গͳ͘ͱ΋෼ʹ੾Γ্͛ ىಈ࣌ؒ ਺ेඵ਺෼ ਺ेඵ਺෼ ݴޠ 1ZUIPO 1ZUIPOPS4DBMB AWS GlueΛ࢖ͬͨ Serverless ETL ͷ࣮૷ύλʔϯ - Speaker Deck https://speakerdeck.com/seiichi1101/aws-gluewoshi-tuta-serverless-etl-falseshi-zhuang-patan-ac5553a5-f8e9-45fb-b093-4bcf0b113a48?slide=41
  5. ࢀՃऀνέοτͷ$47ग़ྗ  λΠϜελϯϓ ໊͓લ ϝʔϧΞυϨε ॴଐ૊৫ ࢀՃ೔ ޕޙ(.5  ࿨ా༞հ

    XBEBZVTVLF !DMBTTNFUIPEKQ Ϋϥεϝιουגࣜձࣾ ೔ ޕޙ(.5  ৽Ҫ੣Ұ XBEBZVTVLF !DMBTTNFUIPEKQ Ϋϥεϝιουגࣜձࣾ ೔ ࢀՃ೔Λ͓બͼ͍ͩ͘͞ɻ νέοτ*% Ϣʔβʔ*% ೔໨೔໨೔໨ BFBCFCBEFCEFG GEEBBDBFCBC ೔໨೔໨೔໨ FGFBBCFBBCF ECED
  6. νέοτ*%͔Βηογϣϯ৘ใΛҾ͘  (&5IUUQTBQJNFFUVQQFSDPNTFTTJPO FWFOU*E UJDLFU*EBFBCFCBEFCEFG { "name": "࿨ా ༞հ", "title":

    "αʔόʔϨε x ϑΝΠϧɿAmazon Athena ͱ Step Functions Λ૊Έ߹ΘͤͯϑΝΠϧΛ݁߹͢Δ", "description": "αʔόʔϨεͰϑΝΠϧΛѻ͏γʔϯ͕૿͖͑ͯ·ͨ͠ɻAWSͰอଘ͓ͯ͘͠৔ॴͱͯ͠S3͸༗ྗͰ͕͢ɺΞ ϓϦέʔγϣϯͰར༻Ͱ͖Δܗࣜʹม׵͢Δํ๏ͱͯ͠ɺଟ͘ͷબ୒ࢶ͕͋Γ·͢ɻ\n͜ͷηογϣϯͰ͸ɺʮผʑͷαʔϏε Ͱൃߦ͞ΕͨCSVϑΝΠϧΛJOIN͢Δʯͱ͍͏Ϣʔεέʔεʹର͠ɺAmazonAthenaΛ࢖ͬͨΞϓϩʔνΛ঺հ͠·͢ɻ͞Β ʹɺҰ࿈ͷॲཧΛStepFunctionsʹ·ͱΊΔ͜ͱͰɺδϣϒͱͯ͠ఆٛ͢Δํ๏Λࣔ͠·͢ɻ" }
  7. ͳͥผϧʔτΛݕ౼͢Δͷ͔ʁ  ͜ͷن໛ͩͱ(MVF+PCͷىಈ࣌ؒͱ՝ֹ͕ۚωοΫ "84(MVF+PC 1ZUIPO4IFMM "84(MVF+PC 4QBSL  ʢ$MPVE'PSNBUJPOΑΓʣ ϝϞϦ

    (#PS(# (# ࣮ߦ࣌ؒ σϑΥϧτ࣌ؒ σϑΥϧτ࣌ؒ ಉ࣮࣌ߦ਺ ΞΧ΢ϯτ಺Ͱ ΞΧ΢ϯτ಺Ͱ ՝ֹۚ ੑೳͱ࣮ߦ࣌ؒϕʔεͷैྔ՝ۚ ͨͩ͠গͳ͘ͱ΋෼ʹ੾Γ্͛ ੑೳͱ࣮ߦ࣌ؒϕʔεͷैྔ՝ۚ ͨͩ͠গͳ͘ͱ΋෼ʹ੾Γ্͛ ىಈ࣌ؒ ਺ेඵ਺෼ ਺ेඵ਺෼ ݴޠ 1ZUIPO 1ZUIPOPS4DBMB AWS GlueΛ࢖ͬͨ Serverless ETL ͷ࣮૷ύλʔϯ - Speaker Deck https://speakerdeck.com/seiichi1101/aws-gluewoshi-tuta-serverless-etl-falseshi-zhuang-patan-ac5553a5-f8e9-45fb-b093-4bcf0b113a48?slide=41
  8. ͳͥผϧʔτΛݕ౼͢Δͷ͔ʁ  ͜ͷن໛ͩͱ(MVF+PCͷىಈ࣌ؒͱ՝ֹ͕ۚωοΫ "84(MVF+PC 1ZUIPO4IFMM "84(MVF+PC 4QBSL  ʢ$MPVE'PSNBUJPOΑΓʣ ϝϞϦ

    (#PS(# (# ࣮ߦ࣌ؒ σϑΥϧτ࣌ؒ σϑΥϧτ࣌ؒ ಉ࣮࣌ߦ਺ ΞΧ΢ϯτ಺Ͱ ΞΧ΢ϯτ಺Ͱ ՝ֹۚ ੑೳͱ࣮ߦ࣌ؒϕʔεͷैྔ՝ۚ ͨͩ͠গͳ͘ͱ΋෼ʹ੾Γ্͛ ੑೳͱ࣮ߦ࣌ؒϕʔεͷैྔ՝ۚ ͨͩ͠গͳ͘ͱ΋෼ʹ੾Γ্͛ ىಈ࣌ؒ ਺ेඵ਺෼ ਺ेඵ਺෼ ݴޠ 1ZUIPO 1ZUIPOPS4DBMB AWS GlueΛ࢖ͬͨ Serverless ETL ͷ࣮૷ύλʔϯ - Speaker Deck https://speakerdeck.com/seiichi1101/aws-gluewoshi-tuta-serverless-etl-falseshi-zhuang-patan-ac5553a5-f8e9-45fb-b093-4bcf0b113a48?slide=41
  9. ͳͥผϧʔτΛݕ౼͢Δͷ͔ʁ  (MVF5BCMFͷΫΤϦ͸"UIFOBʹ೚࣮ͤͯߦ؀ڥΛ -BNCEB'VODUJPOʹ͢Δͱ͍͏ख "84(MVF+PC 1ZUIPO4IFMM "84(MVF+PC 4QBSL  ʢ$MPVE'PSNBUJPOΑΓʣ

    "84-BNCEB ϝϞϦ (#PS(# (# .# ࣮ߦ࣌ؒ σϑΥϧτ࣌ؒ σϑΥϧτ࣌ؒ ࠷େ෼ ಉ࣮࣌ߦ਺ ΞΧ΢ϯτ಺Ͱ ΞΧ΢ϯτ಺Ͱ Ϧʔδϣϯຖʹ ՝ֹۚ ෼ʹ੾Γ্͛ ෼ʹ੾Γ্͛ ϛϦඵʹ੾Γ্͛ ىಈ࣌ؒ ਺ेඵ਺෼ ਺ेඵ਺෼ ਺ϛϦඵ਺ඵ ݴޠ 1ZUIPO 1ZUIPOPS4DBMB αϙʔτ͍ͯ͠ΔϥϯλΠϜ ΧελϜϥϯλΠϜ AWS GlueΛ࢖ͬͨ Serverless ETL ͷ࣮૷ύλʔϯ - Speaker Deck https://speakerdeck.com/seiichi1101/aws-gluewoshi-tuta-serverless-etl-falseshi-zhuang-patan-ac5553a5-f8e9-45fb-b093-4bcf0b113a48?slide=41
  10. "84$%,*OGSBTUSVDUVSFBT$PEF  5ZQF4DSJQUͳͲͷίʔυ͔Β$MPVE'PSNBUJPOς ϯϓϨʔτΛग़ྗɺσϓϩΠՄೳ What is the AWS CDK? -

    AWS Cloud Development Kit (AWS CDK) https://docs.aws.amazon.com/cdk/latest/guide/home.html
  11. $%,αϯϓϧɿ4 -BNCEB'VODUJPO  export async function sessionConverter( stack: cdk.Construct, global:

    GlobalProps, ): Promise<EuqueteBucketResource> { const rawCsvBucket = new s3.Bucket(stack, 'Raw', { bucketName: global.getBucketName('raw', cdk.Stack.of(stack).account), cors: [ { allowedHeaders: ['*'], allowedMethods: [HttpMethods.PUT], allowedOrigins: ['*'], }, ], }); const jsonLinesBucket = new s3.Bucket(stack, 'JsonLines', { bucketName: global.getBucketName( 'json-lines', cdk.Stack.of(stack).account, ), }); const convertSessionCsvToJsonLinesFn = new lambda.Function( stack, 'ConvertSessionCsvToJsonLines', { functionName: global.getFunctionName('ConvertSessionCsvToJsonLines'), code: lambda.Code.fromAsset(NODE_LAMBDA_SRC_DIR), handler: 'lambda/handlers/sf/csv-converter/convert-session-csv-to-json-handler.handler', runtime: lambda.Runtime.NODEJS_12_X, layers: [nodeModulesLayer], timeout: Duration.minutes(15), memorySize: 256, environment: { REGION: global.pm.region, CSV_BUCKET_NAME: rawCsvBucket.bucketName, JSON_LINES_BUCKET_NAME: jsonLinesBucket.bucketName, }, }, ); rawCsvBucket.grantRead(convertSessionCsvToJsonLinesFn); jsonLinesBucket.grantReadWrite(convertSessionCsvToJsonLinesFn);
  12. $%,αϯϓϧɿ4 -BNCEB'VODUJPO  export async function sessionConverter( stack: cdk.Construct, global:

    GlobalProps, ): Promise<EuqueteBucketResource> { const rawCsvBucket = new s3.Bucket(stack, 'Raw', { bucketName: global.getBucketName('raw', cdk.Stack.of(stack).account), cors: [ { allowedHeaders: ['*'], allowedMethods: [HttpMethods.PUT], allowedOrigins: ['*'], }, ], }); const jsonLinesBucket = new s3.Bucket(stack, 'JsonLines', { bucketName: global.getBucketName( 'json-lines', cdk.Stack.of(stack).account, ), }); const convertSessionCsvToJsonLinesFn = new lambda.Function( stack, 'ConvertSessionCsvToJsonLines', { functionName: global.getFunctionName('ConvertSessionCsvToJsonLines'), code: lambda.Code.fromAsset(NODE_LAMBDA_SRC_DIR), handler: 'lambda/handlers/sf/csv-converter/convert-session-csv-to-json-handler.handler', runtime: lambda.Runtime.NODEJS_12_X, layers: [nodeModulesLayer], timeout: Duration.minutes(15), memorySize: 256, environment: { REGION: global.pm.region, CSV_BUCKET_NAME: rawCsvBucket.bucketName, JSON_LINES_BUCKET_NAME: jsonLinesBucket.bucketName, }, }, ); rawCsvBucket.grantRead(convertSessionCsvToJsonLinesFn); jsonLinesBucket.grantReadWrite(convertSessionCsvToJsonLinesFn); w $47ੜσʔλΛ౤ೖ͢Δ4 όέοτ w QSFTJHOFEVSM͔ΒΞοϓ ϩʔυͯ͠΋Β͏ͷͰ$034 ઃఆ
  13. $%,αϯϓϧɿ4 -BNCEB'VODUJPO  export async function sessionConverter( stack: cdk.Construct, global:

    GlobalProps, ): Promise<EuqueteBucketResource> { const rawCsvBucket = new s3.Bucket(stack, 'Raw', { bucketName: global.getBucketName('raw', cdk.Stack.of(stack).account), cors: [ { allowedHeaders: ['*'], allowedMethods: [HttpMethods.PUT], allowedOrigins: ['*'], }, ], }); const jsonLinesBucket = new s3.Bucket(stack, 'JsonLines', { bucketName: global.getBucketName( 'json-lines', cdk.Stack.of(stack).account, ), }); const convertSessionCsvToJsonLinesFn = new lambda.Function( stack, 'ConvertSessionCsvToJsonLines', { functionName: global.getFunctionName('ConvertSessionCsvToJsonLines'), code: lambda.Code.fromAsset(NODE_LAMBDA_SRC_DIR), handler: 'lambda/handlers/sf/csv-converter/convert-session-csv-to-json-handler.handler', runtime: lambda.Runtime.NODEJS_12_X, layers: [nodeModulesLayer], timeout: Duration.minutes(15), memorySize: 256, environment: { REGION: global.pm.region, CSV_BUCKET_NAME: rawCsvBucket.bucketName, JSON_LINES_BUCKET_NAME: jsonLinesBucket.bucketName, }, }, ); rawCsvBucket.grantRead(convertSessionCsvToJsonLinesFn); jsonLinesBucket.grantReadWrite(convertSessionCsvToJsonLinesFn); w +40/-JOFTΛอଘ͓ͯ͘͠ όέοτ
  14. $%,αϯϓϧɿ4 -BNCEB'VODUJPO  export async function sessionConverter( stack: cdk.Construct, global:

    GlobalProps, ): Promise<EuqueteBucketResource> { const rawCsvBucket = new s3.Bucket(stack, 'Raw', { bucketName: global.getBucketName('raw', cdk.Stack.of(stack).account), cors: [ { allowedHeaders: ['*'], allowedMethods: [HttpMethods.PUT], allowedOrigins: ['*'], }, ], }); const jsonLinesBucket = new s3.Bucket(stack, 'JsonLines', { bucketName: global.getBucketName( 'json-lines', cdk.Stack.of(stack).account, ), }); const convertSessionCsvToJsonLinesFn = new lambda.Function( stack, 'ConvertSessionCsvToJsonLines', { functionName: global.getFunctionName('ConvertSessionCsvToJsonLines'), code: lambda.Code.fromAsset(NODE_LAMBDA_SRC_DIR), handler: 'lambda/handlers/sf/csv-converter/convert-session-csv-to-json-handler.handler', runtime: lambda.Runtime.NODEJS_12_X, layers: [nodeModulesLayer], timeout: Duration.minutes(15), memorySize: 256, environment: { REGION: global.pm.region, CSV_BUCKET_NAME: rawCsvBucket.bucketName, JSON_LINES_BUCKET_NAME: jsonLinesBucket.bucketName, }, }, ); rawCsvBucket.grantRead(convertSessionCsvToJsonLinesFn); jsonLinesBucket.grantReadWrite(convertSessionCsvToJsonLinesFn); w ੜσʔλόέοτ͔Βϩʔυ ͠ɺม׵͠ɺ+40/-JOFTό έοτʹ౤ೖ͢Δ-BNCEB 'VODUJPO
  15. $%,αϯϓϧɿ4 (MVF5BCMF  export async function eventGlueTableResource( stack: cdk.Construct, global:

    GlobalProps, s3Resource: SlsGlueTableResourceInput, ): Promise<EventGlueTableResource> { const slsVirtualDataBase = new glue.Database(stack, 'SlsVirtualDatabase', { databaseName: global.getGlueDatabaseName('slsVirtual_application'), }); const sessionGlueTable = new glue.Table(stack, 'SessionTable', { database: slsVirtualDataBase, tableName: 'session', columns: [ { name: 'sessionId', type: glue.Schema.STRING, }, { name: 'email', type: glue.Schema.STRING, }, { name: 'title', type: glue.Schema.STRING, }, { name: 'description', type: glue.Schema.STRING, }, { name: 'organizationName', type: glue.Schema.STRING, }, { name: 'name', type: glue.Schema.STRING, }, ], dataFormat: glue.DataFormat.JSON, bucket: s3Resource.jsonLinesBucket, s3Prefix: 'session/', partitionKeys: [ { name: 'eventId', type: glue.Schema.STRING, }, { name: 'versionId', type: glue.Schema.STRING, }, ], });
  16. $%,αϯϓϧɿ4 (MVF5BCMF  export async function eventGlueTableResource( stack: cdk.Construct, global:

    GlobalProps, s3Resource: SlsGlueTableResourceInput, ): Promise<EventGlueTableResource> { const slsVirtualDataBase = new glue.Database(stack, 'SlsVirtualDatabase', { databaseName: global.getGlueDatabaseName('slsVirtual_application'), }); const sessionGlueTable = new glue.Table(stack, 'SessionTable', { database: slsVirtualDataBase, tableName: 'session', columns: [ { name: 'sessionId', type: glue.Schema.STRING, }, { name: 'email', type: glue.Schema.STRING, }, { name: 'title', type: glue.Schema.STRING, }, { name: 'description', type: glue.Schema.STRING, }, { name: 'organizationName', type: glue.Schema.STRING, }, { name: 'name', type: glue.Schema.STRING, }, ], dataFormat: glue.DataFormat.JSON, bucket: s3Resource.jsonLinesBucket, s3Prefix: 'session/', partitionKeys: [ { name: 'eventId', type: glue.Schema.STRING, }, { name: 'versionId', type: glue.Schema.STRING, }, ], }); w (MVF%BUBCBTFͱ(MVF 5BCMFΛ࡞੒
  17. $%,αϯϓϧɿ4 (MVF5BCMF  export async function eventGlueTableResource( stack: cdk.Construct, global:

    GlobalProps, s3Resource: SlsGlueTableResourceInput, ): Promise<EventGlueTableResource> { const slsVirtualDataBase = new glue.Database(stack, 'SlsVirtualDatabase', { databaseName: global.getGlueDatabaseName('slsVirtual_application'), }); const sessionGlueTable = new glue.Table(stack, 'SessionTable', { database: slsVirtualDataBase, tableName: 'session', columns: [ { name: 'sessionId', type: glue.Schema.STRING, }, { name: 'email', type: glue.Schema.STRING, }, { name: 'title', type: glue.Schema.STRING, }, { name: 'description', type: glue.Schema.STRING, }, { name: 'organizationName', type: glue.Schema.STRING, }, { name: 'name', type: glue.Schema.STRING, }, ], dataFormat: glue.DataFormat.JSON, bucket: s3Resource.jsonLinesBucket, s3Prefix: 'session/', partitionKeys: [ { name: 'eventId', type: glue.Schema.STRING, }, { name: 'versionId', type: glue.Schema.STRING, }, ], }); w σʔλιʔεͱͳΔ4ό έοτΛࢦఆ
  18. $%,αϯϓϧɿ4 (MVF5BCMF  const ticketGlueTable = new glue.Table(stack, 'TicketTable', {

    database: slsVirtualDataBase, tableName: 'ticket', columns: [ { name: 'email', type: glue.Schema.STRING, }, { name: 'ticketId', type: glue.Schema.STRING, }, ], dataFormat: glue.DataFormat.JSON, bucket: s3Resource.jsonLinesBucket, s3Prefix: 'ticket/', partitionKeys: [ { name: 'eventId', type: glue.Schema.STRING, }, { name: 'versionId', type: glue.Schema.STRING, }, ], }); w νέοτ༻ͷ(MVF5BCMF΋ ಉ͡Α͏ʹ࡞੒
  19. $%,αϯϓϧɿ4 (MVF5BCMF  const entryTable = new glue.Table(stack, 'EntryTable', {

    database: slsVirtualDataBase, tableName: 'entry', columns: [ { name: 'sessionId', type: glue.Schema.STRING, }, { name: 'name', type: glue.Schema.STRING, }, { name: 'title', type: glue.Schema.STRING, }, { name: 'description', type: glue.Schema.STRING, }, { name: 'ticketId', type: glue.Schema.STRING, }, ], dataFormat: new glue.DataFormat({ inputFormat: glue.InputFormat.TEXT, outputFormat: glue.OutputFormat.HIVE_IGNORE_KEY_TEXT, serializationLibrary: glue.SerializationLibrary.HIVE_JSON, }), bucket: s3Resource.entryBucket, s3Prefix: 'entry/', partitionKeys: [ { name: 'eventId', type: glue.Schema.STRING, }, ], w ࠷ޙʹɺ"UIFOBͰ+0*/͠ ͨσʔλΛ౤ೖ͢Δ༻ͷ (MVF5BCMF΋༻ҙ͓ͯ͘͠
  20. +0*/͢Δ-BNCEB'VODUJPOͷίʔυ͸͜Μͳײ͡  export async function handler( event: JoinRequest, context?: LambdaContext,

    ): Promise<JoinResponse> { const param: DocumentClient.GetItemInput = { TableName: EventDynamodbTableName, Key: { id: event.eventId, }, }; const res = await dynamo.get(param).promise(); const sessionVersionId = res.Item!.sessionJsonLinesInfo.versionId; const ticketVersionId = res.Item!.ticketJsonLinesInfo.versionId; try { const param = { ResultConfiguration: { OutputLocation: `s3://${AthenaOutputBucketName}/`, }, QueryExecutionContext: { Database: SlsVirtualGlueDataBaseName, }, QueryString: [ `insert into ${EntryGlueTableName}`, 'select session.sessionId,', 'ticket.ticketId,', 'session.name,', 'session.organizationName,', 'session.title,', 'session.description,', 'ticket.email', // lower case for partition key `from ${SessionGlueTableName} session inner join ${TicketGlueTableName} ticket on session.email = ticket.email`, `where session.versionId = '${sessionVersionId}' and ticket.versionId = '${ticketVersionId}'`, ].join(' '), }; console.log('insert into param', param); const execution = await athena.startQueryExecution(param).promise(); console.log(execution); return { executionId: execution.QueryExecutionId!, }; } catch (e) { console.log(e); throw e; } }
  21. export async function handler( event: JoinRequest, context?: LambdaContext, ): Promise<JoinResponse>

    { const param: DocumentClient.GetItemInput = { TableName: EventDynamodbTableName, Key: { id: event.eventId, }, }; const res = await dynamo.get(param).promise(); const sessionVersionId = res.Item!.sessionJsonLinesInfo.versionId; const ticketVersionId = res.Item!.ticketJsonLinesInfo.versionId; try { const param = { ResultConfiguration: { OutputLocation: `s3://${AthenaOutputBucketName}/`, }, QueryExecutionContext: { Database: SlsVirtualGlueDataBaseName, }, QueryString: [ `insert into ${EntryGlueTableName}`, 'select session.sessionId,', 'ticket.ticketId,', 'session.name,', 'session.organizationName,', 'session.title,', 'session.description,', 'ticket.email', // lower case for partition key `from ${SessionGlueTableName} session inner join ${TicketGlueTableName} ticket on session.email = ticket.email`, `where session.versionId = '${sessionVersionId}' and ticket.versionId = '${ticketVersionId}'`, ].join(' '), }; console.log('insert into param', param); const execution = await athena.startQueryExecution(param).promise(); console.log(execution); return { executionId: execution.QueryExecutionId!, }; } catch (e) { console.log(e); throw e; } } -BNCEB'VODUJPOͷίʔυ͸͜Μͳײ͡  w ϝΞυͰ+0*/ w +0*/ޙͷσʔλΛ&OUSZ ςʔϒϧʹ౤ೖ
  22. export async function handler( event: JoinRequest, context?: LambdaContext, ): Promise<JoinResponse>

    { const param: DocumentClient.GetItemInput = { TableName: EventDynamodbTableName, Key: { id: event.eventId, }, }; const res = await dynamo.get(param).promise(); const sessionVersionId = res.Item!.sessionJsonLinesInfo.versionId; const ticketVersionId = res.Item!.ticketJsonLinesInfo.versionId; try { const param = { ResultConfiguration: { OutputLocation: `s3://${AthenaOutputBucketName}/`, }, QueryExecutionContext: { Database: SlsVirtualGlueDataBaseName, }, QueryString: [ `insert into ${EntryGlueTableName}`, 'select session.sessionId,', 'ticket.ticketId,', 'session.name,', 'session.organizationName,', 'session.title,', 'session.description,', 'ticket.email', // lower case for partition key `from ${SessionGlueTableName} session inner join ${TicketGlueTableName} ticket on session.email = ticket.email`, `where session.versionId = '${sessionVersionId}' and ticket.versionId = '${ticketVersionId}'`, ].join(' '), }; console.log('insert into param', param); const execution = await athena.startQueryExecution(param).promise(); console.log(execution); return { executionId: execution.QueryExecutionId!, }; } catch (e) { console.log(e); throw e; } } -BNCEB'VODUJPOͷίʔυ͸͜Μͳײ͡  w "UIFOBΫΤϦΛ࣮ߦ
  23. async function createS3SelectStream<T>( request: EntryRequest, expression: string, ): Promise<Subject<T>> {

    const source$ = new Subject<T>(); const content = await s3 .selectObjectContent({ Bucket: request.bucket, Key: request.key, InputSerialization: { JSON: { Type: 'LINES' }, CompressionType: 'GZIP', }, OutputSerialization: { JSON: { RecordDelimiter: '\n' } }, Expression: expression, ExpressionType: 'SQL', }) .promise(); const contentStream: any = content.Payload!; const filterRecordsOnly = new stream.Transform({ objectMode: true, transform( chunk: any, encoding: string, done: stream.TransformCallback, ): void { if (chunk.Records) { this.push(chunk.Records.Payload); } else if (chunk.Stats) { console.log( `Processed ${chunk.Stats.Details.BytesProcessed} bytes`, ); } else if (chunk.End) { console.log('SelectObjectContent completed'); } done(); }, }); const readLine = readline.createInterface( contentStream.pipe(filterRecordsOnly), ); readLine .on('line', (line: any) => { // console.log('line', line); source$.next(JSON.parse(line) as T); }) .on('close', () => { console.log('end'); source$.complete(); 44FMFDUͷ-BNCEB'VODUJPOίʔυ 
  24. async function createS3SelectStream<T>( request: EntryRequest, expression: string, ): Promise<Subject<T>> {

    const source$ = new Subject<T>(); const content = await s3 .selectObjectContent({ Bucket: request.bucket, Key: request.key, InputSerialization: { JSON: { Type: 'LINES' }, CompressionType: 'GZIP', }, OutputSerialization: { JSON: { RecordDelimiter: '\n' } }, Expression: expression, ExpressionType: 'SQL', }) .promise(); const contentStream: any = content.Payload!; const filterRecordsOnly = new stream.Transform({ objectMode: true, transform( chunk: any, encoding: string, done: stream.TransformCallback, ): void { if (chunk.Records) { this.push(chunk.Records.Payload); } else if (chunk.Stats) { console.log( `Processed ${chunk.Stats.Details.BytesProcessed} bytes`, ); } else if (chunk.End) { console.log('SelectObjectContent completed'); } done(); }, }); const readLine = readline.createInterface( contentStream.pipe(filterRecordsOnly), ); readLine .on('line', (line: any) => { // console.log('line', line); source$.next(JSON.parse(line) as T); }) .on('close', () => { console.log('end'); source$.complete(); 44FMFDUͷ-BNCEB'VODUJPOίʔυ  w TFMFDU GSPNTPCKFDUPCKXIFSFPCKUJDLFUJE b`
  25. async function createS3SelectStream<T>( request: EntryRequest, expression: string, ): Promise<Subject<T>> {

    const source$ = new Subject<T>(); const content = await s3 .selectObjectContent({ Bucket: request.bucket, Key: request.key, InputSerialization: { JSON: { Type: 'LINES' }, CompressionType: 'GZIP', }, OutputSerialization: { JSON: { RecordDelimiter: '\n' } }, Expression: expression, ExpressionType: 'SQL', }) .promise(); const contentStream: any = content.Payload!; const filterRecordsOnly = new stream.Transform({ objectMode: true, transform( chunk: any, encoding: string, done: stream.TransformCallback, ): void { if (chunk.Records) { this.push(chunk.Records.Payload); } else if (chunk.Stats) { console.log( `Processed ${chunk.Stats.Details.BytesProcessed} bytes`, ); } else if (chunk.End) { console.log('SelectObjectContent completed'); } done(); }, }); const readLine = readline.createInterface( contentStream.pipe(filterRecordsOnly), ); readLine .on('line', (line: any) => { // console.log('line', line); source$.next(JSON.parse(line) as T); }) .on('close', () => { console.log('end'); source$.complete(); 44FMFDUͷ-BNCEB'VODUJPOίʔυ  w 44FMFDU͸݁ՌΛಠࣗͷ/PEFKTετϦʔϜͰฦ͢ͷͰ ͕Μ͹ͬͯऔಘ͍ͯ͠Δ༷ࢠ