Storm-01(4)

6.Projection operation - projection

package com.liming.projection;

Import backtype.storm.Config;
Import backtype.storm.LocalCluster;
Import backtype.storm.tuple.Fields;
Import backtype.storm.utils.Utils;
Import storm.trident.Stream;
Import storm.trident.TridentTopology;

Public class TridentDemo {
Public static void main(String[] args) {
//--Create topology
TridentTopology topology = new TridentTopology();

Stream s = topology.newStream("xx", new SentenceSpout())
.each(new Fields("name"), new GenderFunc(),new Fields("gender"))
.project(new Fields("name"));
S.each(s.getOutputFields(), new PrintFilter());

//--Submit to run in the cluster
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyTopology", conf, topology.build());

//-- Kill Topology to close the cluster after running for 10 seconds
Utils.sleep(1000 * 10);
cluster.killTopology("MyTopology");
Cluster.shutdown();
}
}
package com.liming.projection;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class SentenceSpout extends BaseRichSpout{

	private SpoutOutputCollector collector = null;
	
	private Values [] values = {
			new Values("xiaoming","i am so shuai"),
			new Values("xiaoming","do you like me"),
			new Values("xiaohua","i do not like you"),
			new Values("xiaohua","you look like fengjie"),
			new Values("xiaoming","are you sure you do not like me"),
			new Values("xiaohua","yes i am"),
			new Values("xiaoming","ok i am sure")
	};
	
	private int index = 0;
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		collector.emit(values[index]);
		index = index+1 == values.length ? 0 : index+1;
		Utils.sleep(100);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		Fields fields = new Fields("name","sentence");
		declarer.declare(fields);
	}

}
package com.liming.projection;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class GenderFunc extends BaseFunction{

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		String name = tuple.getStringByField("name");
		if("xiaoming".equals(name)){
			collector.emit(new Values("male"));
		}else if("xiaohua".equals(name)){
			collector.emit(new Values("female"));
		}else{
		}
	}
	
}
package com.liming.projection;

import java.util.Iterator;
import java.util.Map;

import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class PrintFilter extends BaseFilter{

	private TridentOperationContext context = null;
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		super.prepare(conf, context);
		this.context = context;
	}
	
	@Override
	public boolean isKeep(TridentTuple tuple) {
		StringBuffer buf = new StringBuffer();
		
		Fields fields = tuple.getFields();
		Iterator<String> it = fields.iterator();

		while(it.hasNext()){
			String key = it.next();
			Object value = tuple.getValueByField(key);
			buf.append("---"+key+":"+value+"---");
		}
		System.out.println(buf.toString());
		
		return true;
	}
	
}

Test as follows:

二, heavy partition operation - Repartitioning operations

Repartition operation can change the division of tuple on each task.
Repartition can also change the number of Partitions.
Repartition requires network transmission.

Concurrency settings when repartitioning:
0.parallelismHint: set the degree of concurrency when repartitioning, this method will forward to find the most recent repartitioning operation, set the concurrency of all operations between the two methods to the specified value, if not all the weights are set The concurrency of partition operations defaults to 1.
The repartitioning operation includes the following methods:
**Repartition method If the degree of concurrency is not set by the parallelismHint method, the default concurrency of the subsequent methods is 1.
1.shuffle: Randomly distribute the tuple evenly to the target partition.
2.broadcast: Each tuple is copied to all target partitions and is useful in DRPC - you can use stateQuery on each partition.
3.partitionBy: The method for selecting the partition for each tuple is: (the tuple specifies the hash value of the field) mod (the number of target partitions), this method ensures that the tuple with the same specified field can be sent to the same partition. (But there may be tuples with different fields in the same partition)
4.global: All tuples are sent to the same partition.
5.batchGlobal: Ensure that tuples in the same batch are sent to the same partition.
6.patition: This method accepts a custom partition function (implements backtype.storm.grouping.CustomStreamGrouping)

.parallelismHint: Sets the degree of concurrency when repartitioning. This method will look forward to the most recent one. The repartition operation sets the concurrency of all operations between the two methods to the specified value. If the reconciliation operation is not set, the concurrency defaults to 1. As follows

package com.liming.Repartition;

