Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
こんな感じでデータパイプライン作ってます 2019年春
Search
Sponsored
·
SiteGround - Reliable hosting with speed, security, and support you can count on.
→
Yuku TAKAHASHI
April 16, 2019
Technology
4.5k
2
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
こんな感じでデータパイプライン作ってます 2019年春
Yuku TAKAHASHI
April 16, 2019
More Decks by Yuku TAKAHASHI
See All by Yuku TAKAHASHI
入門書には載ってない Git & GitHub Tips
yuku
88
16k
Other Decks in Technology
See All in Technology
AIのReact習熟度を測る
uhyo
2
510
AIの性能が向上しても未解決な組織の重大問題は何か?/An Unsolved Organizational Problem in the Age of AI
moriyuya
4
660
「エンジニア進化論」2028年の開発完全自動化、エンジニアはどう進化するか
cyberagentdevelopers
PRO
6
5.1k
中期計画、2回作ってみた ~業務委託と正社員、両方の視点から~
demaecan
1
750
就職⽀援サービスにおけるキャリアアドバイザーのシフトスケジューリング
recruitengineers
PRO
1
140
LayerXにおけるセキュリティ管理の現在地と次の一手
tosho
0
170
Agent Skills設計で柔軟性と硬さのバランスが難しい話
nassy20
0
130
日本 Fintech 未来予測レポート 2027〜2028年(手動編集版)
8maki
0
2.3k
AIソロプレナー時代に2ヶ月で20人増員した事業創造会社の開発組織の話
miyatakoji
0
650
あなたの知らないPDFのアクセシビリティ
lycorptech_jp
PRO
0
180
やさしいA2A入門
minorun365
PRO
12
1.8k
【NRUG vol.18】なぜ多くのオブザーバビリティ導入は失敗するのか
nrug_member
0
120
Featured
See All Featured
Skip the Path - Find Your Career Trail
mkilby
1
150
Highjacked: Video Game Concept Design
rkendrick25
PRO
1
390
AI: The stuff that nobody shows you
jnunemaker
PRO
8
710
Tell your own story through comics
letsgokoyo
1
950
Breaking role norms: Why Content Design is so much more than writing copy - Taylor Woolridge
uxyall
0
320
Why Our Code Smells
bkeepers
PRO
340
58k
Docker and Python
trallard
47
3.9k
The innovator’s Mindset - Leading Through an Era of Exponential Change - McGill University 2025
jdejongh
PRO
1
200
Dealing with People You Can't Stand - Big Design 2015
cassininazir
367
27k
Statistics for Hackers
jakevdp
799
230k
Code Reviewing Like a Champion
maltzj
528
40k
VelocityConf: Rendering Performance Case Studies
addyosmani
333
25k
Transcript
͜Μͳײ͡ͰσʔλύΠϓϥΠϯ࡞ͬͯ·͢ य़ !ZVLV@U %BUB1JQFMJOF$BTVBM5BML
∁ڮါٱ UXJUUFSDPNZVLV@U HJUIVCDPNZVLV RJJUBDPNZVLV@U 4PGUXBSF&OHJOFFS!'-:8)&&- '-:8)&&-ͰϨίϝϯυΤϯδϯͱ ͦͷͨΊͷύΠϓϥΠϯΛ։ൃ͍ͯ͠·͢ɻ ݄ΑΓݱ৬ɻ
ࠓͷ w σʔλύΠϓϥΠϯͬͯͦͦͳΜ͚ͩͬ w ϲ݄ؒύΠϓϥΠϯΛ࡞͖ͬͯͯͷॴײ w υΩϡϝϯτʹࡌͬͯͳ͍$MPVE$PNQPTFS5JQT
SOLUTION DATA DATA PIPELINE
w ༷ʑͳྺ࢙తܦҢͰग़དྷ্͕ͬͨ ෳࡶͰஅยԽͨ͠σʔλ w ϩά͕͋ΔΑ͏ͳͳ͍Α͏ͳ w ෆ҆ఆͳετϨʔδ DATA DATA PIPELINE
w ͍ͬ͢͝ਓೳ͕৭ʑͳΛ ͍͍ײ͡ʹղܾͯ͘͠ΕΔͭ SOLUTION DATA PIPELINE
$0--&$5 .07&4503& &91-03&53"/4'03. "((3&("5&-"#&- -&"3/015*.*;& "*%&&1-&"3/*/( IUUQTIBDLFSOPPODPNUIFBJIJFSBSDIZPGOFFETGGDD 5)&%"5"4$*&/$& )*&3"3$):0'/&&%4 ਏ͍
ָ͍͠
CLIENT SOLUTION w ϨίϝϯυγεςϜ w ྲྀ࠷దԽ w ݕࡧΤϯδϯ w ࠂ৴γεςϜ
w ͳͲ w ෦ʹΔେྔͷσʔλΛ׆༻ ͍ͨ͠ w ΞΠσΞΛ࣮ݱͰ͖Δਓࡐ͕͍ ͳ͍
'-:8)&&-㱠σʔλੳ4BB4 w σʔλΛूΊΔͱ͜Ζ͔ΒιϦϡʔγϣϯΛ࡞Δͱ͜Ζ·Ͱ w ΫϥΠΞϯτͷ"84ΞΧϯτ($1ϓϩδΣΫτʹιϑτΣΞΛల։͍ͯ͠ Δɻ w ٻΊΔػೳ͕ͦ͜ʹ͋Δͱ͔͍ͬͯͯɺ֎෦ʹσʔλΛग़͢͜ͱ͕Ͱ͖ͳ͍اۀ ۀछ͕ଘࡏ͢ΔɻʢओʹͰ͔͍ͬاۀʣ w
ϚϧνΫϥυڥԼͰ͍͔ʹιϑτΣΞࢿ࢈Λங͖ɺύΠϓϥΠϯͰܨ͍Ͱ͍͔͕͘ ࠓޙͷେ͖ͳٕज़తͳνϟϨϯδͷҰͭɻ
ϚϧνΫϥυύΠϓϥΠϯʂʂʂ
ϚϧνΫϥυύΠϓϥΠϯʂʂʂ γΣϧεΫϦϓτ͔Βͷ٫
γΣϧεΫϦϓτʹΑΔύΠϓϥΠϯ w ΈΜͳੲγΣϧεΫϦϓτͩͬͨ w DSPOͰఆظ࣮ߦ͞ΕΔγΣϧεΫϦϓτ͔Β࣮ߦ͞ΕΔ1ZUIPOεΫϦϓτ܈ w ਏ͞ w ෳࡶԽ͍ͯ͘͠ύΠϓϥΠϯʹରԠ͖͠Εͳ͍ w
͍ͭͲͷλεΫ͕ࣦഊͨ͠ͷ͔͔Βͳ͍ w ࣦഊͨ͠λεΫTTIͯ͠࠶࣮ߦ
CLIENT DATA ϨίϝϯυύΠϓϥΠϯWʢγΣϧεΫϦϓτʣ JOINED DATA RECOMMENDER SYSTEM
CLIENT DATA DATA LAKE DATA WAREHOUSE DATA MART DATA SCIENTISTS
ϨίϝϯυύΠϓϥΠϯWʢ$MPVE$PNQPTFSʣ RECOMMENDER SYSTEM
جຊํ w σʔλج൫ͷྨͱਐԽతσʔλϞσϦϯά w %"5"-",&ˠ%"5"8"3&)064&ˠ%"5"."35 w σʔλ#JH2VFSZʹੵ͠ɺۃྗ42-Λ͏ɻ42-Ͱ͍͠ͱ͜Ζ͚ͩ$MPVE %BUBQSPDʢ4QBSLʣΛ࣮ͬͯ͢Δɻ w ϫʔΫϑϩʔ$MPVE$PNQPTFSʢ"JSqPXʣͰཧ͢Δɻ
#JH2VFSZͷ̏֊ w %"5"-",& w ΫϥΠΞϯτ͔Βఏڙ͞ΕΔੜσʔλΛੵ͢Δ w %"5"8"3&)064& w அยԽͨ͠σʔλΛ෮ݩɾඇਖ਼نԽɺ໋໊نଇʹҰ؏ੑΛͨͤΔɺ/6--Λഉআͯ͠ར༻͠ ͘͢͢ΔɺͳͲ
w ΫϥΠΞϯτͷσʔλੳνʔϜʹఏڙ͢Δ͜ͱ w %"5"."35 w $MPVE%BUBMBCͳͲͷ#*πʔϧ͔Βࢀর͢Δ w ఏڙ͍ͯ͠ΔϨίϝϯυγεςϜͷޮՌଌఆͱ͔
γΣϧεΫϦϓτஔ͖͑ਐΊͯΈͯ w "JSqPX͕ͲΜͳʹਏͯ͘ੲΛࢥ͍ग़ͤؤுΕΔ w 8FC6*͕͍͖ͭͯͯخ͍͠ w ؆୯ʹ࠶࣮ߦͰ͖ΔΑ͏ʹͳͬͯλεΫͷႈੑΛҙࣝ͢ΔΑ͏ʹͳͬͨ w ʢσʔλΛอଘ͢Δͱ͜Ζ͔Βίϯαϧ͠ͳ͍ͱμϝͳͷͰʜʁʣ
υΩϡϝϯτʹࡌͬͯͳ͍ $MPVE$PNQPTFSͷݟ
͚ͬ͜͏(,<ྗΛٻΊΒΕΔ w $MPVE$PNQPTFS(,&ͷ্ʹσϓϩΠ͞ΕΔϑϧϚωʔδυ"JSqPXαʔϏεɻ ࠔͬͨ࣌ʹ(,&ܦ༝Ͱ"JSqPXʹଓͯ͠σόοάͨ͠Γ͢Δඞཁ͕͋ΔͷͰɺ (,&ͱLTʹ͍ͭͯͷجૅࣝ͘Β͍͍࣋ͬͯͳ͍ͱͭΒ͍ɻ w ͱΓ͋͑ͣLVCFDUMΛηοτΞοϓ͓ͯ͘͠ɻ GKE_CLUSTER="$(gcloud composer environments
describe $COMPOSER_NAME \ --format='get(config.gkeCluster)')" GKE_LOCATION="$(gcloud composer environments describe $COMPOSER_NAME \ --format='get(config.nodeConfig.location)')" gcloud container clusters get-credentials $GKE_CLUSTER \ --zone $GKE_LOCATION
ϝϞϦ͕Γͳ͍ͱͬͯࢮ͵ w ϩάΛు͔ͣʹλεΫ͕ࣦഊ͢Δͱ͖ɺϝϞϦෆͰBJSqPXXPSLFS͝ͱLTʹࡴ͞Ε ͍ͯΔՄೳੑ͕͋Δɻ w ,VCFSOFUFT&OHJOF8PSLMPBETBJSqPXXPSLFSͰ&WJDUFEͳ1PEΛબ͢Δ͜ͱͰ ࢮҼΛ֬ೝՄೳɻ w OTUBOEBSEͩͱ"1*Λݺͼग़͚ͩ͢ͷ1ZUIPO0QFSBUPSͰكʹࡴ͞ΕΔɻ w
&WJDUFEͳ1PEΛҰ͢ΔίϚϯυͰఆظతʹ($͢Δɻ kubectl get pods -l run=airflow-worker \ | grep Evicted \ | awk '{print $1}' \ | xargs kubectl delete pod
1ZUIPOύοέʔδ͕ඍົʹݹ͍ w $MPVE$PNQPTFSͷBJSqPXXPSLFSʹ͍͔ͭ͘ͷ1ZUIPOύοέʔδ͕Πϯετʔϧ͞ Ε͍ͯΔ͕ɺυΩϡϝϯτʹόʔδϣϯ͕ॻ͔Ε͍ͯͳ͍ɻ͔͠ඍົʹݹ͍ɻ w 1PEͷதͰ1ZUIPOΛىಈͯ͠ௐΔͷ͕खͬऔΓૣ͍ɻ
HTVUJMͰσϓϩΠͰ͖Δ w "JSqPXϩʔΧϧϑΝΠϧΛϙʔϦϯά͢Δ͜ͱͰ%"(ఆٛϑΝΠϧΛݕग़͢Δɻ $MPVE$PNQPTFSͰ($4όέοτΛ($4'64&ͰϚϯτ͢Δ͜ͱͰ࣮ݱ͍ͯ͠ Δɻ w ($4ʹϑΝΠϧΛஔ͖͑͢͞Ε͍͍ͷͰɺHDMPVE4%,ͷTUPSBHFEBHTJNQPSUίϚϯ υΘͳ͍͍ͯ͘ɻ w $JSDMF$*͔ΒHTVUJMSTZODͰ؆୯ࣗಈσϓϩΠɻ
"JSqPX8FCϖʔδͷඈͼํ w ϒοΫϚʔΫͰ͍͍͕$PNQPTFS&OWJSPONFOUΛ࡞Γ͢ͱ63*͕มΘͬͯ͠·͏ɻ ͔ͱ͍ͬͯ$MPVE$POTPMFʹΞΫηε͢Δͷ໘ɻ w HDMPVE4%,Ͱ63*ΛऔಘͰ͖ΔͷͰɺͦΕΛPQFOίϚϯυʹ͍͍ͤɻ gcloud composer environments describe
$COMPOSER_NAME \ --format='get(config.airflowUri)’
"JSqPXͷόʔδϣϯΞοϓ͕ਏ͍ w ʹϕʔλػೳͱͯ͠ఏڙ։࢝͞Ε͕ͨɺυΩϡϝϯτͷ௨Γಈ͔ͳ͍ɻ w ҎલHJUIVCDPN(PPHMF$MPVE1MBUGPSNQZUIPOEPDTTBNQMFTʹೖ͍ͬͯΔ DPQZ@FOWJSPONFOUQZͱ͍͏εΫϦϓτΛ͏Α͏ʹͳ͍ͬͯͨɻ w DPQZ@FOWJSPONFOUQZΛಡΊҰԠԿΛͲ͜ʹίϐʔ͢Ε͍͍ͷ͔͔Δ͕ɺ "JSqPX͚ͩͰͳ͘LTྗͳ͍ͱ͔ͳΓݫ͍͠ɻ
$MPVE.FNPSZTUPSFʹଓͰ͖ͳ͍ w (,&͔Β.FNPSZTUPSFʢ3FEJTʣʹଓ͢ΔʹΫϥελ࡞࣌ʹ*1ΤΠϦΞεΛ༗ޮ ʹ͠ͳ͚Ε͍͚ͳ͍͕ɺ$MPVE$PNQPTFSແޮʹͯ͠࡞ͬͯ͠·͏ɻ ແޮͷ߹ΫϥελʹJQUBCMFTͷϧʔϧΛՃ͢Δඞཁ͕͋Δɻ w $MPVE$PNQPTFS͕ཧ͍ͯ͠ΔLTΫϥελΛ͋·Γ৮Γͨ͘ͳ͔ͬͨͷͰɺ౿Έͱ ͳΔ($&ΠϯελϯεΛཱͯͯղܾͨ͠ɻ from redis
import StrictRedis from sshtunnel import SSHTunnelForwarder with SSHTunnelForwarder((bastion_host, bastion_port), ssh_username="airflow", remote_bind_address=(redis_host, redis_port), local_bind_address=("127.0.0.1", local_port), allow_agent=False): client = StrictRedis(host="127.0.0.1", port=local_port) client.ping()
UFNQMBUF@FYU w ࢦఆͨ͠ͰऴΘΔจࣈྻΛϑΝΠϧύεͱͯ͠ղऍ͠ɺ࣮ମΛࢦఆ͞ΕͨϑΝΠϧͷத Λ+JOKBͰϨϯμϦϯάͨ݁͠ՌͰஔ͖͑ΔͱΜͰͳ͍ศརͳػೳɻ w υΩϡϝϯτແ͍͕͠Εͬͱ#BTF0QFSBUPSʹ࣮͞Ε͍ͯΔɻ with open("foo/bar.sql") as f:
sql = f.read() PythonOperator( template_dict={"sql": sql} # ... ) class SQLTemplateOperator(PythonOperator): template_ext = (".sql",) SQLTemplateOperator( template_dict={"sql": "foo/bar.sql"}, # ... ) IUUQTTUBDLPWFSqPXDPNB
IUUQTXXXqZXIFFMKQDBSFFST We are hiring ͨͷ͍͠Α