String inputPathDefinitions = "data/babynamedefinitions.csv"; String inputPathCounts = "data/babynamecounts.csv"; String outputPath = "output/simplestpipe3"; Scheme sourceSchemeDefinitions = new TextDelimited( new Fields( "name", "definition" ), "," ); Scheme sourceSchemeCounts = new TextDelimited( new Fields( "name", "count" ), "," ); Tap sourceDefinitions = new Hfs( sourceSchemeDefinitions, inputPathDefinitions ); Tap sourceCounts = new Hfs( sourceSchemeCounts, inputPathCounts ); Scheme sinkScheme = new TextDelimited( new Fields( "dname", "count", "definition" ), " ^^^ " ); Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); Pipe definitionspipe = new Pipe( "definitionspipe" ); Pipe countpipe = new Pipe( "countpipe" ); //Join the tuple streams Fields commonfields = new Fields( "name" ); Fields newfields = new Fields("dname", "definition", "cname", "count"); Pipe joinpipe = new CoGroup( definitionspipe, commonfields, countpipe, commonfields, newfields, new InnerJoin() ); Properties properties = new Properties(); FlowConnector.setApplicationJarClass(properties, SimplestPipe3CoGroup.class); FlowConnector flowConnector = new FlowConnector( properties ); Map<String, Tap> sources = new HashMap<String, Tap>(); sources.put("definitionspipe", sourceDefinitions); sources.put("countpipe", sourceCounts); Flow flow = flowConnector.connect( sources, sink, joinpipe ); flow.complete(); } }