Import backtype.storm.Config;
Import backtype.storm.LocalCluster;
Import backtype.storm.tuple.Fields;
Import backtype.storm.utils.Utils;
Import storm.trident.Stream;
Import storm.trident.TridentTopology;

Public class TridentDemo {
Public static void main(String[] args) {
//--Create topology
TridentTopology topology = new TridentTopology();

Stream s = topology.newStream("xx", new SentenceSpout())
.parallelismHint(2);
S.each(s.getOutputFields(), new PrintFilter());

//--Submit to run in the cluster
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyTopology", conf, topology.build());

//-- Kill Topology to close the cluster after running for 10 seconds
Utils.sleep(1000 * 10);
cluster.killTopology("MyTopology");
Cluster.shutdown();
}
}
package com.liming.Repartition;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class SentenceSpout extends BaseRichSpout{

	private SpoutOutputCollector collector = null;
	
	private Values [] values = {
			new Values("xiaoming","i am so shuai"),
			new Values("xiaoming","do you like me"),
			new Values("xiaohua","i do not like you"),
			new Values("xiaohua","you look like fengjie"),
			new Values("xiaoming","are you sure you do not like me"),
			new Values("xiaohua","yes i am"),
			new Values("xiaoming","ok i am sure")
	};
	
	private int index = 0;
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if(index<values.length){
			collector.emit(values[index]);
			index++;
		}
		Utils.sleep(100);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		Fields fields = new Fields("name","sentence");
		declarer.declare(fields);
	}

}
package com.liming.Repartition;

import java.util.Iterator;
import java.util.Map;

import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class PrintFilter extends BaseFilter{

	private TridentOperationContext context = null;
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		super.prepare(conf, context);
		this.context = context;
	}
	
	@Override
	public boolean isKeep(TridentTuple tuple) {
		StringBuffer buf = new StringBuffer();
		
		Fields fields = tuple.getFields();
		Iterator<String> it = fields.iterator();

		while(it.hasNext()){
			String key = it.next();
			Object value = tuple.getValueByField(key);
			buf.append("---"+key+":"+value+"---");
		}
		System.out.println(buf.toString());
		
		return true;
	}
	
}

Results are output twice for each sentence:

案例9 - Remodeling the above cases, using different partitions to handle the speeches of xiaoming and xiaohua, counting the number of times each person speaks:

package com .liming.Repartition;

Import backtype.storm.Config;
Import backtype.storm.LocalCluster;
Import backtype.storm.tuple.Fields;
Import backtype.storm.utils.Utils;
Import storm.trident.Stream;
Import storm.trident.TridentTopology;
Import storm.trident.operation.builtin.Count;
/**
 * From the partition case
 */
Public class TridentDemo2 {
Public static void main(String[] args) {
//--Create topology
TridentTopology topology = new TridentTopology();

//TODO
Stream s = topology.newStream("xx", new SentenceSpout())
.partitionBy(new Fields("name"))
.partitionAggregate(new Fields("name"),new SentenceAggerator(),new Fields("name","count"))
.parallelismHint(2)
;

S.each(s.getOutputFields(), new PrintFilter());


//--Submit to run in the cluster
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyTopology", conf, topology.build());

//-- Kill Topology to close the cluster after running for 10 seconds
Utils.sleep(1000 * 10);
cluster.killTopology("MyTopology");
Cluster.shutdown();

}
}
package com.liming.Repartition;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class SentenceSpout extends BaseRichSpout{

	private SpoutOutputCollector collector = null;
	
	private Values [] values = {
			new Values("xiaoming","i am so shuai"),
			new Values("xiaoming","do you like me"),
			new Values("xiaohua","i do not like you"),
			new Values("xiaohua","you look like fengjie"),
			new Values("xiaoming","are you sure you do not like me"),
			new Values("xiaohua","yes i am"),
			new Values("xiaoming","ok i am sure")
	};
	
	private int index = 0;
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if(index<values.length){
			collector.emit(values[index]);
			index++;
		}
		Utils.sleep(100);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		Fields fields = new Fields("name","sentence");
		declarer.declare(fields);
	}

}
package com.liming.Repartition;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class SentenceAggerator extends BaseAggregator<ConcurrentHashMap<String,Integer>> {

	@Override
	public ConcurrentHashMap<String,Integer> init(Object batchId, TridentCollector collector) {
		return new ConcurrentHashMap<String,Integer>();
	}

	@Override
	public void aggregate(ConcurrentHashMap<String,Integer> val, TridentTuple tuple, TridentCollector collector) {
		String name = tuple.getStringByField("name");
		val.put(name, val.containsKey(name) ? val.get(name)+1 : 1);
	}

	@Override
	public void complete(ConcurrentHashMap<String,Integer> val, TridentCollector collector) {
		for(Map.Entry<String, Integer> entry : val.entrySet()){
			collector.emit(new Values(entry.getKey(),entry.getValue()));
		}
	}

}
package com.liming.Repartition;


