Upgrade to Pro — share decks privately, control downloads, hide ads and more …

こんな感じでデータパイプライン作ってます
2019年春

Sponsored · SiteGround - Reliable hosting with speed, security, and support you can count on.

 こんな感じでデータパイプライン作ってます
2019年春

Avatar for Yuku TAKAHASHI

Yuku TAKAHASHI

April 16, 2019
Tweet

More Decks by Yuku TAKAHASHI

Other Decks in Technology

Transcript

  1. CLIENT SOLUTION w ϨίϝϯυγεςϜ w ෺ྲྀ࠷దԽ w ݕࡧΤϯδϯ w ޿ࠂ഑৴γεςϜ

    w ͳͲ w ಺෦ʹ຾ΔେྔͷσʔλΛ׆༻ ͍ͨ͠ w ΞΠσΞΛ࣮ݱͰ͖Δਓࡐ͕͍ ͳ͍
  2. CLIENT
 DATA DATA LAKE DATA WAREHOUSE DATA
 MART DATA SCIENTISTS

    ϨίϝϯυύΠϓϥΠϯWʢ$MPVE$PNQPTFSʣ RECOMMENDER SYSTEM
  3. #JH2VFSZͷ̏֊૚ w %"5"-",& w ΫϥΠΞϯτ͔Βఏڙ͞ΕΔੜσʔλΛ஝ੵ͢Δ w %"5"8"3&)064& w அยԽͨ͠σʔλΛ෮ݩɾඇਖ਼نԽɺ໋໊نଇʹҰ؏ੑΛ΋ͨͤΔɺ/6--Λഉআͯ͠ར༻͠ ΍͘͢͢ΔɺͳͲ

    w ΫϥΠΞϯτͷσʔλ෼ੳνʔϜʹఏڙ͢Δ͜ͱ΋ w %"5"."35 w $MPVE%BUBMBCͳͲͷ#*πʔϧ͔Βࢀর͢Δ w ఏڙ͍ͯ͠ΔϨίϝϯυγεςϜͷޮՌଌఆͱ͔
  4. ͚ͬ͜͏(,&LTྗΛٻΊΒΕΔ w $MPVE$PNQPTFS͸(,&ͷ্ʹσϓϩΠ͞ΕΔϑϧϚωʔδυ"JSqPXαʔϏεɻ
 ࠔͬͨ࣌ʹ͸(,&ܦ༝Ͱ"JSqPXʹ઀ଓͯ͠σόοάͨ͠Γ͢Δඞཁ͕͋ΔͷͰɺ
 (,&ͱLTʹ͍ͭͯͷجૅ஌ࣝ͘Β͍͸͍࣋ͬͯͳ͍ͱͭΒ͍ɻ w ͱΓ͋͑ͣLVCFDUMΛηοτΞοϓ͓ͯ͘͠ɻ GKE_CLUSTER="$(gcloud composer environments

    describe $COMPOSER_NAME \ --format='get(config.gkeCluster)')" GKE_LOCATION="$(gcloud composer environments describe $COMPOSER_NAME \ --format='get(config.nodeConfig.location)')" gcloud container clusters get-credentials $GKE_CLUSTER \ --zone $GKE_LOCATION
  5. $MPVE.FNPSZTUPSFʹ઀ଓͰ͖ͳ͍ w (,&͔Β.FNPSZTUPSFʢ3FEJTʣʹ઀ଓ͢Δʹ͸Ϋϥελ࡞੒࣌ʹ*1ΤΠϦΞεΛ༗ޮ ʹ͠ͳ͚Ε͹͍͚ͳ͍͕ɺ$MPVE$PNQPTFS͸ແޮʹͯ͠࡞ͬͯ͠·͏ɻ
 ແޮͷ৔߹ΫϥελʹJQUBCMFTͷϧʔϧΛ௥Ճ͢Δඞཁ͕͋Δɻ w $MPVE$PNQPTFS͕؅ཧ͍ͯ͠ΔLTΫϥελΛ͋·Γ৮Γͨ͘ͳ͔ͬͨͷͰɺ౿Έ୆ͱ ͳΔ($&ΠϯελϯεΛཱͯͯղܾͨ͠ɻ from redis

    import StrictRedis from sshtunnel import SSHTunnelForwarder with SSHTunnelForwarder((bastion_host, bastion_port), ssh_username="airflow", remote_bind_address=(redis_host, redis_port), local_bind_address=("127.0.0.1", local_port), allow_agent=False): client = StrictRedis(host="127.0.0.1", port=local_port) client.ping()
  6. UFNQMBUF@FYU w ࢦఆͨ͠஋ͰऴΘΔจࣈྻΛϑΝΠϧύεͱͯ͠ղऍ͠ɺ࣮ମΛࢦఆ͞ΕͨϑΝΠϧͷத ਎Λ+JOKBͰϨϯμϦϯάͨ݁͠ՌͰஔ͖׵͑ΔͱΜͰ΋ͳ͍ศརͳػೳɻ w υΩϡϝϯτ͸ແ͍͕͠Εͬͱ#BTF0QFSBUPSʹ࣮૷͞Ε͍ͯΔɻ with open("foo/bar.sql") as f:

    sql = f.read() PythonOperator( template_dict={"sql": sql} # ... ) class SQLTemplateOperator(PythonOperator): template_ext = (".sql",) SQLTemplateOperator( template_dict={"sql": "foo/bar.sql"}, # ... ) IUUQTTUBDLPWFSqPXDPNB