Upgrade to Pro — share decks privately, control downloads, hide ads and more …

セマフォでタスクの同時実行数制限

Sponsored · Ship Features Fearlessly Turn features on and off without deploys. Used by thousands of Ruby developers.

 セマフォでタスクの同時実行数制限

Avatar for Hank Ehly

Hank Ehly

May 26, 2022
Tweet

More Decks by Hank Ehly

Other Decks in Technology

Transcript

  1. 問題 • 並列タスクの同時実行数を制御したい場合がある ◦ スクレイピング ◦ レート制限がかかっている API ◦ CPU/メモリーの負荷を抑えたい

    • ThreadPoolExecutor / ProcessPoolExecutor • 同時実行数を制御するオプションがない ◦ “一度に最大 N タスクまで実行したい ”
  2. どんな工夫があるか 1. time.sleep while True: run_job() time.sleep(3) current_jobs = 0

    max_jobs = 3 while True: if current_jobs < max_jobs: run_job() current_jobs -= 1 while True: if acquire_lock(): run_job() release_lock() 2. グローバルなカウンター変数 3. ロックオブジェクト ◦ redis ◦ multiprocessing.Lock ◦ 空っぽのファイル
  3. Semaphore と BoundedSemaphore - Semaphore - BoundedSemaphore sem = threading.Semaphore(1)

    print(sem._value) # 1 sem.acquire() # Trueを返す print(sem._value) # 0 sem.acquire(timeout=5) # 5秒経過後にFalseを返す sem.release() print(sem._value) # 1 sem.release() print(sem._value) # 2 セマフォが「獲得回数と解放回数」を見てくれる
  4. Semaphore と BoundedSemaphore - Semaphore - BoundedSemaphore - 有限セマフォとも sem

    = threading.BoundedSemaphore(1) print(sem._value) # 1 sem.acquire() # Trueを返す print(sem._value) # 0 sem.acquire(timeout=5) # 5秒経過後にFalseを返す sem.release() print(sem._value) # 1 sem.release() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/ (省略) /threading.py", line 504, in release raise ValueError("Semaphore released too many times") ValueError: Semaphore released too many times 下限値に加えて上限値もある。 “解放”し過ぎないようにしたい時に使う
  5. 【実例】スクレイピング def scrape_url(url): res = requests.get(url) s3_bucket.put(res.content, key=”...”) urls =

    [“http://netkeiba.com”, ...] sem = BoundedSemaphore(3) with ThreadPoolExecutor() as pool: while urls: if sem.acquire(): try: urls = url.pop() task = pool.submit(scrape_url, url) task.add_done_callback(lambda _: sem.release()) except: sem.release() else: logging.debug("他のタスク待ち..") コード例:qiita.com/hankehly
  6. 【実例】スクレイピング def scrape_url(url): res = requests.get(url) s3_bucket.put(res.content, key=”...”) urls =

    [“http://netkeiba.com”, ...] sem = BoundedSemaphore(3) with ThreadPoolExecutor() as pool: while urls: if sem.acquire(): try: urls = url.pop() task = pool.submit(scrape_url, url) task.add_done_callback(lambda _: sem.release()) except: sem.release() else: logging.debug("他のタスク待ち..") コード例:qiita.com/hankehly
  7. 【実例】スクレイピング def scrape_url(url): res = requests.get(url) s3_bucket.put(res.content, key=”...”) urls =

    [“http://netkeiba.com”, ...] sem = BoundedSemaphore(3) with ThreadPoolExecutor() as pool: while urls: if sem.acquire(): try: urls = url.pop() task = pool.submit(scrape_url, url) task.add_done_callback(lambda _: sem.release()) except: sem.release() else: logging.debug("他のタスク待ち..") コード例:qiita.com/hankehly