import java.util.Iterator;
import java.util.Map;


import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;


public class PrintFilter extends BaseFilter{


	private TridentOperationContext context = null;
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		super.prepare(conf, context);
		this.context = context;
	}
	
	@Override
	public boolean isKeep(TridentTuple tuple) {
		StringBuffer buf = new StringBuffer();
		buf.append("---partition_id:"+context.getPartitionIndex());
		
		Fields fields = tuple.getFields();
		Iterator<String> it = fields.iterator();


		while(it.hasNext()){
			String key = it.next();
			Object value = tuple.getValueByField(key);
			buf.append("---"+key+":"+value+"---");
		}
		System.out.println(buf.toString());
		
		return true;
	}
	
}

Results are as follows: It can be seen that the two partitions are processed

三, Aggregate operation - Aggregation operations

Trident has aggregate() and persistentAggregate() methods to aggregate the stream.

1.aggregate()
Perform independent execution on each batch for global aggregation
When using ReduceAggregator or Aggregator aggregator, the stream is first re-divided into a large partition (only one partition), and then the partition is aggregated;
When using CombinerAggregatr, Trident first aggregates each partition locally, then re-partitions all of these partitions into a partition to complete the global aggregation.
In contrast, CombinerAggregator is more efficient and recommended.
It can be inferred that the aggregate() operation will implicitly cause the data stream to be repartitioned, leaving only one partition after the partition.

Example:
Use aggregate() to get a global count for a batch operation
Mystream.aggregate(new Count(), new Fields("count"))

2.persistentAggregate()
Aggregate all tuples in all batches and store the results in the state source.
When using ReduceAggregator or Aggregator aggregator, the stream is first re-divided into a large partition (only one partition), and then the partition is aggregated;
When using CombinerAggregator, Trident first aggregates each partition locally, then re-partitions all of these partitions into a partition to complete the global aggregation.
In contrast, CombinerAggregator is more efficient and recommended.
** As in partitionAggregate, aggregators in aggregates can also use chained usage. However, if you chain a CombinerAggregator to a non-CombinerAggregator, Trident cannot do local aggregation optimization. 

四,Group operation - Operations on grouped streams

groupBy operation first performs partitionBy operation on the specified fields in the stream, so that the tuples with the same specified fields can be sent to the same partition. The tuples in the partition are then grouped according to the specified field values ​​in each partition.
! ! Note that instead of ignoring the Batch, the specified fields are reconsidered within the batch and aggregated based on the same specified fields within the batch.
See picture: Group By principle

If you do an aggregation operation on a grouped stream, the aggregation will be done in each group, not the entire batch.
The GroupStream class also has a persistentAggregate method. The result of this method aggregation will be stored in a MapState with a key value of the group field (that is, the field specified in groupBy). These are still in the Trident state. 

package com.liming.groupby;

Import backtype.storm.Config;
Import backtype.storm.LocalCluster;
Import backtype.storm.tuple.Fields;
Import backtype.storm.utils.Utils;
Import storm.trident.Stream;
Import storm.trident.TridentTopology;
Import storm.trident.operation.builtin.Count;
/**
 * Group operation case
 */
