software system. Enable root cause analysis and anomaly detection. > 1,000 nodes worldwide > 10 processes per node > 20 metrics per process (OS, JVM, App-spec.) Measured every second. = about 6.3 trillions observations p.a. Data retention: 5 yrs.
Processing Result Processing data flow icon credits to Nimal Raj (database), Arthur Shlain (console) and alvarobueno (takslist) } } Chronix Spark Chronix Server
time series ▸ set … because it’s more flexible and better to parallelise if operations can input and output multiple time series. ▸ univariate … because multivariate will introduce too much complexity (and we have our set to bundle multiple time series). ▸ multi-dimensional … because the ability to slice & dice in the set of time series is very convenient for a lot of use cases. ▸ numeric … because it’s the most common use case. A single time series is identified by a combination of its non-temporal dimensional values (e.g. unit “mem usage” + host “aws42” + process “tomcat”)
a SolrQuery: /** * @param query Solr query * @param zkHost Zookeeper host * @param collection the Solr collection of chronix time series data * @param chronixStorage a ChronixSolrCloudStorage instance * @return ChronixRDD of time series */ public ChronixRDD query( final SolrQuery query, final String zkHost, final String collection, final ChronixSolrCloudStorage chronixStorage) {
SparkContext / JavaSparkContext ChronixSparkContext csc = new ChronixSparkContext(sc); //Read data into ChronixRDD SolrQuery query = new SolrQuery( "metric:\"java.lang:type=Memory/HeapMemoryUsage/used\""); ChronixRDD rdd = csc.query(query, "localhost:9983", //ZooKeeper host "chronix", //Solr collection for Chronix new ChronixSolrCloudStorage()); //Calculate the overall min/max/mean of all time series in the RDD double min = rdd.min(); double max = rdd.max(); double mean = rdd.mean();
and aggregations ‣ Efficient storage format ‣ Heavy lifting distributed processing ‣ Catalyst processing optimizer ‣ Post-processing on a smaller set of time series (e.g. complex analysis algorithms)
‣ end: TimeStamp ‣ unit: String ‣ dimensions: Map<String, String> ‣ values: byte[] TIME SERIES ‣ start: TimeStamp ‣ end: TimeStamp ‣ unit: String ‣ dimensions: Map<String, String> ‣ values: byte[] TIME SERIES ‣ start: TimeStamp ‣ end: TimeStamp ‣ unit: String ‣ dimensions: Map<String, String> ‣ values: byte[] ▸ Chunking: 1 logical time series = n physical time series all with the same identity containing a fixed amount of observations. 1 chunk = 1 solr document. ▸ Binary encoding of all timestamp/value pairs. Delta-encoded and bitwise compressed. Logical Physical
CODEC GZIP + 128 kBytes Florian Lautenschlager, Michael Philippsen, Andreas Kumlehn, Josef Adersberger Chronix: Efficient Storage and Query of Operational Time Series International Conference on Software Maintenance and Evolution 2016 (submitted)
Andreas Kumlehn, Josef Adersberger Chronix: Efficient Storage and Query of Operational Time Series International Conference on Software Maintenance and Evolution 2016 (submitted)
Kumlehn, Josef Adersberger Chronix: Efficient Storage and Query of Operational Time Series International Conference on Software Maintenance and Evolution 2016 (submitted) DISCLAIMER: BENCHMARK ONLY PERFORMED ON ONE NODE ONLY
queryChronixChunks( final SolrQuery query, final String zkHost, final String collection, final ChronixSolrCloudStorage<MetricTimeSeries> chronixStorage) throws SolrServerException, IOException { // first get a list of replicas to query for this collection List<String> shards = chronixStorage.getShardList(zkHost, collection); // parallelize the requests to the shards JavaRDD<MetricTimeSeries> docs = jsc.parallelize(shards, shards.size()).flatMap( (FlatMapFunction<String, MetricTimeSeries>) shardUrl -> chronixStorage.streamFromSingleNode( new KassiopeiaSimpleConverter(), shardUrl, query)::iterator); return new ChronixRDD(docs); } Figure out all Solr shards Query each shard in parallel and convert SolrDocuments to MetricTimeSeries
close to the data as possible! (CPU cache > memory > local disk > network) Horizontal processing (distribution / parallelization) Vertical processing (divide & conquer) Rule 2: Reduce data volume as early as possible! (as long as you don’t sacrifice parallelization) Rule 3: Parallelize as much as possible! (max = #cores)
to the data as possible! 1. Solr caching 2. Spark in-memory processing with activated RDD compression 3. Binary protocol between Solr and Spark ‣ Rule 2: Reduce data volume as early as possible! ‣ Efficient storage format (Chronix Format) ‣ Predicate pushdown to Solr (query) ‣ Group-by & aggregation pushdown to Solr (faceting within a query) ‣ Rule 3: Parallelize as much as possible! ‣ Scale-out on data-level with SolrCloud ‣ Scale-out on processing-level with Spark
Time Series Data ▸ Matlab (Econometrics toolbox) ▸ Python (Pandas) ▸ R (zoo, xts) ▸ SAS (ETS) ▸ … ▸ Big Time Series Data ▸ influxDB ▸ Graphite ▸ OpenTSDB ▸ KairosDB ▸ Prometheus ▸ …
provides no specific time series storage it uses the Spark persistence mechanisms instead. This leads to a less efficient storage usage and less possibilities to perform performance optimizations via predicate pushdown. ▸ In contrast to Spark-TS Chronix does not align all time series values on one vector of timestamps. This leads to greater flexibility in time series aggregation ▸ Chronix provides multi-dimensional time series as this is very useful for data warehousing and APM. ▸ Chronix has support for Datasets as this will be an important Spark API in the near future. But Chronix currently doesn’t support an IndexedRowMatrix for SparkML. ▸ Chronix is purely written in Java. There is no explicit support for Python and Scala yet. ▸ Chronix doesn not support a ZonedTime as this makes it way more complicated.
actions. Hides data partitioning & distributed computation. References a set of partitions (“output partitions”) - materialized or not - and has dependencies to another RDD (“input partitions”). RDD operations are evaluated as late as possible (when an action is called). As long as not being the root RDD the partitions of an RDD are in memory but they can be persisted by request. ▸ Partitions: (Logical) chunks of data. Default unit and level of parallelism - inside of a partition everything is a sequential operation on records. Has to fit into memory. Can have different representations (in-memory, on disk, off heap, …)
which is launched when an action is called on a RDD. ▸ Task: The atomic unit of work (function). Bound to exactly one partition. ▸ Stage: Set of Task pipelines which can be executed in parallel on one executor. ▸ Shuffling: If partitions need to be transferred between executors. Shuffle write = outbound partition transfer. Shuffle read = inbound partition transfer. ▸ DAG Scheduler: Computes DAG of stages from RDD DAG. Determines the preferred location for each task.