---
title: 'SpringBatch 學習'
disqus: hackmd
---
# X. 聲明
* 本篇筆記,參考billbill上的浪飛yes的SpringBatch的教學影片
# 1. 基本範例
* SpringBatch基本架構

## 1.1 設定一個給SpringBatch用的DataSource
* **作用**:紀錄Job的`執行狀態`、`步驟`、`參數`
* 需要創建一個**DataSource**和**TransactionManager**
* SpringBatch的底層**使用JdbcTemplate** ->`JdbcTransactionManager`
* 代碼:
```java=
@Configuration
public class DataSourceConfiguration {
@Bean("BatchDataSource")
public DataSource getDataSource() {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setJdbcUrl("jdbc:postgresql://localhost:5432/batch");
hikariDataSource.setDriverClassName("org.postgresql.Driver");
hikariDataSource.setConnectionTimeout(3000L);//連接超時
hikariDataSource.setReadOnly(false);//是否只讀數據劇
hikariDataSource.setIdleTimeout(3000L);//最小維持時間
hikariDataSource.setMaxLifetime(60000L);//最大維持時間
hikariDataSource.setMaximumPoolSize(10);//線程池最大維持數量
hikariDataSource.setMinimumIdle(5);//線程池最小維持數量
return hikariDataSource;
}
@Bean("BatchTransactionManager")
public JdbcTransactionManager batchTransactionManager(@Qualifier("BatchDataSource") DataSource dataSource) {
return new JdbcTransactionManager(dataSource);
}
}
```
## 1.2 實現一個簡單任務接口
* 實現**Tasklet**接口
* 代碼:
```java=
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.Map;
//創建批次處理的任務
public class MessageTaskLet implements Tasklet {
private final static Logger LOGGER = LoggerFactory.getLogger(MessageTaskLet.class);
//返回該步驟的處理狀態 RepeatStatus
//RepeatStatus.FINISHED 該Step完成
//RepeatStatus.CONTINUABLE 該Step需要再次執行(無窮循環執行)
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//獲取Job參數
Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
LOGGER.info("數據操作處理 ,name = {}", jobParameters.get("name"));
return RepeatStatus.FINISHED;//處理結束
}
}
```
## 1.3 設定任務Job
* 啟用SpringBatch,使用 **@EnableBatchProcessing**,設定:
* **數據源**(`dataSourceRef`)
* **事務管理器**(`transactionManagerRef`)
* **Job類**
* 代碼:
```java=
@EnableBatchProcessing(dataSourceRef = "BatchDataSource", transactionManagerRef = "BatchTransactionManager")
@Configuration
public class SpringBatchConfiguration {
private final static Logger LOGGER = LoggerFactory.getLogger(SpringBatchConfiguration.class);
@Qualifier("BatchTransactionManager")
@Autowired
private JdbcTransactionManager jdbcTransactionManager;
//SpringBoot 啟動後,會自動加載Job對象,並將Job對象交給 JobLauncherApplicationRunner 類(實現 ApplicationRunner ),再藉由JobLauncher類別實作job執行
//從頭到尾獨立執行的有序的步驟(Step)清單。
@Bean("myJob")
public Job job(JobRepository jobRepository) {
return new JobBuilder("myJob", jobRepository)
//1. 第一個step,獨立執行
.start(new StepBuilder("myStep1", jobRepository)
//1.1 簡單的tasklet模式,還有Chunk塊處理模式
.tasklet(new MessageTaskLet(), jdbcTransactionManager)
.build())
.build();
}
}
```
## 1.4 執行任務
* 注入**JobLauncher**、該**Job**
* **JobLauncher**的`run()`方法啟動Job
* **代碼**:
```java=
@SpringBootTest
class BatchTest1ApplicationTests {
@Autowired
private JobLauncher jobLauncher;
@Qualifier("paramJob")
@Autowired
private Job paramJob;
@Test
void contextLoads() throws Exception {
//傳入參數
Map<String, JobParameter<?>> map = new HashMap<>();
map.put("name", new JobParameter<>("nicolas", String.class));
//啟動Job,並放入參數
jobLauncher.run(paramJob, new JobParameters(map));
}
}
```
* **Job的名稱+參數不可以重複,需唯一**
# 2. Job-作業對象
* Job 由**一個**或**多個**Step組合而成
* 定義從頭到尾獨立執行的有序的步驟(Step)清單
* 有序清單:由不同的step組合而成
* 從頭到尾:前一個step完成,才會執行下一個,按照一定的順序執行
* 獨立執行:每一個Job不應該受外部依賴的影響
* **Job名稱不能重複**
* 由 **JobLauncher** 啟動
* **Job執行為相互獨立**
* 確保每次Job的獨立性 =>**Job Instance**(`作業實例`)與**Job Execution**(`作業執行物件`)
* **Job Instance**(`作業實例`):**每次**Job**運行**都會`創建出一個Job Instance`,**表示Job的一次邏輯運行**
* **Job名稱、標示、參數來區分Job**,例:一個業務需求:每天定期資料同步
* 作業名稱:daily-syncjob
* 作業標記參數:當天時間
* **Job Executionk**:**每次**Job**運行**都會`創建出一個Job Execution`,負責**紀錄Job的執行中所發生的情況**
* **Job Instance+Job Execution的架構** => **因為批次處理的過程中可能出現兩種情況**:
* **Job => Job Instance => Job Execution**
* **Job 一次成功**:
* 紀錄**一條**Job Instance信息
* 紀錄**一條**Job Execution信息
* **Job 異常執行**(`執行多次才成功`)
* 紀錄**一條**Job Instance信息(`識別為相同參數,不會重複紀錄`)
* 紀錄**一~?條**Job Execution信息 (`該相同的Job 運行多次`)
* **總結**:
* Job Instance:`Job名稱`+`識別參數`
* Job Instance 一次執行就創建一個Job Execution
* 完整一次的Job Instance,可能會創建一個或多個 Job Execution
## 2.1 Job 參數
* JobLauncher的run啟動Job,返回Job Execution
* 實現類:SimpleJobLauncher
### 2.1.1 JobParameters類
* **作用**:**封裝所有傳遞給Job的參數**
* 相同的JobParameters參數值,`再執行一次處理,會直接報錯`
* **原因**:SpringBatch**相同Job名**與**相同識別參數**`只能成功執行一次`
* 保證 Job Instance:`Job名稱`+`識別參數` 的唯一、執行為獨立操作
* **例外**:`org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException`
* **錯誤訊息**:`A job instance already exists and is complete for identifying parameters ...`
### 2.1.2 獲取參數
#### 2.1.2.1 使用ChunkContext類
* **基本範例**:
```java=
//創建批次處理的任務
public class MessageTaskLet implements Tasklet {
private final static Logger LOGGER = LoggerFactory.getLogger(MessageTaskLet.class);
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//獲取Job參數
Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
LOGGER.info("數據操作處理 ,jobParameters => name = {}", jobParameters.get("name"));
return RepeatStatus.FINISHED;//處理結束
}
}
```
* **方法**:**chunkContext.getStepContext().getJobParameters()**
#### 2.1.2.2 使用@Valueu延時獲取-常用
* **步驟**:
1. 貼上 **@StepScope**
* **@StepScope**(`延時獲取`):**表示啟動時,不要加載該Bean,當此Bean被調用到時才加載**
3. 使用固定的spl表達式:**#{jobParameters['key的名稱']}**
* SpringBoot中會有一個jobParameters的Bean
* **代碼**:**key為”name“**
```java=
@StepScope
@Bean
public Tasklet getTasklet(@Value("#{jobParameters['name']}") String name) {//固定的
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println(name);
return RepeatStatus.FINISHED;
}
};
}
```
### 2.1.3 自定參數校驗-JobParametersValidator
* 實現**JobParametersValidator**接口的`validate方法`
* **校驗失敗**拋出**JobParametersInvalidException**異常(`拋出異常的方式來結束`)
* 可以拋出自定義的異常
* **自定義參數驗證器**
```java=
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
//目標:當value為null或""時,拋出異常
public class NameParametersValidator implements JobParametersValidator {
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
String name = parameters.getString("name");
if (name == null || name.equals("")) {
throw new JobParametersInvalidException("Name is blank.");
}
}
}
```
* **設定**自定義參數驗證器
```java=
@Bean("paramJob")
public Job paramJob(JobRepository jobRepository) {
return new JobBuilder("paramJob", jobRepository)
//1. 第一個step,獨立執行
.start(new StepBuilder("paramJobStep1", jobRepository)
//簡單的tasklet模式,還有Chunk塊處理模式
.tasklet(new MessageTaskLet(), jdbcTransactionManager)
.build())
//X. 指定JobParameter的校驗器
.validator(new NameParametersValidator())
.build();
}
```
#### 2.1.4 SpringBatch 默認參數校驗器
* **兩個默認參數校驗器**:`都實現JobParametersValidator接口`
* **DefaultJobParametersValidator**(`默認參數校驗器`)
* **CompositeJobParametersValidator**(`组合參數校驗器`)
* 組合多個參數校驗器
* **DefaultJobParametersValidator**
* 維護2個key集合**requiredkeys**跟**optionalkeys**
* **requiredkeys**是一個集合,表示作**JobParameters**中**必須包含集合中指定的keys**
* **optionalkeys**也是一個集合,該**集合中的key是可選參數**
* **代碼**:
```java=
public DefaultJobParametersValidator getDefaultJobParametersValidator() {
DefaultJobParametersValidator defaultJobParametersValidator = new DefaultJobParametersValidator();
//必填key => name
defaultJobParametersValidator.setRequiredKeys(new String[]{"name1"});
//選填key => age
defaultJobParametersValidator.setOptionalKeys(new String[]{"age"});
return defaultJobParametersValidator;
}
```
* **缺少key的錯誤訊息**:`The JobParameters do not contain required keys: [name1]`
* **CompositejobParametersValidator**
* 傳入一個`List<JobParametersValidator>`,驗證時會遍歷驗證器
* **代碼**:
```java=
public CompositeJobParametersValidator getCompositejobParametersValidator() {
CompositeJobParametersValidator compositeJobParametersValidator = new CompositeJobParametersValidator();
//設定驗證器
compositeJobParametersValidator.setValidators(List.of(
getDefaultJobParametersValidator(),//默認參數較驗器
new NameParametersValidator())//自定較驗器
);
return compositeJobParametersValidator;
}
```
### 2.1.4 增量參數-JobParametersIncrementer
* **注意**:該**JobParametersIncrementer**被調用的位置在**JobLauncherApplicationRunner**(`使用手動調用Job的話,無法啟用增量參數`)
* 使用**JobParametersValidator**接口,實現`getNext方法`
* 作用:**改動JobParameters參數**
* 解決使用相同參數的情況
* 源碼:
```java=
@FunctionalInterface
public interface JobParametersIncrementer {
JobParameters getNext(@Nullable JobParameters parameters);
}
```
#### 2.1.4.1 Job遞增run.id參數
* **RunIdIncrementer**:自增參數增量器 `run.id`
* 每次啟動時,裡面維護名為`run.id`標識參數,每次啟動讓其自增1。
* **代碼**:
```java=
@Bean("paramJob")
public Job paramJob(JobRepository jobRepository) {
return new JobBuilder("paramJob", jobRepository)
//1. 第一個step,獨立執行
.start(new StepBuilder("paramJobStep1", jobRepository)
//簡單的tasklet模式,還有Chunk塊處理模式
.tasklet(new MessageTaskLet(), jdbcTransactionManager)
.build())
//X. 設定 增量參數
.incrementer(new RunIdIncrementer())//run.id自增
.build();
}
```
#### 2.1.4.2 自定-時間戳
* 實作**JobParametersIncrementer**,傳入一個K-V
* 代碼:
```java=
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
import java.time.LocalDateTime;
public class LocalDataTimeIncrementer implements JobParametersIncrementer {
@Override
public JobParameters getNext(JobParameters parameters) {
return new JobParametersBuilder().addLocalDateTime("daily", LocalDateTime.now()).toJobParameters();
}
}
```
## 2.2 Job 監聽器
* **作用**:用於**監聽Job的執行過程**。在作業**執行前**,**執行後**2個時間點嵌入業務邏輯
* **執行前**:一般用於**初始化操作作**,作業執行前需著手準備工作
* **執行後**:業務執行完後,需要做各種**清理動作**,例如`釋放資源`等。
* **JobExecutionListener接口**:**實現Job監聽**
* **注意**:該**Job**需要通過**JobParametersValidator**才會到**監聽的環節**
* **源碼**:
```java=
public interface JobExecutionListener {
//執行前
default void beforeJob(JobExecution jobExecution) {
}
//執行後
default void afterJob(JobExecution jobExecution) {
}
}
```
### 2.2.1 接口實現方式
* 實作**JobExecutionListener接口**:
* 目標:紀錄狀態
* 代碼:
```java=
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class JobStatusListener implements JobExecutionListener {
private static final Logger LOGGER = LoggerFactory.getLogger(JobStatusListener.class);
@Override
public void beforeJob(JobExecution jobExecution) {
LOGGER.info("Job 開始執行,status = {}", jobExecution.getStatus());
}
@Override
public void afterJob(JobExecution jobExecution) {
LOGGER.info("Job 執行後,status = {}", jobExecution.getStatus());
}
}
```
* step:
```java=
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.Map;
//創建批次處理的任務
public class MessageTaskLet implements Tasklet {
private final static Logger LOGGER = LoggerFactory.getLogger(MessageTaskLet.class);
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//通過StepExecution獲取JobExecution
BatchStatus status = contribution.getStepExecution().getJobExecution().getStatus();
LOGGER.info("Job 執行中,status = {}", status);
return RepeatStatus.FINISHED;//處理結束
}
}
```
* **使用**:
```java=
@Bean("paramJob")
public Job paramJob(JobRepository jobRepository) {
return new JobBuilder("paramJob", jobRepository)
.incrementer(new RunIdIncrementer())
//1. 第一個step,獨立執行
.start(new StepBuilder("paramJobStep1", jobRepository)
//簡單的tasklet模式,還有Chunk塊處理模式
.tasklet(new MessageTaskLet(), jdbcTransactionManager)
.build())
//X. 設定Job監聽器
.listener(new JobStatusListener())
.build();
}
```
### 2.2.2 註解實現方式
* **概念**:**切面的邏輯**
* **使用註解**:
* **@BeforeJob**:執行前
* **@AfterJob**:執行後
* **使用方式與接口方式相同**
* **代碼**:
```java=
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.annotation.AfterJob;
import org.springframework.batch.core.annotation.BeforeJob;
public class JobStatusListenerByAnnotation {
private static final Logger LOGGER = LoggerFactory.getLogger(JobStatusListenerByAnnotation.class);
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
LOGGER.info("A Job 開始執行,status = {}", jobExecution.getStatus());
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
LOGGER.info("A Job 執行後,status = {}", jobExecution.getStatus());
}
}
```
## 2.3 執行上下文 ExecutionContext
* **總結**:
* **Step資料保存在StepExecutionContext,只能在Step中使用**
* **Job資料保存在jobExecutionContext,可以在所有Step中共享**
* **開啟資料庫觀察表**:`batch_job_execution_context`、`batch_step_execution_context`表
* **JobContext**資料儲存到:`batchjob_execution_context`
* **StepContext**資料儲存到:`batchstep_executioncontext`
### 2.3.1 上下文
* 上下文有`環境`、`語境`、`氛圍`的意思
* Spring使用`Context`表示上下文,例:
* Spring容器:SpringApplicationContext
* SpringBatch中:
* **JobContext**:維護JobExecution對象,實現Job收尾工作,與處理各種Job回調邏輯
* **JobContext**綁定**JobExecution執行對象**為**Job**提供**執行環境**(`上下文`)
* **StepContext**:維護StepExecution對象,實現Step收尾工作,與處理各種Step回調邏輯
* **StepContext**綁定**StepExecution執行對象**為**Step**提供**執行環境**(`上下文`)
### 2.3.2 執行上下文
* **ExecutionContext**:SpringBatch的**執行上下文**
* **作用**:**數據共享**
* **ExecutionContext分為兩大類**:
* **Job的ExecutionContext**:
* **作用域**:一次Job運行,**所有Step間**資料共用
* **Step的ExecutionContext**:
* **作用域**:一次Step運行,**單一Step間**(`ItemReader/ItemProcessor/ItemWrite組件間`)資料共享
### 2.3.3 Job與Step的執行鏈