Public class TridentDemo4 {
Public static void main(String[] args) {
//--Create topology
TridentTopology topology = new TridentTopology();

//TODO
Stream s = topology.newStream("xx", new SentenceSpout())
.groupBy(new Fields("name"))
.aggregate(new Count(),new Fields("count"))
;

S.each(s.getOutputFields(), new PrintFilter());


//--Submit to run in the cluster
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyTopology", conf, topology.build());

//-- Kill Topology to close the cluster after running for 10 seconds
Utils.sleep(1000 * 10);
cluster.killTopology("MyTopology");
Cluster.shutdown();

}
}
package com.liming.groupby;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class SentenceSpout extends BaseRichSpout{

	private SpoutOutputCollector collector = null;
	
	private Values [] values = {
			new Values("xiaoming","i am so shuai"),
			new Values("xiaoming","do you like me"),
			new Values("xiaohua","i do not like you"),
			new Values("xiaohua","you look like fengjie"),
			new Values("xiaoming","are you sure you do not like me"),
			new Values("xiaohua","yes i am"),
			new Values("xiaoming","ok i am sure")
	};
	
	private int index = 0;
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if(index<values.length){
			collector.emit(values[index]);
			index++;
		}
		Utils.sleep(100);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		Fields fields = new Fields("name","sentence");
		declarer.declare(fields);
	}

}
package com.liming.groupby;

import java.util.Iterator;
import java.util.Map;

import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class PrintFilter extends BaseFilter{

	private TridentOperationContext context = null;
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		super.prepare(conf, context);
		this.context = context;
	}
	
	@Override
	public boolean isKeep(TridentTuple tuple) {
		StringBuffer buf = new StringBuffer();
		buf.append("---partition_id:"+context.getPartitionIndex());
		
		Fields fields = tuple.getFields();
		Iterator<String> it = fields.iterator();

		while(it.hasNext()){
			String key = it.next();
			Object value = tuple.getValueByField(key);
			buf.append("---"+key+":"+value+"---");
		}
		System.out.println(buf.toString());
		
		return true;
	}
	
}

Results are as follows:

五, merge and connect - Merges and joins

 can aggregate several streams together, the simplest way to summarize them is to merge them into a stream, this can be passed The merge method in TridentTopology is completed.

		E.g:
Topology.merge(stream1, stream2, stream3);
Trident specifies that the field in the stream after the new merge is the field in stream1.
All fields in the stream must be consistent in order to merge

Another way to summarize is to use join (connection, similar to the join operation in sql).
The following are in stream1(["key", "val1", "val2"] ) and stream2["x", "val1"]

	E.g:
Topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
The above connection operation uses the "key" and "x" fields as connection fields. Since there are overlapping field names in the input stream (as in the val1 fields above, both in stream1 and stream2), Trident requires that all fields in the new stream of the output be specified. The tuple in the output stream should contain the following fields:

1. Connection field list: The "key" field in the output stream in this example corresponds to "key" in stream1 and "x" in stream2.
2. From the list of unconnected fields in all input streams, in the order of the input streams in the incoming join method: as in this example, "a" and "b" correspond to "val1" and "val2" in stream1, "c" corresponds to "val1" in stream2.

**Note: The stream to be connected must be merged in the same Topology as 

Package com.liming.merge;

Import backtype.storm.Config;
Import backtype.storm.LocalCluster;
Import backtype.storm.tuple.Fields;
Import backtype.storm.utils.Utils;
Import storm.trident.Stream;
Import storm.trident.TridentTopology;
/**
 * Consolidation case
 */
Public class TridentDemo5 {
Public static void main(String[] args) {
//--Create topology
TridentTopology topology = new TridentTopology();

/**
* merge
*/
Stream s1 = topology.newStream("xx", new SentenceSpout());
Stream s2 = topology.newStream("yy", new SentenceSpout());
Stream s3 = topology.newStream("zz", new SentenceSpout());
Stream s = topology.merge(s1,s2,s3);
S.each(s.getOutputFields(), new PrintFilter());

//--Submit to run in the cluster
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyTopology", conf, topology.build());

//-- Kill Topology to close the cluster after running for 10 seconds
Utils.sleep(1000 * 10);
cluster.killTopology("MyTopology");
Cluster.shutdown();

}
}
package com.liming.merge;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class SentenceSpout extends BaseRichSpout{

	private SpoutOutputCollector collector = null;
	
	private Values [] values = {
			new Values("xiaoming","i am so shuai"),
			new Values("xiaoming","do you like me"),
			new Values("xiaohua","i do not like you"),
			new Values("xiaohua","you look like fengjie"),
			new Values("xiaoming","are you sure you do not like me"),
			new Values("xiaohua","yes i am"),
			new Values("xiaoming","ok i am sure")
	};
	
	private int index = 0;
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if(index<values.length){
			collector.emit(values[index]);
			index++;
		}
		Utils.sleep(100);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		Fields fields = new Fields("name","sentence");
		declarer.declare(fields);
	}

}
package com.liming.merge;

