本文共 5063 字,大约阅读时间需要 16 分钟。
1.接上篇内容:https://blog.csdn.net/seanme/article/details/802564602.本次介绍流式处理任务类型
* 流式任务类型:业务实现两个接口:抓取(fetchData)和处理(processData)数据 * a.流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; * b.非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业
3.相关配置(Spring Boot)
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;import com.dangdang.ddframe.job.event.JobEventConfiguration;import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import javax.annotation.PostConstruct;import javax.annotation.Resource;/** * 流式任务类型:业务实现两个接口:抓取(fetchData)和处理(processData)数据 ** a.流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; * b.非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业 * @param*/public abstract class DataflowJobTypeConfig implements DataflowJob { @Resource protected ZookeeperRegistryCenter regCenter; @Resource protected JobEventConfiguration jobEventConfiguration; /** * 作业启动时间的cron表达式 * * @return */ abstract protected String getCron(); /** * 作业 Listener * * @return */ protected ElasticJobListener[] getJobListener(){ return new ElasticJobListener[0]; } /** * 作业分片总数,default 1; * * @return */ protected int getShardingTotalCount() { return 1; } /** * 设置分片序列号和个性化参数对照表. * * * 分片序列号和参数用等号分隔, 多个键值对用逗号分隔. 类似map. 分片序列号从0开始, 不可大于或等于作业分片总数. 如: 0=a,1=b,2=c *
* * @return */ protected String getShardingItemParameters() { return ""; } /** * 执行任务的Class * @return */ abstract protected Class getJobClass(); @PostConstruct public void simpleJobScheduler() { new SpringJobScheduler(this, regCenter, getLiteJobConfiguration(this.getJobClass(), getCron(),getShardingTotalCount(), getShardingItemParameters()), jobEventConfiguration, getJobListener()).init(); } @SuppressWarnings("rawtypes") protected LiteJobConfiguration getLiteJobConfiguration(final Class jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration .newBuilder( new DataflowJobConfiguration( JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)) .overwrite(true).build(); }}
4.样例
import com.alibaba.fastjson.JSON;import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;import org.springframework.context.annotation.Configuration;import java.util.ArrayList;import java.util.List;/** * 示例-流式处理类型 */@Configuration("DemoFlowTask")public class DemoFlowTask extends DataflowJobTypeConfig{ @Override protected String getCron() { return "0 0/1 * * * ?"; } @Override protected Class getJobClass() { return DemoFlowTask.class; } private int testLoopCount=0;// just for test /** * 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; * @param shardingContext * @return */ @Override public List fetchData(ShardingContext shardingContext) { if(testLoopCount>=2){ return null; //如果是返回NULL,则会停止停业 } String execTime = "12292088575058"; System.out.println(testLoopCount+"->testLoopCount *************1 DemoFlowTask fetchData executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext)); List demoList = new ArrayList (); demoList.add("" + execTime); testLoopCount++; try{ Thread.sleep(1000*60); }catch (Exception ex){ ex.printStackTrace(); } System.out.println("*************2 DemoFlowTask fetchData end executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext)); return demoList; } @Override public void processData(ShardingContext shardingContext, List list) { String testData=list != null? list.get(0):""; System.out.println("*************3 DemoFlowTask processData executed:"+testData+"*******************************"+JSON.toJSONString(shardingContext)); }}