### 2.3.4 Job與Step的引用鏈
* Job鏈:Job -> JobInstance -> JobContext -> JobExecution -> ExecutionContext(`Job`)
* Step鏈:Step -> StepContext -> StepExecution -> ExecutionContext(`Step`)
### 2.3.5 JobContext的API
* 使用一個**工具類**創建**JobContext**:`JobSynchronizationManager.getContext();`
* 代碼:
```java=
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.scope.context.JobContext;
import org.springframework.batch.core.scope.context.JobSynchronizationManager;
import java.util.Map;
public class UseJobContxt {
public void execute() {
//使用一個工具類創建JobContext
JobContext context = JobSynchronizationManager.getContext();
JobExecution jobExecution = context.getJobExecution();
Map<String, Object> jobParameters = context.getJobParameters();
Map<String, Object> jobExecutionContext = context.getJobExecutionContext();
}
}
```
### 2.3.6 StepContext的API
* **代碼**:
```java=
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.StepContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.Map;
public class UseStepContext implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//獲取Step上下文 => StepContext
StepContext stepContext = chunkContext.getStepContext();
StepExecution stepExecution = stepContext.getStepExecution();
//當前Step執行上下文
Map<String, Object> stepExecutionContext = stepContext.getStepExecutionContext();
//當次Job執行上下文
Map<String, Object> jobExecutionContext = stepContext.getJobExecutionContext();
//通過StepExecution獲取JobExecution
BatchStatus status = contribution.getStepExecution().getJobExecution().getStatus();
//獲取Job參數
Map<String, Object> jobParameters = stepContext.getJobParameters();
return null;
}
}
```
### 2.3.7 ExecutionContext的API
* **代碼**:
```java=
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.StepContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
public class UseExecutionContext implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//Step => StepContext -> StepExecution -> ExecutionContext
StepContext stepContext = chunkContext.getStepContext();
StepExecution stepExecution = stepContext.getStepExecution();
ExecutionContext executionContext = stepExecution.getExecutionContext();
executionContext.put("key1","AAA");
//Job => JobExecution -> ExecutionContext
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext executionContext1 = jobExecution.getExecutionContext();
executionContext.put("key2","AAA");
return null;
}
}
```
### 2.3.8 使用案例
* **需求**:觀察**JobExecutionContext**與**StepExecutionContext**資料共享
* **步驟**:
* 定義兩個step
* 在**step1**中**設定數據**
* JobExecutionContext 添加:key-step1-job = value-step1-job
* StepExecutionContext 添加:key-step1-step = value-step1-step
* 在**step2**中**打印ExecutionContext**觀察
* **代碼**:
* step1:
```java=
public class ObserveStep1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
StepContext stepContext = chunkContext.getStepContext();
//Step
//Map 可以獲取數據,但不可以修改
Map<String, Object> map = stepContext.getStepExecutionContext();
//通過執行上下文,獲取參數並修改
ExecutionContext stepExecutionContext = stepContext.getStepExecution().getExecutionContext();
stepExecutionContext.put("key-step1-step A","value-step1-step");
//Job
ExecutionContext jobExecutionContext = stepContext.getStepExecution().getJobExecution().getExecutionContext();
jobExecutionContext.put("key-step1-job X","value-step1-job");
return RepeatStatus.FINISHED;
}
}
```
* step2:
```java=
public class ObserveStep2 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
StepContext stepContext = chunkContext.getStepContext();
System.out.println("Step: "+ stepContext.getStepExecution().getExecutionContext().get("key-step1-step"));
System.out.println("-------");
System.out.println("Job: "+stepContext.getJobExecutionContext().get("key-step1-job"));
return RepeatStatus.FINISHED;
}
}
```
* 設定:
```java=
@Bean("contextJob")
public Job contextJob(JobRepository jobRepository) {
return new JobBuilder("contextJob", jobRepository)
.incrementer(new RunIdIncrementer())
//1. 第一個step,獨立執行
.start(new StepBuilder("contextJobStep1", jobRepository)
.tasklet(new ObserveStep1(), jdbcTransactionManager)
.build())
.next(new StepBuilder("contextJobStep2", jobRepository)
.tasklet(new ObserveStep2(), jdbcTransactionManager)
.build())
.build();
}
```
* 結果:
```=
Step: null
-------
Job: value-step1-job
```
* Step中的數據無法共享給其他的Step,所以Step的數據為null
* Job中的數據是共享整個Job
# 3. Step-步驟對象
* **支持兩種Step處理模式**:
* **Tasklet**處理模式
* 簡單
* 實現**Tasklet接口**,就可以構建一個Step代碼,循環執行,直到返回`RepeatStatus.FINISHED`
* **chunk**(`塊`)處理模式
* 包含2~3個組件:
1. ltemReader
2. ltemProcessor(`可選`)
3. ltemWriter
## 3.1 Tasklet 處理模式
* **Tasklet接口源碼**:
```java=
@FunctionalInterface
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
```
* **重寫execute方法**
* **參數**:
* **StepContribution**:`step信息對象`
* 主要**設定Step結果狀態**:`contribution.setExitStatus(ExitStatus status)`
* **contribution.setexitstatus(Exitstatus.COMPLETED)**
* **ChunkContext**:`chunck上下文`
* **紀錄chunk的執行環境**,可獲取`StepContext`\`JobContext`
* **返回值**:
* **RepeatStatus**(`枚舉類`):**當前Step狀態**
* RepeatStatus.**CONTINUABLE**:當前Step**循環執行**
* RepeatStatus.**FINISHED**:當前Step**執行結束**
## 3.2 Chunk 處理模式
* **三個模塊**(`接口`):
* **ltemReader\<T>**:讀模塊
* **ltemProcessor\<I, O>**(`可選`):處理模塊
* **ltemWriter\<T>**:寫模塊
* **結構圖**:

* **時序圖**:

* **item**:`數據`
### 3.2.1 案例:Chunk基本使用
* **代碼**:
* **接口實現**
* **ltemReader\<T>**
```java=
public class SimpleTaskletChunkItemReader implements ItemReader {
//讀三次後結束
private int size = 3;
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (size > 0) {
size--;
System.out.println("============= Reader ==========");
return "read-ret";
} else {
return null;
}
}
}
```
* **T**:**寫出類型**
* **ltemProcessor\<I, O>**
```java=
public class SimpleTaskletChunkItemProcessor implements ItemProcessor {
@Override
public Object process(Object item) throws Exception {
//item = 進來的數據
System.out.println("============= Processor =========== " + item);
//加工後輸出數據
return "process-ret -> " + item;
}
}
```
* **I**:**讀入類型**
* **O**:**寫出類型**
* **ltemWriter\<T>**
```java=
public class SimpleTaskletChunkItemWriter implements ItemWriter {
@Override
public void write(Chunk chunk) throws Exception {
List items = chunk.getItems();
System.out.println(items);
}
}
```
* **T**:**讀入類型**
* 配置
```java=
//基本Chunk處理模式範例
@Bean("simpleChunkJob")
public Job simpleChunkJob(JobRepository jobRepository) {
return new JobBuilder("simpleChunkJob", jobRepository)
.start(new StepBuilder("simpleChunkJob-step1", jobRepository)
.chunk(3, jdbcTransactionManager)
.reader(new SimpleTaskletChunkItemReader())
.processor(new SimpleTaskletChunkItemProcessor())
.writer(new SimpleTaskletChunkItemWriter()).build())
.build();
}
```
* **chunk(chunkSize,transactionManager)**:
* **chunkSize**:**一次批次**只處理 **?條** 數據
* **結果**:
* **持續循環打印**,先讀`三次` -> 再處理`三次` -> **最後一次性處理所有**(`ltemWriter`)
```
============= Reader ==========
============= Reader ==========
============= Reader ==========
============= Processor =========== read-ret
============= Processor =========== read-ret
============= Processor =========== read-ret
[process-ret -> read-ret, process-ret -> read-ret, process-ret -> read-ret]
```
* **chunkSize**設定為**3**,所以**該批次最多處理3條數據**(`多的數據,下一個批次處理`)
* **死循環重複上述步驟**(`因為一直有獲取到數據`),直到**ltemReader**返回null
* **特點**:**ltemReader會一直讀取**,直到`返回null,才停止`
* **ltemProcessor是跟著ltemReader**:ltemReader讀幾次,ltemProcessor也對應的執行幾次
## 3.3 Step 監聽器
* **兩種Step 監聽器**:
* **StepExecutionListener**:監聽`Step前後`
* **ChunkListener**:監聽`Chunk塊執行前後`
* **兩種監聽的配置方式相同**
### 3.3.1 StepExecutionListener接口
* **源碼**:
```java=
public interface StepExecutionListener extends StepListener {
default void beforeStep(StepExecution stepExecution) {
}
@Nullable
default ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
```
* ExitStatus:Step狀態
* 代碼:
```java=
public class SimpleStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("----------Before Step----------");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("----------After Step----------");
return stepExecution.getExitStatus();
}
}
```
* 配置:
```java=
@Bean("step1")
public Step getStep1(JobRepository jobRepository) {
return new StepBuilder("paramJobStep1", jobRepository)
//簡單的tasklet模式,還有Chunk塊處理模式
.tasklet(new MessageTaskLet(), jdbcTransactionManager)
//設定Step監聽器
.listener(new SimpleStepListener())
.build();
}
```
## 3.4 Step控制
* **目標**:根據**條件**來`決定Step的執行`
* **可使用的API**:改變Ste的執行順序
* **start()**:定義開始的Step
* **next()**:定義下一個的Step
* **on(條件)**:如果**滿足該條件**(`上一個Step的返回值`),則執行該定義後續Step
* **條件**:**ExitStatus**或**自定義ExitStatus**的字串
* \*號:通配符
* **from()**:定義**從哪個 Step 開始進行條件判斷**。它通常和 `on()`、`to()` 結合使用。
* **to()**:定義流向下一個Step
* **end()**:結束分支判斷
### 3.4.1 使用Step的狀態進行Step控制-ExitStatus
* **需求**:作業執行**firstStep**步驟,如果處理成功執行**successStep**,如果處理失敗執行**failStep**
* 代碼:
* **firstStep**:
```java=
public class FirstStep implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("======== FirstStep =========");
// throw new Exception("模擬FirstStep執行失敗");//失敗的ExitStatus是FAILED
return RepeatStatus.FINISHED;
}
}
```
* **successStep**:
```java=
public class SuccessStep implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("========== successStep =========");
return RepeatStatus.FINISHED;
}
}
```
* **failStep**:
```java=
public class FailStep implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("========== FailStep ===========");
return RepeatStatus.FINISHED;
}
}
```
* **配置**:
```java=
@Bean("simpleStepControlJob")
public Job simpleStepControlJob(JobRepository jobRepository) {
return new JobBuilder("simpleStepControlJob", jobRepository)
.start(firstStep(jobRepository))
//on 滿足X條件,執行後續邏輯
.on("FAILED")
.to(new StepBuilder("fail", jobRepository)
.tasklet(new FailStep(), jdbcTransactionManager)
.build())
//from 從哪個step開始
.from(firstStep(jobRepository))
//on 滿足X條件,執行後續邏輯
.on("*")
.to(new StepBuilder("success", jobRepository).tasklet(new SuccessStep(), jdbcTransactionManager).build())
.end()
.build();
}
@Bean("firstStep")
public Step firstStep(JobRepository jobRepository) {
return new StepBuilder("first", jobRepository)
.tasklet(new FirstStep(), jdbcTransactionManager)
.build();
}
```
### 3.4.2 使用Step的狀態進行Step控制-自定義ExitStatus
* 實現**JobExecutionDecider接口**:**實現自定狀態值**
```java=
@FunctionalInterface
public interface JobExecutionDecider {
FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution);
}
```
* **需求**:先執行startStep,如果回傳值為A,執行stepA,回傳值為B,執行stepB,其他執行defaultstep
* 代碼:
* **實現自定狀態值**:實現**JobExecutionDecider接口**
```java=
public class MyStepExistsStatusDecider implements JobExecutionDecider {
//執行過程返回狀態值,有三分之ㄧ的機會返回A\B\C
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
int ret = new Random().nextInt(3);
return switch (ret) {
case 0 -> new FlowExecutionStatus("A");
case 1 -> new FlowExecutionStatus("B");
default -> new FlowExecutionStatus("C");
};
}
}
```
* **startStep**
```java=
public class StartStep implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("======== StartStep =========");
return RepeatStatus.FINISHED;
}
}
```
* **stepA**
```java=
public class StepA implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("========= StepA =========");
return RepeatStatus.FINISHED;
}
}
```
* **stepB**
```java=
public class StepB implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("========= StepB =========");
return RepeatStatus.FINISHED;
}
}
```
* **defaultstep**
```java=
public class DefaultStep implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("========== default Step ======");
return RepeatStatus.FINISHED;
}
}
```
* **配置**:
```java=
@Bean("CustomerStepControl")
public Job getCustomerStepControl(JobRepository jobRepository) {
return new JobBuilder("CustomerStepControl", jobRepository)
.start(getStartStep(jobRepository))
//自定Step狀態 => next(決策器)
.next(getMyStepExistsStatusDecider())
//'A'
.from(getMyStepExistsStatusDecider()).on("A")
.to(new StepBuilder("StartA", jobRepository)
.tasklet(new StepA(), jdbcTransactionManager).build())
//'B'
.from(getMyStepExistsStatusDecider()).on("B")
.to(new StepBuilder("StepB", jobRepository)
.tasklet(new StepB(), jdbcTransactionManager).build())
// default
.from(getMyStepExistsStatusDecider()).on("*")
.to(new StepBuilder("DefaultStep", jobRepository)
.tasklet(new DefaultStep(), jdbcTransactionManager).build())
.end()
.build();
}
@Bean("StartSep")
public Step getStartStep(JobRepository jobRepository) {
return new StepBuilder("StartSep1", jobRepository)
.tasklet(new StartStep(), jdbcTransactionManager).build();
}
//自定決策器
@Bean("MyStepExistsStatusDecider")
public MyStepExistsStatusDecider getMyStepExistsStatusDecider() {
return new MyStepExistsStatusDecider();
}
```
## 3.5 ExitStatus
* **作用**:表示Step、Chunk、Job的執行狀態
* **狀態種類**:
* UNKNOWN:不知
* EXECUTING:執行中
* COMPLETED:執行成功
* NOOP:無效執行
* FAILED:執行失敗
* STOPPED:執行中斷
* Job啟動之後,這些**狀態皆為流程自行控制**
* **成功結束**:返回**COMPLETED**
* **異常結束**:返回**FAILED**
* **無效執行**(`該Step執行前就結束,例:重複的Job參數`):返回**NOOP**
* SpringBatch **有提供3個方法決定Job流程走向**
* 調用**end()**:表示成功執行結束 => 返回**COMPLETED**
* 調用**fail()**:表示異常結束 => 返回**FAILED**
* 調用**stopAndRestart(step)**:表示中斷結束 => 返回**STOPPED**
### 3.5.1 案例-控制Step返回的ExitStatus
* **需求**:當Step的firstStep執行**出現異常**時,透過`end`,`fail`,`stopAndRestart`改變**Step**的**ExitStatus**(`Step的狀態在DB中不變,但Job的狀態會改變`)
* **.end()**:DB的狀態**Step**為**FAILED**,**Job**為**COMPLETED**(`FAILED -> COMPLETED`)
* **.fail()**::DB的狀態**Step**為**FAILED**,**Job**為**FAILED**(`FAILED -> FAILED`)
* **.stopAndRestart(successStep)**::DB的狀態**Step**為**FAILED**,**Job**為**STOPPED**(`FAILED -> STOPPED`)
* **代碼**:
* **配置**:
```java=
@Bean("job-control-existstatus")
public Job getJobControlExiststatus(JobRepository jobRepository,
@Qualifier("FirstStepControl") Step firstStep,
@Qualifier("SuccessfulStepControl") Step successStep) {
return new JobBuilder("job-control-existstatus", jobRepository)
.start(firstStep)
//強制改變Job的ExistStatus,DB中Step的狀態,仍然是FAILED,但Job的狀態會變成COMPLETED、FAILED、STOPPED
// .on("FAILED").end()//將當前Job的狀態應該為異常結束,轉換成『正常結束』 (該Step的狀態為FAILED\該Job的狀態為COMPLETED)
// .on("FAILED").fail()//將當前Job的狀態應該為異常結束,轉換成『異常結束』
.on("FAILED").stopAndRestart(successStep)//將當前Job的狀態為異常結束,轉換成『執行中斷』狀態的結束,後續重新啟動時,從successStep開始
.from(firstStep).on("*").to(successStep)
.end()
.build();
}
@Bean("FirstStepControl")
public Step getFristStepControl(JobRepository jobRepository) {
return new StepBuilder("job-control-existstatus-firststep", jobRepository)
.tasklet(new FirstStep(), jdbcTransactionManager).build();
}
@Bean("SuccessfulStepControl")
public Step getSuccessfulStepControl(JobRepository jobRepository) {
return new StepBuilder("job-control-existstatus-successfulstep", jobRepository)
.tasklet(new SuccessStep(), jdbcTransactionManager)
.build();
}
```
## 3.6 FlowStep 組合Step
* **Flow類**:**Step的集合**
* 由**多個子Step組成**,Job執行時,當成一個普通的Step
* 用於複雜的業務(`一個邏輯需要拆分成按照順序執行的子Step`)
* **需求**:執行順序 `StepA -> StepB -> StepC`,**StepB**中又包括`StepB1 -> StepB2 -> StepB3`
* **代碼**:
* **Step配置**:
```java=
@Bean("FlowStepA")
public Step getFlowStepA() {
return new StepBuilder("Step-FlowStepA", jobRepository)
.tasklet(new FlowStepA(), jdbcTransactionManager)
.build();
}
@Bean("FlowStepB")
public Step getFlowStepB() {
return new StepBuilder("Step-FlowStepB", jobRepository)
//使用Step來包裝Flow
.flow(new FlowBuilder<Flow>("FlowB")
.start(new StepBuilder("Step-FlowB-1", jobRepository)
.tasklet(new FlowStepB1(), jdbcTransactionManager)
.build())
.next(new StepBuilder("Step-FlowB-2", jobRepository)
.tasklet(new FlowStepB2(), jdbcTransactionManager)
.build())
.next(new StepBuilder("Step-FlowB-3", jobRepository)
.tasklet(new FlowStepB3(), jdbcTransactionManager)
.build())
.build())
.build();
}
@Bean("FlowStepC")
public Step getFlowStepC() {
return new StepBuilder("Step-FlowStepC", jobRepository)
.tasklet(new FlowStepC(), jdbcTransactionManager)
.build();
}
```
* **Job方法中無法傳入Flow類**,需要將Flow類**封裝**至Step類
* **Flow類中可以包Flow類、Step類**
* **Job配置**:
```java=
@Bean("JobFlowStep")
public Job getJobFlowStep() {
return new JobBuilder("JobFlowStep", jobRepository)
.start(getFlowStepA())
.next(getFlowStepB())
.next(getFlowStepC())
.build();
}
```
# 4. SpringBatch 數據庫表
# 5. Job控制
## 5.1 Job啟動
* **SpringBoot啟動**:
* SpringBoot啟動後會調用**JobLauncherApplicationRunner**的`run方法`
* application.properties(`默認自動啟動`),設定SpringBoot啟動後,不執行
```properties
spring.batch.job.enabled=false
```
* **單元測試啟動**:
```java=
@SpringBootTest
class BatchTest1ApplicationTests {
@Autowired
private JobLauncher jobLauncher;
@Qualifier("JobFlowStep")
@Autowired
private Job paramJob;
@Test
void contextLoads() throws Exception {
//傳入參數
Map<String, JobParameter<?>> map = new HashMap<>();
map.put("name", new JobParameter<>(UUID.randomUUID().toString(), String.class));
//啟動Job,並放入參數
jobLauncher.run(paramJob, new JobParameters(map));
}
}
```
* **RESTFul API 啟動**:
* 先禁用不跟隨SpringBoot啟動
* 代碼:兩種建立JobParameter的方式
```java=
@RestController("/job")
public class JobController {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;//Job的相關對象
private final Job paramJob;
public JobController(JobLauncher jobLauncher, JobExplorer jobExplorer, @Qualifier("JobFlowStep") Job paramJob) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.paramJob = paramJob;
}
@GetMapping("/start/{id}")
public ResponseEntity<ExitStatus> startJob(@PathVariable("id") String id) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
//獲取該Job上一次的Job的啟動參數
.getNextJobParameters(paramJob)
.addJobParameter("name", UUID.randomUUID().toString(), String.class)
.toJobParameters();
//傳入參數
Map<String, JobParameter<?>> map = new HashMap<>();
map.put("id", new JobParameter<>(id, String.class));
//啟動Job
JobExecution run = jobLauncher.run(paramJob, new JobParameters(map));
return ResponseEntity.ok(run.getExitStatus());
}
}
```
## 5.2 Job停止
* Job停止,存在**三種情況**:
* **自然結束**:Job成功執行,正常停止,Job的返回狀態為**COMPLETED**
* **異常結束**:Job執行過程中,因其他因素,導致Job中斷而停止,Job的返回狀態**大多數**為**FAILED**
* **編程結束**:某Step的結果**不滿足**下一個Step的執行時,**手動停止**,Job的返回狀態**一般設定**為**STOPPED**
### 5.2.1 Job停止-編程結束
* **場境**:
* 有一個資源類,有兩個屬性
* totalCount = 100
* readCount = 0
* 設計兩個步驟:
* Step1:用於疊加readCount => 模擬從DB讀取資源
* Step2:用於邏輯執行
* **條件**:
* 當totalCount **==** readCount時 => 正常結束
* 當totalCount **!=** readCount時 => 異常結束,並不執行Step2,直接停止Job
* 當數據修復後,從Step1開始,並完成Job
* 兩種方式:
* Step監聽器方式
* StepExecution停止標記
#### 5.2.1.1 方式-Step監聽器方式
* 代碼:
* Step監聽器:
```java=
public class StopStepListener implements StepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
//不滿足條件的處理
if (ResourceCount.totalCount != ResourceCount.readCount) {
return ExitStatus.STOPPED;
}
return stepExecution.getExitStatus();
}
}
```
* Step1
```java=
public class StopStep1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("======== StopStep1 ========");
int[] numbers = {100, 50, 30, 10};
Random random = new Random();
int randomIndex = random.nextInt(numbers.length);
//模擬從DB中查詢數據
ResourceCount.readCount = numbers[randomIndex];
return RepeatStatus.FINISHED;
}
}
```
* Step2
```java=
public class StopStep2 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("======== StopStep2 ========");
System.out.println("readCount : " + ResourceCount.readCount);
System.out.println("totalCount : " + ResourceCount.totalCount);
return RepeatStatus.FINISHED;
}
}
```
* **配置**:
```java=
//模擬作業Stop
@Bean("StopStep1")
public Step getStopStep1() {
return new StepBuilder("StopStep1", jobRepository)
.tasklet(new StopStep1(), jdbcTransactionManager)
.listener(new StopStepListener())
.allowStartIfComplete(true)//允許Step可以重新執行
.build();
}
@Bean("StopJobTest")
public Job gteStopJobTest() {
return new JobBuilder("StopJobTest", jobRepository)
.start(getStopStep1())
//當STOPPED時,Job馬上停止,Job重新啟動時,從Step1開始執行
.on("STOPPED").stopAndRestart(getStopStep1())
//如果Step1的狀態非為STOPPED,滿足條件
.from(getStopStep1()).on("*").to(new StepBuilder("StopStep2", jobRepository).tasklet(new StopStep2(), jdbcTransactionManager).build())
.end()
.build();
}
```
#### 5.2.1.2 方式-StepExecution停止標記
* **代碼**:
* **Step1、設置StepExecution停止標記**:
```java=
public class StopStep1StopExecution implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("======== StopStep1StopExecution ========");
int[] numbers = {100,50};
Random random = new Random();
int randomIndex = random.nextInt(numbers.length);
//1.模擬從DB中查詢數據
ResourceCount.readCount = numbers[randomIndex];
//2.設定停止標記
if (ResourceCount.readCount != ResourceCount.totalCount) {
//停止標記
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
return RepeatStatus.FINISHED;
}
}
```
* **Step2**
```java=
public class StopStep2 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("======== StopStep2 ========");
System.out.println("readCount : " + ResourceCount.readCount);
System.out.println("totalCount : " + ResourceCount.totalCount);
return RepeatStatus.FINISHED;
}
}
```
* **配置**:
```java=
@Bean("StopStep1_1")
public Step getStopStep1_1() {
return new StepBuilder("StopStep1_1", jobRepository)
.tasklet(new StopStep1StopExecution(), jdbcTransactionManager)
.allowStartIfComplete(true)//允許Step可以重新執行
.build();
}
@Bean("StopJobTest1")
public Job gteStopJobTest1() {
return new JobBuilder("StopJobTest1", jobRepository)
.start(getStopStep1_1())
.next(new StepBuilder("StopStep3", jobRepository).tasklet(new StopStep2(), jdbcTransactionManager).build())
.build();
}
```
## 5.3 Job重啟
* **默認情況**(`Job名稱+Job參數一樣`)下,只允許**FAILED**、**STOPPED**的狀態 => **無限重啟**
* **COMPLETED**的狀態下 => **無法重啟**
### 5.3.1 禁止重啟
* 適用**一次性執行場景**,執行失敗,就不允許再次執行
* **方法**:`.preventRestart()`
* **代碼**:
```java=
@Bean("StopJobTest1")
public Job gteStopJobTest1() {
return new JobBuilder("StopJobTest1", jobRepository)
//禁止重啟
.preventRestart()
.start(getStopStep1_1())
.next(new StepBuilder("StopStep3", jobRepository).tasklet(new StopStep2(), jdbcTransactionManager).build())
.build();
}
```
### 5.3.2 限制重啟次數
* 適用**重啟次數有限**的場景
* 例:下載、讀取操作
* **代碼**:設定在Step上,限制Step的啟動次數
```java=
@Bean("StopStep1_1")
public Step getStopStep1_1() {
return new StepBuilder("StopStep1_1", jobRepository)
//該Step只能夠重新啟動1次 >> 第一次啟動+重啟 = 2
.startLimit(2)
.tasklet(new StopStep1StopExecution(), jdbcTransactionManager)
.allowStartIfComplete(true)//允許Step可以重新執行
.build();
}
```
### 5.3.3 無限重啟
* 默認情況下,相同的**Job名稱+Job參數**只能夠`成功執行一次`
* **調整Step的配置,達到Job的無限重啟**
* **allowStartIfComplete(true)**:設定Step無限重啟
* **代碼**:
```java=
@Bean("StopStep1_1")
public Step getStopStep1_1() {
return new StepBuilder("StopStep1_1", jobRepository)
//該Step只能夠重新啟動1次
// .startLimit(2)
//Step無限重啟
.allowStartIfComplete(true)
.tasklet(new StopStep1StopExecution(), jdbcTransactionManager)
.allowStartIfComplete(true)//允許Step可以重新執行
.build();
}
```
# 6. ItemReader
* Chunk步驟
* 源碼:
```java=
@FunctionalInterface
public interface ItemReader<T> {
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
```
* T:輸出的數據類型
* 作用:數據讀取
## 6.1 讀取純文本
* 簡單行、多行結構的純文本,例:user.txt
```=
1#21323#18
2#asasa41#16
3#213adas23#20
4#90hak23#19
5#sda21323#15
```
* 默認使用**FlatFileItemReader類**
* **方法**:
* **resource**:設定文件來源
* 解析字符串
* **delimited**:分割字符串
* delimiter:指定分割的符號(`默認『,』`)
* names:分割後的字串Array,每個Column的命名,需要與**targetType**設定**物件的屬性名對應**
* **fieldSetMapper**:字段映射(`複雜`)
* 參數:**FieldSetMapper接口**的實現
* **targetType**:數據封裝
### 6.1.1 字符串的擷取-delimited方法
* **需求**:**讀取user.txt**
* 對象:
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
}
```
* 數據:
```=
1#21323#18
2#asasa41#16
3#213adas23#20
4#90hak23#19
5#sd#11
```
* **代碼**:
* **ItemWriter**
```java=
public class UserItemWriter implements ItemWriter<User> {
@Override
public void write(Chunk<? extends User> chunk) throws Exception {
List<? extends User> items = chunk.getItems();
items.forEach(System.out::println);
}
}
```
* 使用**FlatFileItemReader**的**delimited方法**:分割字串『#』
```java=
@Bean("getUserByTxt")
public FlatFileItemReader<User> getUserByTxt() {
return new FlatFileItemReaderBuilder<User>()
.name("user-item-reader")
//獲取文件
.resource(new ClassPathResource("user.txt"))
//解析數據-指定解析器使用『#』分割,默認是『,』
.delimited().delimiter("#")
//分割後的字串Array,每個Column的命名,需要與User的屬性名對應
.names("id", "name", "age")
//封裝數據 => 將讀取的數據,封裝到對象中
.targetType(User.class)
.build();
}
```
* **Step + Job 配置**:
```java=
@Bean("getUserTxtFileStep")
public Step getUserTxtFileStep() {
return new StepBuilder("getUserTxtFile", jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象
.reader(getUserByTxt())
.writer(new UserItemWriter())
.build();
}
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() {
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(getUserTxtFileStep())
.build();
}
```
### 6.1.2 字段映射-FieldSetMapper接口
* 數據user2.txt
```=
1#21323#18#KKK#KKK
2#asasa41#16#JJJ#JJJ
3#213adas23#20#BBB#BBB
4#90hak23#19#III#QWw
5#sd#12#AAA#AAA
```
* 對象
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
private String address;
}
```
* 代碼:
* 使用**FlatFileItemReader**的fieldSetMapper方法映射數據:
```java=
@Bean("getUserByTxt")
public FlatFileItemReader<User> getUserByTxt() {
return new FlatFileItemReaderBuilder<User>()
//獲取文件
.name("user-item-reader")
.resource(new ClassPathResource("user2.txt"))
//解析數據-指定解析器使用『#』分割,默認是『,』
.delimited().delimiter("#")
//分割後的字串Array,每個Column的命名,需要與User的屬性名對應
.names("id", "name", "age", "city", "area")
//封裝數據
// //自動封裝 => 將讀取的數據,封裝到對象中
// .targetType(User.class)
//手動封裝 => 映射邏輯
.fieldSetMapper(new UserFieldSetMapper())
.build();
}
```
* **delimited()**:字符串分割 => 『#』
* 改用**fieldSetMapper()**:手動封裝數據
* **FieldSetMapper接口實現**:
* **分割出的字段名**:`"id", "name", "age", "city", "area"`
```java=
public class UserFieldSetMapper implements FieldSetMapper<User> {
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
//"id", "name", "age", "city", "area"
User user = new User();
user.setId(fieldSet.readLong("id"));
user.setName(fieldSet.readString("name"));
user.setAge(fieldSet.readInt("age"));
user.setAddress(fieldSet.readString("city") + "_" + fieldSet.readString("area"));
return user;
}
}
```
* **Step + Job 配置**:
```java=
@Bean("getUserTxtFileStep")
public Step getUserTxtFileStep() {
return new StepBuilder("getUserTxtFile", jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象
.reader(getUserByTxt())
.writer(new UserItemWriter())
.build();
}
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() {
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(getUserTxtFileStep())
.build();
}
```
## 6.2 讀取Json文件
* 使用**JsonItemReader**
* **.jsonObjectReader()**:**解析+封裝數據**
* **需求**:**讀取下列Josn檔案**
```json=
[
{ "id":1,"name":"a1","age":14},
{ "id":2,"name":"a2","age":15},
{ "id":3,"name":"a3","age":46}
]
```
* 對象
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
}
```
* **代碼**:
* **配置JsonItemReader**
```java=
@Bean("UserItemReaderJson")
public JsonItemReader<User> getUserItemReaderJson() {
JacksonJsonObjectReader<User> jsonObjectReader = new JacksonJsonObjectReader<>(User.class);
jsonObjectReader.setMapper(new ObjectMapper());
return new JsonItemReaderBuilder<User>()
.name("UserItemReaderJson")
//獲取數據
.resource(new ClassPathResource("user.json"))
//解析+封裝數據
.jsonObjectReader(jsonObjectReader)
.build();
}
```
* **Json解析器**:
* **ObjectMapper** => **JacksonJsonObjectReader**
* **Gson** => **GsonJsonObjectReader**
* **Step + Job 配置**:
```java=
@Bean("getUserTxtFileStep")
public Step getUserTxtFileStep() {
return new StepBuilder("getUserTxtFile", jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象
.reader(getUserItemReaderJson())
.writer(new UserItemWriter())
.build();
}
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() {
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(getUserTxtFileStep())
.build();
}
```
## 6.3 讀取DB
* 兩種從DB獲取數據的方式
* **居於遊標方式**:一次讀取出所有符合條件的數據
* **JdbcCursorItemReader**類
* .sql():寫Sql
* .dataSource():設定DB連接
* .preparedStatementSetter():設定Sql的參數
* .rowMapper():數據映射
* **分頁讀取方式**:按照指定的pageSize讀取
* **JdbcPagingItemReader類**
### 6.3.1 讀取DB-居於遊標方式
* 一條一條的讀取數據
* **遊標** => 指針的概念
* **遊標遍歷時**,讀取出一條數據,使用**JDBC操作**的話,**數據**會被`封裝至ResultSet`中,使用SpringBatch中的**RowMapper**實現**映射**
* 表 -> 數據
* **代碼**:
* 實作**RowMapper**(`處理數據映射`)
```java=
public class UserRowMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getLong("id"));
user.setName(rs.getString("name"));
user.setAge(rs.getInt("age"));
return user;
}
}
```
* 配置**JdbcCursorItemReader**
```java=
@Bean
public JdbcCursorItemReader<User> getJdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder<User>()
.name("UserJdbcCursorItemReader")
//連接DB
.dataSource(dataSource)
//執行SQL查詢數據 以游標的方式(一行一行讀),返回數據
.sql("Select * from \"user\" where age > ? ")
//拼接參數 >> 查詢條件 age需要大於16
.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16}))
//DB讀出的數據,與實體一一映射
.rowMapper(new UserRowMapper())
.build();
}
```
* **Step + Job 配置**:
```java=
@Bean("getUserTxtFileStep")
public Step getUserTxtFileStep() {
return new StepBuilder("getUserTxtFile", jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象
.reader(getUserItemReaderJson())
.writer(new UserItemWriter())
.build();
}
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() {
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(getUserTxtFileStep())
.build();
}
```
### 6.3.2 讀取DB-分頁讀取方式
* **PageingQueryProvider**:**分頁邏輯**
* **代碼**:
* 設定**PagingQueryProvider**
```java=
@Bean
public PagingQueryProvider getPagingQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
//查詢的欄位
sqlPagingQueryProviderFactoryBean.setSelectClause("Select * ");
//表
sqlPagingQueryProviderFactoryBean.setFromClause("from \"user\" ");
//條件
sqlPagingQueryProviderFactoryBean.setWhereClause("where age > :age");//age表示占位符
//排序
sqlPagingQueryProviderFactoryBean.setSortKey("id");
return sqlPagingQueryProviderFactoryBean.getObject();
}
```
* 設定**JdbcPagingItemReader**
```java=
@Bean
public JdbcPagingItemReader<User> getJdbcPagingItemReader() throws Exception {
return new JdbcPagingItemReaderBuilder<User>()
.name("UserJdbcPagingItemReader")
//1. 連接DB
.dataSource(dataSource)
//2. DB讀出的數據,與實體一一映射
.rowMapper(new UserRowMapper())
//3. 設定 SQL
//3.1 分頁邏輯
.queryProvider(getPagingQueryProvider())
//3.2條件
.parameterValues(Map.of("age", 16))
//3.3一次讀10筆
.pageSize(10)
.build();
}
```
* **Step + Job 配置**:
```java=
@Bean("getUserTxtFileStep")
public Step getUserTxtFileStep() {
return new StepBuilder("getUserTxtFile", jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象
.reader(getJdbcPagingItemReader())
.writer(new UserItemWriter())
.build();
}
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() {
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(getUserTxtFileStep())
.build();
}
```
## 6.4 讀取異常
* **三種處理方式**:
* 跳過異常
* **異常紀錄日誌**(`建議使用`)
* 放棄處理
### 6.4.1 跳過異常
* ItemReader可以按照約定**跳過指定的異常**,也可以限制跳過次數
* Step的**faultTolerant()** 方法:**開始容錯的配置**
* **代碼**:
```java=
@Bean
public Step handlerStep() throws Exception {
return new StepBuilder("handlerStep",jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象
.reader(getJdbcPagingItemReader())
.writer(new UserItemWriter())
.faultTolerant()//容錯
.skip(Exception.class)//跳過 Exception 異常
.noSkip(RuntimeException.class)//不跳過 RuntimeException 異常
.skipLimit(10)//跳過異常次數
.skipPolicy(new SkipPolicy() {
@Override
public boolean shouldSkip(Throwable t, long skipCount) throws SkipLimitExceededException {
//定制 跳過異常 與 跳過異常次數
return false;
}
})
.build();
}
```
### 6.4.2 異常紀錄日誌
* 當**ItemReader**讀取數據拋出異常時,**將具體信息紀錄下來**,方便後續人工介入
* **方式**:採用**IteamReader監聽器**
* 實現 **ItemReadListener\<T>** 接口(`繼承StepListener`)
```java=
public class ErrorItemReaderListener implements ItemReadListener<User> {
@Override
public void beforeRead() {
ItemReadListener.super.beforeRead();
}
@Override
public void afterRead(User item) {
ItemReadListener.super.afterRead(item);
}
@Override
public void onReadError(Exception ex) {
ItemReadListener.super.onReadError(ex);
}
}
```
* **配置**:**在Step中放入ItemReadListener**(`一個Step可以插入多個繼承於StepListener的監聽器`)
# 7. ItemProcessor
* 使用**ItemReader**將數據取出後,有**兩種處理方式**:
* **直接將數據向後傳輸**
* **對讀入的數據進行加工**
* 需要使用**ItemProcessor**進行處理,提供**默認的ItemProcessor實現類**與**自定義ItemProcessor**
## 7.1 默認的ItemProcessor
* **SpringBatch**提供默認**ItemProcessor的實現類**有**四個**:
* **ValidatingItemProcessor**(`較驗處理器`):數據較驗,過濾不符合條件的數據
* **ItemProcessorAdapter**(`適配器處理器`):數據轉換
* **ScriptItemProcessor**(`腳本處理器`)
* **CompositeItemProcessor**(`組合處理器`)
* **前置代碼**:
* **ItemReader**:
```java=
@Bean("getUserByTxt")
public FlatFileItemReader<User> getUserByTxt() {
return new FlatFileItemReaderBuilder<User>()
//獲取文件
.name("user-item-reader")
.resource(new ClassPathResource("user-vail.txt"))
//解析數據-指定解析器使用『#』分割,默認是『,』
.delimited().delimiter("#")
//分割後的字串Array,每個Column的命名,需要與User的屬性名對應
.names("id", "name", "age")
//封裝數據
//自動封裝 => 將讀取的數據,封裝到對象中
.targetType(User.class)
.build();
}
```
* **Step + Job 配置**:
```java=
@Bean("getUserTxtFileStep")
public Step getUserTxtFileStep() {
return new StepBuilder("getUserTxtFile", jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象
.reader(getUserByTxt())
.processor( ????? )
.writer(new UserItemWriter())
.build();
}
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() throws Exception {
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(getUserTxtFileStep())
.build();
}
```
### 7.1.1 ValidatingItemProcessor 較驗處理器
* **前置**
* **測試數據**:
```txt=
1##18
2##16
3#a1#20
4#akjfl#19
5#a387s4d#15
```
* **對像**:
```java=
@Data
public class User {
private Long id;
@NotBlank(message = "Name is Blank")
private String name;
private Integer age;
}
```
* **導入校驗包**:**spring-boot-starter-validation**
```xml=
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
```
* **代碼**:
* 使用**BeanValidatingItemProcessor**(`ValidatingItemProcessor的子類`)設定較驗器
```java=
@Bean("UserBeanValidatingItemProcessor")
public ValidatingItemProcessor<User> getUserValidatingItemProcessor() {
ValidatingItemProcessor<User> objectBeanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
//true:不滿足條件的數據直接拋棄
objectBeanValidatingItemProcessor.setFilter(true);
return objectBeanValidatingItemProcessor;
}
```
### 7.1.2 ItemProcessorAdapter 適配器處理器
* **前置**
* **測試數據**:
```txt=
1#adsda#18
2#dsad#16
3#a1#20
4#akjfl#19
5#a387s4d#15
```
* **對像**:
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
}
```
* **代碼**:**將name字段轉換成大寫**
* **name字段轉換成大寫**的usecase
```java=
public class UserNameToUpperCaseUseCase {
public User execute(User user) {
user.setName(user.getName().toUpperCase());
return user;
}
}
```
* **使用ItemProcessorAdapter**
```java=
//ItemProcessor => ItemProcessorAdapter 數據轉換
//目標將Name字段的值,轉換成全大寫(使用現承的邏輯)
@Bean("UserItemProcessorAdapter")
public ItemProcessorAdapter<User, User> getItemProcessorAdapter() {
ItemProcessorAdapter<User, User> itemProcessorAdapter = new ItemProcessorAdapter<>();
//執行的對象
itemProcessorAdapter.setTargetObject(getUserNameToUpperCaseUseCase());
//設定 執行的對象中要被調用的方法名
itemProcessorAdapter.setTargetMethod("execute");
return itemProcessorAdapter;
}
@Bean("UserNameToUpperCaseUseCase")
public UserNameToUpperCaseUseCase getUserNameToUpperCaseUseCase() {
return new UserNameToUpperCaseUseCase();
}
```
### 7.1.3 ScriptItemProcessor 腳本處理器
* **作用**:使用 **.js腳本**來定義**處理邏輯**
* **注意**:
* **item**是`約定的變數`,表示ItemReader讀取每個數據
* **放置位置**:.js文件,放置在**resource目錄下**
* **其他**:
* GraalVM的JS處理器無效(`使用其他的JVM可能有效`)
* **代碼**:**使用.js腳本將name字段轉換成大寫**
* **使用.js腳本將name字段轉換成大寫**:**item為約定**
```js=
item.setName(item.getName().toUpperCase());
item;
```
* **使用ScriptItemProcessor**
```java=
//ItemProcessor => ScriptItemProcessor 數據轉換 => 使用.js腳本來處理
@Bean("UserScriptItemProcessor")
public ScriptItemProcessor<User, User> getUserScriptItemProcessor() {
ScriptItemProcessor<User, User> userUserScriptItemProcessor = new ScriptItemProcessor<>();
userUserScriptItemProcessor.setScript(new ClassPathResource("user_touppercase.js"));
return userUserScriptItemProcessor;
}
```
### 7.1.4 CompositeItemProcessor 組合處理器
* **作用**:**組合多個ItemProcessor**
* 類似**過濾鏈**
* `前一個ItemProcessor處理完後,交由下一個ItemProcessor處理`

* **前置**
* **測試數據**:
```txt=
1##18
2##16
3#a1#20
4#akjfl#19
5#a387s4d#15
```
* **對像**:
```java=
@Data
public class User {
private Long id;
@NotBlank(message = "Name is Blank")
private String name;
private Integer age;
}
```
* **代碼**:**過濾name為null,再將name轉成大寫**
* **過濾name為null**(`User類的屬性要貼上validation的annotation`)
```java=
@Bean("UserBeanValidatingItemProcessor")
public ValidatingItemProcessor<User> getUserValidatingItemProcessor() {
ValidatingItemProcessor<User> objectBeanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
//true:不滿足條件的數據直接拋棄
objectBeanValidatingItemProcessor.setFilter(true);
return objectBeanValidatingItemProcessor;
}
```
* **將name轉成大寫**
```java=
@Bean("UserNameToUpperCaseUseCase")
public UserNameToUpperCaseUseCase getUserNameToUpperCaseUseCase() {
return new UserNameToUpperCaseUseCase();
}
@Bean("UserItemProcessorAdapter")
public ItemProcessorAdapter<User, User> getItemProcessorAdapter() {
ItemProcessorAdapter<User, User> itemProcessorAdapter = new ItemProcessorAdapter<>();
//執行的對象
itemProcessorAdapter.setTargetObject(getUserNameToUpperCaseUseCase());
//設定 執行的對象中要被調用的方法名
itemProcessorAdapter.setTargetMethod("execute");
return itemProcessorAdapter;
}
```
* **執行的對象**
```java=
public class UserNameToUpperCaseUseCase {
public User execute(User user) {
user.setName(user.getName().toUpperCase());
return user;
}
}
```
* 使用 **CompositeItemProcessor** `組合上述兩個ItemProcessor`
```java=
//ItemProcessor => CompositeItemProcessor 組合 UserBeanValidatingItemProcessor、UserItemProcessorAdapter
@Bean("UserCompositeItemProcessor")
public CompositeItemProcessor<User, User> getCompositeItemProcessor() {
return new CompositeItemProcessorBuilder<User, User>()
.delegates(List.of(getUserValidatingItemProcessor(),
getItemProcessorAdapter())
)
.build();
}
```
## 7.2 自定義ItemProcessor處理器(常用)
* **方式**:實現 **ItemProcessor\<I,O>** 接口
* **案例**:**過濾出id為偶數**
* **前置**
* **測試數據**:
```txt=
1##18
2##16
3#a1#20
4#akjfl#19
5#a387s4d#15
```
* **對像**:
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
}
```
* **代碼**:
* 實現**ItemProcessor\<I,O>接口**
```java=
public class UsetItemProcessor implements ItemProcessor<User, User> {
@Override
public User process(User item) throws Exception {
if (item.getId() % 2 == 0) {
return item;
}
return null;
}
}
```
* **配置方式照舊**
# 8. ItemWriter
* **前置**
* **測試數據**:user.txt
```txt=
1#afsa#18
2#ffa#16
3#a1#20
4#akjfl#19
5#a387s4d#15
```
* **對像**:
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
}
```
## 8.1 輸出純文件
* 使用**FlatFileItemWriter\<T>**
* **代碼**:
* 使用**FlatFileItemWriter\<T>**,輸出純文件
```java=
@Bean("UserFlatFileItemWriter")
public FlatFileItemWriter<User> getUserFlatFileItemWriter() {
return new FlatFileItemWriterBuilder<User>()
.name("UserFlatFileItemWriter")
//1. 輸出位置:當前專案下
.resource(new FileSystemResource("./user-out.txt"))
//2. 要進行格式輸出
.formatted()
//2.1 輸出格式設定
.format("id: %s, name: %s, age: %s")
//2.2 與%s對應的User類的 屬性名
.names("id","name","age")
//X. 如果輸入的數據為空,則刪除文件
.shouldDeleteIfEmpty(true)
//X. 如果文件已存在,則刪除文件
.shouldDeleteIfExists(true)
//X. 如果文件為存在,不刪除文件,追加數據到現有文件中
.append(true)
.build();
}
```
* **其他**:
* shouldDeleteIfEmpty(true):如果輸入的數據為空,則刪除文件
* shouldDeleteIfExists(true):如果文件已存在,則刪除文件
* append(true):如果文件為存在,不刪除文件,追加數據到現有文件中
## 8.2 輸出Json文件
* 使用**JsonFileItemWriter\<T>**
* **案例**:將數據輸出到 **.json文件**
* **代碼**:
* 使用**JsonFileItemWriter\<T>**
```java=
@Bean("UserJsonFileItemWriter")
public JsonFileItemWriter<User> getUserJsonFileItemWriter() {
JacksonJsonObjectMarshaller<User> jsonObjectMarshaller = new JacksonJsonObjectMarshaller<>();
return new JsonFileItemWriterBuilder<User>()
.name("UserJsonFileItemWriter")
//1. 輸出位置:當前專案下
.resource(new FileSystemResource("./user-out.json"))
//2. 設定json對象的調度器,將User對象轉換成 .json
.jsonObjectMarshaller(jsonObjectMarshaller)
.build();
}
```
## 8.3 輸出DB
* 使用**JdbcBatchItemWriter\<T>**
* **案例**:將讀取.txt檔後,將數據輸出到 **DB**
* **代碼**:
* 使用**JdbcBatchItemWriter\<T>**
```java=
@Bean("UserJdbcBatchItemWriter")
public JdbcBatchItemWriter<User> getUserJdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<User>()
//1. 數據源
.dataSource(dataSource)
//2. insert的sql
.sql("INSERT INTO public.\"user\" (id, \"name\", age) VALUES(?, ?, ?);")
//3. 設定preparedStatement,實作 ItemPreparedStatementSetter<User>接口
.itemPreparedStatementSetter((user, ps) -> {
ps.setLong(1, user.getId());
ps.setString(2, user.getName());
ps.setInt(3, user.getAge());
})
.build();
}
```
1. **設定數據源**
2. insert的SQL,另外使用 **佔位符『?』** 來標記要設置的參數
3. **設定參數**:實現**ItemPreparedStatementSetter\<T>** 接口
## 8.4 輸出多終端
* **作用**:**同時將數據輸出**到`DB`、`json文件`、`純文件`
* 使用**CompositeItemWriter\<T>**
* **代碼**:將數據輸出至`DB`、`user-out.txt`、`user-out.json`
* 使用**CompositeItemWriter\<T>**
```java=
// CompositeItemWriter 組合多個輸出,同時輸出至多個目標
@Bean
public CompositeItemWriter<User> getUserCompositeItemWriter() {
return new CompositeItemWriterBuilder<User>()
//整合多個ItemWriter
.delegates(getUserJdbcBatchItemWriter(),
getUserFlatFileItemWriter(),
getUserJsonFileItemWriter())
.build();
}
```
* **整合上述三個ItemWriter**:
* **getUserJdbcBatchItemWriter()**
* **getUserFlatFileItemWriter()**
* **getUserJsonFileItemWriter()**
# 9. SpringBatch 高級
* 多線程 Step
* 一份文檔
* 並行 Step
* 分區 Step(`處理海量數據集`)
* 多份文檔
* DB數據
## 9.1 多線程 Step
* 默認為單線程執行
* 多線程環境下:**Step**要**設定為不可以重啟**(`規定`)
* 使用**Spring**的**TaskExecutor**實現,約定`每一個chunk開啟一個獨立線程執行`
* **chunkSize設定為1**(`一次處理一筆數據`),如果有10筆數據,就啟動10個
* 前置:
* 數據
```txt=
1#afsa#18
2#ffa#16
3#a1#20
4#akjfl#19
5#a387s4d#15
6#afsa#18
7#ffa#16
8#a1#20
9#akjfl#19
10#a387s4d#15
```
* **對象**
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
private String address;
}
```
* 代碼:讀取數據並顯示,多線程執行
* **讀取純文件**
```java=
@Bean("getUserByTxt")
public FlatFileItemReader<User> getUserByTxt() {
return new FlatFileItemReaderBuilder<User>()
//獲取文件
.name("user-item-reader")
.resource(new ClassPathResource("user.txt"))
//解析數據-指定解析器使用『#』分割,默認是『,』
.delimited().delimiter("#")
//分割後的字串Array,每個Column的命名,需要與User的屬性名對應
.names("id", "name", "age")
//封裝數據
//自動封裝 => 將讀取的數據,封裝到對象中
.targetType(User.class)
//防止狀態被覆蓋
.saveState(false)
.build();
}
```
* **.saveState(false)**:表示**關閉狀態**,不能重啟
* **ItemReader**大部分都會`提供狀態`,job的重啟**通過狀態來確認job停止的位置**
* 在**多線程的環境**中,該**Step被多個線程訪問**,**可能存在Step狀態『相互覆蓋』的問題**
* **設定Step + Job**
```java=
@Bean("getUserTxtFileStep")
public Step getUserTxtFileStep() {
return new StepBuilder("getUserTxtFile", jobRepository)
.<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象\處理一筆數據
.reader(getUserByTxt())
// .processor(new UsetItemProcessor())
.writer(chunk -> System.out.println(chunk.getItems()))
.taskExecutor(new SimpleAsyncTaskExecutor())//開啟多線程操作
.build();
}
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() throws Exception {
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(getUserTxtFileStep())
.build();
}
```
* Step**開啟多線程操作**:.**taskExecutor**(new SimpleAsyncTaskExecutor())
## 9.2 並行 Step
* **並行Step**:**兩個**或**多個**不同的Step**同時執行**
```graphviz
digraph G {
rankdir=LR;
node [shape=box];
A[label = "Step 1"]
B[label = "Step 2"]
C[label = "Step 3"]
D[label = "Step 4"]
A -> B
A -> C
B -> D
C -> D
}
```
* Step1執行完後,然後同時執行Step2\Step3,都結束後在執行Step4
* **適用場景**:讀取`2個`或`多個`不相關聯的文件(`Step2\Step3 讀取文件`),**多個文件同時讀取**
* **作法**:在Job中,使用**Flow**整合多個Step
* **前置**:
* **數據**:user-1.txt、user-2.josn
```txt=
1#afsa#18
2#ffa#16
3#a1#20
4#akjfl#19
5#a387s4d#15
```
```json=
[
{"id":11,"name":"21323","age":22},
{"id":12,"name":"asadasa41","age":78},
{"id":13,"name":"21as3adas23","age":14},
{"id":14,"name":"90hsdak23","age":129},
{"id":15,"name":"sdaad21323","age":89}
]
```
* **對象**
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
private String address;
}
```
* **代碼**:讀取user-1.txt、user-2.josn
* **Step2** => 讀取user-1.txt
```java=
//ItemReader-純文件
@Bean("getUserByTxt")
public FlatFileItemReader<User> getUserByTxt() {
return new FlatFileItemReaderBuilder<User>()
//獲取文件
.name("user-item-reader")
.resource(new ClassPathResource("user.txt"))
//解析數據-指定解析器使用『#』分割,默認是『,』
.delimited().delimiter("#")
//分割後的字串Array,每個Column的命名,需要與User的屬性名對應
.names("id", "name", "age")
//封裝數據
//自動封裝 => 將讀取的數據,封裝到對象中
.targetType(User.class)
.build();
}
```
* **Step3** => 讀取user-2.josn
```java=
@Bean("UserItemReaderJson")
public JsonItemReader<User> getUserItemReaderJson() {
JacksonJsonObjectReader<User> jsonObjectReader = new JacksonJsonObjectReader<>(User.class);
jsonObjectReader.setMapper(new ObjectMapper());
return new JsonItemReaderBuilder<User>()
.name("UserItemReaderJson")
//獲取數據
.resource(new ClassPathResource("user.json"))
//解析+封裝數據
.jsonObjectReader(jsonObjectReader)
.build();
}
```
* **設定Step2、Step3**
```java=
@Bean("getUserTxtFileStep2")
public Step getUserTxtFileStep2() {
return new StepBuilder("getUserTxtFileStep2", jobRepository)
.<User, User>chunk(2, jdbcTransactionManager)//一次讀一個對象\處理一筆數據
.reader(getUserByTxt())
.writer(chunk -> System.out.println(chunk.getItems()))
.build();
}
@Bean("getUserJsonFileStep3")
public Step getUserJsonFileStep3() {
return new StepBuilder("getUserJsonFileStep3", jobRepository)
.<User, User>chunk(2, jdbcTransactionManager)//一次讀一個對象\處理一筆數據
.reader(getUserItemReaderJson())
.writer(chunk -> System.out.println(chunk.getItems()))
.build();
}
```
* **設定Job**:使用Flow組合Step
```java=
//合併Step 並行Step
@Bean("getUserTxtFileJob")
public Job getUserTxtFileJob() throws Exception {
//並行1 讀取.txt
Flow flow2 = new FlowBuilder<Flow>("Flow2")
.start(getUserTxtFileStep2()).build();
//並行2 讀取.json
Flow flow3 = new FlowBuilder<Flow>("Flow2")
.start(getUserJsonFileStep3())
.split(new SimpleAsyncTaskExecutor())//開啟線程 執行Step
.add(flow2)//添加並行的Step,同時啟動
.build();
return new JobBuilder("getUserTxtFileJob", jobRepository)
.start(flow3)
.end()
.build();
}
```
* **Flow**的 **.split()**:隔開兩個Flow,表示線程池開啟兩個線程,分別執行Step2\Stpe3
## 9.3 分區 Step
* 概念:將Step**區分**出**上下級**
* **上級**(`Master Step`):主Step
* **領導、管理多個從Step**
* **下級**(`Work Step`):從Step、工作Step
* **執行任務**
* 是一個完整的Step,負責`讀`、`處理`、`寫`
* **分區Step的結構圖**

* **場景**:**海量數據處理,分治思想**
* **主Step**:將數據**劃分**成`多個小的數據集`,再**開啟多個從Step**
* **從Step**:負責**小數據集**的
* 當**所有從Step結束**時,**整個作業流程才結束**
### 9.3.1 主Step - 分區器
* **主Step的核心部件**
* **作用**:
* **數據分區**:將完整數據**拆解**成多個數據集
* **指派給從Step執行**
* **數據拆分規則**:實現**Partitioner**接口
* **源碼**:
```java=
@FunctionalInterface
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
```
* **參數**:
* **gridSize**:**分區大小** => 要**開啟**幾個 **從Step**
* **返回值**:**Map**
* **key**:**從Step的名稱**
* **value**:該 **從Step** 的 **元數據** (`起始位置、數據量`)
* **默認實現類**:**MultiResourcePartitioner**
### 9.3.2 主Step - 分區處理器
* **作用**:統一**管理**從Step,並給從Step**指派任務**
* **管理規則**:實現**PartitionHandler**接口
* **默認實現類**:**TaskExecutorPartitionHandler**
### 9.3.3 案例:將多個文件讀入內存
* **流程**:
1. Job
2. 主Step
3. **分區處理器**
* **指導『分區器』進行從Step的創建**(`核心`),將下列訊息灌至**Step執行上下文**
1. 從Step的`名稱`
2. `要處理的數據文件`
3. `如何處理`數據文件
4. 所有**從Step**執行
1. reader
2. writer
* **前置**:
* **數據**
* user1-10.txt
* user11-20.txt
* user21-30.txt
* user31-40.txt
* user41-50.txt
* **對象**
```java=
@Data
public class User {
private Long id;
private String name;
private Integer age;
private String address;
}
```
#### 9.3.3.1 從Step
* **定義ItemReader、Step**
* **重點**:resource會從**Step執行上下文**中獲取
* 此數據會在分區器中設定
* 多個從Step**共用同一個ItemReader組件**
* **代碼**:
* **ItemReader**
```java=
@Bean("getUserByTxtPartitioner")
@StepScope//延遲加載
//多個從Step共用同一個ItemReader組件,不能寫死操作的文件資源,使用變量的方式定義
//@Value("#{stepExecutionContext['file']}") 從Step的執行上下文獲取要讀的資源文件
//實現前提:Step的執行上下文必須要有值,在主Step中的分區器設定
public FlatFileItemReader<User> getUserByTxtPartitioner(@Value("#{stepExecutionContext['file']}") Resource resource) {
return new FlatFileItemReaderBuilder<User>()
.name("itemReader-partition")
//需要動態獲取文件
.resource(resource)
//解析數據-指定解析器使用『#』分割,默認是『,』
.delimited().delimiter("#")
//分割後的字串Array,每個Column的命名,需要與User的屬性名對應
.names("id", "name", "age")
//封裝數據
//自動封裝 => 將讀取的數據,封裝到對象中
.targetType(User.class)
.build();
}
```
* 從**Step執行上下文**獲取**file**(`Value("#{stepExecutionContext['file']}")`)
* **Step**
```java=
@Bean("getUserWorkStep")
public Step getUserWorkStep() {
return new StepBuilder("getUserWorkStep", jobRepository)
.<User, User>chunk(10, jdbcTransactionManager)//一次讀10個對象\處理10筆數據
.reader(getUserByTxtPartitioner(null))
.writer(chunk -> chunk.getItems().forEach(System.out::println))
.build();
}
```
#### 9.3.3.2 分區器
* **目標**:將**資源路徑的訊息**,放入到**Step執行上下文**
* `Value("#{stepExecutionContext['file']}")`
* **代碼**:
* **自定一個分區器**,實作**Partitioner**接口
```java=
public class UserPartitioner implements Partitioner {
int begin = 1;
int end = 10;
int range = 10;
//key 從Step名稱
//value 從Step上下文環境
//從Step1 => 定義要處理的文件路徑=> file: user1-10.txt
//從Step2 => 定義要處理的文件路徑=> file: user11-20.txt
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext executionContext = new ExecutionContext();
Resource classPathResource = new ClassPathResource("./user" + begin + "-" + end + ".txt");
try {
//SpringBatch中ExecutionContext中的值為物件,是會報錯的的,改用String的形式,Spring會自動的加載
executionContext.put("file", classPathResource.getURL().toExternalForm());
} catch (IOException e) {
e.printStackTrace();
}
begin += range;
end += range;
map.put("user_partition_" + i, executionContext);
}
return map;
}
}
```
* **ExecutionContext**中的**Value**不可以是物件,改用**字符串的路徑**(**Spring會在自動轉換成Resource**)
* **定義Bean**
```java=
@Bean
public UserPartitioner getUserPartitioner() {
return new UserPartitioner();
}
```
#### 9.3.3.3 分區處理器
* 使用 **TaskExecutorPartitionHandler**
```java=
@Bean
public PartitionHandler getPartitionHandler() {
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
//1. 設定GridSize,指定要創建幾個從Step
taskExecutorPartitionHandler.setGridSize(5);
//2. 一個從Step是一個獨立的線程
taskExecutorPartitionHandler.setTaskExecutor(new SyncTaskExecutor());
//3. 關聯 從Step
taskExecutorPartitionHandler.setStep(getUserWorkStep());
//4.判斷Step是否是null(可選)
try {
taskExecutorPartitionHandler.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
return taskExecutorPartitionHandler;
}
```
#### 9.3.3.4 主Step
* **組合 分區處理器、分區器**
* **代碼**:
* **設定Step+Job**
```java=
//組合 分區處理器、分區器
@Bean("getPartitionStep")
public Step getPartitionStep() {
return new StepBuilder("masterGetPartitionStep", jobRepository)
.partitioner(getUserWorkStep().getName(), getUserPartitioner())//分區器
.partitionHandler(getPartitionHandler())//分區處理器
.build();
}
@Bean("getPartitionJob")
public Job getPartitionJob() {
return new JobBuilder("getPartitionJob", jobRepository)
.start(getPartitionStep())
.build();
}
```
###### tags:`SpringBatch`