Upgrade to Pro — share decks privately, control downloads, hide ads and more …

こんな感じでデータパイプライン作ってます
2019年春

 こんな感じでデータパイプライン作ってます
2019年春

Avatar for Yuku TAKAHASHI

Yuku TAKAHASHI

April 16, 2019
Tweet

More Decks by Yuku TAKAHASHI

Other Decks in Technology

Transcript

  1. CLIENT SOLUTION w ϨίϝϯυγεςϜ w ෺ྲྀ࠷దԽ w ݕࡧΤϯδϯ w ޿ࠂ഑৴γεςϜ

    w ͳͲ w ಺෦ʹ຾ΔେྔͷσʔλΛ׆༻ ͍ͨ͠ w ΞΠσΞΛ࣮ݱͰ͖Δਓࡐ͕͍ ͳ͍
  2. CLIENT
 DATA DATA LAKE DATA WAREHOUSE DATA
 MART DATA SCIENTISTS

    ϨίϝϯυύΠϓϥΠϯWʢ$MPVE$PNQPTFSʣ RECOMMENDER SYSTEM
  3. #JH2VFSZͷ̏֊૚ w %"5"-",& w ΫϥΠΞϯτ͔Βఏڙ͞ΕΔੜσʔλΛ஝ੵ͢Δ w %"5"8"3&)064& w அยԽͨ͠σʔλΛ෮ݩɾඇਖ਼نԽɺ໋໊نଇʹҰ؏ੑΛ΋ͨͤΔɺ/6--Λഉআͯ͠ར༻͠ ΍͘͢͢ΔɺͳͲ

    w ΫϥΠΞϯτͷσʔλ෼ੳνʔϜʹఏڙ͢Δ͜ͱ΋ w %"5"."35 w $MPVE%BUBMBCͳͲͷ#*πʔϧ͔Βࢀর͢Δ w ఏڙ͍ͯ͠ΔϨίϝϯυγεςϜͷޮՌଌఆͱ͔
  4. ͚ͬ͜͏(,&LTྗΛٻΊΒΕΔ w $MPVE$PNQPTFS͸(,&ͷ্ʹσϓϩΠ͞ΕΔϑϧϚωʔδυ"JSqPXαʔϏεɻ
 ࠔͬͨ࣌ʹ͸(,&ܦ༝Ͱ"JSqPXʹ઀ଓͯ͠σόοάͨ͠Γ͢Δඞཁ͕͋ΔͷͰɺ
 (,&ͱLTʹ͍ͭͯͷجૅ஌ࣝ͘Β͍͸͍࣋ͬͯͳ͍ͱͭΒ͍ɻ w ͱΓ͋͑ͣLVCFDUMΛηοτΞοϓ͓ͯ͘͠ɻ GKE_CLUSTER="$(gcloud composer environments

    describe $COMPOSER_NAME \ --format='get(config.gkeCluster)')" GKE_LOCATION="$(gcloud composer environments describe $COMPOSER_NAME \ --format='get(config.nodeConfig.location)')" gcloud container clusters get-credentials $GKE_CLUSTER \ --zone $GKE_LOCATION
  5. $MPVE.FNPSZTUPSFʹ઀ଓͰ͖ͳ͍ w (,&͔Β.FNPSZTUPSFʢ3FEJTʣʹ઀ଓ͢Δʹ͸Ϋϥελ࡞੒࣌ʹ*1ΤΠϦΞεΛ༗ޮ ʹ͠ͳ͚Ε͹͍͚ͳ͍͕ɺ$MPVE$PNQPTFS͸ແޮʹͯ͠࡞ͬͯ͠·͏ɻ
 ແޮͷ৔߹ΫϥελʹJQUBCMFTͷϧʔϧΛ௥Ճ͢Δඞཁ͕͋Δɻ w $MPVE$PNQPTFS͕؅ཧ͍ͯ͠ΔLTΫϥελΛ͋·Γ৮Γͨ͘ͳ͔ͬͨͷͰɺ౿Έ୆ͱ ͳΔ($&ΠϯελϯεΛཱͯͯղܾͨ͠ɻ from redis

    import StrictRedis from sshtunnel import SSHTunnelForwarder with SSHTunnelForwarder((bastion_host, bastion_port), ssh_username="airflow", remote_bind_address=(redis_host, redis_port), local_bind_address=("127.0.0.1", local_port), allow_agent=False): client = StrictRedis(host="127.0.0.1", port=local_port) client.ping()
  6. UFNQMBUF@FYU w ࢦఆͨ͠஋ͰऴΘΔจࣈྻΛϑΝΠϧύεͱͯ͠ղऍ͠ɺ࣮ମΛࢦఆ͞ΕͨϑΝΠϧͷத ਎Λ+JOKBͰϨϯμϦϯάͨ݁͠ՌͰஔ͖׵͑ΔͱΜͰ΋ͳ͍ศརͳػೳɻ w υΩϡϝϯτ͸ແ͍͕͠Εͬͱ#BTF0QFSBUPSʹ࣮૷͞Ε͍ͯΔɻ with open("foo/bar.sql") as f:

    sql = f.read() PythonOperator( template_dict={"sql": sql} # ... ) class SQLTemplateOperator(PythonOperator): template_ext = (".sql",) SQLTemplateOperator( template_dict={"sql": "foo/bar.sql"}, # ... ) IUUQTTUBDLPWFSqPXDPNB