Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Repro における Presto Cassandra Connector 改造秘話 / Pr...

Repro における Presto Cassandra Connector 改造秘話 / Presto Conference Tokyo 2020

https://techplay.jp/event/795265 の発表資料です

Takeshi Arabiki

November 20, 2020
Tweet

More Decks by Takeshi Arabiki

Other Decks in Technology

Transcript

  1. Repro ʹ͓͚Δ Presto Cassandra Connector վ଄ൿ࿩ Presto Conference Tokyo 2020

    (2020-11-20) Reproגࣜձࣾ Takeshi Arabiki (@a_bicky)
  2. • Twitter: @a_bicky • Blog: ͋Βͼ͖೔ه • ॴଐ: Repro גࣜձࣾ

    (2017 ೥ 8 ݄ʙ) • SRE ͬΆ͍ʢʁʣνʔϜॴଐ • σʔλऩूɾॲཧؔ࿈౳ͷػೳ։ൃɾΞʔΩςΫνϟ࡮৽ͨ͠Γ • Presto, Cassandra, Kafka, Fluentd ౳ͷ໘౗ΛݟͨΓ • AWS ౳ͷΠϯϑϥपΓͷ໘౗ΛݟͨΓ • etc. ͳνʔϜ ࣗݾ঺հ
  3. • Repro ʹ͓͚Δ Presto ͷ׆༻Օॴ • Presto Cassandra Connector ͷߴ଎Խ

    • վ଄൛ Presto Cassandra Connector ͷ EMR ΁ͷಋೖ ΞδΣϯμ
  4. • ഑৴ର৅৚݅ʢϢʔβηάϝϯςʔγϣϯʣ͔Β SQL Λػցతʹੜ੒ • ಛఆͷ SQL Λ࠷దԽ͢Δ͜ͱ͕೉͍͠ • ϓογϡ௨஌ͷ഑৴௚લʹ഑৴ର৅ϢʔβΛܾఆ

    • ௕͔͔࣌ؒΔ SQL ͕ൃߦ͞ΕΔͱ༧ఆͷ഑৴࣌ؒʹؒʹ߹Θͳ͍ • Repro Λಋೖ͍ͯ͠ΔαʔϏεʹΑͬͯσʔλͷن໛΋ಛੑ΋ҟͳΔ • ಉ͡Α͏ͳ഑৴৚݅Ͱ΋αʔϏεʹΑͬͯ SQL ͷ࣮ߦ͕࣌ؒେ͖͘ҟͳΔ ಛघͳ Presto ͷ༻్
  5. ϓογϡ௨஌ؔ࿈ͷΞʔΩςΫνϟུ֓ਤ event data processor scheduled job ᶃ Insert data ᶄ

    SQL ᶅ CQL ᶆ Data ᶇ user IDs etc. ᶈ Invoke push notification application
  6. Cassandra ಋೖཧ༝ http://joker1007.hatenablog.com/entry/2018/06/29/201400 • ॻ͖ࠐΈͷεέʔϥϏϦςΟΛ୲อ͍ͨ͠ • MySQL ͩͱ sharding ͕ඞཁʹͳΔ͠؆୯ʹ

    writer Λ૿΍ͤͳ͍ • σʔλΛߋ৽͍ͨ͠ • Hive ͸ߋ৽Ͱ͖ͳ͍ͷͰ SQL Λ޻෉ͯ͠࠷৽ͷϨίʔυͷ஋Λऔಘ͢Δඞཁ͕͋Δ • σʔλͷ൓өΛ΄΅ϦΞϧλΠϜͰߦ͍͍ͨ • Hive ͩͱ͋Δఔ౓σʔλΛ·ͱΊ͔ͯΒ HDFS, S3 ౳ʹॻ͖ࠐ·ͳ͍ͱඇޮ཰ • Presto ͔ΒେྔͷσʔλΛߴ଎ʹऔಘ͍ͨ͠ • MySQL ͩͱ 1 worker ͚ͩͰσʔλΛऔಘ͢Δ͜ͱʹͳΓ௿଎ • Cassandra ͷ༻్ʹ͸޲͍͍ͯͳ͍͕ MySQL ʹൺ΂Δͱߴ଎
  7. Bucketing ʹΑΔ Cassandra ͷෛՙ෼ࢄ CREATE TABLE main.event_counts ( service_id bigint,

    event_id bigint, dt text, bucket int, user_id bigint, occurrence bigint, PRIMARY KEY ((service_id, event_id, dt, bucket), user_id) ); partition key murmur3(user_id) % bucket num
  8. ഑৴৚݅ʹϚον͢ΔϢʔβͷநग़ͷྫ SELECT user_id FROM ( SELECT user_id FROM cassandra.main.event_counts WHERE

    service_id = 1 AND event_id = 2 AND bucket IN (0, 1, ...) AND dt IN ('2020-11-18', '2002-11-19', '2020-11-20') AND occurrence >= 1 ) AS condition_0 LEFT OUTER JOIN ( SELECT user_id FROM cassandra.main.event_counts WHERE service_id = 1 AND event_id = 3 AND bucket IN (0, 1, ...) AND dt IN ('2020-11-18', '2002-11-19', '2020-11-20') AND occurrence >= 1 ) AS condition_1 ON condition_0.user_id = condition_1.user_id WHERE condition_1.user_id IS NULL; -- 08_ΞΠςϜߪೖը໘Λ 3 ೔Ҏ಺ʹ 1 ճҎ্࣮ߦͨ͠ -- ߪೖΛ 3 ೔Ҏ಺ʹ 1 ճҎ্࣮ߦͨ͠ -- AND NOT
  9. ஗͍ Planning: ͋Δ SQL ͷ౷ܭ৘ใ { "elapsed_time": "80620", "execution_time": "6833",

    "distributed_planning_time": "9", "analysis_time": "73698" } 73 ඵʂʂ
  10. ݪҼௐࠪ: σόοάϩάͷ֬ೝ • /path/to/log.properties ʹ com.facebook.presto=DEBUG Λ௥Ճͯ͠࠶ىಈ • prestosql ൛ͷ৔߹͸

    io.prestosql=DEBUG • JMX ΤʔδΣϯτ͕༗ޮͰ͋Ε͹࠶ىಈͤͣʹมߋՄೳ • cf. Amazon Elastic MapReduce (EMR) Ͱ͸͡ΊΔ Presto ೖ໳#σόοάϩάͷ༗ޮԽ 2018-09-19T11:03:24.824Z ...QueryStateMachine Query .... is PLANNING 2018-09-19T11:03:27.325Z ...CassandraPartitionManager... #partitions: 512 2018-09-19T11:03:28.795Z ...CassandraPartitionManager... #partitions: 256 2018-09-19T11:03:30.449Z ...CassandraPartitionManager... #partitions: 512 2018-09-19T11:03:38.877Z ...CassandraPartitionManager... #partitions: 2048 ... 2018-09-19T11:04:38.419Z ...CassandraPartitionManager... #partitions: 256
  11. ιʔείʔυϦʔσΟϯά (Presto 0.203) • CassandraPartitionManager#getCassandraPartitions • WHERE ۟ʹࢦఆ͞ΕͨύʔςΟγϣϯͷ૊Έ߹Θͤͷ਺͚ͩ NativeCassandraSession#getPartitions Λݺͼग़͢

    • cf. CassandraPartitionManager.java#L125-L127 • NativeCassandraSession#getPartitions • ࢦఆ͞ΕͨύʔςΟγϣϯͷ༗ແΛ SELECT DISTINCT ... Λ࣮ߦ͢Δ͜ͱͰ֬ೝ • cf. NativeCassandraSession.java#L418-L420
  12. ιʔείʔυϦʔσΟϯά (Presto 0.203) • CassandraPartitionManager#getCassandraPartitions • WHERE ۟ʹࢦఆ͞ΕͨύʔςΟγϣϯͷ૊Έ߹Θͤͷ਺͚ͩ NativeCassandraSession#getPartitions Λݺͼग़͢

    • cf. CassandraPartitionManager.java#L125-L127 • NativeCassandraSession#getPartitions • ࢦఆ͞ΕͨύʔςΟγϣϯͷ༗ແΛ SELECT DISTINCT ... Λ࣮ߦ͢Δ͜ͱͰ֬ೝ • cf. NativeCassandraSession.java#L418-L420 WHERE 句に指定されたパーティションの 組み合わせの数だけ SELECT DISTINCT を発⾏
  13. ύʔςΟγϣϯͷ૊Έ߹Θͤྫ SELECT user_id FROM cassandra.main.event_counts WHERE service_id = 1 AND

    event_id = 2 AND dt IN ('2020-11-18', '2020-11-19', '2020-11-20') AND bucket IN (0, 1, ..., 63) AND occurrence >= 1; SELECT DISTINCT service_id, event_id, dt, bucket FROM main.event_counts WHERE service_id = 1 AND event_id = 2 AND dt = '2020-11-18' AND bucket = 0; SELECT DISTINCT service_id, event_id, dt, bucket FROM main.event_counts WHERE service_id = 1 AND event_id = 2 AND dt = '2020-11-19' AND bucket = 0; ... 1 x 1 x 3 x 64 = 192 queries Presto SQL CQL NativeCassandraSession#getPartitions
  14. Presto ͷ Planningʢ࠶ܝʣ https://www.oreilly.com/library/view/presto-the-definitive/9781492044260/ Figure 4-1. Presto architecture overview with

    coordinator and workers • Planning ʹ͸ύʔςΟγϣϯ৘ใ͕ඞཁ • Hive ͸ Hive metastore ʹύʔςΟγϣϯ৘ใͳͲΛอ͍࣋ͯ͠Δ • Hive connector ͸ Hive metastore Λར༻͢Δ͜ͱͰύʔςΟγϣϯͷ༗ແΛ֬ೝ • Cassandra ͸ύʔςΟγϣϯͷ༗ແʹ͍ͭͯͷ৘ใΛอ͍࣋ͯ͠ͳ͍ • Cassandra connector ͸࣮ࡍʹ CQL Λ౤͛Δ͜ͱͰ༗ແΛ֬ೝ
  15. IN ԋࢉࢠΛ࢖ͬͨ࠷దԽ (Presto 0.204) https://github.com/prestodb/presto/pull/10720 SELECT DISTINCT service_id, event_id, dt,

    bucket FROM main.event_counts WHERE service_id = 1 AND event_id = 2 AND dt IN ('2020-11-18', '2020-11-19', '2020-11-20') AND bucket IN (0, 1, ..., 63); CQL
  16. IN ԋࢉࢠΛ࢖ͬͨ࠷దԽ (Presto 0.204) https://github.com/prestodb/presto/pull/10720 SELECT DISTINCT service_id, event_id, dt,

    bucket FROM main.event_counts WHERE service_id = 1 AND event_id = 2 AND dt IN ('2020-11-18', '2020-11-19', '2020-11-20') AND bucket IN (0, 1, ..., 63); CQL 3x faster in our use cases but not enough!!
  17. • Bucketed ςʔϒϧͰύʔςΟγϣϯͷଘࡏ༗ແΛ֬ೝ͢Δҙຯ͕΄ͱΜͲͳ͍ • Ϩίʔυ਺͕ଟ͍Πϕϯτ౳͸Ͳͷ bucket ͷύʔςΟγϣϯ΋ଘࡏ͢ΔՄೳੑ͕ߴ͍ • Ϩίʔυ਺͕গͳ͍Πϕϯτ౳͸࠷దԽ͠ͳͯ͘΋े෼ߴ଎ •

    WHERE ۟ʹࢦఆ͞ΕͨύʔςΟγϣϯ͸શͯଘࡏ͢Δ΋ͷͱΈͳͤ͹Ұॠ • ύʔςΟγϣϯͷଘࡏ༗ແΛ֬ೝ͢ΔॲཧΛεΩοϓ͢ΔΦϓγϣϯΛ௥Ճ • prestodb/presto Λ fork ͯ͠վ଄ͨ͠ presto-cassandra.jar Λຊ൪Ͱ࢖༻ Planning ߴ଎ԽͷΞΠσΞ
  18. վ଄൛ Presto Cassandra Connector ͷޮՌ skip-partition-check Analysis time (ms) CPU

    time (ms) false 22,829 6,946 true 1,060 8,057 ※ com.facebook.presto.spi.eventlistener.QueryStatistics ͔ΒಘΒΕΔϝτϦΫε
  19. • ໰୊఺ • JVM ͷઃఆΛมߋͰ͖ͳ͍ • Resource groups ͷઃఆΛมߋͰ͖ͳ͍ •

    Plugin ͷಋೖ΍ event-listener ͷઃఆ͕Ͱ͖ͳ͍ • Connector ʹಠࣗͷ jar Λ࢖͑ͳ͍ • ղܾࡦ • Bootstrap actions ͰؤுΔ • ΧελϜ AMI Λ࢖͏ EMR Ͱ Presto ΛΧελϚΠζ͢Δࡍͷ໰୊఺ͱղܾࡦ
  20. • Presto ౳ΛΠϯετʔϧ͢Δલʹ࣮ߦ͢ΔεΫϦϓτ • Bootstrap actions ͕ऴΘΒͳ͍ͱΠϯετʔϧ͕࢝·Βͳ͍ • Presto ΛΠϯετʔϧͨ͠ޙʹεΫϦϓτΛ࣮ߦ͢Δ͜ͱ͸Ͱ͖ͳ͍

    Bootstrap actions # Execute the script in background and exit immediately # because Presto is not installed until bootstrap actions end # cf. https://forums.aws.amazon.com/thread.jspa?threadID=220183 if [ $(whoami) != "root" ]; then sudo "$0" "$@" & exit 0 fi # Commands to execute after installing Presto is here
  21. վ଄൛ Presto Cassandra Connector ͷઃஔ until curl -s localhost:8889/v1/info; do

    sleep 1; done presto_ver=$(curl -s localhost:8889/v1/info | jq -r '.nodeVersion.version') plugin_dir=/usr/lib/presto/plugin presto_cassandra=$plugin_dir/cassandra/presto-cassandra-$presto_ver.jar mv $presto_cassandra $presto_cassandra.orig aws s3 cp $s3_uri/plugin/ $plugin_dir/ --recursive if [ ! -f $presto_cassandra ]; then echo "$presto_cassandra doesn't exist. Please upload it to S3." >&2 exit 1 fi # Don't specify this prop in EMR configurations to avoid UNUSED property error echo "cassandra.skip-partition-check=true" \ >> /etc/presto/conf/catalog/cassandra.properties
  22. Background process ͷҟৗऴྃݕ஌ onexit() { exit_code=$? [ $exit_code -eq 0

    ] && return payload="{...}" res=$(curl -sX POST --data-urlencode "payload=$payload" \ $SLACK_WEBHOOK_URL) if [ "$res" != "ok" ]; then echo "Failed to notify slack: res: ${res}, payload: ${payload}" >&2 fi } trap 'onexit' EXIT
  23. • Repro Ͱ͸ϓογϡ௨஌ͷ഑৴ର৅நग़ͳͲʹ Presto Λ׆༻͍ͯ͠Δ • ෼ੳ΍ूܭ༻్Ͱ͸ͳ͘ Repro ͷαʔϏεͦͷ΋ͷΛࢧ͑ΔॏཁͳҰ෦ •

    σʔλιʔεͱͯ͠ओʹ Cassandra Λ࢖༻ • Bucketing ʹΑΓෛՙΛ෼ࢄ • Cassandra connector ͷ planning ʹ͸͕͔͔࣌ؒΔ • σʔλͷಛੑʹΑͬͯ͸ύʔςΟγϣϯ༗ແͷ֬ೝΛεΩοϓ͢ΔϝϦοτ͕େ͖͍ • ϘτϧωοΫಛఆͷͨΊʹσόοάϩάΛ༗ޮԽ͢Δͱେ͖ͳώϯτ͕ಘΒΕΔ • վ଄ͨ͠ίωΫλΛ EMR Ͱར༻͢Δʹ͸Ұ޻෉ඞཁ • Repro Ͱ͸ bootstrap actions Ͱ background process Λ্ཱͪ͛Δ͜ͱͰઃఆมߋ͍ͯ͠Δ ·ͱΊ
  24. VisualVM ͷ Sampler ʹΑΔܭଌ 2020-11-20T01:04:12.310Z ... :: elapsed 7778ms ::

    planning 7571ms ... ͲΕ΋ߦ͖ண͘ઌ͸ CassandraPartitionManager#getCassandraPartitions ͕ͩ ෼ذ͕େมଟ͍ɾɾɾ