as pd import boot3 bucket = boto3.resource("s3").Bucket(s3_bucket) @task(name="extract task") def extract(file_name: str) -> pd.DataFrame: bucket.download_file(Key=file_name, Filename=file_name) df = pd.read_csv(file_name) return df @task(name="transform task",retries=3, retry_delay_seconds=60) # リトライ設定 def transform(df: pd.DataFrame) -> pd.DataFrame: df_transformed = df.groupby(["type1"]).mean().reset_index() return df_transformed @task(name="load task", timeout_seconds=30) # タイムアウト設定 def load(df: pd.DataFrame, file_name: str) -> None: df.to_csv(file_name) bucket.upload_file(Key=file_name, Filename=file_name) @flow(name="ETL Flow") def etl_flow(download_file_name: str, upload_file_name: str): extracted_df = extract(file_name=download_file_name) transformed_df = transform(extracted_df) load(transformed_df, file_name=upload_file_name) if __name__ == "__main__": etl_flow(download_file_name="extract.csv", upload_file_name="transformed.csv") Pythonでワークフローを作成 Flow・Taskをつくる Try and Errorで開発