Upgrade to Pro — share decks privately, control downloads, hide ads and more …

How Presto is used by CDP

How Presto is used by CDP

yui-knk

June 04, 2019
Tweet

More Decks by yui-knk

Other Decks in Programming

Transcript

  1. ࣗݾ঺հ • Yuichiro Kaneko • Arm Treasure Data • CDP

    team (Writing Rails application) • CRuby Committer 2015/12~ • GitHub (yui-knk)
  2. 1MB[NB%# • ࣮σʔλ + ϝλσʔλ • σϑΥϧτͰ͸time columnͰύʔςΟγϣϯ͞Ε͍ͯΔ • https://www.slideshare.net/treasure-data/td-techplazma

    • https://qiita.com/xerial/items/959d2b928353b595f31e • User Defined Partitioning (UDP)Λࢦఆ͢Δ͜ͱ΋Մೳ • https://support.treasuredata.com/hc/en-us/articles/360009798714-User-Defined- Partitioning-for-Presto
  3. ސ٬Ϛελʔ ଐੑσʔλ webϩά ߪങཤྺ ΞϓϦϩά customers table behaviors tables ηάϝϯτ

    உੑ ੡඼AΛ2ͭҎ্ߪೖ ΞϓϦΛ࢖͍ͬͯΔ ࠷ۙαΠτΛ๚Εͨ Profiles API Activation
  4. ސ٬Ϛελʔ ଐੑσʔλ webϩά ߪങཤྺ ΞϓϦϩά customers table behaviors tables ηάϝϯτ

    உੑ ੡඼AΛ2ͭҎ্ߪೖ ΞϓϦΛ࢖͍ͬͯΔ ࠷ۙαΠτΛ๚Εͨ Profiles API Activation
  5. ސ٬Ϛελʔ ଐੑσʔλ webϩά ߪങཤྺ ΞϓϦϩά customers table behaviors tables ηάϝϯτ

    உੑ ੡඼AΛ2ͭҎ্ߪೖ ΞϓϦΛ࢖͍ͬͯΔ ࠷ۙαΠτΛ๚Εͨ Profiles API Activation
  6. +40/͔ΒΫΤϦΛੜ੒ { "conditions": [ { "array_matching": null, "left_value": { "name":

    "country" }, "operator": { "not": false, "right_value": "japan", "type": "Equal" }, "type": "Value" }, { "array_matching": null, "left_value": { "aggregation": { "grouping_columns": [], "type": "Count" }, "filter": { "conditions": [ { "array_matching": null, "column": “city”,… select a."cdp_customer_id" from "cdp_audience_21821"."customers" a where a."country" = 'japan' and a."cdp_customer_id" in ( select _r1."cdp_customer_id" from "cdp_audience_21821"."behavior_behavior_1" _r1 where _r1."city" = 'Mexico City9' ) -- set session distributed_join = 'true'
  7. αϯϓϧσʔλ • customers: 10,000,000Ϩίʔυ • behavior_1: 49,006,085Ϩίʔυ +-----------------+-------+---------+--------+ | cdp_customer_id

    | email | country | time | +-----------------+-------+---------+--------+ | 479c0e0f-c2ed.. | a@... | japan | 583200 | | 7ba0998c-774f.. | b@... | japan | 583201 | | 65bd4227-000e.. | c@... | usa | 583202 | +-----------------+-------+---------+--------+ +-----------+-------+-----------------+--------+ | item_name | count | cdp_customer_id | time | +-----------+-------+-----------------+--------+ | hci | 3 | 479c0e0f-c2ed.. | 583200 | | eml | 9 | 7ba0998c-774f.. | 583201 | | llu | 10 | 65bd4227-000e.. | 583202 | +-----------+-------+-----------------+--------+
  8. αϯϓϧηάϝϯτͷΫΤϦ select a."cdp_customer_id" from "kaneko_reprotalk_sample"."customers" a where ( select count(*)

    from "kaneko_reprotalk_sample"."behavior_1" _r1 where _r1."cdp_customer_id" = a."cdp_customer_id" ) >= 2 -- set session distributed_join = 'true'
  9. ৄ͘͠͸ • “PrestoͰ࣮ݱ͢ΔΠϯλϥΫςΟϒΫΤϦ - dbtech showcase 2014 Tokyo” • https://www.slideshare.net/treasure-data/

    2014-1113prestodbtechshowcase • “Presto: SQL on Everything” • https://research.fb.com/publications/presto-sql-on-everything/
  10. ໰୊  ϝϞϦ select a."cdp_customer_id" from "kaneko_reprotalk_sample"."customers" a where (

    select count(*) from "kaneko_reprotalk_sample"."behavior_1" _r1 where _r1."cdp_customer_id" = a."cdp_customer_id" ) >= 2 -- set session distributed_join = 'true' https://support.treasuredata.com/hc/en-us/articles/360001450908-Presto-Performance-Tuning
  11. ໰୊  ϝϞϦ • Without distributed_join • Duration: 1 min

    • Result count: 9,962,475 records, 1 column • Peak memory: 7.53GB • With distributed_join • Duration: 1 min • Result count: 9,962,475 records, 1 column • Peak memory: 1.81GB
  12. EJTUSJCVUFE@KPJO • Distributed joins require redistributing both tables using a

    hash of the join key. This can be slower (sometimes substantially) than broadcast joins, but allows much larger joins. Broadcast joins require that the tables on the right side of the join after filtering fit in memory on each node, whereas distributed joins only need to fit in distributed memory across all nodes. This can also be specified on a per-query basis using the distributed_join session property. https://github.com/prestosql/presto/commit/506f45764040322d244ee6487b12e313b9a9d00f
  13. 8JUIPVUEJTUSJCVUFE@KPJO 11 tasks Stage 1 1 task Stage 0 18

    tasks Stage 2 11 tasks Stage 3 1 task Stage 4
  14. 8JUIPVUEJTUSJCVUFE@KPJO Stage 3: In 49,006,085 / Out 48,652,188 Output partitioning:

    HASH - Aggregate(PARTIAL)[cdp_customer_id_0] - ScanProject[table behavior_1] Stage 2: In 48,652,188 / Out 9,997,841 Output partitioning: BROADCAST [] - Aggregate(FINAL)[cdp_customer_id_0] - RemoteSource[3]
  15. 8JUIPVUEJTUSJCVUFE@KPJO • 9,997,841 * 11 = 109,976,251 Stage 1: In

    119,976,262 / Out 9,962,475 Output partitioning: SINGLE [] - FilterProject(>= BIGINT '2') - CrossJoin - LeftJoin - ScanProject[table customers] - LocalExchange - RemoteSource[2]: Output: 109,976,251
  16. 8JUIEJTUSJCVUFE@KPJO 18 tasks Stage 1 1 task Stage 0 11

    tasks Stage 2 9 task Stage 3 1 task Stage 4
  17. 8JUIEJTUSJCVUFE@KPJO Stage 3: In 49,006,085 / Out 48,651,697 Output partitioning:

    HASH - Aggregate(PARTIAL)[cdp_customer_id_0] - ScanProject[table behavior_1] Stage 2: In 10,000,000 / Out 10,000,000 Output partitioning: HASH - ScanProject[table customers]
  18. 8JUIEJTUSJCVUFE@KPJO Stage 1: In 58,651,715 / Out 9,962,475 Output partitioning:

    SINGLE [] - FilterProject(>= BIGINT '2') - CrossJoin - LeftJoin - RemoteSource[2]: Output: 10,000,000 - Aggregate(FINAL)[cdp_customer_id_0] - RemoteSource[3]: Output: 48,651,697
  19. 6TFS%FpOFE1BSUJUJPOJOH • All tables in Treasure Data are partitioned based

    on the time column. … User-defined partitioning (UDP) provides hash partitioning for a table on one or more columns in addition to the time column. https://support.treasuredata.com/hc/en-us/articles/360009798714-User-Defined-Partitioning- for-Presto
  20. 6TFS%FpOFE1BSUJUJPOJOH • With distributed_join / Without UDP • Duration: 1

    min • Result count: 9,962,475 records, 1 column • Peak memory: 1.81GB • With distributed_join / With UDP • Duration: 4 mins • Result count: 9,962,475 records, 1 column • Peak memory: 982.00MB
  21. 8JUI6%1 18 tasks Stage 1 1 task Stage 0 18

    tasks Stage 2 1 task Stage 3
  22. 8JUI6%1 Stage 2: In 49,006,085 / Out 9,997,841 Output partitioning:

    td-presto:[cdp_customer_id]/512 [cdp_customer_id_0] - Aggregate[cdp_customer_id_0] - ScanProject[table behavior_1]
  23. 8JUI6%1 Stage 1: In 19,997,859 / Out 9,962,475 Output partitioning:

    SINGLE [] - FilterProject(>= BIGINT '2') - CrossJoin - LeftJoin - ScanProject[table customers_udp] - RemoteSource[2]: Output: 9,997,841
  24. select a."cdp_customer_id" from "cdp_audience_21597"."customers" a where ( ( select count(*)

    from "cdp_audience_21597"."behavior_weblogs" _r1 where _r1."cdp_customer_id" = a."cdp_customer_id" ) > 2 ) or ( ( select count(*) from "cdp_audience_21597"."behavior_weblogs" _r2 where _r2."td_ip_city_latitude" = 0 and _r2."cdp_customer_id" = a."cdp_customer_id" ) > 3 )