博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Elastic Job入门示例-实现DataflowJob接口
阅读量:4041 次
发布时间:2019-05-24

本文共 5063 字,大约阅读时间需要 16 分钟。

1.接上篇内容:https://blog.csdn.net/seanme/article/details/80256460

2.本次介绍流式处理任务类型

 *  流式任务类型:业务实现两个接口:抓取(fetchData)和处理(processData)数据 *    a.流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; *    b.非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业 

3.相关配置(Spring Boot)

import com.dangdang.ddframe.job.api.dataflow.DataflowJob;import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;import com.dangdang.ddframe.job.event.JobEventConfiguration;import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import javax.annotation.PostConstruct;import javax.annotation.Resource;/** *  流式任务类型:业务实现两个接口:抓取(fetchData)和处理(processData)数据 **     a.流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; *      b.非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业 * @param 
*/public abstract class DataflowJobTypeConfig
implements DataflowJob
{ @Resource protected ZookeeperRegistryCenter regCenter; @Resource protected JobEventConfiguration jobEventConfiguration; /** * 作业启动时间的cron表达式 * * @return */ abstract protected String getCron(); /** * 作业 Listener * * @return */ protected ElasticJobListener[] getJobListener(){ return new ElasticJobListener[0]; } /** * 作业分片总数,default 1; * * @return */ protected int getShardingTotalCount() { return 1; } /** * 设置分片序列号和个性化参数对照表. * *

* 分片序列号和参数用等号分隔, 多个键值对用逗号分隔. 类似map. 分片序列号从0开始, 不可大于或等于作业分片总数. 如: 0=a,1=b,2=c *

* * @return */ protected String getShardingItemParameters() { return ""; } /** * 执行任务的Class * @return */ abstract protected Class getJobClass(); @PostConstruct public void simpleJobScheduler() { new SpringJobScheduler(this, regCenter, getLiteJobConfiguration(this.getJobClass(), getCron(),getShardingTotalCount(), getShardingItemParameters()), jobEventConfiguration, getJobListener()).init(); } @SuppressWarnings("rawtypes") protected LiteJobConfiguration getLiteJobConfiguration(final Class
jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration .newBuilder( new DataflowJobConfiguration( JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)) .overwrite(true).build(); }}

4.样例

import com.alibaba.fastjson.JSON;import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;import org.springframework.context.annotation.Configuration;import java.util.ArrayList;import java.util.List;/** *  示例-流式处理类型 */@Configuration("DemoFlowTask")public class DemoFlowTask extends DataflowJobTypeConfig
{ @Override protected String getCron() { return "0 0/1 * * * ?"; } @Override protected Class getJobClass() { return DemoFlowTask.class; } private int testLoopCount=0;// just for test /** * 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; * @param shardingContext * @return */ @Override public List
fetchData(ShardingContext shardingContext) { if(testLoopCount>=2){ return null; //如果是返回NULL,则会停止停业 } String execTime = "12292088575058"; System.out.println(testLoopCount+"->testLoopCount *************1 DemoFlowTask fetchData executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext)); List
demoList = new ArrayList
(); demoList.add("" + execTime); testLoopCount++; try{ Thread.sleep(1000*60); }catch (Exception ex){ ex.printStackTrace(); } System.out.println("*************2 DemoFlowTask fetchData end executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext)); return demoList; } @Override public void processData(ShardingContext shardingContext, List
list) { String testData=list != null? list.get(0):""; System.out.println("*************3 DemoFlowTask processData executed:"+testData+"*******************************"+JSON.toJSONString(shardingContext)); }}
你可能感兴趣的文章
CocoaPods实践之制作篇
查看>>
[Mac]Mac 操作系统 常见技巧
查看>>
苹果Swift编程语言入门教程【中文版】
查看>>
捕鱼忍者(ninja fishing)之游戏指南+游戏攻略+游戏体验
查看>>
iphone开发基础之objective-c学习
查看>>
iphone开发之SDK研究(待续)
查看>>
计算机网络复习要点
查看>>
Variable property attributes or Modifiers in iOS
查看>>
NSNotificationCenter 用法总结
查看>>
C primer plus 基础总结(一)
查看>>
剑指offer算法题分析与整理(一)
查看>>
剑指offer算法题分析与整理(三)
查看>>
Ubuntu 13.10使用fcitx输入法
查看>>
pidgin-lwqq 安装
查看>>
mint/ubuntu安装搜狗输入法
查看>>
C++动态申请数组和参数传递问题
查看>>
opencv学习——在MFC中读取和显示图像
查看>>
retext出现Could not parse file contents, check if you have the necessary module installed解决方案
查看>>
Matlab与CUDA C的混合编程配置出现的问题及解决方案
查看>>
PaperDownloader——文献命名6起来
查看>>