sourceScheme = new TextLine( new Fields( "line" ) ); Tap source = new Hfs( sourceScheme, inputPath ); Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); // the 'head' of the pipe assembly Pipe assembly = new Pipe( "wordcount" );
out each word into a new Tuple with the field name "word" // regular expressions are optional in Cascading String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; Function function = new RegexGenerator( new Fields( "word" ), regex ); assembly = new Each( assembly, new Fields( "line" ), function ); // group the Tuple stream by the "word" value assembly = new GroupBy( assembly, new Fields( "word" ) );
occurrences of "word" and store result in // a field named "count" Aggregator count = new Count( new Fields( "count" ) ); assembly = new Every( assembly, count ); // initialize app properties, tell Hadoop which jar file to use Properties properties = new Properties(); FlowConnector.setApplicationJarClass( properties, Main.class ); Word Count 3/4
"from \"example\".\"sales_fact_1997\" as s\n" + "join \"example\".\"employee\" as e\n" + "on e.\"EMPID\" = s.\"CUST_ID\""; Tap empTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ), "src/main/resources/data/example/employee.tcsv", SinkMode.KEEP ); Tap salesTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ), "src/main/resources/data/example/sales_fact_1997.tcsv", SinkMode.KEEP ); Tap resultsTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ), "build/test/output/flow/results.tcsv", SinkMode.REPLACE );