Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Web Content Analytics at Scale: How Parse.ly us...

Web Content Analytics at Scale: How Parse.ly uses Elasticsearch for Multi-Terabyte Time Series

Using Elasticsearch, Parse.ly wrote a time series backend for its real-time content analytics product. This talk will cover how it was done, including document mappings, rollup approaches, time-based indices, hot/warm/cold tiers, index aliases/versioning, and other techniques to run a multi-terabyte Elasticsearch cluster to perform time series at scale.

Andrew Montalenti

February 18, 2016
Tweet

More Decks by Andrew Montalenti

Other Decks in Programming

Transcript

  1. 2015! "Can we push ES to take 10K+ writes/sec and

    store 10TB+ of customer data?"
  2. _source: don't do it!! ! especially if your schema has

    high-cardinality multi-value fields.
  3. "logstash-style" raw records! are nice, but...! ! to operate with

    good query latency, you need rollups, and these are tricky.
  4. { "url": "http://arstechnica.com/12345", "ts": "2015-01-02T00:00:000Z",! "visitors": ["3f3f", "3f3g", ...millions],! !

    "metrics": { "$all/page_views": 6200000, "desktop/page_views": 4200000,! "mobile/page_views": 2000000,! "$all/engaged_secs": 27500000,! "new/engaged_secs": 250000000,! "returning/engaged_secs": 25000000, }, ! "metas": { "title": "Obama gives speech",! "authors": ["Mike Journo"],! "section": "Politics",! "pub_date": "2015-01-02T08:00:000Z", } } partition and time bucket high-cardinality metric numeric metrics metadata 1day rollup! (1 per day)
  5. { "url": "http://arstechnica.com/12345", "ts": "2015-01-02T08:05:000Z",! "visitors": ["3f3f", "3f3g", ...hundreds],! !

    "metrics": { "$all/page_views": 62, "desktop/page_views": 42,! "mobile/page_views": 20,! "$all/engaged_secs": 275,! "new/engaged_secs": 250,! "returning/engaged_secs": 25, }, ! "metas": { "title": "Obama gives speech",! "authors": ["Mike Journo"],! "section": "Politics",! "pub_date": "2015-01-02T08:00:000Z", } } partition and time bucket high-cardinality metric numeric metrics metadata 5min rollup! (288 per day)
  6. { "url": "http://arstechnica.com/12345", "ts": "2015-01-02T08:05:123Z",! "visitors": ["3f3f3"],! ! "metrics": {

    "$all/page_views": 1, "desktop/page_views": 1,! "mobile/page_views": 0,! "$all/engaged_secs": 0,! "new/engaged_secs": 0,! "returning/engaged_secs": 0, }, ! "metas": { "title": "Obama gives speech",! "authors": ["Mike Journo"],! "section": "Politics",! "pub_date": "2015-01-02T08:00:000Z", } } partition and time bucket high-cardinality metric numeric metrics metadata raw event! (millions per day)
  7. Parse.ly "Batch Layer" Topologies with Spark and Amazon S3 Parse.ly

    "Speed Layer" Topologies with Storm & Kafka Parse.ly Dashboards and APIs with Elasticsearch & Cassandra Parse.ly Raw Data Pipeline with Amazon Kinesis & S3 Access mage building blocks
  8. Mid-2015! "This sort of works, but seems that we need

    more hardware... and what's up with response times?"
  9. "You need to give big ! customers their own indices."

    - Otis "You need to use node-shard! allocation for hot/cold tiers." - Radu
  10. Node-Shard Allocation • v1_shared-1day-2015.01 => cold (mem, rust)! • v1_shared-5min-2015.02.01

    => warm (mem, ssd)! • v1_shared-5min-2015.03.15 => hot (mem, cpu)! • v1_shared-raw-2015.03.15T12 => raw (cpu)
  11. • Cluster: 40 nodes, 500+ indices, 7,000+ shards • Tiers:

    4 client, 3 master, 9 raw, 9 hot, 12 warm, 3 cold • Instances: 1TB+ of RAM, 500+ CPU cores • Disks: 12+ TB data, >50% in SSDs, rest in rust • Writes: 10K+ writes per second • Reads: 100's of aggregations per second
  12. In the worst case, a bad query takes longer than

    its timeout, hogs the cluster, and hits an OOM bug.
  13. links • Lucene: The Good Parts! • Mage: The Magical

    Time Series Backend! • Pythonic Analytics with Elasticsearch! • Visit us: http://parse.ly • Join us: http://parse.ly/jobs
  14. Python State Code Server 1 Core 2 Core 1 Server

    2 Core 2 Core 1 Server 3 Core 2 Core 1 consumer = ... # balanced while True: msg = consumer.consume() msg = json.loads(msg) urlparse(msg["url"]) Python State Code Python State Code Python State Code Python State Code Python State Code pykafka.producer Python State Code
  15. Python State Code Server 1 Core 2 Core 1 Server

    2 Core 2 Core 1 Server 3 Core 2 Core 1 Python State Code Python State Code Python State Code Python State Code pykafka.producer Python State Code multi-lang json protocol class UrlParser(Topology): url_spout = UrlSpout.spec(p=1) url_bolt = UrlBolt.spec(p=4, input=url_spout)
  16. Python State Code Server 1 Core 2 Core 1 Server

    2 Core 2 Core 1 Server 3 Core 2 Core 1 Python State Code Python State Code Python State Code Python State Code pyspark.SparkContext sc = SparkContext() file_rdd = sc.textFile(files) file_rdd.map(urlparse).take(1) cloudpickle py4j and binary pipes