博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm定时任务【tick】
阅读量:5041 次
发布时间:2019-06-12

本文共 2770 字,大约阅读时间需要 9 分钟。

一. 简介
     storm作为流计算,处理数据通常以数据驱动。即只有当spout发射数据才会进行计算。那么如果想要做定时任务如何处理哪,例如有的bolt需要输出一段时间统计的结果,这里一段时间可能是几秒、几分钟或者几小时。如果还是以数据进行驱动的话必然会输出时间不可确定。同样也可以启一个线程一段时间执行一次,这也是一种解决方案。但是我希望有种更优雅的解决方案,就是这里说的tick。tick是由storm框架进行计时,到达设定时间会发送一个特殊的tuple:ticktuple,此时处理定时任务就可以了。
二. 代码
     如果是某一个bolt由定时需求的话,可以按照一下方式设置
  1. 继承BaseBasicBolt
  2. getComponentConfiguration方法设置发送ticktuple间隔时间(单位s)
  3. execute方法判断tuple类型,如果为ticktuple处理定时任务,如果不是处理其他任务。
以下是wordCount中CountBolt代码,每5s输出各单词统计的数据。
1 //继承 BaseBasicBolt 2 public class CountTickBolt extends BaseBasicBolt { 3     private static final Logger logger = LoggerFactory.getLogger(CountTickBolt.class); 4     private Map
count; 5 private Long time; 6 7 @Override 8 public Map
getComponentConfiguration() { 9 //设置发送ticktuple的时间间隔10 Config conf = new Config();11 conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);12 return conf;13 }14 15 @Override16 public void prepare(Map stormConf, TopologyContext context) {17 super.prepare(stormConf, context);18 count = new HashMap
();19 time = System.currentTimeMillis();20 }21 22 @Override23 public void cleanup() {24 super.cleanup();25 }26 27 @Override28 public void execute(Tuple input, BasicOutputCollector collector) {29 //判断是否为tickTuple30 if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&31 input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){32 //是,处理定时任务33 Long nowTime = System.currentTimeMillis();34 Long useTime = nowTime - time;35 StringBuffer sb = new StringBuffer();36 sb.append("======== Use Time :" + useTime + "========\r\n");37 for (Map.Entry wordCount : count.entrySet()){38 sb.append(wordCount.getKey() + "------>" + wordCount.getValue() + "\r\n");39 }40 Long nnTime = System.currentTimeMillis();41 logger.info(sb.toString() + (nnTime - nowTime) );42 time = nnTime;43 }else {44 //否,处理其他数据45 String word = input.getString(0);46 if (count.containsKey(word)){47 int thisWordCont = count.get(word);48 count.put(word, ++thisWordCont);49 }else {50 count.put(word,1);51 }52 }53 }54 55 @Override56 public void declareOutputFields(OutputFieldsDeclarer declarer) {57 58 }
三. 总结
     以上是一个简单的介绍,需要说明的是由于设置时间间隔是秒级的,所以在处理时会有毫秒级的误差,通常是± 2ms。
  以下是没有介绍或者测试到的地方,在以后会补上。
  1. 如何设置此拓扑中所有的spout和bolt都定时处理。
  2. 由于是tuple类型数据,当正常tuple数据量过大时是否会造成tickTuple延时消费。
  WordCout源码:

转载于:https://www.cnblogs.com/nayt/p/6942707.html

你可能感兴趣的文章
图解算法时间复杂度
查看>>
UI_搭建MVC
查看>>
一个样例看清楚JQuery子元素选择器children()和find()的差别
查看>>
代码实现导航栏分割线
查看>>
Windows Phone开发(7):当好总舵主 转:http://blog.csdn.net/tcjiaan/article/details/7281421...
查看>>
VS 2010打开设计器出现错误
查看>>
SQLServer 镜像功能完全实现
查看>>
Vue-详解设置路由导航的两种方法
查看>>
一个mysql主从复制的配置案例
查看>>
大数据学习系列(8)-- WordCount+Block+Split+Shuffle+Map+Reduce技术详解
查看>>
dvwa网络渗透测试环境的搭建
查看>>
Win8 安装VS2012 和 Sql Server失败问题
查看>>
过点(2,4)作一直线在第一象限与两轴围成三角形,问三角形面积的最小值?...
查看>>
java aes CBC的填充方式发现
查看>>
使用ionic cordova build android --release --prod命令打包报有如下错误及解决方法
查看>>
BZOJ 2338 HNOI2011 数矩形 计算几何
查看>>
关于页面<!DOCTYPE>声明
查看>>
【AS3代码】播放FLV视频流的三步骤!
查看>>
C++标准库vector使用(更新中...)
查看>>
cocos2d-x 2.2.6 之 .xml文件数据读取
查看>>