学习storm,开始编写小例子
import java.io.File; import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordCountTopology {
public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
/**
* 此方法只调用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}
/**
* 死循环调用,心跳
*/
int i=0;
public void nextTuple() {
//读取指定文件目录
Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"), new String[]{"txt"}, true);
for(File file:listFiles){
try {
//获取每个文件的所有数据
List<String> readLines = FileUtils.readLines(file);
//文件被读取过以后进行重命名
FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
// file.renameTo(new File(file.getAbsolutePath()+System.currentTimeMillis()));
for (String line : readLines) {
//把每一行数据发射出去
this.collector.emit(new Values(line));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 声明输出内容
*/
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("line"));
}
}
public static class Splitbolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
public void execute(Tuple input) {
//获取每一行数据
String line = input.getStringByField("line");
//把数据切分成一个个单词
String[] wordsStrings = line.split("\t");
//把每个单词都发射出去
for (String word : wordsStrings) {
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("words"));
}
}
/**
* 计算每个单词出现次数
* @author tangyw
*
*/
public static class Countbolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
public void execute(Tuple input) {
//获取每一个单词
String word = input.getStringByField("words");
//对所有的单词汇总
Integer valueInteger = hashMap.get(word);
if (valueInteger==null) {
valueInteger=0;
}
valueInteger++;
hashMap.put(word, valueInteger);
//把结果打印出来
System.out.println("----------------");
for (Entry<String, Integer> entry : hashMap.entrySet()) {
System.out.println(entry);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout_id", new DataSourceSpout());
topologyBuilder.setBolt("bolt_id", new Splitbolt()).shuffleGrouping("spout_id");
topologyBuilder.setBolt("bolt_id_count", new Countbolt()).shuffleGrouping("bolt_id");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());
}
}