Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Python on Google Cloud Functions で作るバッチ処理

Python on Google Cloud Functions で作るバッチ処理

Avatar for Suzu Ito

Suzu Ito

June 23, 2021
Tweet

More Decks by Suzu Ito

Other Decks in Programming

Transcript

  1. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - ૉ๿ͳ࣮૷ σʔλϕʔε σʔλͷ நग़ͱՃ޻ ετϨʔδ def transfer_data(event,

    context): with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn: current_data = None while True: data = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break write_data_to_storage(storage_conn, edit_data(data)) current_data = data[-1]
  2. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - ૉ๿ͳ࣮૷ σʔλϕʔε σʔλͷ நग़ͱՃ޻ ετϨʔδ def transfer_data(event,

    context): with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn: current_data = None while True: data = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break write_data_to_storage(storage_conn, edit_data(data)) current_data = data[-1] σʔλநग़ σʔλՃ޻ʢʴసૹʣ
  3. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - ࠷௿ݶͷ࣮૷ σʔλϕʔε σʔλͷ நग़ͱՃ޻ ετϨʔδ ετϨʔδ ్தܦաΛ

    อଘ def transfer_data(event, context): with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn1,\ new_storage_conn() as storage_conn2: current_data = load(storage_conn2) while True: datas = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break write_data_to_storage(storage_conn1, edit_data(data)) current_data = data[-1] save(storage_conn2, current_data)
  4. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - ࠷௿ݶͷ࣮૷ σʔλϕʔε σʔλͷ நग़ͱՃ޻ ετϨʔδ ετϨʔδ ్தܦաΛ

    อଘ def transfer_data(event, context): with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn1,\ new_storage_conn() as storage_conn2: current_data = load(storage_conn2) while True: datas = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break write_data_to_storage(storage_conn1, edit_data(data)) current_data = data[-1] save(storage_conn2, current_data) ॲཧͷ్தܦաΛ TUPSBHF͔Βऔಘ ॲཧͷ్தܦաΛ TUPSBHF΁อଘ
  5. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - ฒྻॲཧʹΑΔ࣮૷ σʔλϕʔε σʔλͷ நग़ͱՃ޻ ετϨʔδ ετϨʔδ ్தܦաΛ

    อଘ def transfer_data(event, context): # σʔλͷநग़ jobs = {} max_jobs = 10 with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn1,\ new_storage_conn() as storage_conn2: current_data = load(storage_conn2) while True: sweep_terminated_jobs(jobs) if len(jobs) > max_jobs: continue data = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break job = new_job(target=transfer_data_each, args=(data, )) job.start() jobs[job.name] = job current_data = data[-1] save(storage_conn2, current_data) def transfer_data_each(data): # σʔλͷՃ޻ with new_storage_conn() as storage_conn1: write_data_to_storage(storage_conn1, edit_data(data))
  6. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - ฒྻॲཧʹΑΔ࣮૷ σʔλϕʔε σʔλͷ நग़ͱՃ޻ ετϨʔδ ετϨʔδ ్தܦաΛ

    อଘ def transfer_data(event, context): # σʔλͷநग़ jobs = {} max_jobs = 10 with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn1,\ new_storage_conn() as storage_conn2: current_data = load(storage_conn2) while True: sweep_terminated_jobs(jobs) if len(jobs) > max_jobs: continue data = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break job = new_job(target=transfer_data_each, args=(data, )) job.start() jobs[job.name] = job current_data = data[-1] save(storage_conn2, current_data) def transfer_data_each(data): # σʔλͷՃ޻ with new_storage_conn() as storage_conn1: write_data_to_storage(storage_conn1, edit_data(data)) ऴྃͨ͠+PCΛ ϓʔϧ͔Βআ֎ +PCͷϓʔϧ +PC࡞੒ ͜͜Ͱ͍͏+PCͱ͸ ϓϩηεɺ΋͘͠͸εϨου
  7. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - Fan-outʹΑΔ࣮૷ σʔλϕʔε σʔλͷநग़ ετϨʔδ ɾɾɾ σʔλͷՃ޻ def

    transfer_data(event, context): # σʔλͷநग़ with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn1,\ new_storage_conn() as storage_conn2,\ new_queue_conn() as queue_conn: current_data = load(storage_conn2) while True: data = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break enqueue(queue_conn, data) current_data = data[-1] save(storage_conn2, current_data) def transfer_data_each(event, context): # σʔλͷՃ޻ data = get_data_from_event(event) with new_storage_conn() as storage_conn: write_datas_to_storage(storage_conn, edit_data([data]))
  8. 2,ݱঢ়෼ੳͱ໰୊఺ 2,Ͳ͏࡞͔ͬͨ - Fan-outʹΑΔ࣮૷ σʔλϕʔε σʔλͷநग़ ετϨʔδ ɾɾɾ σʔλͷՃ޻ def

    transfer_data(event, context): # σʔλͷநग़ with new_db_conn() as db_conn,\ new_storage_conn() as storage_conn1,\ new_storage_conn() as storage_conn2,\ new_queue_conn() as queue_conn: current_data = load(storage_conn2) while True: data = read_data_from_db( db_conn, current_data, 1000, ) if len(data) <= 0: break enqueue(queue_conn, data) current_data = data[-1] save(storage_conn2, current_data) def transfer_data_each(event, context): # σʔλͷՃ޻ data = get_data_from_event(event) with new_storage_conn() as storage_conn: write_datas_to_storage(storage_conn, edit_data([data])) ΩϡʔʹೖΕΔ͚ͩ ΩϡʔͷσʔλΛೖྗ ͱͯ͠ىಈ͢Δ ΋͏Ұͭͷ ($'JOTUBODF