Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
こんな感じでデータパイプライン作ってます 2019年春
Search
Yuku TAKAHASHI
April 16, 2019
Technology
4.5k
2
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
こんな感じでデータパイプライン作ってます 2019年春
Yuku TAKAHASHI
April 16, 2019
More Decks by Yuku TAKAHASHI
See All by Yuku TAKAHASHI
入門書には載ってない Git & GitHub Tips
yuku
88
16k
Other Decks in Technology
See All in Technology
SONiCのLinuxベースを活かしたZabbix監視
sonic
0
150
やさしいA2A入門
minorun365
PRO
12
1.8k
2026 TECHFRESH 畢業分享會 - AI-Native 重塑軟體工程與虛擬講師
line_developers_tw
PRO
0
990
FinOps × AIエージェントで実現する コストインシデントの自動調査
oasis1994liveforever
0
130
Snowflakeと仲良くなる第一歩
coco_se
4
470
【NRUG vol.18】なぜ多くのオブザーバビリティ導入は失敗するのか
nrug_member
0
130
人材育成分科会.pdf
_awache
4
240
ルールやカスタム機能、どう活かす?ハンズオンで体感するIBM Bobの出力コントロール
muehara
1
150
2026TECHFRESH畢業分享會 - 葬送的通靈師:化系統與用戶雜訊成行動訊號
line_developers_tw
PRO
0
980
"何を作るか"を任される エンジニアは、どう育つのか
yutaokafuji
1
680
非定型業務をAI slackbotで自動化する ~ 社内要望を自動壁打ちするbotを作った ~/automating-ad-hoc-work-with-ai-slackbot
shibayu36
0
650
自宅LLMの話
jacopen
1
550
Featured
See All Featured
AI Search: Implications for SEO and How to Move Forward - #ShenzhenSEOConference
aleyda
1
1.3k
The Curse of the Amulet
leimatthew05
1
13k
AI: The stuff that nobody shows you
jnunemaker
PRO
8
710
XXLCSS - How to scale CSS and keep your sanity
sugarenia
250
1.3M
Highjacked: Video Game Concept Design
rkendrick25
PRO
1
390
Documentation Writing (for coders)
carmenintech
77
5.4k
Creating an realtime collaboration tool: Agile Flush - .NET Oxford
marcduiker
35
2.5k
Stewardship and Sustainability of Urban and Community Forests
pwiseman
0
230
How GitHub (no longer) Works
holman
316
150k
コードの90%をAIが書く世界で何が待っているのか / What awaits us in a world where 90% of the code is written by AI
rkaga
62
44k
Exploring the Power of Turbo Streams & Action Cable | RailsConf2023
kevinliebholz
37
6.5k
Breaking role norms: Why Content Design is so much more than writing copy - Taylor Woolridge
uxyall
0
320
Transcript
͜Μͳײ͡ͰσʔλύΠϓϥΠϯ࡞ͬͯ·͢ य़ !ZVLV@U %BUB1JQFMJOF$BTVBM5BML
∁ڮါٱ UXJUUFSDPNZVLV@U HJUIVCDPNZVLV RJJUBDPNZVLV@U 4PGUXBSF&OHJOFFS!'-:8)&&- '-:8)&&-ͰϨίϝϯυΤϯδϯͱ ͦͷͨΊͷύΠϓϥΠϯΛ։ൃ͍ͯ͠·͢ɻ ݄ΑΓݱ৬ɻ
ࠓͷ w σʔλύΠϓϥΠϯͬͯͦͦͳΜ͚ͩͬ w ϲ݄ؒύΠϓϥΠϯΛ࡞͖ͬͯͯͷॴײ w υΩϡϝϯτʹࡌͬͯͳ͍$MPVE$PNQPTFS5JQT
SOLUTION DATA DATA PIPELINE
w ༷ʑͳྺ࢙తܦҢͰग़དྷ্͕ͬͨ ෳࡶͰஅยԽͨ͠σʔλ w ϩά͕͋ΔΑ͏ͳͳ͍Α͏ͳ w ෆ҆ఆͳετϨʔδ DATA DATA PIPELINE
w ͍ͬ͢͝ਓೳ͕৭ʑͳΛ ͍͍ײ͡ʹղܾͯ͘͠ΕΔͭ SOLUTION DATA PIPELINE
$0--&$5 .07&4503& &91-03&53"/4'03. "((3&("5&-"#&- -&"3/015*.*;& "*%&&1-&"3/*/( IUUQTIBDLFSOPPODPNUIFBJIJFSBSDIZPGOFFETGGDD 5)&%"5"4$*&/$& )*&3"3$):0'/&&%4 ਏ͍
ָ͍͠
CLIENT SOLUTION w ϨίϝϯυγεςϜ w ྲྀ࠷దԽ w ݕࡧΤϯδϯ w ࠂ৴γεςϜ
w ͳͲ w ෦ʹΔେྔͷσʔλΛ׆༻ ͍ͨ͠ w ΞΠσΞΛ࣮ݱͰ͖Δਓࡐ͕͍ ͳ͍
'-:8)&&-㱠σʔλੳ4BB4 w σʔλΛूΊΔͱ͜Ζ͔ΒιϦϡʔγϣϯΛ࡞Δͱ͜Ζ·Ͱ w ΫϥΠΞϯτͷ"84ΞΧϯτ($1ϓϩδΣΫτʹιϑτΣΞΛల։͍ͯ͠ Δɻ w ٻΊΔػೳ͕ͦ͜ʹ͋Δͱ͔͍ͬͯͯɺ֎෦ʹσʔλΛग़͢͜ͱ͕Ͱ͖ͳ͍اۀ ۀछ͕ଘࡏ͢ΔɻʢओʹͰ͔͍ͬاۀʣ w
ϚϧνΫϥυڥԼͰ͍͔ʹιϑτΣΞࢿ࢈Λங͖ɺύΠϓϥΠϯͰܨ͍Ͱ͍͔͕͘ ࠓޙͷେ͖ͳٕज़తͳνϟϨϯδͷҰͭɻ
ϚϧνΫϥυύΠϓϥΠϯʂʂʂ
ϚϧνΫϥυύΠϓϥΠϯʂʂʂ γΣϧεΫϦϓτ͔Βͷ٫
γΣϧεΫϦϓτʹΑΔύΠϓϥΠϯ w ΈΜͳੲγΣϧεΫϦϓτͩͬͨ w DSPOͰఆظ࣮ߦ͞ΕΔγΣϧεΫϦϓτ͔Β࣮ߦ͞ΕΔ1ZUIPOεΫϦϓτ܈ w ਏ͞ w ෳࡶԽ͍ͯ͘͠ύΠϓϥΠϯʹରԠ͖͠Εͳ͍ w
͍ͭͲͷλεΫ͕ࣦഊͨ͠ͷ͔͔Βͳ͍ w ࣦഊͨ͠λεΫTTIͯ͠࠶࣮ߦ
CLIENT DATA ϨίϝϯυύΠϓϥΠϯWʢγΣϧεΫϦϓτʣ JOINED DATA RECOMMENDER SYSTEM
CLIENT DATA DATA LAKE DATA WAREHOUSE DATA MART DATA SCIENTISTS
ϨίϝϯυύΠϓϥΠϯWʢ$MPVE$PNQPTFSʣ RECOMMENDER SYSTEM
جຊํ w σʔλج൫ͷྨͱਐԽతσʔλϞσϦϯά w %"5"-",&ˠ%"5"8"3&)064&ˠ%"5"."35 w σʔλ#JH2VFSZʹੵ͠ɺۃྗ42-Λ͏ɻ42-Ͱ͍͠ͱ͜Ζ͚ͩ$MPVE %BUBQSPDʢ4QBSLʣΛ࣮ͬͯ͢Δɻ w ϫʔΫϑϩʔ$MPVE$PNQPTFSʢ"JSqPXʣͰཧ͢Δɻ
#JH2VFSZͷ̏֊ w %"5"-",& w ΫϥΠΞϯτ͔Βఏڙ͞ΕΔੜσʔλΛੵ͢Δ w %"5"8"3&)064& w அยԽͨ͠σʔλΛ෮ݩɾඇਖ਼نԽɺ໋໊نଇʹҰ؏ੑΛͨͤΔɺ/6--Λഉআͯ͠ར༻͠ ͘͢͢ΔɺͳͲ
w ΫϥΠΞϯτͷσʔλੳνʔϜʹఏڙ͢Δ͜ͱ w %"5"."35 w $MPVE%BUBMBCͳͲͷ#*πʔϧ͔Βࢀর͢Δ w ఏڙ͍ͯ͠ΔϨίϝϯυγεςϜͷޮՌଌఆͱ͔
γΣϧεΫϦϓτஔ͖͑ਐΊͯΈͯ w "JSqPX͕ͲΜͳʹਏͯ͘ੲΛࢥ͍ग़ͤؤுΕΔ w 8FC6*͕͍͖ͭͯͯخ͍͠ w ؆୯ʹ࠶࣮ߦͰ͖ΔΑ͏ʹͳͬͯλεΫͷႈੑΛҙࣝ͢ΔΑ͏ʹͳͬͨ w ʢσʔλΛอଘ͢Δͱ͜Ζ͔Βίϯαϧ͠ͳ͍ͱμϝͳͷͰʜʁʣ
υΩϡϝϯτʹࡌͬͯͳ͍ $MPVE$PNQPTFSͷݟ
͚ͬ͜͏(,<ྗΛٻΊΒΕΔ w $MPVE$PNQPTFS(,&ͷ্ʹσϓϩΠ͞ΕΔϑϧϚωʔδυ"JSqPXαʔϏεɻ ࠔͬͨ࣌ʹ(,&ܦ༝Ͱ"JSqPXʹଓͯ͠σόοάͨ͠Γ͢Δඞཁ͕͋ΔͷͰɺ (,&ͱLTʹ͍ͭͯͷجૅࣝ͘Β͍͍࣋ͬͯͳ͍ͱͭΒ͍ɻ w ͱΓ͋͑ͣLVCFDUMΛηοτΞοϓ͓ͯ͘͠ɻ GKE_CLUSTER="$(gcloud composer environments
describe $COMPOSER_NAME \ --format='get(config.gkeCluster)')" GKE_LOCATION="$(gcloud composer environments describe $COMPOSER_NAME \ --format='get(config.nodeConfig.location)')" gcloud container clusters get-credentials $GKE_CLUSTER \ --zone $GKE_LOCATION
ϝϞϦ͕Γͳ͍ͱͬͯࢮ͵ w ϩάΛు͔ͣʹλεΫ͕ࣦഊ͢Δͱ͖ɺϝϞϦෆͰBJSqPXXPSLFS͝ͱLTʹࡴ͞Ε ͍ͯΔՄೳੑ͕͋Δɻ w ,VCFSOFUFT&OHJOF8PSLMPBETBJSqPXXPSLFSͰ&WJDUFEͳ1PEΛબ͢Δ͜ͱͰ ࢮҼΛ֬ೝՄೳɻ w OTUBOEBSEͩͱ"1*Λݺͼग़͚ͩ͢ͷ1ZUIPO0QFSBUPSͰكʹࡴ͞ΕΔɻ w
&WJDUFEͳ1PEΛҰ͢ΔίϚϯυͰఆظతʹ($͢Δɻ kubectl get pods -l run=airflow-worker \ | grep Evicted \ | awk '{print $1}' \ | xargs kubectl delete pod
1ZUIPOύοέʔδ͕ඍົʹݹ͍ w $MPVE$PNQPTFSͷBJSqPXXPSLFSʹ͍͔ͭ͘ͷ1ZUIPOύοέʔδ͕Πϯετʔϧ͞ Ε͍ͯΔ͕ɺυΩϡϝϯτʹόʔδϣϯ͕ॻ͔Ε͍ͯͳ͍ɻ͔͠ඍົʹݹ͍ɻ w 1PEͷதͰ1ZUIPOΛىಈͯ͠ௐΔͷ͕खͬऔΓૣ͍ɻ
HTVUJMͰσϓϩΠͰ͖Δ w "JSqPXϩʔΧϧϑΝΠϧΛϙʔϦϯά͢Δ͜ͱͰ%"(ఆٛϑΝΠϧΛݕग़͢Δɻ $MPVE$PNQPTFSͰ($4όέοτΛ($4'64&ͰϚϯτ͢Δ͜ͱͰ࣮ݱ͍ͯ͠ Δɻ w ($4ʹϑΝΠϧΛஔ͖͑͢͞Ε͍͍ͷͰɺHDMPVE4%,ͷTUPSBHFEBHTJNQPSUίϚϯ υΘͳ͍͍ͯ͘ɻ w $JSDMF$*͔ΒHTVUJMSTZODͰ؆୯ࣗಈσϓϩΠɻ
"JSqPX8FCϖʔδͷඈͼํ w ϒοΫϚʔΫͰ͍͍͕$PNQPTFS&OWJSPONFOUΛ࡞Γ͢ͱ63*͕มΘͬͯ͠·͏ɻ ͔ͱ͍ͬͯ$MPVE$POTPMFʹΞΫηε͢Δͷ໘ɻ w HDMPVE4%,Ͱ63*ΛऔಘͰ͖ΔͷͰɺͦΕΛPQFOίϚϯυʹ͍͍ͤɻ gcloud composer environments describe
$COMPOSER_NAME \ --format='get(config.airflowUri)’
"JSqPXͷόʔδϣϯΞοϓ͕ਏ͍ w ʹϕʔλػೳͱͯ͠ఏڙ։࢝͞Ε͕ͨɺυΩϡϝϯτͷ௨Γಈ͔ͳ͍ɻ w ҎલHJUIVCDPN(PPHMF$MPVE1MBUGPSNQZUIPOEPDTTBNQMFTʹೖ͍ͬͯΔ DPQZ@FOWJSPONFOUQZͱ͍͏εΫϦϓτΛ͏Α͏ʹͳ͍ͬͯͨɻ w DPQZ@FOWJSPONFOUQZΛಡΊҰԠԿΛͲ͜ʹίϐʔ͢Ε͍͍ͷ͔͔Δ͕ɺ "JSqPX͚ͩͰͳ͘LTྗͳ͍ͱ͔ͳΓݫ͍͠ɻ
$MPVE.FNPSZTUPSFʹଓͰ͖ͳ͍ w (,&͔Β.FNPSZTUPSFʢ3FEJTʣʹଓ͢ΔʹΫϥελ࡞࣌ʹ*1ΤΠϦΞεΛ༗ޮ ʹ͠ͳ͚Ε͍͚ͳ͍͕ɺ$MPVE$PNQPTFSແޮʹͯ͠࡞ͬͯ͠·͏ɻ ແޮͷ߹ΫϥελʹJQUBCMFTͷϧʔϧΛՃ͢Δඞཁ͕͋Δɻ w $MPVE$PNQPTFS͕ཧ͍ͯ͠ΔLTΫϥελΛ͋·Γ৮Γͨ͘ͳ͔ͬͨͷͰɺ౿Έͱ ͳΔ($&ΠϯελϯεΛཱͯͯղܾͨ͠ɻ from redis
import StrictRedis from sshtunnel import SSHTunnelForwarder with SSHTunnelForwarder((bastion_host, bastion_port), ssh_username="airflow", remote_bind_address=(redis_host, redis_port), local_bind_address=("127.0.0.1", local_port), allow_agent=False): client = StrictRedis(host="127.0.0.1", port=local_port) client.ping()
UFNQMBUF@FYU w ࢦఆͨ͠ͰऴΘΔจࣈྻΛϑΝΠϧύεͱͯ͠ղऍ͠ɺ࣮ମΛࢦఆ͞ΕͨϑΝΠϧͷத Λ+JOKBͰϨϯμϦϯάͨ݁͠ՌͰஔ͖͑ΔͱΜͰͳ͍ศརͳػೳɻ w υΩϡϝϯτແ͍͕͠Εͬͱ#BTF0QFSBUPSʹ࣮͞Ε͍ͯΔɻ with open("foo/bar.sql") as f:
sql = f.read() PythonOperator( template_dict={"sql": sql} # ... ) class SQLTemplateOperator(PythonOperator): template_ext = (".sql",) SQLTemplateOperator( template_dict={"sql": "foo/bar.sql"}, # ... ) IUUQTTUBDLPWFSqPXDPNB
IUUQTXXXqZXIFFMKQDBSFFST We are hiring ͨͷ͍͠Α