--- title: 'SpringBatch 學習' disqus: hackmd --- # X. 聲明 * 本篇筆記,參考billbill上的浪飛yes的SpringBatch的教學影片 # 1. 基本範例 * SpringBatch基本架構 ![SpringBatch基本架構.drawio](https://hackmd.io/_uploads/rJxk6fN6C.png) ## 1.1 設定一個給SpringBatch用的DataSource * **作用**:紀錄Job的`執行狀態`、`步驟`、`參數` * 需要創建一個**DataSource**和**TransactionManager** * SpringBatch的底層**使用JdbcTemplate** ->`JdbcTransactionManager` * 代碼: ```java= @Configuration public class DataSourceConfiguration { @Bean("BatchDataSource") public DataSource getDataSource() { HikariDataSource hikariDataSource = new HikariDataSource(); hikariDataSource.setJdbcUrl("jdbc:postgresql://localhost:5432/batch"); hikariDataSource.setDriverClassName("org.postgresql.Driver"); hikariDataSource.setConnectionTimeout(3000L);//連接超時 hikariDataSource.setReadOnly(false);//是否只讀數據劇 hikariDataSource.setIdleTimeout(3000L);//最小維持時間 hikariDataSource.setMaxLifetime(60000L);//最大維持時間 hikariDataSource.setMaximumPoolSize(10);//線程池最大維持數量 hikariDataSource.setMinimumIdle(5);//線程池最小維持數量 return hikariDataSource; } @Bean("BatchTransactionManager") public JdbcTransactionManager batchTransactionManager(@Qualifier("BatchDataSource") DataSource dataSource) { return new JdbcTransactionManager(dataSource); } } ``` ## 1.2 實現一個簡單任務接口 * 實現**Tasklet**接口 * 代碼: ```java= import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import java.util.Map; //創建批次處理的任務 public class MessageTaskLet implements Tasklet { private final static Logger LOGGER = LoggerFactory.getLogger(MessageTaskLet.class); //返回該步驟的處理狀態 RepeatStatus //RepeatStatus.FINISHED 該Step完成 //RepeatStatus.CONTINUABLE 該Step需要再次執行(無窮循環執行) @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { //獲取Job參數 Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters(); LOGGER.info("數據操作處理 ,name = {}", jobParameters.get("name")); return RepeatStatus.FINISHED;//處理結束 } } ``` ## 1.3 設定任務Job * 啟用SpringBatch,使用 **@EnableBatchProcessing**,設定: * **數據源**(`dataSourceRef`) * **事務管理器**(`transactionManagerRef`) * **Job類** * 代碼: ```java= @EnableBatchProcessing(dataSourceRef = "BatchDataSource", transactionManagerRef = "BatchTransactionManager") @Configuration public class SpringBatchConfiguration { private final static Logger LOGGER = LoggerFactory.getLogger(SpringBatchConfiguration.class); @Qualifier("BatchTransactionManager") @Autowired private JdbcTransactionManager jdbcTransactionManager; //SpringBoot 啟動後,會自動加載Job對象,並將Job對象交給 JobLauncherApplicationRunner 類(實現 ApplicationRunner ),再藉由JobLauncher類別實作job執行 //從頭到尾獨立執行的有序的步驟(Step)清單。 @Bean("myJob") public Job job(JobRepository jobRepository) { return new JobBuilder("myJob", jobRepository) //1. 第一個step,獨立執行 .start(new StepBuilder("myStep1", jobRepository) //1.1 簡單的tasklet模式,還有Chunk塊處理模式 .tasklet(new MessageTaskLet(), jdbcTransactionManager) .build()) .build(); } } ``` ## 1.4 執行任務 * 注入**JobLauncher**、該**Job** * **JobLauncher**的`run()`方法啟動Job * **代碼**: ```java= @SpringBootTest class BatchTest1ApplicationTests { @Autowired private JobLauncher jobLauncher; @Qualifier("paramJob") @Autowired private Job paramJob; @Test void contextLoads() throws Exception { //傳入參數 Map<String, JobParameter<?>> map = new HashMap<>(); map.put("name", new JobParameter<>("nicolas", String.class)); //啟動Job,並放入參數 jobLauncher.run(paramJob, new JobParameters(map)); } } ``` * **Job的名稱+參數不可以重複,需唯一** # 2. Job-作業對象 * Job 由**一個**或**多個**Step組合而成 * 定義從頭到尾獨立執行的有序的步驟(Step)清單 * 有序清單:由不同的step組合而成 * 從頭到尾:前一個step完成,才會執行下一個,按照一定的順序執行 * 獨立執行:每一個Job不應該受外部依賴的影響 * **Job名稱不能重複** * 由 **JobLauncher** 啟動 * **Job執行為相互獨立** * 確保每次Job的獨立性 =>**Job Instance**(`作業實例`)與**Job Execution**(`作業執行物件`) * **Job Instance**(`作業實例`):**每次**Job**運行**都會`創建出一個Job Instance`,**表示Job的一次邏輯運行** * **Job名稱、標示、參數來區分Job**,例:一個業務需求:每天定期資料同步 * 作業名稱:daily-syncjob * 作業標記參數:當天時間 * **Job Executionk**:**每次**Job**運行**都會`創建出一個Job Execution`,負責**紀錄Job的執行中所發生的情況** * **Job Instance+Job Execution的架構** => **因為批次處理的過程中可能出現兩種情況**: * **Job => Job Instance => Job Execution** * **Job 一次成功**: * 紀錄**一條**Job Instance信息 * 紀錄**一條**Job Execution信息 * **Job 異常執行**(`執行多次才成功`) * 紀錄**一條**Job Instance信息(`識別為相同參數,不會重複紀錄`) * 紀錄**一~?條**Job Execution信息 (`該相同的Job 運行多次`) * **總結**: * Job Instance:`Job名稱`+`識別參數` * Job Instance 一次執行就創建一個Job Execution * 完整一次的Job Instance,可能會創建一個或多個 Job Execution ## 2.1 Job 參數 * JobLauncher的run啟動Job,返回Job Execution * 實現類:SimpleJobLauncher ### 2.1.1 JobParameters類 * **作用**:**封裝所有傳遞給Job的參數** * 相同的JobParameters參數值,`再執行一次處理,會直接報錯` * **原因**:SpringBatch**相同Job名**與**相同識別參數**`只能成功執行一次` * 保證 Job Instance:`Job名稱`+`識別參數` 的唯一、執行為獨立操作 * **例外**:`org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException` * **錯誤訊息**:`A job instance already exists and is complete for identifying parameters ...` ### 2.1.2 獲取參數 #### 2.1.2.1 使用ChunkContext類 * **基本範例**: ```java= //創建批次處理的任務 public class MessageTaskLet implements Tasklet { private final static Logger LOGGER = LoggerFactory.getLogger(MessageTaskLet.class); @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { //獲取Job參數 Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters(); LOGGER.info("數據操作處理 ,jobParameters => name = {}", jobParameters.get("name")); return RepeatStatus.FINISHED;//處理結束 } } ``` * **方法**:**chunkContext.getStepContext().getJobParameters()** #### 2.1.2.2 使用@Valueu延時獲取-常用 * **步驟**: 1. 貼上 **@StepScope** * **@StepScope**(`延時獲取`):**表示啟動時,不要加載該Bean,當此Bean被調用到時才加載** 3. 使用固定的spl表達式:**#{jobParameters['key的名稱']}** * SpringBoot中會有一個jobParameters的Bean * **代碼**:**key為”name“** ```java= @StepScope @Bean public Tasklet getTasklet(@Value("#{jobParameters['name']}") String name) {//固定的 return new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println(name); return RepeatStatus.FINISHED; } }; } ``` ### 2.1.3 自定參數校驗-JobParametersValidator * 實現**JobParametersValidator**接口的`validate方法` * **校驗失敗**拋出**JobParametersInvalidException**異常(`拋出異常的方式來結束`) * 可以拋出自定義的異常 * **自定義參數驗證器** ```java= import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.JobParametersValidator; //目標:當value為null或""時,拋出異常 public class NameParametersValidator implements JobParametersValidator { @Override public void validate(JobParameters parameters) throws JobParametersInvalidException { String name = parameters.getString("name"); if (name == null || name.equals("")) { throw new JobParametersInvalidException("Name is blank."); } } } ``` * **設定**自定義參數驗證器 ```java= @Bean("paramJob") public Job paramJob(JobRepository jobRepository) { return new JobBuilder("paramJob", jobRepository) //1. 第一個step,獨立執行 .start(new StepBuilder("paramJobStep1", jobRepository) //簡單的tasklet模式,還有Chunk塊處理模式 .tasklet(new MessageTaskLet(), jdbcTransactionManager) .build()) //X. 指定JobParameter的校驗器 .validator(new NameParametersValidator()) .build(); } ``` #### 2.1.4 SpringBatch 默認參數校驗器 * **兩個默認參數校驗器**:`都實現JobParametersValidator接口` * **DefaultJobParametersValidator**(`默認參數校驗器`) * **CompositeJobParametersValidator**(`组合參數校驗器`) * 組合多個參數校驗器 * **DefaultJobParametersValidator** * 維護2個key集合**requiredkeys**跟**optionalkeys** * **requiredkeys**是一個集合,表示作**JobParameters**中**必須包含集合中指定的keys** * **optionalkeys**也是一個集合,該**集合中的key是可選參數** * **代碼**: ```java= public DefaultJobParametersValidator getDefaultJobParametersValidator() { DefaultJobParametersValidator defaultJobParametersValidator = new DefaultJobParametersValidator(); //必填key => name defaultJobParametersValidator.setRequiredKeys(new String[]{"name1"}); //選填key => age defaultJobParametersValidator.setOptionalKeys(new String[]{"age"}); return defaultJobParametersValidator; } ``` * **缺少key的錯誤訊息**:`The JobParameters do not contain required keys: [name1]` * **CompositejobParametersValidator** * 傳入一個`List<JobParametersValidator>`,驗證時會遍歷驗證器 * **代碼**: ```java= public CompositeJobParametersValidator getCompositejobParametersValidator() { CompositeJobParametersValidator compositeJobParametersValidator = new CompositeJobParametersValidator(); //設定驗證器 compositeJobParametersValidator.setValidators(List.of( getDefaultJobParametersValidator(),//默認參數較驗器 new NameParametersValidator())//自定較驗器 ); return compositeJobParametersValidator; } ``` ### 2.1.4 增量參數-JobParametersIncrementer * **注意**:該**JobParametersIncrementer**被調用的位置在**JobLauncherApplicationRunner**(`使用手動調用Job的話,無法啟用增量參數`) * 使用**JobParametersValidator**接口,實現`getNext方法` * 作用:**改動JobParameters參數** * 解決使用相同參數的情況 * 源碼: ```java= @FunctionalInterface public interface JobParametersIncrementer { JobParameters getNext(@Nullable JobParameters parameters); } ``` #### 2.1.4.1 Job遞增run.id參數 * **RunIdIncrementer**:自增參數增量器 `run.id` * 每次啟動時,裡面維護名為`run.id`標識參數,每次啟動讓其自增1。 * **代碼**: ```java= @Bean("paramJob") public Job paramJob(JobRepository jobRepository) { return new JobBuilder("paramJob", jobRepository) //1. 第一個step,獨立執行 .start(new StepBuilder("paramJobStep1", jobRepository) //簡單的tasklet模式,還有Chunk塊處理模式 .tasklet(new MessageTaskLet(), jdbcTransactionManager) .build()) //X. 設定 增量參數 .incrementer(new RunIdIncrementer())//run.id自增 .build(); } ``` #### 2.1.4.2 自定-時間戳 * 實作**JobParametersIncrementer**,傳入一個K-V * 代碼: ```java= import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersIncrementer; import java.time.LocalDateTime; public class LocalDataTimeIncrementer implements JobParametersIncrementer { @Override public JobParameters getNext(JobParameters parameters) { return new JobParametersBuilder().addLocalDateTime("daily", LocalDateTime.now()).toJobParameters(); } } ``` ## 2.2 Job 監聽器 * **作用**:用於**監聽Job的執行過程**。在作業**執行前**,**執行後**2個時間點嵌入業務邏輯 * **執行前**:一般用於**初始化操作作**,作業執行前需著手準備工作 * **執行後**:業務執行完後,需要做各種**清理動作**,例如`釋放資源`等。 * **JobExecutionListener接口**:**實現Job監聽** * **注意**:該**Job**需要通過**JobParametersValidator**才會到**監聽的環節** * **源碼**: ```java= public interface JobExecutionListener { //執行前 default void beforeJob(JobExecution jobExecution) { } //執行後 default void afterJob(JobExecution jobExecution) { } } ``` ### 2.2.1 接口實現方式 * 實作**JobExecutionListener接口**: * 目標:紀錄狀態 * 代碼: ```java= import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; public class JobStatusListener implements JobExecutionListener { private static final Logger LOGGER = LoggerFactory.getLogger(JobStatusListener.class); @Override public void beforeJob(JobExecution jobExecution) { LOGGER.info("Job 開始執行,status = {}", jobExecution.getStatus()); } @Override public void afterJob(JobExecution jobExecution) { LOGGER.info("Job 執行後,status = {}", jobExecution.getStatus()); } } ``` * step: ```java= import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import java.util.Map; //創建批次處理的任務 public class MessageTaskLet implements Tasklet { private final static Logger LOGGER = LoggerFactory.getLogger(MessageTaskLet.class); @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { //通過StepExecution獲取JobExecution BatchStatus status = contribution.getStepExecution().getJobExecution().getStatus(); LOGGER.info("Job 執行中,status = {}", status); return RepeatStatus.FINISHED;//處理結束 } } ``` * **使用**: ```java= @Bean("paramJob") public Job paramJob(JobRepository jobRepository) { return new JobBuilder("paramJob", jobRepository) .incrementer(new RunIdIncrementer()) //1. 第一個step,獨立執行 .start(new StepBuilder("paramJobStep1", jobRepository) //簡單的tasklet模式,還有Chunk塊處理模式 .tasklet(new MessageTaskLet(), jdbcTransactionManager) .build()) //X. 設定Job監聽器 .listener(new JobStatusListener()) .build(); } ``` ### 2.2.2 註解實現方式 * **概念**:**切面的邏輯** * **使用註解**: * **@BeforeJob**:執行前 * **@AfterJob**:執行後 * **使用方式與接口方式相同** * **代碼**: ```java= import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.annotation.AfterJob; import org.springframework.batch.core.annotation.BeforeJob; public class JobStatusListenerByAnnotation { private static final Logger LOGGER = LoggerFactory.getLogger(JobStatusListenerByAnnotation.class); @BeforeJob public void beforeJob(JobExecution jobExecution) { LOGGER.info("A Job 開始執行,status = {}", jobExecution.getStatus()); } @AfterJob public void afterJob(JobExecution jobExecution) { LOGGER.info("A Job 執行後,status = {}", jobExecution.getStatus()); } } ``` ## 2.3 執行上下文 ExecutionContext * **總結**: * **Step資料保存在StepExecutionContext,只能在Step中使用** * **Job資料保存在jobExecutionContext,可以在所有Step中共享** * **開啟資料庫觀察表**:`batch_job_execution_context`、`batch_step_execution_context`表 * **JobContext**資料儲存到:`batchjob_execution_context` * **StepContext**資料儲存到:`batchstep_executioncontext` ### 2.3.1 上下文 * 上下文有`環境`、`語境`、`氛圍`的意思 * Spring使用`Context`表示上下文,例: * Spring容器:SpringApplicationContext * SpringBatch中: * **JobContext**:維護JobExecution對象,實現Job收尾工作,與處理各種Job回調邏輯 * **JobContext**綁定**JobExecution執行對象**為**Job**提供**執行環境**(`上下文`) * **StepContext**:維護StepExecution對象,實現Step收尾工作,與處理各種Step回調邏輯 * **StepContext**綁定**StepExecution執行對象**為**Step**提供**執行環境**(`上下文`) ### 2.3.2 執行上下文 * **ExecutionContext**:SpringBatch的**執行上下文** * **作用**:**數據共享** * **ExecutionContext分為兩大類**: * **Job的ExecutionContext**: * **作用域**:一次Job運行,**所有Step間**資料共用 * **Step的ExecutionContext**: * **作用域**:一次Step運行,**單一Step間**(`ItemReader/ItemProcessor/ItemWrite組件間`)資料共享 ### 2.3.3 Job與Step的執行鏈 ![SpringBatch_Job與Step的執行鏈.drawio](https://hackmd.io/_uploads/BkVICzVpA.png) ### 2.3.4 Job與Step的引用鏈 * Job鏈:Job -> JobInstance -> JobContext -> JobExecution -> ExecutionContext(`Job`) * Step鏈:Step -> StepContext -> StepExecution -> ExecutionContext(`Step`) ### 2.3.5 JobContext的API * 使用一個**工具類**創建**JobContext**:`JobSynchronizationManager.getContext();` * 代碼: ```java= import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.scope.context.JobContext; import org.springframework.batch.core.scope.context.JobSynchronizationManager; import java.util.Map; public class UseJobContxt { public void execute() { //使用一個工具類創建JobContext JobContext context = JobSynchronizationManager.getContext(); JobExecution jobExecution = context.getJobExecution(); Map<String, Object> jobParameters = context.getJobParameters(); Map<String, Object> jobExecutionContext = context.getJobExecutionContext(); } } ``` ### 2.3.6 StepContext的API * **代碼**: ```java= import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.scope.context.StepContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import java.util.Map; public class UseStepContext implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { //獲取Step上下文 => StepContext StepContext stepContext = chunkContext.getStepContext(); StepExecution stepExecution = stepContext.getStepExecution(); //當前Step執行上下文 Map<String, Object> stepExecutionContext = stepContext.getStepExecutionContext(); //當次Job執行上下文 Map<String, Object> jobExecutionContext = stepContext.getJobExecutionContext(); //通過StepExecution獲取JobExecution BatchStatus status = contribution.getStepExecution().getJobExecution().getStatus(); //獲取Job參數 Map<String, Object> jobParameters = stepContext.getJobParameters(); return null; } } ``` ### 2.3.7 ExecutionContext的API * **代碼**: ```java= import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.scope.context.StepContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.repeat.RepeatStatus; public class UseExecutionContext implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { //Step => StepContext -> StepExecution -> ExecutionContext StepContext stepContext = chunkContext.getStepContext(); StepExecution stepExecution = stepContext.getStepExecution(); ExecutionContext executionContext = stepExecution.getExecutionContext(); executionContext.put("key1","AAA"); //Job => JobExecution -> ExecutionContext JobExecution jobExecution = stepExecution.getJobExecution(); ExecutionContext executionContext1 = jobExecution.getExecutionContext(); executionContext.put("key2","AAA"); return null; } } ``` ### 2.3.8 使用案例 * **需求**:觀察**JobExecutionContext**與**StepExecutionContext**資料共享 * **步驟**: * 定義兩個step * 在**step1**中**設定數據** * JobExecutionContext 添加:key-step1-job = value-step1-job * StepExecutionContext 添加:key-step1-step = value-step1-step * 在**step2**中**打印ExecutionContext**觀察 * **代碼**: * step1: ```java= public class ObserveStep1 implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { StepContext stepContext = chunkContext.getStepContext(); //Step //Map 可以獲取數據,但不可以修改 Map<String, Object> map = stepContext.getStepExecutionContext(); //通過執行上下文,獲取參數並修改 ExecutionContext stepExecutionContext = stepContext.getStepExecution().getExecutionContext(); stepExecutionContext.put("key-step1-step A","value-step1-step"); //Job ExecutionContext jobExecutionContext = stepContext.getStepExecution().getJobExecution().getExecutionContext(); jobExecutionContext.put("key-step1-job X","value-step1-job"); return RepeatStatus.FINISHED; } } ``` * step2: ```java= public class ObserveStep2 implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { StepContext stepContext = chunkContext.getStepContext(); System.out.println("Step: "+ stepContext.getStepExecution().getExecutionContext().get("key-step1-step")); System.out.println("-------"); System.out.println("Job: "+stepContext.getJobExecutionContext().get("key-step1-job")); return RepeatStatus.FINISHED; } } ``` * 設定: ```java= @Bean("contextJob") public Job contextJob(JobRepository jobRepository) { return new JobBuilder("contextJob", jobRepository) .incrementer(new RunIdIncrementer()) //1. 第一個step,獨立執行 .start(new StepBuilder("contextJobStep1", jobRepository) .tasklet(new ObserveStep1(), jdbcTransactionManager) .build()) .next(new StepBuilder("contextJobStep2", jobRepository) .tasklet(new ObserveStep2(), jdbcTransactionManager) .build()) .build(); } ``` * 結果: ```= Step: null ------- Job: value-step1-job ``` * Step中的數據無法共享給其他的Step,所以Step的數據為null * Job中的數據是共享整個Job # 3. Step-步驟對象 * **支持兩種Step處理模式**: * **Tasklet**處理模式 * 簡單 * 實現**Tasklet接口**,就可以構建一個Step代碼,循環執行,直到返回`RepeatStatus.FINISHED` * **chunk**(`塊`)處理模式 * 包含2~3個組件: 1. ltemReader 2. ltemProcessor(`可選`) 3. ltemWriter ## 3.1 Tasklet 處理模式 * **Tasklet接口源碼**: ```java= @FunctionalInterface public interface Tasklet { @Nullable RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception; } ``` * **重寫execute方法** * **參數**: * **StepContribution**:`step信息對象` * 主要**設定Step結果狀態**:`contribution.setExitStatus(ExitStatus status)` * **contribution.setexitstatus(Exitstatus.COMPLETED)** * **ChunkContext**:`chunck上下文` * **紀錄chunk的執行環境**,可獲取`StepContext`\`JobContext` * **返回值**: * **RepeatStatus**(`枚舉類`):**當前Step狀態** * RepeatStatus.**CONTINUABLE**:當前Step**循環執行** * RepeatStatus.**FINISHED**:當前Step**執行結束** ## 3.2 Chunk 處理模式 * **三個模塊**(`接口`): * **ltemReader\<T>**:讀模塊 * **ltemProcessor\<I, O>**(`可選`):處理模塊 * **ltemWriter\<T>**:寫模塊 * **結構圖**: ![Chunk_結構圖.drawio](https://hackmd.io/_uploads/r1VSi2BpC.png) * **時序圖**: ![Chunk_時序圖.drawio](https://hackmd.io/_uploads/Byq0C2H6A.png) * **item**:`數據` ### 3.2.1 案例:Chunk基本使用 * **代碼**: * **接口實現** * **ltemReader\<T>** ```java= public class SimpleTaskletChunkItemReader implements ItemReader { //讀三次後結束 private int size = 3; @Override public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if (size > 0) { size--; System.out.println("============= Reader =========="); return "read-ret"; } else { return null; } } } ``` * **T**:**寫出類型** * **ltemProcessor\<I, O>** ```java= public class SimpleTaskletChunkItemProcessor implements ItemProcessor { @Override public Object process(Object item) throws Exception { //item = 進來的數據 System.out.println("============= Processor =========== " + item); //加工後輸出數據 return "process-ret -> " + item; } } ``` * **I**:**讀入類型** * **O**:**寫出類型** * **ltemWriter\<T>** ```java= public class SimpleTaskletChunkItemWriter implements ItemWriter { @Override public void write(Chunk chunk) throws Exception { List items = chunk.getItems(); System.out.println(items); } } ``` * **T**:**讀入類型** * 配置 ```java= //基本Chunk處理模式範例 @Bean("simpleChunkJob") public Job simpleChunkJob(JobRepository jobRepository) { return new JobBuilder("simpleChunkJob", jobRepository) .start(new StepBuilder("simpleChunkJob-step1", jobRepository) .chunk(3, jdbcTransactionManager) .reader(new SimpleTaskletChunkItemReader()) .processor(new SimpleTaskletChunkItemProcessor()) .writer(new SimpleTaskletChunkItemWriter()).build()) .build(); } ``` * **chunk(chunkSize,transactionManager)**: * **chunkSize**:**一次批次**只處理 **?條** 數據 * **結果**: * **持續循環打印**,先讀`三次` -> 再處理`三次` -> **最後一次性處理所有**(`ltemWriter`) ``` ============= Reader ========== ============= Reader ========== ============= Reader ========== ============= Processor =========== read-ret ============= Processor =========== read-ret ============= Processor =========== read-ret [process-ret -> read-ret, process-ret -> read-ret, process-ret -> read-ret] ``` * **chunkSize**設定為**3**,所以**該批次最多處理3條數據**(`多的數據,下一個批次處理`) * **死循環重複上述步驟**(`因為一直有獲取到數據`),直到**ltemReader**返回null * **特點**:**ltemReader會一直讀取**,直到`返回null,才停止` * **ltemProcessor是跟著ltemReader**:ltemReader讀幾次,ltemProcessor也對應的執行幾次 ## 3.3 Step 監聽器 * **兩種Step 監聽器**: * **StepExecutionListener**:監聽`Step前後` * **ChunkListener**:監聽`Chunk塊執行前後` * **兩種監聽的配置方式相同** ### 3.3.1 StepExecutionListener接口 * **源碼**: ```java= public interface StepExecutionListener extends StepListener { default void beforeStep(StepExecution stepExecution) { } @Nullable default ExitStatus afterStep(StepExecution stepExecution) { return null; } } ``` * ExitStatus:Step狀態 * 代碼: ```java= public class SimpleStepListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { System.out.println("----------Before Step----------"); } @Override public ExitStatus afterStep(StepExecution stepExecution) { System.out.println("----------After Step----------"); return stepExecution.getExitStatus(); } } ``` * 配置: ```java= @Bean("step1") public Step getStep1(JobRepository jobRepository) { return new StepBuilder("paramJobStep1", jobRepository) //簡單的tasklet模式,還有Chunk塊處理模式 .tasklet(new MessageTaskLet(), jdbcTransactionManager) //設定Step監聽器 .listener(new SimpleStepListener()) .build(); } ``` ## 3.4 Step控制 * **目標**:根據**條件**來`決定Step的執行` * **可使用的API**:改變Ste的執行順序 * **start()**:定義開始的Step * **next()**:定義下一個的Step * **on(條件)**:如果**滿足該條件**(`上一個Step的返回值`),則執行該定義後續Step * **條件**:**ExitStatus**或**自定義ExitStatus**的字串 * \*號:通配符 * **from()**:定義**從哪個 Step 開始進行條件判斷**。它通常和 `on()`、`to()` 結合使用。 * **to()**:定義流向下一個Step * **end()**:結束分支判斷 ### 3.4.1 使用Step的狀態進行Step控制-ExitStatus * **需求**:作業執行**firstStep**步驟,如果處理成功執行**successStep**,如果處理失敗執行**failStep** * 代碼: * **firstStep**: ```java= public class FirstStep implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("======== FirstStep ========="); // throw new Exception("模擬FirstStep執行失敗");//失敗的ExitStatus是FAILED return RepeatStatus.FINISHED; } } ``` * **successStep**: ```java= public class SuccessStep implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("========== successStep ========="); return RepeatStatus.FINISHED; } } ``` * **failStep**: ```java= public class FailStep implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("========== FailStep ==========="); return RepeatStatus.FINISHED; } } ``` * **配置**: ```java= @Bean("simpleStepControlJob") public Job simpleStepControlJob(JobRepository jobRepository) { return new JobBuilder("simpleStepControlJob", jobRepository) .start(firstStep(jobRepository)) //on 滿足X條件,執行後續邏輯 .on("FAILED") .to(new StepBuilder("fail", jobRepository) .tasklet(new FailStep(), jdbcTransactionManager) .build()) //from 從哪個step開始 .from(firstStep(jobRepository)) //on 滿足X條件,執行後續邏輯 .on("*") .to(new StepBuilder("success", jobRepository).tasklet(new SuccessStep(), jdbcTransactionManager).build()) .end() .build(); } @Bean("firstStep") public Step firstStep(JobRepository jobRepository) { return new StepBuilder("first", jobRepository) .tasklet(new FirstStep(), jdbcTransactionManager) .build(); } ``` ### 3.4.2 使用Step的狀態進行Step控制-自定義ExitStatus * 實現**JobExecutionDecider接口**:**實現自定狀態值** ```java= @FunctionalInterface public interface JobExecutionDecider { FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution); } ``` * **需求**:先執行startStep,如果回傳值為A,執行stepA,回傳值為B,執行stepB,其他執行defaultstep * 代碼: * **實現自定狀態值**:實現**JobExecutionDecider接口** ```java= public class MyStepExistsStatusDecider implements JobExecutionDecider { //執行過程返回狀態值,有三分之ㄧ的機會返回A\B\C @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { int ret = new Random().nextInt(3); return switch (ret) { case 0 -> new FlowExecutionStatus("A"); case 1 -> new FlowExecutionStatus("B"); default -> new FlowExecutionStatus("C"); }; } } ``` * **startStep** ```java= public class StartStep implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("======== StartStep ========="); return RepeatStatus.FINISHED; } } ``` * **stepA** ```java= public class StepA implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("========= StepA ========="); return RepeatStatus.FINISHED; } } ``` * **stepB** ```java= public class StepB implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("========= StepB ========="); return RepeatStatus.FINISHED; } } ``` * **defaultstep** ```java= public class DefaultStep implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("========== default Step ======"); return RepeatStatus.FINISHED; } } ``` * **配置**: ```java= @Bean("CustomerStepControl") public Job getCustomerStepControl(JobRepository jobRepository) { return new JobBuilder("CustomerStepControl", jobRepository) .start(getStartStep(jobRepository)) //自定Step狀態 => next(決策器) .next(getMyStepExistsStatusDecider()) //'A' .from(getMyStepExistsStatusDecider()).on("A") .to(new StepBuilder("StartA", jobRepository) .tasklet(new StepA(), jdbcTransactionManager).build()) //'B' .from(getMyStepExistsStatusDecider()).on("B") .to(new StepBuilder("StepB", jobRepository) .tasklet(new StepB(), jdbcTransactionManager).build()) // default .from(getMyStepExistsStatusDecider()).on("*") .to(new StepBuilder("DefaultStep", jobRepository) .tasklet(new DefaultStep(), jdbcTransactionManager).build()) .end() .build(); } @Bean("StartSep") public Step getStartStep(JobRepository jobRepository) { return new StepBuilder("StartSep1", jobRepository) .tasklet(new StartStep(), jdbcTransactionManager).build(); } //自定決策器 @Bean("MyStepExistsStatusDecider") public MyStepExistsStatusDecider getMyStepExistsStatusDecider() { return new MyStepExistsStatusDecider(); } ``` ## 3.5 ExitStatus * **作用**:表示Step、Chunk、Job的執行狀態 * **狀態種類**: * UNKNOWN:不知 * EXECUTING:執行中 * COMPLETED:執行成功 * NOOP:無效執行 * FAILED:執行失敗 * STOPPED:執行中斷 * Job啟動之後,這些**狀態皆為流程自行控制** * **成功結束**:返回**COMPLETED** * **異常結束**:返回**FAILED** * **無效執行**(`該Step執行前就結束,例:重複的Job參數`):返回**NOOP** * SpringBatch **有提供3個方法決定Job流程走向** * 調用**end()**:表示成功執行結束 => 返回**COMPLETED** * 調用**fail()**:表示異常結束 => 返回**FAILED** * 調用**stopAndRestart(step)**:表示中斷結束 => 返回**STOPPED** ### 3.5.1 案例-控制Step返回的ExitStatus * **需求**:當Step的firstStep執行**出現異常**時,透過`end`,`fail`,`stopAndRestart`改變**Step**的**ExitStatus**(`Step的狀態在DB中不變,但Job的狀態會改變`) * **.end()**:DB的狀態**Step**為**FAILED**,**Job**為**COMPLETED**(`FAILED -> COMPLETED`) * **.fail()**::DB的狀態**Step**為**FAILED**,**Job**為**FAILED**(`FAILED -> FAILED`) * **.stopAndRestart(successStep)**::DB的狀態**Step**為**FAILED**,**Job**為**STOPPED**(`FAILED -> STOPPED`) * **代碼**: * **配置**: ```java= @Bean("job-control-existstatus") public Job getJobControlExiststatus(JobRepository jobRepository, @Qualifier("FirstStepControl") Step firstStep, @Qualifier("SuccessfulStepControl") Step successStep) { return new JobBuilder("job-control-existstatus", jobRepository) .start(firstStep) //強制改變Job的ExistStatus,DB中Step的狀態,仍然是FAILED,但Job的狀態會變成COMPLETED、FAILED、STOPPED // .on("FAILED").end()//將當前Job的狀態應該為異常結束,轉換成『正常結束』 (該Step的狀態為FAILED\該Job的狀態為COMPLETED) // .on("FAILED").fail()//將當前Job的狀態應該為異常結束,轉換成『異常結束』 .on("FAILED").stopAndRestart(successStep)//將當前Job的狀態為異常結束,轉換成『執行中斷』狀態的結束,後續重新啟動時,從successStep開始 .from(firstStep).on("*").to(successStep) .end() .build(); } @Bean("FirstStepControl") public Step getFristStepControl(JobRepository jobRepository) { return new StepBuilder("job-control-existstatus-firststep", jobRepository) .tasklet(new FirstStep(), jdbcTransactionManager).build(); } @Bean("SuccessfulStepControl") public Step getSuccessfulStepControl(JobRepository jobRepository) { return new StepBuilder("job-control-existstatus-successfulstep", jobRepository) .tasklet(new SuccessStep(), jdbcTransactionManager) .build(); } ``` ## 3.6 FlowStep 組合Step * **Flow類**:**Step的集合** * 由**多個子Step組成**,Job執行時,當成一個普通的Step * 用於複雜的業務(`一個邏輯需要拆分成按照順序執行的子Step`) * **需求**:執行順序 `StepA -> StepB -> StepC`,**StepB**中又包括`StepB1 -> StepB2 -> StepB3` * **代碼**: * **Step配置**: ```java= @Bean("FlowStepA") public Step getFlowStepA() { return new StepBuilder("Step-FlowStepA", jobRepository) .tasklet(new FlowStepA(), jdbcTransactionManager) .build(); } @Bean("FlowStepB") public Step getFlowStepB() { return new StepBuilder("Step-FlowStepB", jobRepository) //使用Step來包裝Flow .flow(new FlowBuilder<Flow>("FlowB") .start(new StepBuilder("Step-FlowB-1", jobRepository) .tasklet(new FlowStepB1(), jdbcTransactionManager) .build()) .next(new StepBuilder("Step-FlowB-2", jobRepository) .tasklet(new FlowStepB2(), jdbcTransactionManager) .build()) .next(new StepBuilder("Step-FlowB-3", jobRepository) .tasklet(new FlowStepB3(), jdbcTransactionManager) .build()) .build()) .build(); } @Bean("FlowStepC") public Step getFlowStepC() { return new StepBuilder("Step-FlowStepC", jobRepository) .tasklet(new FlowStepC(), jdbcTransactionManager) .build(); } ``` * **Job方法中無法傳入Flow類**,需要將Flow類**封裝**至Step類 * **Flow類中可以包Flow類、Step類** * **Job配置**: ```java= @Bean("JobFlowStep") public Job getJobFlowStep() { return new JobBuilder("JobFlowStep", jobRepository) .start(getFlowStepA()) .next(getFlowStepB()) .next(getFlowStepC()) .build(); } ``` # 4. SpringBatch 數據庫表 # 5. Job控制 ## 5.1 Job啟動 * **SpringBoot啟動**: * SpringBoot啟動後會調用**JobLauncherApplicationRunner**的`run方法` * application.properties(`默認自動啟動`),設定SpringBoot啟動後,不執行 ```properties spring.batch.job.enabled=false ``` * **單元測試啟動**: ```java= @SpringBootTest class BatchTest1ApplicationTests { @Autowired private JobLauncher jobLauncher; @Qualifier("JobFlowStep") @Autowired private Job paramJob; @Test void contextLoads() throws Exception { //傳入參數 Map<String, JobParameter<?>> map = new HashMap<>(); map.put("name", new JobParameter<>(UUID.randomUUID().toString(), String.class)); //啟動Job,並放入參數 jobLauncher.run(paramJob, new JobParameters(map)); } } ``` * **RESTFul API 啟動**: * 先禁用不跟隨SpringBoot啟動 * 代碼:兩種建立JobParameter的方式 ```java= @RestController("/job") public class JobController { private final JobLauncher jobLauncher; private final JobExplorer jobExplorer;//Job的相關對象 private final Job paramJob; public JobController(JobLauncher jobLauncher, JobExplorer jobExplorer, @Qualifier("JobFlowStep") Job paramJob) { this.jobLauncher = jobLauncher; this.jobExplorer = jobExplorer; this.paramJob = paramJob; } @GetMapping("/start/{id}") public ResponseEntity<ExitStatus> startJob(@PathVariable("id") String id) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { JobParameters jobParameters = new JobParametersBuilder(jobExplorer) //獲取該Job上一次的Job的啟動參數 .getNextJobParameters(paramJob) .addJobParameter("name", UUID.randomUUID().toString(), String.class) .toJobParameters(); //傳入參數 Map<String, JobParameter<?>> map = new HashMap<>(); map.put("id", new JobParameter<>(id, String.class)); //啟動Job JobExecution run = jobLauncher.run(paramJob, new JobParameters(map)); return ResponseEntity.ok(run.getExitStatus()); } } ``` ## 5.2 Job停止 * Job停止,存在**三種情況**: * **自然結束**:Job成功執行,正常停止,Job的返回狀態為**COMPLETED** * **異常結束**:Job執行過程中,因其他因素,導致Job中斷而停止,Job的返回狀態**大多數**為**FAILED** * **編程結束**:某Step的結果**不滿足**下一個Step的執行時,**手動停止**,Job的返回狀態**一般設定**為**STOPPED** ### 5.2.1 Job停止-編程結束 * **場境**: * 有一個資源類,有兩個屬性 * totalCount = 100 * readCount = 0 * 設計兩個步驟: * Step1:用於疊加readCount => 模擬從DB讀取資源 * Step2:用於邏輯執行 * **條件**: * 當totalCount **==** readCount時 => 正常結束 * 當totalCount **!=** readCount時 => 異常結束,並不執行Step2,直接停止Job * 當數據修復後,從Step1開始,並完成Job * 兩種方式: * Step監聽器方式 * StepExecution停止標記 #### 5.2.1.1 方式-Step監聽器方式 * 代碼: * Step監聽器: ```java= public class StopStepListener implements StepExecutionListener { @Override public ExitStatus afterStep(StepExecution stepExecution) { //不滿足條件的處理 if (ResourceCount.totalCount != ResourceCount.readCount) { return ExitStatus.STOPPED; } return stepExecution.getExitStatus(); } } ``` * Step1 ```java= public class StopStep1 implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("======== StopStep1 ========"); int[] numbers = {100, 50, 30, 10}; Random random = new Random(); int randomIndex = random.nextInt(numbers.length); //模擬從DB中查詢數據 ResourceCount.readCount = numbers[randomIndex]; return RepeatStatus.FINISHED; } } ``` * Step2 ```java= public class StopStep2 implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("======== StopStep2 ========"); System.out.println("readCount : " + ResourceCount.readCount); System.out.println("totalCount : " + ResourceCount.totalCount); return RepeatStatus.FINISHED; } } ``` * **配置**: ```java= //模擬作業Stop @Bean("StopStep1") public Step getStopStep1() { return new StepBuilder("StopStep1", jobRepository) .tasklet(new StopStep1(), jdbcTransactionManager) .listener(new StopStepListener()) .allowStartIfComplete(true)//允許Step可以重新執行 .build(); } @Bean("StopJobTest") public Job gteStopJobTest() { return new JobBuilder("StopJobTest", jobRepository) .start(getStopStep1()) //當STOPPED時,Job馬上停止,Job重新啟動時,從Step1開始執行 .on("STOPPED").stopAndRestart(getStopStep1()) //如果Step1的狀態非為STOPPED,滿足條件 .from(getStopStep1()).on("*").to(new StepBuilder("StopStep2", jobRepository).tasklet(new StopStep2(), jdbcTransactionManager).build()) .end() .build(); } ``` #### 5.2.1.2 方式-StepExecution停止標記 * **代碼**: * **Step1、設置StepExecution停止標記**: ```java= public class StopStep1StopExecution implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("======== StopStep1StopExecution ========"); int[] numbers = {100,50}; Random random = new Random(); int randomIndex = random.nextInt(numbers.length); //1.模擬從DB中查詢數據 ResourceCount.readCount = numbers[randomIndex]; //2.設定停止標記 if (ResourceCount.readCount != ResourceCount.totalCount) { //停止標記 chunkContext.getStepContext().getStepExecution().setTerminateOnly(); } return RepeatStatus.FINISHED; } } ``` * **Step2** ```java= public class StopStep2 implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("======== StopStep2 ========"); System.out.println("readCount : " + ResourceCount.readCount); System.out.println("totalCount : " + ResourceCount.totalCount); return RepeatStatus.FINISHED; } } ``` * **配置**: ```java= @Bean("StopStep1_1") public Step getStopStep1_1() { return new StepBuilder("StopStep1_1", jobRepository) .tasklet(new StopStep1StopExecution(), jdbcTransactionManager) .allowStartIfComplete(true)//允許Step可以重新執行 .build(); } @Bean("StopJobTest1") public Job gteStopJobTest1() { return new JobBuilder("StopJobTest1", jobRepository) .start(getStopStep1_1()) .next(new StepBuilder("StopStep3", jobRepository).tasklet(new StopStep2(), jdbcTransactionManager).build()) .build(); } ``` ## 5.3 Job重啟 * **默認情況**(`Job名稱+Job參數一樣`)下,只允許**FAILED**、**STOPPED**的狀態 => **無限重啟** * **COMPLETED**的狀態下 => **無法重啟** ### 5.3.1 禁止重啟 * 適用**一次性執行場景**,執行失敗,就不允許再次執行 * **方法**:`.preventRestart()` * **代碼**: ```java= @Bean("StopJobTest1") public Job gteStopJobTest1() { return new JobBuilder("StopJobTest1", jobRepository) //禁止重啟 .preventRestart() .start(getStopStep1_1()) .next(new StepBuilder("StopStep3", jobRepository).tasklet(new StopStep2(), jdbcTransactionManager).build()) .build(); } ``` ### 5.3.2 限制重啟次數 * 適用**重啟次數有限**的場景 * 例:下載、讀取操作 * **代碼**:設定在Step上,限制Step的啟動次數 ```java= @Bean("StopStep1_1") public Step getStopStep1_1() { return new StepBuilder("StopStep1_1", jobRepository) //該Step只能夠重新啟動1次 >> 第一次啟動+重啟 = 2 .startLimit(2) .tasklet(new StopStep1StopExecution(), jdbcTransactionManager) .allowStartIfComplete(true)//允許Step可以重新執行 .build(); } ``` ### 5.3.3 無限重啟 * 默認情況下,相同的**Job名稱+Job參數**只能夠`成功執行一次` * **調整Step的配置,達到Job的無限重啟** * **allowStartIfComplete(true)**:設定Step無限重啟 * **代碼**: ```java= @Bean("StopStep1_1") public Step getStopStep1_1() { return new StepBuilder("StopStep1_1", jobRepository) //該Step只能夠重新啟動1次 // .startLimit(2) //Step無限重啟 .allowStartIfComplete(true) .tasklet(new StopStep1StopExecution(), jdbcTransactionManager) .allowStartIfComplete(true)//允許Step可以重新執行 .build(); } ``` # 6. ItemReader * Chunk步驟 * 源碼: ```java= @FunctionalInterface public interface ItemReader<T> { @Nullable T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; } ``` * T:輸出的數據類型 * 作用:數據讀取 ## 6.1 讀取純文本 * 簡單行、多行結構的純文本,例:user.txt ```= 1#21323#18 2#asasa41#16 3#213adas23#20 4#90hak23#19 5#sda21323#15 ``` * 默認使用**FlatFileItemReader類** * **方法**: * **resource**:設定文件來源 * 解析字符串 * **delimited**:分割字符串 * delimiter:指定分割的符號(`默認『,』`) * names:分割後的字串Array,每個Column的命名,需要與**targetType**設定**物件的屬性名對應** * **fieldSetMapper**:字段映射(`複雜`) * 參數:**FieldSetMapper接口**的實現 * **targetType**:數據封裝 ### 6.1.1 字符串的擷取-delimited方法 * **需求**:**讀取user.txt** * 對象: ```java= @Data public class User { private Long id; private String name; private Integer age; } ``` * 數據: ```= 1#21323#18 2#asasa41#16 3#213adas23#20 4#90hak23#19 5#sd#11 ``` * **代碼**: * **ItemWriter** ```java= public class UserItemWriter implements ItemWriter<User> { @Override public void write(Chunk<? extends User> chunk) throws Exception { List<? extends User> items = chunk.getItems(); items.forEach(System.out::println); } } ``` * 使用**FlatFileItemReader**的**delimited方法**:分割字串『#』 ```java= @Bean("getUserByTxt") public FlatFileItemReader<User> getUserByTxt() { return new FlatFileItemReaderBuilder<User>() .name("user-item-reader") //獲取文件 .resource(new ClassPathResource("user.txt")) //解析數據-指定解析器使用『#』分割,默認是『,』 .delimited().delimiter("#") //分割後的字串Array,每個Column的命名,需要與User的屬性名對應 .names("id", "name", "age") //封裝數據 => 將讀取的數據,封裝到對象中 .targetType(User.class) .build(); } ``` * **Step + Job 配置**: ```java= @Bean("getUserTxtFileStep") public Step getUserTxtFileStep() { return new StepBuilder("getUserTxtFile", jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象 .reader(getUserByTxt()) .writer(new UserItemWriter()) .build(); } @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() { return new JobBuilder("getUserTxtFileJob", jobRepository) .start(getUserTxtFileStep()) .build(); } ``` ### 6.1.2 字段映射-FieldSetMapper接口 * 數據user2.txt ```= 1#21323#18#KKK#KKK 2#asasa41#16#JJJ#JJJ 3#213adas23#20#BBB#BBB 4#90hak23#19#III#QWw 5#sd#12#AAA#AAA ``` * 對象 ```java= @Data public class User { private Long id; private String name; private Integer age; private String address; } ``` * 代碼: * 使用**FlatFileItemReader**的fieldSetMapper方法映射數據: ```java= @Bean("getUserByTxt") public FlatFileItemReader<User> getUserByTxt() { return new FlatFileItemReaderBuilder<User>() //獲取文件 .name("user-item-reader") .resource(new ClassPathResource("user2.txt")) //解析數據-指定解析器使用『#』分割,默認是『,』 .delimited().delimiter("#") //分割後的字串Array,每個Column的命名,需要與User的屬性名對應 .names("id", "name", "age", "city", "area") //封裝數據 // //自動封裝 => 將讀取的數據,封裝到對象中 // .targetType(User.class) //手動封裝 => 映射邏輯 .fieldSetMapper(new UserFieldSetMapper()) .build(); } ``` * **delimited()**:字符串分割 => 『#』 * 改用**fieldSetMapper()**:手動封裝數據 * **FieldSetMapper接口實現**: * **分割出的字段名**:`"id", "name", "age", "city", "area"` ```java= public class UserFieldSetMapper implements FieldSetMapper<User> { @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { //"id", "name", "age", "city", "area" User user = new User(); user.setId(fieldSet.readLong("id")); user.setName(fieldSet.readString("name")); user.setAge(fieldSet.readInt("age")); user.setAddress(fieldSet.readString("city") + "_" + fieldSet.readString("area")); return user; } } ``` * **Step + Job 配置**: ```java= @Bean("getUserTxtFileStep") public Step getUserTxtFileStep() { return new StepBuilder("getUserTxtFile", jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象 .reader(getUserByTxt()) .writer(new UserItemWriter()) .build(); } @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() { return new JobBuilder("getUserTxtFileJob", jobRepository) .start(getUserTxtFileStep()) .build(); } ``` ## 6.2 讀取Json文件 * 使用**JsonItemReader** * **.jsonObjectReader()**:**解析+封裝數據** * **需求**:**讀取下列Josn檔案** ```json= [ { "id":1,"name":"a1","age":14}, { "id":2,"name":"a2","age":15}, { "id":3,"name":"a3","age":46} ] ``` * 對象 ```java= @Data public class User { private Long id; private String name; private Integer age; } ``` * **代碼**: * **配置JsonItemReader** ```java= @Bean("UserItemReaderJson") public JsonItemReader<User> getUserItemReaderJson() { JacksonJsonObjectReader<User> jsonObjectReader = new JacksonJsonObjectReader<>(User.class); jsonObjectReader.setMapper(new ObjectMapper()); return new JsonItemReaderBuilder<User>() .name("UserItemReaderJson") //獲取數據 .resource(new ClassPathResource("user.json")) //解析+封裝數據 .jsonObjectReader(jsonObjectReader) .build(); } ``` * **Json解析器**: * **ObjectMapper** => **JacksonJsonObjectReader** * **Gson** => **GsonJsonObjectReader** * **Step + Job 配置**: ```java= @Bean("getUserTxtFileStep") public Step getUserTxtFileStep() { return new StepBuilder("getUserTxtFile", jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象 .reader(getUserItemReaderJson()) .writer(new UserItemWriter()) .build(); } @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() { return new JobBuilder("getUserTxtFileJob", jobRepository) .start(getUserTxtFileStep()) .build(); } ``` ## 6.3 讀取DB * 兩種從DB獲取數據的方式 * **居於遊標方式**:一次讀取出所有符合條件的數據 * **JdbcCursorItemReader**類 * .sql():寫Sql * .dataSource():設定DB連接 * .preparedStatementSetter():設定Sql的參數 * .rowMapper():數據映射 * **分頁讀取方式**:按照指定的pageSize讀取 * **JdbcPagingItemReader類** ### 6.3.1 讀取DB-居於遊標方式 * 一條一條的讀取數據 * **遊標** => 指針的概念 * **遊標遍歷時**,讀取出一條數據,使用**JDBC操作**的話,**數據**會被`封裝至ResultSet`中,使用SpringBatch中的**RowMapper**實現**映射** * 表 -> 數據 * **代碼**: * 實作**RowMapper**(`處理數據映射`) ```java= public class UserRowMapper implements RowMapper<User> { @Override public User mapRow(ResultSet rs, int rowNum) throws SQLException { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; } } ``` * 配置**JdbcCursorItemReader** ```java= @Bean public JdbcCursorItemReader<User> getJdbcCursorItemReader() { return new JdbcCursorItemReaderBuilder<User>() .name("UserJdbcCursorItemReader") //連接DB .dataSource(dataSource) //執行SQL查詢數據 以游標的方式(一行一行讀),返回數據 .sql("Select * from \"user\" where age > ? ") //拼接參數 >> 查詢條件 age需要大於16 .preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16})) //DB讀出的數據,與實體一一映射 .rowMapper(new UserRowMapper()) .build(); } ``` * **Step + Job 配置**: ```java= @Bean("getUserTxtFileStep") public Step getUserTxtFileStep() { return new StepBuilder("getUserTxtFile", jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象 .reader(getUserItemReaderJson()) .writer(new UserItemWriter()) .build(); } @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() { return new JobBuilder("getUserTxtFileJob", jobRepository) .start(getUserTxtFileStep()) .build(); } ``` ### 6.3.2 讀取DB-分頁讀取方式 * **PageingQueryProvider**:**分頁邏輯** * **代碼**: * 設定**PagingQueryProvider** ```java= @Bean public PagingQueryProvider getPagingQueryProvider() throws Exception { SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean(); sqlPagingQueryProviderFactoryBean.setDataSource(dataSource); //查詢的欄位 sqlPagingQueryProviderFactoryBean.setSelectClause("Select * "); //表 sqlPagingQueryProviderFactoryBean.setFromClause("from \"user\" "); //條件 sqlPagingQueryProviderFactoryBean.setWhereClause("where age > :age");//age表示占位符 //排序 sqlPagingQueryProviderFactoryBean.setSortKey("id"); return sqlPagingQueryProviderFactoryBean.getObject(); } ``` * 設定**JdbcPagingItemReader** ```java= @Bean public JdbcPagingItemReader<User> getJdbcPagingItemReader() throws Exception { return new JdbcPagingItemReaderBuilder<User>() .name("UserJdbcPagingItemReader") //1. 連接DB .dataSource(dataSource) //2. DB讀出的數據,與實體一一映射 .rowMapper(new UserRowMapper()) //3. 設定 SQL //3.1 分頁邏輯 .queryProvider(getPagingQueryProvider()) //3.2條件 .parameterValues(Map.of("age", 16)) //3.3一次讀10筆 .pageSize(10) .build(); } ``` * **Step + Job 配置**: ```java= @Bean("getUserTxtFileStep") public Step getUserTxtFileStep() { return new StepBuilder("getUserTxtFile", jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象 .reader(getJdbcPagingItemReader()) .writer(new UserItemWriter()) .build(); } @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() { return new JobBuilder("getUserTxtFileJob", jobRepository) .start(getUserTxtFileStep()) .build(); } ``` ## 6.4 讀取異常 * **三種處理方式**: * 跳過異常 * **異常紀錄日誌**(`建議使用`) * 放棄處理 ### 6.4.1 跳過異常 * ItemReader可以按照約定**跳過指定的異常**,也可以限制跳過次數 * Step的**faultTolerant()** 方法:**開始容錯的配置** * **代碼**: ```java= @Bean public Step handlerStep() throws Exception { return new StepBuilder("handlerStep",jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象 .reader(getJdbcPagingItemReader()) .writer(new UserItemWriter()) .faultTolerant()//容錯 .skip(Exception.class)//跳過 Exception 異常 .noSkip(RuntimeException.class)//不跳過 RuntimeException 異常 .skipLimit(10)//跳過異常次數 .skipPolicy(new SkipPolicy() { @Override public boolean shouldSkip(Throwable t, long skipCount) throws SkipLimitExceededException { //定制 跳過異常 與 跳過異常次數 return false; } }) .build(); } ``` ### 6.4.2 異常紀錄日誌 * 當**ItemReader**讀取數據拋出異常時,**將具體信息紀錄下來**,方便後續人工介入 * **方式**:採用**IteamReader監聽器** * 實現 **ItemReadListener\<T>** 接口(`繼承StepListener`) ```java= public class ErrorItemReaderListener implements ItemReadListener<User> { @Override public void beforeRead() { ItemReadListener.super.beforeRead(); } @Override public void afterRead(User item) { ItemReadListener.super.afterRead(item); } @Override public void onReadError(Exception ex) { ItemReadListener.super.onReadError(ex); } } ``` * **配置**:**在Step中放入ItemReadListener**(`一個Step可以插入多個繼承於StepListener的監聽器`) # 7. ItemProcessor * 使用**ItemReader**將數據取出後,有**兩種處理方式**: * **直接將數據向後傳輸** * **對讀入的數據進行加工** * 需要使用**ItemProcessor**進行處理,提供**默認的ItemProcessor實現類**與**自定義ItemProcessor** ## 7.1 默認的ItemProcessor * **SpringBatch**提供默認**ItemProcessor的實現類**有**四個**: * **ValidatingItemProcessor**(`較驗處理器`):數據較驗,過濾不符合條件的數據 * **ItemProcessorAdapter**(`適配器處理器`):數據轉換 * **ScriptItemProcessor**(`腳本處理器`) * **CompositeItemProcessor**(`組合處理器`) * **前置代碼**: * **ItemReader**: ```java= @Bean("getUserByTxt") public FlatFileItemReader<User> getUserByTxt() { return new FlatFileItemReaderBuilder<User>() //獲取文件 .name("user-item-reader") .resource(new ClassPathResource("user-vail.txt")) //解析數據-指定解析器使用『#』分割,默認是『,』 .delimited().delimiter("#") //分割後的字串Array,每個Column的命名,需要與User的屬性名對應 .names("id", "name", "age") //封裝數據 //自動封裝 => 將讀取的數據,封裝到對象中 .targetType(User.class) .build(); } ``` * **Step + Job 配置**: ```java= @Bean("getUserTxtFileStep") public Step getUserTxtFileStep() { return new StepBuilder("getUserTxtFile", jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象 .reader(getUserByTxt()) .processor( ????? ) .writer(new UserItemWriter()) .build(); } @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() throws Exception { return new JobBuilder("getUserTxtFileJob", jobRepository) .start(getUserTxtFileStep()) .build(); } ``` ### 7.1.1 ValidatingItemProcessor 較驗處理器 * **前置** * **測試數據**: ```txt= 1##18 2##16 3#a1#20 4#akjfl#19 5#a387s4d#15 ``` * **對像**: ```java= @Data public class User { private Long id; @NotBlank(message = "Name is Blank") private String name; private Integer age; } ``` * **導入校驗包**:**spring-boot-starter-validation** ```xml= <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> ``` * **代碼**: * 使用**BeanValidatingItemProcessor**(`ValidatingItemProcessor的子類`)設定較驗器 ```java= @Bean("UserBeanValidatingItemProcessor") public ValidatingItemProcessor<User> getUserValidatingItemProcessor() { ValidatingItemProcessor<User> objectBeanValidatingItemProcessor = new BeanValidatingItemProcessor<>(); //true:不滿足條件的數據直接拋棄 objectBeanValidatingItemProcessor.setFilter(true); return objectBeanValidatingItemProcessor; } ``` ### 7.1.2 ItemProcessorAdapter 適配器處理器 * **前置** * **測試數據**: ```txt= 1#adsda#18 2#dsad#16 3#a1#20 4#akjfl#19 5#a387s4d#15 ``` * **對像**: ```java= @Data public class User { private Long id; private String name; private Integer age; } ``` * **代碼**:**將name字段轉換成大寫** * **name字段轉換成大寫**的usecase ```java= public class UserNameToUpperCaseUseCase { public User execute(User user) { user.setName(user.getName().toUpperCase()); return user; } } ``` * **使用ItemProcessorAdapter** ```java= //ItemProcessor => ItemProcessorAdapter 數據轉換 //目標將Name字段的值,轉換成全大寫(使用現承的邏輯) @Bean("UserItemProcessorAdapter") public ItemProcessorAdapter<User, User> getItemProcessorAdapter() { ItemProcessorAdapter<User, User> itemProcessorAdapter = new ItemProcessorAdapter<>(); //執行的對象 itemProcessorAdapter.setTargetObject(getUserNameToUpperCaseUseCase()); //設定 執行的對象中要被調用的方法名 itemProcessorAdapter.setTargetMethod("execute"); return itemProcessorAdapter; } @Bean("UserNameToUpperCaseUseCase") public UserNameToUpperCaseUseCase getUserNameToUpperCaseUseCase() { return new UserNameToUpperCaseUseCase(); } ``` ### 7.1.3 ScriptItemProcessor 腳本處理器 * **作用**:使用 **.js腳本**來定義**處理邏輯** * **注意**: * **item**是`約定的變數`,表示ItemReader讀取每個數據 * **放置位置**:.js文件,放置在**resource目錄下** * **其他**: * GraalVM的JS處理器無效(`使用其他的JVM可能有效`) * **代碼**:**使用.js腳本將name字段轉換成大寫** * **使用.js腳本將name字段轉換成大寫**:**item為約定** ```js= item.setName(item.getName().toUpperCase()); item; ``` * **使用ScriptItemProcessor** ```java= //ItemProcessor => ScriptItemProcessor 數據轉換 => 使用.js腳本來處理 @Bean("UserScriptItemProcessor") public ScriptItemProcessor<User, User> getUserScriptItemProcessor() { ScriptItemProcessor<User, User> userUserScriptItemProcessor = new ScriptItemProcessor<>(); userUserScriptItemProcessor.setScript(new ClassPathResource("user_touppercase.js")); return userUserScriptItemProcessor; } ``` ### 7.1.4 CompositeItemProcessor 組合處理器 * **作用**:**組合多個ItemProcessor** * 類似**過濾鏈** * `前一個ItemProcessor處理完後,交由下一個ItemProcessor處理` ![CompositeItemProcessor.drawio](https://hackmd.io/_uploads/Hky_UFZAA.png) * **前置** * **測試數據**: ```txt= 1##18 2##16 3#a1#20 4#akjfl#19 5#a387s4d#15 ``` * **對像**: ```java= @Data public class User { private Long id; @NotBlank(message = "Name is Blank") private String name; private Integer age; } ``` * **代碼**:**過濾name為null,再將name轉成大寫** * **過濾name為null**(`User類的屬性要貼上validation的annotation`) ```java= @Bean("UserBeanValidatingItemProcessor") public ValidatingItemProcessor<User> getUserValidatingItemProcessor() { ValidatingItemProcessor<User> objectBeanValidatingItemProcessor = new BeanValidatingItemProcessor<>(); //true:不滿足條件的數據直接拋棄 objectBeanValidatingItemProcessor.setFilter(true); return objectBeanValidatingItemProcessor; } ``` * **將name轉成大寫** ```java= @Bean("UserNameToUpperCaseUseCase") public UserNameToUpperCaseUseCase getUserNameToUpperCaseUseCase() { return new UserNameToUpperCaseUseCase(); } @Bean("UserItemProcessorAdapter") public ItemProcessorAdapter<User, User> getItemProcessorAdapter() { ItemProcessorAdapter<User, User> itemProcessorAdapter = new ItemProcessorAdapter<>(); //執行的對象 itemProcessorAdapter.setTargetObject(getUserNameToUpperCaseUseCase()); //設定 執行的對象中要被調用的方法名 itemProcessorAdapter.setTargetMethod("execute"); return itemProcessorAdapter; } ``` * **執行的對象** ```java= public class UserNameToUpperCaseUseCase { public User execute(User user) { user.setName(user.getName().toUpperCase()); return user; } } ``` * 使用 **CompositeItemProcessor** `組合上述兩個ItemProcessor` ```java= //ItemProcessor => CompositeItemProcessor 組合 UserBeanValidatingItemProcessor、UserItemProcessorAdapter @Bean("UserCompositeItemProcessor") public CompositeItemProcessor<User, User> getCompositeItemProcessor() { return new CompositeItemProcessorBuilder<User, User>() .delegates(List.of(getUserValidatingItemProcessor(), getItemProcessorAdapter()) ) .build(); } ``` ## 7.2 自定義ItemProcessor處理器(常用) * **方式**:實現 **ItemProcessor\<I,O>** 接口 * **案例**:**過濾出id為偶數** * **前置** * **測試數據**: ```txt= 1##18 2##16 3#a1#20 4#akjfl#19 5#a387s4d#15 ``` * **對像**: ```java= @Data public class User { private Long id; private String name; private Integer age; } ``` * **代碼**: * 實現**ItemProcessor\<I,O>接口** ```java= public class UsetItemProcessor implements ItemProcessor<User, User> { @Override public User process(User item) throws Exception { if (item.getId() % 2 == 0) { return item; } return null; } } ``` * **配置方式照舊** # 8. ItemWriter * **前置** * **測試數據**:user.txt ```txt= 1#afsa#18 2#ffa#16 3#a1#20 4#akjfl#19 5#a387s4d#15 ``` * **對像**: ```java= @Data public class User { private Long id; private String name; private Integer age; } ``` ## 8.1 輸出純文件 * 使用**FlatFileItemWriter\<T>** * **代碼**: * 使用**FlatFileItemWriter\<T>**,輸出純文件 ```java= @Bean("UserFlatFileItemWriter") public FlatFileItemWriter<User> getUserFlatFileItemWriter() { return new FlatFileItemWriterBuilder<User>() .name("UserFlatFileItemWriter") //1. 輸出位置:當前專案下 .resource(new FileSystemResource("./user-out.txt")) //2. 要進行格式輸出 .formatted() //2.1 輸出格式設定 .format("id: %s, name: %s, age: %s") //2.2 與%s對應的User類的 屬性名 .names("id","name","age") //X. 如果輸入的數據為空,則刪除文件 .shouldDeleteIfEmpty(true) //X. 如果文件已存在,則刪除文件 .shouldDeleteIfExists(true) //X. 如果文件為存在,不刪除文件,追加數據到現有文件中 .append(true) .build(); } ``` * **其他**: * shouldDeleteIfEmpty(true):如果輸入的數據為空,則刪除文件 * shouldDeleteIfExists(true):如果文件已存在,則刪除文件 * append(true):如果文件為存在,不刪除文件,追加數據到現有文件中 ## 8.2 輸出Json文件 * 使用**JsonFileItemWriter\<T>** * **案例**:將數據輸出到 **.json文件** * **代碼**: * 使用**JsonFileItemWriter\<T>** ```java= @Bean("UserJsonFileItemWriter") public JsonFileItemWriter<User> getUserJsonFileItemWriter() { JacksonJsonObjectMarshaller<User> jsonObjectMarshaller = new JacksonJsonObjectMarshaller<>(); return new JsonFileItemWriterBuilder<User>() .name("UserJsonFileItemWriter") //1. 輸出位置:當前專案下 .resource(new FileSystemResource("./user-out.json")) //2. 設定json對象的調度器,將User對象轉換成 .json .jsonObjectMarshaller(jsonObjectMarshaller) .build(); } ``` ## 8.3 輸出DB * 使用**JdbcBatchItemWriter\<T>** * **案例**:將讀取.txt檔後,將數據輸出到 **DB** * **代碼**: * 使用**JdbcBatchItemWriter\<T>** ```java= @Bean("UserJdbcBatchItemWriter") public JdbcBatchItemWriter<User> getUserJdbcBatchItemWriter() { return new JdbcBatchItemWriterBuilder<User>() //1. 數據源 .dataSource(dataSource) //2. insert的sql .sql("INSERT INTO public.\"user\" (id, \"name\", age) VALUES(?, ?, ?);") //3. 設定preparedStatement,實作 ItemPreparedStatementSetter<User>接口 .itemPreparedStatementSetter((user, ps) -> { ps.setLong(1, user.getId()); ps.setString(2, user.getName()); ps.setInt(3, user.getAge()); }) .build(); } ``` 1. **設定數據源** 2. insert的SQL,另外使用 **佔位符『?』** 來標記要設置的參數 3. **設定參數**:實現**ItemPreparedStatementSetter\<T>** 接口 ## 8.4 輸出多終端 * **作用**:**同時將數據輸出**到`DB`、`json文件`、`純文件` * 使用**CompositeItemWriter\<T>** * **代碼**:將數據輸出至`DB`、`user-out.txt`、`user-out.json` * 使用**CompositeItemWriter\<T>** ```java= // CompositeItemWriter 組合多個輸出,同時輸出至多個目標 @Bean public CompositeItemWriter<User> getUserCompositeItemWriter() { return new CompositeItemWriterBuilder<User>() //整合多個ItemWriter .delegates(getUserJdbcBatchItemWriter(), getUserFlatFileItemWriter(), getUserJsonFileItemWriter()) .build(); } ``` * **整合上述三個ItemWriter**: * **getUserJdbcBatchItemWriter()** * **getUserFlatFileItemWriter()** * **getUserJsonFileItemWriter()** # 9. SpringBatch 高級 * 多線程 Step * 一份文檔 * 並行 Step * 分區 Step(`處理海量數據集`) * 多份文檔 * DB數據 ## 9.1 多線程 Step * 默認為單線程執行 * 多線程環境下:**Step**要**設定為不可以重啟**(`規定`) * 使用**Spring**的**TaskExecutor**實現,約定`每一個chunk開啟一個獨立線程執行` * **chunkSize設定為1**(`一次處理一筆數據`),如果有10筆數據,就啟動10個 * 前置: * 數據 ```txt= 1#afsa#18 2#ffa#16 3#a1#20 4#akjfl#19 5#a387s4d#15 6#afsa#18 7#ffa#16 8#a1#20 9#akjfl#19 10#a387s4d#15 ``` * **對象** ```java= @Data public class User { private Long id; private String name; private Integer age; private String address; } ``` * 代碼:讀取數據並顯示,多線程執行 * **讀取純文件** ```java= @Bean("getUserByTxt") public FlatFileItemReader<User> getUserByTxt() { return new FlatFileItemReaderBuilder<User>() //獲取文件 .name("user-item-reader") .resource(new ClassPathResource("user.txt")) //解析數據-指定解析器使用『#』分割,默認是『,』 .delimited().delimiter("#") //分割後的字串Array,每個Column的命名,需要與User的屬性名對應 .names("id", "name", "age") //封裝數據 //自動封裝 => 將讀取的數據,封裝到對象中 .targetType(User.class) //防止狀態被覆蓋 .saveState(false) .build(); } ``` * **.saveState(false)**:表示**關閉狀態**,不能重啟 * **ItemReader**大部分都會`提供狀態`,job的重啟**通過狀態來確認job停止的位置** * 在**多線程的環境**中,該**Step被多個線程訪問**,**可能存在Step狀態『相互覆蓋』的問題** * **設定Step + Job** ```java= @Bean("getUserTxtFileStep") public Step getUserTxtFileStep() { return new StepBuilder("getUserTxtFile", jobRepository) .<User, User>chunk(1, jdbcTransactionManager)//一次讀一個對象\處理一筆數據 .reader(getUserByTxt()) // .processor(new UsetItemProcessor()) .writer(chunk -> System.out.println(chunk.getItems())) .taskExecutor(new SimpleAsyncTaskExecutor())//開啟多線程操作 .build(); } @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() throws Exception { return new JobBuilder("getUserTxtFileJob", jobRepository) .start(getUserTxtFileStep()) .build(); } ``` * Step**開啟多線程操作**:.**taskExecutor**(new SimpleAsyncTaskExecutor()) ## 9.2 並行 Step * **並行Step**:**兩個**或**多個**不同的Step**同時執行** ```graphviz digraph G { rankdir=LR; node [shape=box]; A[label = "Step 1"] B[label = "Step 2"] C[label = "Step 3"] D[label = "Step 4"] A -> B A -> C B -> D C -> D } ``` * Step1執行完後,然後同時執行Step2\Step3,都結束後在執行Step4 * **適用場景**:讀取`2個`或`多個`不相關聯的文件(`Step2\Step3 讀取文件`),**多個文件同時讀取** * **作法**:在Job中,使用**Flow**整合多個Step * **前置**: * **數據**:user-1.txt、user-2.josn ```txt= 1#afsa#18 2#ffa#16 3#a1#20 4#akjfl#19 5#a387s4d#15 ``` ```json= [ {"id":11,"name":"21323","age":22}, {"id":12,"name":"asadasa41","age":78}, {"id":13,"name":"21as3adas23","age":14}, {"id":14,"name":"90hsdak23","age":129}, {"id":15,"name":"sdaad21323","age":89} ] ``` * **對象** ```java= @Data public class User { private Long id; private String name; private Integer age; private String address; } ``` * **代碼**:讀取user-1.txt、user-2.josn * **Step2** => 讀取user-1.txt ```java= //ItemReader-純文件 @Bean("getUserByTxt") public FlatFileItemReader<User> getUserByTxt() { return new FlatFileItemReaderBuilder<User>() //獲取文件 .name("user-item-reader") .resource(new ClassPathResource("user.txt")) //解析數據-指定解析器使用『#』分割,默認是『,』 .delimited().delimiter("#") //分割後的字串Array,每個Column的命名,需要與User的屬性名對應 .names("id", "name", "age") //封裝數據 //自動封裝 => 將讀取的數據,封裝到對象中 .targetType(User.class) .build(); } ``` * **Step3** => 讀取user-2.josn ```java= @Bean("UserItemReaderJson") public JsonItemReader<User> getUserItemReaderJson() { JacksonJsonObjectReader<User> jsonObjectReader = new JacksonJsonObjectReader<>(User.class); jsonObjectReader.setMapper(new ObjectMapper()); return new JsonItemReaderBuilder<User>() .name("UserItemReaderJson") //獲取數據 .resource(new ClassPathResource("user.json")) //解析+封裝數據 .jsonObjectReader(jsonObjectReader) .build(); } ``` * **設定Step2、Step3** ```java= @Bean("getUserTxtFileStep2") public Step getUserTxtFileStep2() { return new StepBuilder("getUserTxtFileStep2", jobRepository) .<User, User>chunk(2, jdbcTransactionManager)//一次讀一個對象\處理一筆數據 .reader(getUserByTxt()) .writer(chunk -> System.out.println(chunk.getItems())) .build(); } @Bean("getUserJsonFileStep3") public Step getUserJsonFileStep3() { return new StepBuilder("getUserJsonFileStep3", jobRepository) .<User, User>chunk(2, jdbcTransactionManager)//一次讀一個對象\處理一筆數據 .reader(getUserItemReaderJson()) .writer(chunk -> System.out.println(chunk.getItems())) .build(); } ``` * **設定Job**:使用Flow組合Step ```java= //合併Step 並行Step @Bean("getUserTxtFileJob") public Job getUserTxtFileJob() throws Exception { //並行1 讀取.txt Flow flow2 = new FlowBuilder<Flow>("Flow2") .start(getUserTxtFileStep2()).build(); //並行2 讀取.json Flow flow3 = new FlowBuilder<Flow>("Flow2") .start(getUserJsonFileStep3()) .split(new SimpleAsyncTaskExecutor())//開啟線程 執行Step .add(flow2)//添加並行的Step,同時啟動 .build(); return new JobBuilder("getUserTxtFileJob", jobRepository) .start(flow3) .end() .build(); } ``` * **Flow**的 **.split()**:隔開兩個Flow,表示線程池開啟兩個線程,分別執行Step2\Stpe3 ## 9.3 分區 Step * 概念:將Step**區分**出**上下級** * **上級**(`Master Step`):主Step * **領導、管理多個從Step** * **下級**(`Work Step`):從Step、工作Step * **執行任務** * 是一個完整的Step,負責`讀`、`處理`、`寫` * **分區Step的結構圖** ![分區Step的結構圖](https://hackmd.io/_uploads/SkWdid8C0.png) * **場景**:**海量數據處理,分治思想** * **主Step**:將數據**劃分**成`多個小的數據集`,再**開啟多個從Step** * **從Step**:負責**小數據集**的 * 當**所有從Step結束**時,**整個作業流程才結束** ### 9.3.1 主Step - 分區器 * **主Step的核心部件** * **作用**: * **數據分區**:將完整數據**拆解**成多個數據集 * **指派給從Step執行** * **數據拆分規則**:實現**Partitioner**接口 * **源碼**: ```java= @FunctionalInterface public interface Partitioner { Map<String, ExecutionContext> partition(int gridSize); } ``` * **參數**: * **gridSize**:**分區大小** => 要**開啟**幾個 **從Step** * **返回值**:**Map** * **key**:**從Step的名稱** * **value**:該 **從Step** 的 **元數據** (`起始位置、數據量`) * **默認實現類**:**MultiResourcePartitioner** ### 9.3.2 主Step - 分區處理器 * **作用**:統一**管理**從Step,並給從Step**指派任務** * **管理規則**:實現**PartitionHandler**接口 * **默認實現類**:**TaskExecutorPartitionHandler** ### 9.3.3 案例:將多個文件讀入內存 * **流程**: 1. Job 2. 主Step 3. **分區處理器** * **指導『分區器』進行從Step的創建**(`核心`),將下列訊息灌至**Step執行上下文** 1. 從Step的`名稱` 2. `要處理的數據文件` 3. `如何處理`數據文件 4. 所有**從Step**執行 1. reader 2. writer * **前置**: * **數據** * user1-10.txt * user11-20.txt * user21-30.txt * user31-40.txt * user41-50.txt * **對象** ```java= @Data public class User { private Long id; private String name; private Integer age; private String address; } ``` #### 9.3.3.1 從Step * **定義ItemReader、Step** * **重點**:resource會從**Step執行上下文**中獲取 * 此數據會在分區器中設定 * 多個從Step**共用同一個ItemReader組件** * **代碼**: * **ItemReader** ```java= @Bean("getUserByTxtPartitioner") @StepScope//延遲加載 //多個從Step共用同一個ItemReader組件,不能寫死操作的文件資源,使用變量的方式定義 //@Value("#{stepExecutionContext['file']}") 從Step的執行上下文獲取要讀的資源文件 //實現前提:Step的執行上下文必須要有值,在主Step中的分區器設定 public FlatFileItemReader<User> getUserByTxtPartitioner(@Value("#{stepExecutionContext['file']}") Resource resource) { return new FlatFileItemReaderBuilder<User>() .name("itemReader-partition") //需要動態獲取文件 .resource(resource) //解析數據-指定解析器使用『#』分割,默認是『,』 .delimited().delimiter("#") //分割後的字串Array,每個Column的命名,需要與User的屬性名對應 .names("id", "name", "age") //封裝數據 //自動封裝 => 將讀取的數據,封裝到對象中 .targetType(User.class) .build(); } ``` * 從**Step執行上下文**獲取**file**(`Value("#{stepExecutionContext['file']}")`) * **Step** ```java= @Bean("getUserWorkStep") public Step getUserWorkStep() { return new StepBuilder("getUserWorkStep", jobRepository) .<User, User>chunk(10, jdbcTransactionManager)//一次讀10個對象\處理10筆數據 .reader(getUserByTxtPartitioner(null)) .writer(chunk -> chunk.getItems().forEach(System.out::println)) .build(); } ``` #### 9.3.3.2 分區器 * **目標**:將**資源路徑的訊息**,放入到**Step執行上下文** * `Value("#{stepExecutionContext['file']}")` * **代碼**: * **自定一個分區器**,實作**Partitioner**接口 ```java= public class UserPartitioner implements Partitioner { int begin = 1; int end = 10; int range = 10; //key 從Step名稱 //value 從Step上下文環境 //從Step1 => 定義要處理的文件路徑=> file: user1-10.txt //從Step2 => 定義要處理的文件路徑=> file: user11-20.txt @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> map = new HashMap<>(); for (int i = 0; i < gridSize; i++) { ExecutionContext executionContext = new ExecutionContext(); Resource classPathResource = new ClassPathResource("./user" + begin + "-" + end + ".txt"); try { //SpringBatch中ExecutionContext中的值為物件,是會報錯的的,改用String的形式,Spring會自動的加載 executionContext.put("file", classPathResource.getURL().toExternalForm()); } catch (IOException e) { e.printStackTrace(); } begin += range; end += range; map.put("user_partition_" + i, executionContext); } return map; } } ``` * **ExecutionContext**中的**Value**不可以是物件,改用**字符串的路徑**(**Spring會在自動轉換成Resource**) * **定義Bean** ```java= @Bean public UserPartitioner getUserPartitioner() { return new UserPartitioner(); } ``` #### 9.3.3.3 分區處理器 * 使用 **TaskExecutorPartitionHandler** ```java= @Bean public PartitionHandler getPartitionHandler() { TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler(); //1. 設定GridSize,指定要創建幾個從Step taskExecutorPartitionHandler.setGridSize(5); //2. 一個從Step是一個獨立的線程 taskExecutorPartitionHandler.setTaskExecutor(new SyncTaskExecutor()); //3. 關聯 從Step taskExecutorPartitionHandler.setStep(getUserWorkStep()); //4.判斷Step是否是null(可選) try { taskExecutorPartitionHandler.afterPropertiesSet(); } catch (Exception e) { throw new RuntimeException(e); } return taskExecutorPartitionHandler; } ``` #### 9.3.3.4 主Step * **組合 分區處理器、分區器** * **代碼**: * **設定Step+Job** ```java= //組合 分區處理器、分區器 @Bean("getPartitionStep") public Step getPartitionStep() { return new StepBuilder("masterGetPartitionStep", jobRepository) .partitioner(getUserWorkStep().getName(), getUserPartitioner())//分區器 .partitionHandler(getPartitionHandler())//分區處理器 .build(); } @Bean("getPartitionJob") public Job getPartitionJob() { return new JobBuilder("getPartitionJob", jobRepository) .start(getPartitionStep()) .build(); } ``` ###### tags:`SpringBatch`