import java.util.Iterator;
import java.util.Map;

import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class PrintFilter extends BaseFilter{

	private TridentOperationContext context = null;
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		super.prepare(conf, context);
		this.context = context;
	}
	
	@Override
	public boolean isKeep(TridentTuple tuple) {
		StringBuffer buf = new StringBuffer();
		buf.append("---partition_id:"+context.getPartitionIndex());
		
		Fields fields = tuple.getFields();
		Iterator<String> it = fields.iterator();

		while(it.hasNext()){
			String key = it.next();
			Object value = tuple.getValueByField(key);
			buf.append("---"+key+":"+value+"---");
		}
		System.out.println(buf.toString());
		
		return true;
	}
	
}

Results are as follows:

流不相相 merge:

package com.liming.merge;

Import backtype.storm.Config;
Import backtype.storm.LocalCluster;
Import backtype.storm.tuple.Fields;
Import backtype.storm.utils.Utils;
Import storm.trident.Stream;
Import storm.trident.TridentTopology;
/**
 * Consolidation case
 */
Public class TridentDemo5 {
Public static void main(String[] args) {
//--Create topology
TridentTopology topology = new TridentTopology();


Stream s1 = topology.newStream("xx", new SentenceSpout());
Stream s2 = topology.newStream("yy", new GenderSpout());
Stream s = topology.join(s1, new Fields("name"), s2, new Fields("name"),new Fields("name","sentence","gender"));
S.each(s.getOutputFields(), new PrintFilter());

//--Submit to run in the cluster
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyTopology", conf, topology.build());

//-- Kill Topology to close the cluster after running for 10 seconds
Utils.sleep(1000 * 10);
cluster.killTopology("MyTopology");
Cluster.shutdown();

}
}
package com.liming.merge;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class SentenceSpout extends BaseRichSpout{

	private SpoutOutputCollector collector = null;
	
	private Values [] values = {
			new Values("xiaoming","i am so shuai"),
			new Values("xiaoming","do you like me"),
			new Values("xiaohua","i do not like you"),
			new Values("xiaohua","you look like fengjie"),
			new Values("xiaoming","are you sure you do not like me"),
			new Values("xiaohua","yes i am"),
			new Values("xiaoming","ok i am sure")
	};
	
	private int index = 0;
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if(index<values.length){
			collector.emit(values[index]);
			index++;
		}
		Utils.sleep(100);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		Fields fields = new Fields("name","sentence");
		declarer.declare(fields);
	}

}
package com.liming.merge;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class GenderSpout extends BaseRichSpout{

	private SpoutOutputCollector collector = null;
	
	private Values [] values = {
			new Values("xiaoming","male"),
			new Values("xiaohua","female"),
	};
	
	private int index = 0;
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if(index<values.length){
			collector.emit(values[index]);
			index++;
		}
		Utils.sleep(100);
	}
//	@Override
//	public void nextTuple() {
//		collector.emit(values[index]);
//		index = index+1 == values.length ? 0 : index+1;
//		Utils.sleep(100);
//	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		Fields fields = new Fields("name","sentence");
		declarer.declare(fields);
	}

}
package com.liming.merge;

import java.util.Iterator;
import java.util.Map;

import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class PrintFilter extends BaseFilter{

	private TridentOperationContext context = null;
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		super.prepare(conf, context);
		this.context = context;
	}
	
	@Override
	public boolean isKeep(TridentTuple tuple) {
		StringBuffer buf = new StringBuffer();
		buf.append("---partition_id:"+context.getPartitionIndex());
		
		Fields fields = tuple.getFields();
		Iterator<String> it = fields.iterator();

		while(it.hasNext()){
			String key = it.next();
			Object value = tuple.getValueByField(key);
			buf.append("---"+key+":"+value+"---");
		}
		System.out.println(buf.toString());
		
		return true;
	}
	
}

Results are as follows: