博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot整合SpringBatch实用简例
阅读量:4308 次
发布时间:2019-06-06

本文共 15153 字,大约阅读时间需要 50 分钟。

SpringBatch主要是一个轻量级的大数据量的并行处理(批处理)的框架。

作用和Hadoop很相似,不过Hadoop是基于重量级的分布式环境(处理巨量数据),而SpringBatch是基于轻量的应用框架(处理中小数据)。

这里使用SpringBatch做了一个能跑的最简单例子,进行描述SpringBatch的基本作用。

如果需要进行深入学习,请详细参考阅读  ;英文不好的同学,请和我一样右键(翻译成中文查看)。

简单的技术栈 : SpringBoot + SpringBatch + JPA , 完整demo的项目地址 : 

1 . 新建项目springboot-batch,基本的pom.xml依赖 : 

4.0.0
name.ealen
springboot-batch
1.0
org.springframework.boot
spring-boot-starter-parent
2.0.1.RELEASE
org.springframework.boot
spring-boot-starter-batch
org.springframework.boot
spring-boot-starter-data-jpa
mysql
mysql-connector-java
runtime

2 . 你需要在数据库中建立springbatch的相关元数据表,所以你需要在数据库中执行如下来自官方的脚本。

-- do not edit this file-- BATCH JOB 实例表 包含与aJobInstance相关的所有信息-- JOB ID由batch_job_seq分配-- JOB 名称,与spring配置一致-- JOB KEY 对job参数的MD5编码,正因为有这个字段的存在,同一个job如果第一次运行成功,第二次再运行会抛出JobInstanceAlreadyCompleteException异常。CREATE TABLE BATCH_JOB_INSTANCE  (    JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,    VERSION BIGINT ,    JOB_NAME VARCHAR(100) NOT NULL,    JOB_KEY VARCHAR(32) NOT NULL,    constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)) ENGINE=InnoDB;-- 该BATCH_JOB_EXECUTION表包含与该JobExecution对象相关的所有信息CREATE TABLE BATCH_JOB_EXECUTION  (    JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,    VERSION BIGINT  ,    JOB_INSTANCE_ID BIGINT NOT NULL,    CREATE_TIME DATETIME NOT NULL,    START_TIME DATETIME DEFAULT NULL ,    END_TIME DATETIME DEFAULT NULL ,    STATUS VARCHAR(10) ,    EXIT_CODE VARCHAR(2500) ,    EXIT_MESSAGE VARCHAR(2500) ,    LAST_UPDATED DATETIME,    JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,    constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)    references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)) ENGINE=InnoDB;-- 该表包含与该JobParameters对象相关的所有信息CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (    JOB_EXECUTION_ID BIGINT NOT NULL ,    TYPE_CD VARCHAR(6) NOT NULL ,    KEY_NAME VARCHAR(100) NOT NULL ,    STRING_VAL VARCHAR(250) ,    DATE_VAL DATETIME DEFAULT NULL ,    LONG_VAL BIGINT ,    DOUBLE_VAL DOUBLE PRECISION ,    IDENTIFYING CHAR(1) NOT NULL ,    constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)) ENGINE=InnoDB;-- 该表包含与该StepExecution 对象相关的所有信息CREATE TABLE BATCH_STEP_EXECUTION  (    STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,    VERSION BIGINT NOT NULL,    STEP_NAME VARCHAR(100) NOT NULL,    JOB_EXECUTION_ID BIGINT NOT NULL,    START_TIME DATETIME NOT NULL ,    END_TIME DATETIME DEFAULT NULL ,    STATUS VARCHAR(10) ,    COMMIT_COUNT BIGINT ,    READ_COUNT BIGINT ,    FILTER_COUNT BIGINT ,    WRITE_COUNT BIGINT ,    READ_SKIP_COUNT BIGINT ,    WRITE_SKIP_COUNT BIGINT ,    PROCESS_SKIP_COUNT BIGINT ,    ROLLBACK_COUNT BIGINT ,    EXIT_CODE VARCHAR(2500) ,    EXIT_MESSAGE VARCHAR(2500) ,    LAST_UPDATED DATETIME,    constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)) ENGINE=InnoDB;-- 该BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext与Step相关的所有信息CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (    STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,    SHORT_CONTEXT VARCHAR(2500) NOT NULL,    SERIALIZED_CONTEXT TEXT ,    constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)    references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)) ENGINE=InnoDB;-- 该表包含ExecutionContext与Job相关的所有信息CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (    JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,    SHORT_CONTEXT VARCHAR(2500) NOT NULL,    SERIALIZED_CONTEXT TEXT ,    constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION_SEQ (    ID BIGINT NOT NULL,    UNIQUE_KEY CHAR(1) NOT NULL,    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)) ENGINE=InnoDB;INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_EXECUTION_SEQ (    ID BIGINT NOT NULL,    UNIQUE_KEY CHAR(1) NOT NULL,    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)) ENGINE=InnoDB;INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_SEQ (    ID BIGINT NOT NULL,    UNIQUE_KEY CHAR(1) NOT NULL,    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)) ENGINE=InnoDB;INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

3 . 测试数据的实体类 : Access.java

package name.ealen.model;import javax.persistence.*;/** * Created by EalenXie on 2018/9/10 16:17. */@Entity@Tablepublic class Access {    @Id    @GeneratedValue(strategy = GenerationType.AUTO)    private Integer id;    private String username;    private String shopName;    private String categoryName;    private String brandName;    private String shopId;    private String omit;    private String updateTime;    private boolean deleteStatus;    private String createTime;    private String description;    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }    public String getUsername() {        return username;    }    public void setUsername(String username) {        this.username = username;    }    public String getShopName() {        return shopName;    }    public void setShopName(String shopName) {        this.shopName = shopName;    }    public String getCategoryName() {        return categoryName;    }    public void setCategoryName(String categoryName) {        this.categoryName = categoryName;    }    public String getBrandName() {        return brandName;    }    public void setBrandName(String brandName) {        this.brandName = brandName;    }    public String getShopId() {        return shopId;    }    public void setShopId(String shopId) {        this.shopId = shopId;    }    public String getOmit() {        return omit;    }    public void setOmit(String omit) {        this.omit = omit;    }    public String getUpdateTime() {        return updateTime;    }    public void setUpdateTime(String updateTime) {        this.updateTime = updateTime;    }    public boolean isDeleteStatus() {        return deleteStatus;    }    public void setDeleteStatus(boolean deleteStatus) {        this.deleteStatus = deleteStatus;    }    public String getCreateTime() {        return createTime;    }    public void setCreateTime(String createTime) {        this.createTime = createTime;    }    public String getDescription() {        return description;    }    public void setDescription(String description) {        this.description = description;    }    @Override    public String toString() {        return "Access{" +                "id=" + id +                ", username='" + username + '\'' +                ", shopName='" + shopName + '\'' +                ", categoryName='" + categoryName + '\'' +                ", brandName='" + brandName + '\'' +                ", shopId='" + shopId + '\'' +                ", omit='" + omit + '\'' +                ", updateTime='" + updateTime + '\'' +                ", deleteStatus=" + deleteStatus +                ", createTime='" + createTime + '\'' +                ", description='" + description + '\'' +                '}';    }}

4 . 配置一个最简单的Job 之前,准备一些基本配置,例如为Job添加一个监听器 : 

  配置TaskExecutor,ExecutorConfiguration.java

package name.ealen.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/** * 配置TaskExecutor */@Configurationpublic class ExecutorConfiguration {    @Bean    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();        threadPoolTaskExecutor.setCorePoolSize(50);        threadPoolTaskExecutor.setMaxPoolSize(200);        threadPoolTaskExecutor.setQueueCapacity(1000);        threadPoolTaskExecutor.setThreadNamePrefix("Data-Job");        return threadPoolTaskExecutor;    }}

  为Job准备一个简单的监听器 ,实现JobExecutionListener即可 : 

package name.ealen.listener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.core.BatchStatus;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.JobExecutionListener;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * Created by EalenXie on 2018/9/10 15:09. * 一个简单的JOB listener */@Componentpublic class JobListener implements JobExecutionListener {    private static final Logger log = LoggerFactory.getLogger(JobListener.class);    @Resource    private ThreadPoolTaskExecutor threadPoolTaskExecutor;    private long startTime;    @Override    public void beforeJob(JobExecution jobExecution) {        startTime = System.currentTimeMillis();        log.info("job before " + jobExecution.getJobParameters());    }    @Override    public void afterJob(JobExecution jobExecution) {        log.info("JOB STATUS : {}", jobExecution.getStatus());        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {            log.info("JOB FINISHED");            threadPoolTaskExecutor.destroy();        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {            log.info("JOB FAILED");        }        log.info("Job Cost Time : {}ms" , (System.currentTimeMillis() - startTime));    }}

5 . 配置一个最基本的Job : 一个Job 通常由一个或多个Step组成(基本就像是一个工作流);一个Step通常由三部分组成(读入数据 ItemReader,处理数据 ItemProcessor,写入数据 ItemWriter)

package name.ealen.batch;import name.ealen.listener.JobListener;import name.ealen.model.Access;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.JpaPagingItemReader;import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;import javax.persistence.EntityManagerFactory;/** * Created by EalenXie on 2018/9/10 14:50. * :@EnableBatchProcessing提供用于构建批处理作业的基本配置 */@Configuration@EnableBatchProcessingpublic class DataBatchConfiguration {    private static final Logger log = LoggerFactory.getLogger(DataBatchConfiguration.class);    @Resource    private JobBuilderFactory jobBuilderFactory;    //用于构建JOB    @Resource    private StepBuilderFactory stepBuilderFactory;  //用于构建Step    @Resource    private EntityManagerFactory emf;           //注入实例化Factory 访问数据    @Resource    private JobListener jobListener;            //简单的JOB listener    /**     * 一个简单基础的Job通常由一个或者多个Step组成     */    @Bean    public Job dataHandleJob() {        return jobBuilderFactory.get("dataHandleJob").                incrementer(new RunIdIncrementer()).                start(handleDataStep()).    //start是JOB执行的第一个step//                next(xxxStep()).//                next(xxxStep()).//                ...        listener(jobListener).      //设置了一个简单JobListener                build();    }    /**     * 一个简单基础的Step主要分为三个部分     * ItemReader : 用于读取数据     * ItemProcessor : 用于处理数据     * ItemWriter : 用于写数据     */    @Bean    public Step handleDataStep() {        return stepBuilderFactory.get("getData").                
chunk(100). //
<输入,输出>
。chunk通俗的讲类似于SQL的commit; 这里表示处理(processor)100条后写入(writer)一次。 faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class). //捕捉到异常就重试,重试100次还是异常,JOB就停止并标志失败 reader(getDataReader()). //指定ItemReader processor(getDataProcessor()). //指定ItemProcessor writer(getDataWriter()). //指定ItemWriter build(); } @Bean public ItemReader
getDataReader() { //读取数据,这里可以用JPA,JDBC,JMS 等方式 读入数据 JpaPagingItemReader
reader = new JpaPagingItemReader<>(); //这里选择JPA方式读数据 一个简单的 native SQL String sqlQuery = "SELECT * FROM access"; try { JpaNativeQueryProvider
queryProvider = new JpaNativeQueryProvider<>(); queryProvider.setSqlQuery(sqlQuery); queryProvider.setEntityClass(Access.class); queryProvider.afterPropertiesSet(); reader.setEntityManagerFactory(emf); reader.setPageSize(3); reader.setQueryProvider(queryProvider); reader.afterPropertiesSet(); //所有ItemReader和ItemWriter实现都会在ExecutionContext提交之前将其当前状态存储在其中,如果不希望这样做,可以设置setSaveState(false) reader.setSaveState(true); } catch (Exception e) { e.printStackTrace(); } return reader; } @Bean public ItemProcessor
getDataProcessor() { return new ItemProcessor
() { @Override public Access process(Access access) throws Exception { log.info("processor data : " + access.toString()); //模拟 假装处理数据,这里处理就是打印一下 return access; } };// lambda也可以写为:// return access -> {// log.info("processor data : " + access.toString());// return access;// }; } @Bean public ItemWriter
getDataWriter() { return list -> { for (Access access : list) { log.info("write data : " + access); //模拟 假装写数据 ,这里写真正写入数据的逻辑 } }; }}

6 . 配置好基本的Job之后,为Access表导入一些基本的数据(git上面有demo数据,access.sql),写一个SpringBoot的启动类进行测试。

  注意 : Job中的各个组件请使用@Bean注解声明,这样在元数据中才会有相应的正常操作记录 : 

package name.ealen;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;/** * Created by EalenXie on 2018/9/10 14:41. */@SpringBootApplicationpublic class SpringBatchApplication {    public static void main(String[] args) {        SpringApplication.run(SpringBatchApplication.class, args);    }}

7 . 运行可以看到基本数据处理效果,这里是模拟处理,和模拟写入 : 

8 . 从元数据等表中查看验证JOB的执行情况 : 

  

这里提一下,之前写过一篇的整合, 大家应该想到些什么了吧。SpringBatch像是一个天然的Job,Quartz是完全可以做为它运作的调度器。两者结合,效果很不错。

感谢各位提出意见和支持。

 

转载于:https://www.cnblogs.com/ealenxie/p/9647703.html

你可能感兴趣的文章
测试—自定义消息处理
查看>>
MFC中关于虚函数的一些问题
查看>>
根据图层名获取图层和图层序号
查看>>
规范性附录 属性值代码
查看>>
提取面狭长角
查看>>
Arcsde表空间自动增长
查看>>
Arcsde报ora-29861: 域索引标记为loading/failed/unusable错误
查看>>
记一次断电恢复ORA-01033错误
查看>>
C#修改JPG图片EXIF信息中的GPS信息
查看>>
从零开始的Docker ELK+Filebeat 6.4.0日志管理
查看>>
How it works(1) winston3源码阅读(A)
查看>>
How it works(2) autocannon源码阅读(A)
查看>>
How it works(3) Tilestrata源码阅读(A)
查看>>
How it works(12) Tileserver-GL源码阅读(A) 服务的初始化
查看>>
uni-app 全局变量的几种实现方式
查看>>
echarts 为例讲解 uni-app 如何引用 npm 第三方库
查看>>
uni-app跨页面、跨组件通讯
查看>>
springmvc-helloworld(idea)
查看>>
JDK下载(百度网盘)
查看>>
idea用得溜,代码才能码得快
查看>>