assigned to their machine • starts / stops worker processes as necessary • each worker process runs sub-section of a topology • topology :- multiple worker processes spread across several machines
new SplitSentence(), 8) .shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12) .fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology());
nextTuple() { Utils.sleep(100); String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } ... @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
of processing stages • each stage can be a filter, aggregation, function, or other similar process • sounds familiar? • individual steps are combined into spouts / bolts at runtime
= topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6);