Upgrade to Pro — share decks privately, control downloads, hide ads and more …

非同期タスクキューを使って業務効率化した話

Avatar for Hiroshi Sano Hiroshi Sano
January 25, 2024

 非同期タスクキューを使って業務効率化した話

Avatar for Hiroshi Sano

Hiroshi Sano

January 25, 2024
Tweet

More Decks by Hiroshi Sano

Other Decks in Programming

Transcript

  1. お前誰よ / Self Introduction 佐野浩士(Hiroshi Sano)@hrs_sano645 : 静岡県富士市 : 株式会社佐野設計事務所 代表取締役

    : PyCon mini Shizuoka Stuff / Shizuoka.py / Unagi.py / Python駿河 CivicTech, Startup Weekend Organizer Hobby: Camp , DIY , IoT 3
  2. 4

  3. compose.yml version: '3' services: redis: image: redis worker: build: .

    depends_on: - redis environment: RQ_REDIS_URL: redis://redis command: rq worker volumes: - .:/app working_dir: /app app: build: . depends_on: - redis - worker environment: RQ_REDIS_URL: redis://redis command: python app.py volumes: - .:/app working_dir: /app 18
  4. ファイル操作をしてみる tasks.py from pathlib import Path import random import string

    import sys def create_random_string(length): """ 指定された長さのランダムな文字列を生成する関数""" letters = string.ascii_letters + string.digits return "".join(random.choice(letters) for i in range(length)) def create_files(num_files, file_size, directory="test_files"): """ 指定された数とサイズのファイルを生成する関数""" Path(directory).mkdir(parents=True, exist_ok=True) for i in range(num_files): savefile = Path(f"{directory}/file_{i}.txt") with savefile.open("w") as f: f.write(create_random_string(file_size)) 19
  5. app.py import os import redis from rq import Queue from

    tasks import create_files NUM_FILES = 100 FILE_SIZE = 1048576 NUM_TASKS = 3 q = Queue(connection=redis.from_url(os.environ.get("RQ_REDIS_URL"))) # タスクの実行をキューに投げる tasks = [ q.enqueue(create_files, args=(NUM_FILES, FILE_SIZE, f"test_files_{i}")) for i in range(NUM_TASKS) ] 20
  6. 実行 # シングルワーカー $ docker compose up # マルチワーカー: 3

    つのワーカーを起動 $ docker compose up --scale worker=3 ## ログは別途ファイルでみせます 21
  7. ワーカーのログ: シングルワーカー 1つのワーカーにタスクが順番に渡されて実行される file_task-worker-1 | 03:20:35 Worker rq:worker:81027d039a944dc3b8a230519243f68e started with

    PID 1, version 1.15.1 file_task-worker-1 | 03:20:35 Subscribing to channel rq:pubsub:81027d039a944dc3b8a230519243f68e file_task-worker-1 | 03:20:35 *** Listening on default... file_task-worker-1 | 03:20:35 default: tasks.create_files(50, 1048576, 'test_files_0') (f18592d2-16ba-4c82-98ef-11da85c44493) file_task-app-1 exited with code 0 file_task-worker-1 | 03:20:52 default: Job OK (f18592d2-16ba-4c82-98ef-11da85c44493) file_task-worker-1 | 03:20:52 Result is kept for 500 seconds file_task-worker-1 | 03:20:52 default: tasks.create_files(50, 1048576, 'test_files_1') (9f4a596f-73af-4973-9007-af03da8f5057) file_task-worker-1 | 03:21:09 default: Job OK (9f4a596f-73af-4973-9007-af03da8f5057) file_task-worker-1 | 03:21:09 Result is kept for 500 seconds file_task-worker-1 | 03:21:09 default: tasks.create_files(50, 1048576, 'test_files_2') (bf5c15ad-3222-45be-ab8a-3f214a57700d) file_task-worker-1 | 03:21:27 default: Job OK (bf5c15ad-3222-45be-ab8a-3f214a57700d) file_task-worker-1 | 03:21:27 Result is kept for 500 seconds 22
  8. ワーカーのログ: マルチワーカー 3つそれぞれが並列に動いて実行される file_task-worker-3 | 03:19:26 Worker rq:worker:a3fae5de17c34f658f597ce4d5543dbc started with

    PID 1, version 1.15.1 file_task-worker-3 | 03:19:26 Subscribing to channel rq:pubsub:a3fae5de17c34f658f597ce4d5543dbc file_task-worker-3 | 03:19:26 *** Listening on default... file_task-worker-3 | 03:19:26 Cleaning registries for queue: default file_task-worker-3 | StartedJobRegistry cleanup: Moving job to FailedJobRegistry (due to AbandonedJobError) file_task-worker-3 | StartedJobRegistry cleanup: Moving job to FailedJobRegistry (due to AbandonedJobError) file_task-worker-3 | StartedJobRegistry cleanup: Moving job to FailedJobRegistry (due to AbandonedJobError) file_task-worker-2 | 03:19:26 Worker rq:worker:3f33ea404d2040e99961f0a2d5d46b1f started with PID 1, version 1.15.1 file_task-worker-2 | 03:19:26 Subscribing to channel rq:pubsub:3f33ea404d2040e99961f0a2d5d46b1f file_task-worker-2 | 03:19:26 *** Listening on default... file_task-worker-1 | 03:19:26 Worker rq:worker:32c4d2c46a944ba3b05dcb295cc522c2 started with PID 1, version 1.15.1 file_task-worker-1 | 03:19:26 Subscribing to channel rq:pubsub:32c4d2c46a944ba3b05dcb295cc522c2 file_task-worker-1 | 03:19:26 *** Listening on default... file_task-worker-3 | 03:19:27 default: tasks.create_files(50, 1048576, 'test_files_0') (a2767c5e-2caa-47ed-b8f2-204881f671ac) file_task-worker-2 | 03:19:27 default: tasks.create_files(50, 1048576, 'test_files_1') (288a07b9-80eb-4736-9e1c-56887781babe) file_task-worker-1 | 03:19:27 default: tasks.create_files(50, 1048576, 'test_files_2') (49f43d8d-b0c2-4edb-a4b3-711852102a9f) file_task-app-1 exited with code 0 file_task-worker-2 | 03:19:44 default: Job OK (288a07b9-80eb-4736-9e1c-56887781babe) file_task-worker-2 | 03:19:44 Result is kept for 500 seconds file_task-worker-1 | 03:19:44 default: Job OK (49f43d8d-b0c2-4edb-a4b3-711852102a9f) file_task-worker-1 | 03:19:44 Result is kept for 500 seconds file_task-worker-3 | 03:19:44 default: Job OK (a2767c5e-2caa-47ed-b8f2-204881f671ac) file_task-worker-3 | 03:19:44 Result is kept for 500 seconds 23
  9. 参考 メッセージキュー - Wikipedia python-rq 【Pythonで高速化】I / Oバウンドとか並列処理とかマルチプロセスとかってなんぞや #Python -

    Qiita docker利用時の参考: Python で分散タスクキュー (RQ 編) #Python - Qiita Python3.12で新たにサポートされたsub-interpretersの紹介 | gihyo.jp サンプルコード https://github.com/hrsano645/exam-python-rq-by-docker 26