Spring Batch ### 簡介 在企業領域中,許多應用程式需要進行大量處理,以在關鍵任務環境中執行業務操作。這些業務操作包括: 1. 自動化、複雜的大量資訊處理,通常在無需使用者介入的情況下進行最有效率的處理。這些操作通常包括基於時間的事件(例如月結計算、通知或函件等)。 2. 定期應用於非常大的數據集的複雜業務規則,這些業務規則需要重複處理。例如,這可能包括保險福利確定或利率調整等任務。 3. 整合來自內部和外部系統的資訊,通常需要格式化、驗證並以交易方式處理到記錄系統中。批次處理被用於每天為企業處理數十億筆交易。 4. Spring Batch在保留了人們對Spring Framework的期望特性(高效率、基於POJO的開發方法和簡便易用)的基礎上,使開發人員可以在必要時輕鬆訪問和使用更高級的企業服務。Spring Batch並不是一個排程框架。在商業和開源領域都有許多優秀的企業排程器(例如Quartz、Tivoli、Control-M等)可供使用。Spring Batch旨在與排程器配合使用,而不是取代排程器。 5. Spring Batch提供了可重複使用的功能,這在處理大量記錄時非常重要,包括日誌追蹤、交易管理、作業處理統計、作業重啟、略過以及資源管理等。它還提供了更高級的技術服務和功能,通過優化和分區技術實現了極高容量和高性能的批次作業。我們可以在簡單的用例中使用Spring Batch(例如將文件讀取到數據庫中或運行存儲過程),也可以在複雜、高容量的情況下使用(例如在不同數據庫之間移動大量數據,進行轉換等等)。高容量的批次作業可以以高度可擴展的方式使用這個框架來處理大量信息。 ### 使用情境: 一般情況下,一個批次程式通常會執行以下操作: 1. 從資料庫、檔案或佇列中讀取大量記錄。 2. 以某種方式處理數據。 3. 將處理後的數據以修改後的形式寫回。 Spring Batch 自動化了這種基本的批次迭代,提供了處理類似交易集的能力,通常在離線環境中進行,無需任何使用者交互使用。 業務情境: Spring Batch 支援以下業務情境: 1. 定期提交批次處理作業。 2. 同時進行批次處理:對作業進行並行處理。 3. 分階段的企業消息驅動處理。 4. 大規模並行批次處理。 5. 失敗後手動或定期重新啟動。 6. 依賴步驟的順序處理(帶有工作流擴展)。 7. 部分處理:跳過記錄(例如在回滾時)。 8. 整批交易,用於批次大小較小或已存在存儲過程或腳本的情況。 ### Spring Batch 架構 Spring Batch 設計時考慮了可擴展性以及各種不同的最終用戶。以下圖像顯示了支持最終用戶開發者的可擴展性和易用性的分層架構。 ![](https://hackmd.io/_uploads/SyUubi6lp.png) 這種分層架構突顯了三個主要的高層級組件:應用程序、核心和基礎設施。Application包含了所有由使用Spring Batch的開發人員編寫的批處理作業和自定義代碼。Batch core包含了啟動和控制批處理作業所需的核心運行時類。它包括了JobLauncher、Job和Step的實現。Application和core都建立在一個共同的基礎設施之上。這個基礎設施包含了通用的readers和writers,以及services(如RetryTemplate),這些服務既被應用程序開發人員(如ItemReader和ItemWriter等讀取器和寫入器)使用,也被核心框架本身(retry,它是自己的Library)使用。 ### 一般批次處理原則與指南 在建立批次解決方案時,應考慮以下關鍵原則、指南和一般考慮事項。 請記住,批次架構通常會影響on-line架構,反之亦然。在可能的情況下,使用共同的基礎設施元件來考慮這兩種架構和環境。 盡量簡化,避免在單個批次應用程序中構建複雜的邏輯結構。 保持數據的處理和存儲在物理上緊密相連(換句話說,將數據保留在處理發生的地方)。 最小化系統資源使用,尤其是I/O。盡可能在內部內存中執行盡可能多的操作。 審查應用程序的I/O(分析SQL語句),以確保避免不必要的物理I/O。特別是需要查找以下四個常見缺陷: 1. 在每筆交易中讀取數據,而數據可以在第一次讀取後被緩存或保留在工作存儲中。 2. 對於在同一事務中早些時候讀取的數據再次進行讀取。 3. 導致不必要的表格或索引掃描。 4. 在SQL語句的WHERE子句中未指定關鍵值。 在批次運行中不要做相同的事情兩次。例如,如果我們需要為報告目的進行數據總結,我們應該(如果可能)在初始處理數據時增加存儲的總數,以便您的報告應用程序不必重新處理相同的數據。 在批次應用程序的開始時分配足夠的內存,以避免在過程中進行耗時的重新分配。 始終假設關於數據完整性的最壞情況。插入足夠的檢查和記錄驗證來維護數據的完整性。 在可能的情況下實施內部驗證的校驗和校驗和核對。例如,平面文件應該具有包含文件中的記錄總數和關鍵字段的聚合的尾部記錄。 在類似生產環境中,盡早計劃並執行壓力測試,並使用現實數據量。 在大型批次系統中,備份可能具有挑戰性,特別是如果系統在24小時7天不間斷地與在線應用程序同時運行。通常在線設計中會妥善處理數據庫備份,但應該同樣重視文件備份。如果系統依賴於平面文件,則應該建立並記錄文件備份程序,並定期進行測試。 批次處理策略 為了幫助設計和實施批次系統,應以樣本結構圖和代碼模板的形式提供基本批次應用程序構建塊和模式給設計師和撰寫人員。在開始設計批次作業時,應將業務邏輯分解為一系列步驟,這些步驟可以使用以下標準構建塊來實現: 1. 轉換應用程序:為來自外部系統提供或生成的每種類型的文件,必須創建一個轉換應用程序,將提供的交易記錄轉換為處理所需的標準格式。這種類型的批次應用程序可以部分或完全由翻譯實用程序模塊組成(參見基本批次服務)。 2. 驗證應用程序:驗證應用程序確保所有輸入和輸出記錄都是正確和一致的。驗證通常基於文件標頭和尾部、校驗和驗證算法以及記錄級的交叉檢查。 3. 提取應用程序:提取應用程序從數據庫或輸入文件中讀取一組記錄,根據預定義的規則選擇記錄,並將記錄寫入輸出文件。 4. 提取/更新應用程序:提取/更新應用程序從數據庫或輸入文件中讀取記錄,並根據每個輸入記錄中找到的數據對數據庫或輸出文件進行更改。 5. 處理和更新應用程序:處理和更新應用程序對來自提取或驗證應用程序的輸入交易進行處理。處理通常包括從數據庫中讀取以獲得處理所需的數據,可能更新數據庫並為輸出處理創建記錄。 6. 輸出/格式應用程序:輸出/格式應用程序讀取輸入文件,根據標準格式重組此記錄中的數據,並生成用於列印或傳輸到另一個程序或系統的輸出文件。 此外,還應提供一個基本的應用程式框架,用於無法使用前述構建塊來構建的業務邏輯。 除了主要的構建塊之外,每個應用程式還可以使用一個或多個標準的實用程序步驟,例如: 1. 排序:讀取輸入文件並生成輸出文件,其中記錄根據記錄中的排序鍵字段重新排序。排序通常由標準系統實用程序執行。 2. 分割:讀取單個輸入文件並根據字段值將每個記錄寫入多個輸出文件中。分割可以通過定制或通過參數驅動的標準系統實用程序執行。 3. 合併:從多個輸入文件中讀取記錄並生成一個包含來自輸入文件的組合數據的輸出文件。合併可以通過定制或通過參數驅動的標準系統實用程序執行。 批次應用程序還可以根據其輸入來源進行分類: 1. 基於數據庫的應用程序由從數據庫檢索的行或值驅動。 2. 基於文件的應用程序由從文件中檢索的記錄或值驅動。 3. 基於消息的應用程序由從消息隊列檢索的消息驅動。 任何批次系統的基礎都是處理策略。影響策略選擇的因素包括:預估的批次系統容量、與在線系統或其他批次系統的同時性、可用的批次。 批次的典型處理選項(按實施複雜度遞增順序)包括: 1. 在離線模式下在批次視窗內進行正常處理。 2. 同時進行批次處理或在線處理。 3. 同時處理許多不同的批次運行或作業。 4. 分割(同時處理相同作業的許多實例)。 5. 前述選項的組合。 以下是這些選項可能由商業排程器支援: 本節的其餘部分將更詳細地討論這些處理選項。請注意,一般來說,批次處理所採用的提交和鎖定策略取決於所執行的處理類型,而在線鎖定策略也應該使用相同的原則。因此,在設計整體架構時,批次架構不能僅僅是一個事後的想法。 鎖定策略可以是僅使用常規數據庫鎖定,或在架構中實施額外的自定義鎖定服務。鎖定服務將跟蹤數據庫鎖定(例如,通過在專用數據庫表中存儲必要的信息)並給予或拒絕對請求數據庫操作的應用程序的權限。在鎖定情況下,也可以通過此架構實施重試邏輯,以避免在發生鎖定情況時中止批次作業。 1. 在批次視窗中進行正常處理 對於在單獨的批次視窗中運行的簡單批次過程,在線用戶或其他批次過程不需要的數據更新,並不需要併發,可以在批次運行結束時進行單次提交。 在大多數情況下,一種更健壯的方法更為適用。請記住,隨著時間的推移,無論在複雜性還是處理的數據量方面,批次系統都有增長的趨勢。如果沒有設置鎖定策略,系統仍依賴於單個提交點,修改批次程序可能會很痛苦。因此,即使在最簡單的批次系統中,也要考慮需要提交邏輯以進行重啟恢復選項,以及本節後面描述的更複雜情況的相關信息。 2. 同時進行批次或在線處理 處理可同時由在線用戶進行更新的數據的批次應用程序不應鎖定可能被在線用戶需要超過幾秒鐘的數據(無論是在數據庫中還是在文件中)。此外,應在每幾筆交易後將更新提交到數據庫中。這樣做可以最小化對其他進程不可用的數據部分和數據不可用的過去時間。 最小化物理鎖定的另一個選擇是實施具有樂觀鎖定模式或悲觀鎖定模式的邏輯行級鎖定。 樂觀鎖定假設記錄爭用的可能性很低。通常意味著在批次和在線處理中都同時使用的每個數據庫表中插入一個時間戳記列。當應用程序提取要進行處理的行時,它還會提取時間戳記。然後,當應用程序嘗試更新處理的行時,更新將在WHERE子句中使用原始時間戳記。如果時間戳記匹配,則更新數據和時間戳記。如果時間戳記不匹配,則表示在提取和更新嘗試之間另一個應用。 ![](https://hackmd.io/_uploads/Skb72s6x6.png) 架構應具有足夠的靈活性,以允許動態配置分區的數量。您應該考慮自動配置和用戶控制的配置。自動配置可以基於諸如輸入文件大小和輸入記錄數之類的參數進行。 4.1 分區方法 選擇分區方法必須根據具體情況進行。以下列表描述了一些可能的分區方法: 1. 固定且均勻分割記錄集 這涉及將輸入記錄集分成偶數部分(例如,10部分,每個部分正好包含整個記錄集的1/10)。然後,將由批次/提取應用程序的一個實例處理每個部分。 要使用此方法,需要預處理來拆分記錄集。此拆分的結果是一個下限和上限放置號,您可以將其用作批次/提取應用程序的輸入,以將其處理限制為僅處理其部分。 預處理可能是一個很大的開銷,因為它必須計算並確定記錄集的每個部分的邊界。 2. 按關鍵列分割 這涉及通過關鍵列(例如位置代碼)將輸入記錄集分割並將來自每個關鍵的數據分配給批次實例。為了實現這一點,列值可以是: - 通過分區表(稍後在本節中描述)為批次實例分配。 - 通過值的一部分(例如0000-0999,1000-1999等)為批次實例分配。 在選項1下,添加新值意味著必須手動重新配置批次或提取,以確保將新值添加到特定實例中。 在選項2下,這確保了批次作業的每個實例都處理所有值。但是,由一個實例處理的值的數量取決於列值的分佈(在0000-0999範圍內可能有大量位置,而在1000-1999範圍內可能只有少量)。在這種選項下,應該以分區為考慮來設計數據範圍。 在兩種選項下,無法實現將記錄均勻分配到批次實例的最佳方式。不能實現批次實例數量的動態配置。 3. 按視圖分割 這種方法基本上是按數據庫級別的關鍵列分割。它涉及將記錄集分成視圖。這些視圖在每個批次應用程序的實例在其處理過程中使用。分割是通過對數據進行分組來完成的。 使用此選項,必須配置批次應用程序的每個實例以命中特定視圖(而不是主表)。此外,隨著新數據值的添加,必須將此新數據組包含到視圖中。沒有動態配置能力,因為實例數量的變化會導致視圖的變化。 4. 新增處理指示器 這涉及在輸入表格中新增一個作為指示器的新列。作為預處理步驟,將所有指示器標記為未處理。在批次應用程序的記錄擷取階段,只有在個別記錄被標記為未處理的情況下才會讀取這些記錄,一旦讀取(並鎖定),它就會被標記為正在處理中。當該記錄完成時,指示器會被更新為完成或出錯。您可以啟動許多批次應用程序的實例而無需進行更改,因為額外的列確保了記錄只被處理一次。 使用此選項,表格的I/O會動態增加。在更新批次應用程序的情況下,此影響會減小,因為無論如何都必須進行寫入。 5. 將表格提取為平面文件 這種方法涉及將表格提取為一個平面文件。然後可以將該文件拆分為多個段,並將其用作批次實例的輸入。 使用此選項,將表格提取到文件並將其拆分的額外開銷可能會抵消多重分區的效果。可以通過更改文件拆分腳本來實現動態配置。 6. 使用哈希列 此方案涉及向用於檢索驅動記錄的數據庫表中添加一個哈希列(鍵或索引)。此哈希列具有指示器,以確定批次應用程序的哪個實例處理此特定行。例如,如果要啟動三個批次實例,則將'A'指示器標記為由實例1處理的行,'B'指示器標記為由實例2處理的行,'C'指示器標記為由實例3處理的行。 用於檢索記錄的程序將具有額外的WHERE子句,以選擇由特定指示器標記的所有行。在此表中進行插入將涉及添加標記字段,該字段將默認為其中一個實例(例如'A')。 將用於更新指示器的簡單批次應用程序,例如重新分配不同實例之間的負載。當添加了足夠多的新行時,可以運行此批次(在批次窗口以外的任何時間)以將新行重新分發給其他實例。 對於批次應用程序的其他實例,只需運行批次應用程序(如前文所述)即可將指示器重新分發以配合新的實例數量。 4.2 資料庫和應用程式設計原則 支援多分區應用程式的架構,可以運行在分區資料庫表上並使用關鍵列方法,應該包括一個用於儲存分區參數的中央分區儲存庫。 這提供了靈活性並確保可維護性。 儲存庫通常由一個稱為分區表的單一表組成。 儲存在分區表中的資訊是靜態的,通常應由資料庫管理員進行維護。 該表應包括多分區應用程式的每個分區的資訊行。 該表應具有程式ID代碼、分區號碼(分區的邏輯ID)、該分區的資料庫關鍵列的低值以及該分區的資料庫關鍵列的高值的列。 在程式啟動時,程式ID和分區號碼應從架構(特別是從控制處理任務)傳遞給應用程式。 如果使用關鍵列方法,將使用這些變數來讀取分區表,以確定應用程式要處理的資料範圍。 此外,在整個處理過程中必須使用分區號碼來: 用於新增至輸出檔案或資料庫更新中,以使合併過程正常運作。 向批次日誌報告正常處理,並向架構錯誤處理程序報告任何錯誤。 4.3 最小化死鎖 當應用程式並行運行或被分割時,可能會發生對資料庫資源的爭用和死鎖。 資料庫設計團隊必須盡可能消除潛在的爭用情況,作為資料庫設計的一部分,這是至關重要的。 此外,開發人員必須確保資料庫索引表的設計考慮了死鎖預防和效能。 死鎖或熱點經常發生在管理或架構表中,例如日誌表、控製表和鎖定表。 這些的影響也應該考慮。 識別架構中可能瓶頸的可能性非常重要的現實壓力測試。 為了將對資料的衝突影響最小化,架構應在連接到資料庫或遇到死鎖時提供服務(例如等待並重試間隔)。 這意味著有一個內建機制來對特定的資料庫回傳代碼做出反應,而不是立即發出錯誤,而是等待一定的時間並重試資料庫操作。 4.4 參數傳遞與驗證 分區架構應對應用程式開發人員相對透明。 架構應執行與以分區模式運行應用程式相關的所有任務,包括: 在應用程式啟動之前檢索分區參數。 在應用程式啟動之前驗證分區參數。 在啟動時將參數傳遞給應用程式。 驗證應包括檢查以確保: 應用程式具有足夠的分區來覆蓋整個資料範圍。 分區之間沒有間隙。 如果資料庫被分割區,可能需要進行一些額外的驗證,以確保單一分割區不會跨越資料庫分割。 此外,架構應考慮分割區的合併。 關鍵問題包括: 在進入下一個作業步驟之前,是否必須完成所有分區? 如果其中一個分割區中止會發生什麼事? ### Spring Batch 5.0的主要更新如下: 1. Java 17要求 2. 主要依賴項升級 3. 批處理基礎設施配置更新 4. 批處理測試配置更新 5. 工作參數處理更新 6. 執行上下文序列化更新 7. SystemCommandTasklet更新 8. 新功能:修剪(Pruning) 批處理基礎設施配置更新 Spring Batch 5 包括以下基礎設施配置的更新: - 數據源和事務管理器需求更新 - 事務管理器Bean曝光 - 在EnableBatchProcessing中的新注釋屬性 - 用於基礎設施Bean的新配置類 - JobExplorer和JobOperator中的事務支持 數據源和事務管理器需求更新 在過去,Spring Batch提供了基於映射的作業存儲庫和作業探查器實現,以便使用內存中的作業存儲庫。這些實現在版本4中已被棄用,並在版本5中完全移除。建議的替代方案是使用基於JDBC的實現,並使用嵌入式數據庫,如H2、HSQL等。 在此版本中,@EnableBatchProcessing註釋配置了一個基於JDBC的JobRepository,它需要在應用程序上下文中定義DataSource和PlatformTransactionManager bean。DataSource bean可以參考一個嵌入式數據庫,以便與內存中的作業存儲庫一起使用。 事務管理器Bean曝光 在4.3版本之前,@EnableBatchProcessing註釋在應用程序上下文中暴露了一個事務管理器bean。雖然這在許多情況下很方便,但無條件暴露事務管理器可能會干擾用戶定義的事務管理器。在此版本中,@EnableBatchProcessing不再在應用程序上下文中暴露事務管理器bean。 在此版本中,@EnableBatchProcessing註釋提供了新的屬性,可以指定要使用哪些組件和參數來配置批處理基礎設施的Bean。例如,現在可以像下面這樣指定Spring Batch應該在作業存儲庫中配置哪個數據源和事務管理器: ![](https://hackmd.io/_uploads/BkQc13aep.png) 在此示例中,`batchDataSource` 和 `batchTransactionManager` 指的是應用程序上下文中的Bean,它們將用於配置作業存儲庫和作業探查器。不再需要定義自定義的 `BatchConfigurer`,這在此版本中已被移除。 在此版本中,引入了一個名為 `DefaultBatchConfiguration` 的新配置類,可以作為配置基礎設施Bean的一種替代方法,而不是使用 `@EnableBatchProcessing`。該類提供了帶有默認配置的基礎設施Bean,可以根據需要進行自定義。以下片段展示了使用這個類的典型用法: ![](https://hackmd.io/_uploads/HJH2en6lp.png) ### job參數處理更新 支援任何類型作為作業參數 這個版本新增了對於任何類型作為作業參數的支援,而不僅僅是像在 v4 中只有四種預定義的類型(long、double、string、date)。這個變化對於作業參數在數據庫中的持久化方式產生了影響(不再有四個獨立的列對應每種預定義的類型)。請檢查 BATCH_JOB_EXECUTION_PARAMS 中的列變化以了解 DDL 變更。參數的類型的完整限定名現在以字符串形式進行持久化,同樣參數的值也是如此。字符串文字會使用標準的Spring轉換服務轉換為參數類型。標準的轉換服務可以通過任何需要的轉換器來擴充,以便將用戶特定的類型轉換為字符串文字,反之亦然。 默認job參數轉換 在v4中,作業參數的默認表示法如下所示: [+|-]parameterName(parameterType)=value 其中parameterType可以是[string, long, double, date]中的一種。這種表示法有一定的限制,並且與環境變量不太相容,也不太友好於Spring Boot。 在v5中,有兩種指定作業參數的方法: 預設表示法 現在的預設表示法如下所示: parameterName=parameterValue,parameterType,identificationFlag 其中,parameterType 是參數類型的完整限定名。Spring Batch 提供了 DefaultJobParametersConverter 來支援這種表示法。 擴展表示法 雖然默認表示法適用於大多數情況,但當值包含逗號等特殊符號時可能不太方便。在這種情況下,可以使用擴展表示法,它受到Spring Boot的Json應用程序屬性的啟發,並且其格式如下: parameterName='{"value": "parameterValue", "type":"parameterType", "identifying": "booleanValue"}' 這裡的 parameterType 是參數類型的完整限定名。Spring Batch 提供了 JsonJobParametersConverter 以支援這種表示法。 執行上下文序列化更新 從版本5開始,DefaultExecutionContextSerializer 已經更新為將上下文序列化/反序列化為Base64格式。 此外,由 @EnableBatchProcessing 或 DefaultBatchConfiguration 配置的默認 ExecutionContextSerializer 已從 JacksonExecutionContextStringSerializer 更改為 DefaultExecutionContextSerializer。對於 Jackson 的依賴現在變成是可選的。要使用 JacksonExecutionContextStringSerializer,需要將 jackson-core 添加到類路徑中。 SystemCommandTasklet 更新 在這個版本中,對 SystemCommandTasklet 進行了重新審視,並進行了以下更改: 引入了一個名為 CommandRunner 的新策略接口,以將命令執行與任務執行解耦。默認實現是 JvmCommandRunner,它使用 java.lang.Runtime#exec API 執行系統命令。可以實現此接口以使用任何其他API來運行系統命令。 運行命令的方法現在接受一個字符串陣列,表示命令及其參數。不再需要對命令進行分詞或進行任何預處理。這個變化使API更直觀,也更不容易出錯。 批處理測試配置更新 Spring Batch 5 包括以下測試配置的更新: - 測試工具中取消自動連線 - 遷移到 JUnit Jupiter 取消測試工具中的自動連線 在4.3版本之前,JobLauncherTestUtils 和 JobRepositoryTestUtils 會自動連線到正在測試的作業以及測試數據源,以方便設置測試基礎設施。儘管對於大多數用例來說這是方便的,但事實證明在定義了多個作業或多個數據源的測試上下文中會引起幾個問題。 在這個版本中,我們進行了一些變更,以移除這些依賴的自動連線,以避免在手動導入這些工具或通過 @SpringBatchTest 註釋進行導入時出現任何問題。 新功能 在 JobExplorer 和 JobOperator 中增加了事務支援 這個版本引入了對通過 JobExplorerFactoryBean 創建的 JobExplorer 的事務支援。現在可以指定在查詢批處理元數據時使用哪個事務管理器來執行只讀事務,同時也可以自定義事務屬性。 同樣的事務支援也被添加到了 JobOperator 中,通過一個名為 JobOperatorFactoryBean 的新工廠Bean。 EnableBatchProcessing 的自動註冊 JobOperator 從版本4開始,EnableBatchProcessing註釋提供了啟動Spring Batch作業所需的所有基本基礎設施Bean。但它並未註冊一個作業運算器(JobOperator)Bean,而這個Bean是停止、重新啟動和放棄作業執行的主要入口點。 雖然這些工具並不像啟動作業那麼常用,但自動在應用程序上下文中添加一個作業運算器可以避免最終用戶手動配置此類Bean,這對於他們來說非常有用。 改進的Java records支援 在v4.3中首次引入了對於將Java records作為分塊導向步驟中的項目的支援,但由於v4的基準為Java 8,因此該支援受到了限制。最初的支援是基於反射技巧來創建Java records並填充它們的數據,而無需訪問在Java 16中最終確定的java.lang.Record API。 現在v5以Java 17為基準,我們通過在框架的不同部分利用Record API來提升了Spring Batch對於records的支援。例如,FlatFileItemReaderBuilder現在能夠檢測項目類型是record還是普通類,並相應地配置相應的FieldSetMapper實現(對於records使用RecordFieldSetMapper,對於普通類使用BeanWrapperFieldSetMapper)。這裡的目標是使所需的FieldSetMapper類型對用戶透明。 使用Micrometer進行批量跟蹤 通過升級到Micrometer 1.10,你現在可以在批量指標之外獲得批量跟蹤。Spring Batch將為每個作業創建一個span,以及在作業內的每個步驟創建一個span。這些跟蹤元數據可以被收集並在例如Zipkin等儀表板上查看。 此外,這個版本還引入了新的指標,例如當前活動步驟,以及通過提供的JobLauncher獲取的作業啟動次數。 改進的Java records支援 在v4.3中首次引入了對於將Java records作為分塊導向步驟中的項目的支援,但由於v4的基準為Java 8,因此該支援受到了限制。最初的支援是基於反射技巧來創建Java records並填充它們的數據,而無需訪問在Java 16中最終確定的java.lang.Record API。 現在v5以Java 17為基準,我們通過在框架的不同部分利用Record API來提升了Spring Batch對於records的支援。例如,FlatFileItemReaderBuilder現在能夠檢測項目類型是record還是普通類,並相應地配置相應的FieldSetMapper實現(對於records使用RecordFieldSetMapper,對於普通類使用BeanWrapperFieldSetMapper)。這裡的目標是使所需的FieldSetMapper類型對用戶透明。 使用Micrometer進行批量跟蹤 通過升級到Micrometer 1.10,你現在可以在批量指標之外獲得批量跟蹤。Spring Batch將為每個作業創建一個span,以及在作業內的每個步驟創建一個span。這些跟蹤元數據可以被收集並在例如Zipkin等儀表板上查看。 此外,這個版本還引入了新的指標,例如當前活動步驟,以及通過提供的JobLauncher獲取的作業啟動次數。 默認使用UTF-8編碼 多年來,報告了與字符編碼相關的多個問題,這些問題出現在框架的不同領域,比如基於文件的項目讀取器和寫入器之間的默認編碼不一致,處理執行上下文中的多字節字符時的序列化/反序列化問題等。 在與JEP 400相同的精神下,並遵循UTF-8宣言,這個版本在框架的所有領域中將默認編碼更新為UTF-8,並確保可以根據需要配置此默認值。 完整的GraalVM原生支持 從v4.2開始,開始了支持使用GraalVM原生映像編譯Spring Batch應用程序的工作,並在v4.3中作為實驗功能發布。 在這個版本中,通過為GraalVM提供必要的運行時提示來顯著改進了原生支持,現在已經被視為不再處於測試階段。 執行上下文元數據的改進 除了Spring Batch已經在執行上下文中保留的與運行時信息相關的內容(比如步驟類型、重新啟動標誌等)之外,這個版本在執行上下文中添加了一個重要的細節,即用於序列化上下文的Spring Batch版本。 雖然這似乎是一個細節,但在調試與執行上下文序列化和反序列化相關的升級問題時,它具有巨大的附加價值。 改進的文檔 在這個版本中,文檔已經更新為使用Spring Asciidoctor Backend。這個後端確保了所有項目遵循相同的文檔風格。為了與其他項目保持一致,本版本更新了Spring Batch的參考文檔以使用這個後端。 批次處理的領域語言 對於任何有經驗的批次處理架構師來說,Spring Batch 中使用的批次處理的整體概念應該是熟悉且舒適的。其中包括了“作業”和“步驟”,以及開發人員提供的稱為 ItemReader 和 ItemWriter 的處理單元。 然而,由於Spring框架的模式、操作、模板、回調和慣用法,存在以下機會: - 在遵循清晰關注點分離方面有顯著的改進。 - 明確劃分的架構層和提供的服務作為介面。 - 簡單且默認的實現,允許快速採用並在開箱即用時提供便利。 - 大幅度增強了可擴展性。 下圖是批次處理參考架構的簡化版本,這個架構已經在幾十年的實施中得到證明。它提供了構成批次處理領域語言的組件的概述。這個架構框架是一個藍圖,在過去幾十年的平台(例如在主機上的 COBOL、Unix 上的 C,現在的 Java 等)的多代實施中得到了證實。JCL 和 COBOL 開發人員可能對這些概念感到熟悉,就像C、C#和Java開發人員一樣。 Spring Batch 提供了對在強大、可維護的系統中通常找到的層、組件和技術服務的實際實現,這些系統用於解決從簡單到複雜的批次處理應用程序的創建需求,並提供了基礎架構和擴展以應對非常複雜的處理需求。 ![](https://hackmd.io/_uploads/rk9JRTpgT.png) 前面的圖表突顯了構成Spring Batch領域語言的關鍵概念。一個作業(Job)包含一個或多個步驟(Step),每個步驟都具有確切的一個ItemReader、一個ItemProcessor和一個ItemWriter。需要使用JobLauncher來啟動作業,並需要在JobRepository中存儲有關當前運行過程的元數據。 Job 本節描述了與批處理作業概念相關的模式。Job是一個實體,它封裝了整個批次處理過程。與其他Spring項目一樣,可以使用XML配置文件或基於Java的配置來將作業連接在一起。這個配置可以稱為“Job配置”。然而,Job只是整體層次結構的頂部,如下圖所示: ![](https://hackmd.io/_uploads/rJKA9BCgT.png) 在Spring Batch中,作業(Job)只是Step實例的容器。它組合了在流程中邏輯上屬於一起的多個步驟,並允許配置所有步驟都全局通用的屬性,例如可重新啟動性。作業配置包括: - 作業的名稱。 - Step實例的定義和排序。 - 作業是否可以重新啟動。 對於那些使用Java配置的人來說,Spring Batch 提供了Job接口的預設實現,即SimpleJob類,它在Job之上創建了一些標準功能。在使用基於Java的配置時,可以使用一系列的建構器來實例化一個Job,如下例所示: ![](https://hackmd.io/_uploads/r1QPhHClp.png) JobInstance JobInstance 指的是一個邏輯作業運行的概念。考慮一個批次處理Job,應該在一天的結尾運行一次,例如前面圖表中的 EndOfDay Job。有一個 EndOfDay Job,但必須分別跟蹤每次Job的個別運行。對於這個Job,每天都會有一個邏輯上的 JobInstance。例如,有一個1月1日運行,一個1月2日運行,依此類推。如果1月1日的運行第一次失敗,並在隔天重新運行,它仍然是1月1日的運行。(通常這也對應著它正在處理的數據,意味著1月1日的運行處理1月1日的數據)。因此,每個 JobInstance 可以有多個執行(JobExecution 在本章後面會進一步討論),並且只有一個 JobInstance(它對應於特定的作業和識別 JobParameters)可以在給定時間運行。 JobInstance 的定義對於要加載的數據完全沒有關聯。完全取決於 ItemReader 的實現來確定如何加載數據。例如,在 EndOfDay 的情況下,數據上可能有一列指示數據所屬的有效日期或計劃日期。因此,1月1日的運行只會加載1號的數據,而1月2日的運行只會使用2號的數據。由於這種確定很可能是一個業務決策,因此由 ItemReader 來決定。然而,使用相同的 JobInstance 決定了是否使用來自先前執行的“狀態”(即 ExecutionContext,在本章後面將進一步討論)。使用新的 JobInstance 意味著“從頭開始”,而使用現有的實例通常意味著“從你上次停下來的地方開始”。 JobParameters 在討論了 JobInstance 及其與 Job 的區別之後,自然而然的問題是:“如何將一個 JobInstance 區分為另一個?” 答案是:JobParameters。一個 JobParameters 對象包含一組用於啟動批次處理作業的參數。它們可以用於在運行期間進行識別,甚至作為參考數據,如下圖所示: ![](https://hackmd.io/_uploads/Skv6yICe6.png) 在前面的例子中,有兩個實例,一個是1月1日的,另一個是1月2日的,實際上只有一個作業,但它有兩個 JobParameter 對象:一個是以 01-01-2017 作為作業參數啟動的,另一個是以 01-02-2017 作為參數啟動的。因此,可以定義合約為:JobInstance = Job + identifying JobParameters。這使得開發人員能夠有效地控制如何定義 JobInstance,因為他們控制傳入的參數是什麼。 注意:並非所有的作業參數都需要用於identifying JobInstance。預設情況下,它們確實會這樣做。但是,框架還允許提交具有不會對 JobInstance 識別有所貢獻的參數的作業。 JobExecution(作業執行) JobExecution 指的是運行一個作業的單次嘗試的技術概念。一次執行可能以失敗或成功告終,但與特定執行對應的 JobInstance 除非執行成功完成,否則不被認為是完成的。以之前描述的 EndOfDay 作業為例,考慮一個日期為 01-01-2017 的 JobInstance,第一次運行時失敗了。如果使用與第一次運行相同的識別作業參數(01-01-2017)再次運行它,將創建一個新的 JobExecution。但是,仍然只有一個 JobInstance。 一個 Job 定義了作業的內容以及如何執行,而 JobInstance 則是一個純粹的組織對象,用於將executions一起分組,主要是為了啟用正確的重新啟動語義。然而,JobExecution 是實際運行過程中發生的事情的主要存儲機制,它包含許多必須受控並持久化的屬性,如下表所示: ![](https://hackmd.io/_uploads/S1RS48AlT.png) ![](https://hackmd.io/_uploads/B1wvNIRx6.png) ### Property Status: BatchStatus: 對象用於指示執行的狀態。在運行時,它是 `BatchStatus#STARTED`。如果失敗,它是 `BatchStatus#FAILED`。如果成功完成,它是 `BatchStatus#COMPLETED`。 startTime: 一個 `java.time.LocalDateTime` 表示執行開始時的當前系統時間。如果工作尚未開始,此欄位將為空。 endTime: 一個 `java.time.LocalDateTime` 代表了執行完成時的當前系統時間,無論成功與否。如果工作尚未完成,則此欄位為空。 exitStatus: 退出狀態,指示執行的結果。這是最重要的,因為它包含返回給呼叫者的退出代碼。有關更多詳細信息,請參閱第五章。如果`Job`尚未完成,則此字段為空。 createTime: 一個 `java.time.LocalDateTime` 代表了 `JobExecution` 首次持久化時的當前系統時間。Job可能尚未開始(因此沒有開始時間),但它始終具有一個 createTime,這是框架管理Job級別 `ExecutionContexts` 所需的。 lastUpdated: 一個 `java.time.LocalDateTime` 代表了最後一次持久化 JobExecution 的時間。如果作業尚未開始,則此字段為空。 executionContext: 包含需要在執行之間持久化的任何用戶數據的「`property bag`」。 failureExceptions: 作業執行期間遇到的異常列表。如果作業失敗時遇到多個異常,這些信息可能很有用。 這些屬性很重要,因為它們被持久化,可以用來完全確定執行的狀態。例如,如果 01-01 的 EndOfDay 作業在晚上 9:00 執行,並在 9:30 失敗,將在批處理元數據表中進行以下記錄: ![](https://hackmd.io/_uploads/S12FhY0lT.png) 注意: 為了清晰和格式化的緣故,可能已經縮寫或刪除了列名稱。 現在,由於作業失敗,假設需要整個晚上才能確定問題,因此`batch-windows`現在已經關閉。進一步假設窗口從晚上 9:00 開始,`Job`再次為 01-01 啟動,從上次停止的地方開始執行,並在 9:30 成功完成。由於現在已經是第二天,必須運行 01-02 `Job`,它在稍後的 9:31 開始,並在其正常的一小時內於 10:30 完成。除非兩個`Job`嘗試訪問相同的數據可能導致在數據庫層面出現鎖定問題,否則並沒有要求必須在另一個作業`Job`之後啟動`JobInstance`。完全由調度程序來確定應該何時運行`Job`。由於它們是獨立的`JobInstance`,Spring Batch 不會阻止它們並行運行(嘗試在另一個``JobInstance``正在運行時運行相同的`JobInstance`將引發 `JobExecutionAlreadyRunningException`)。現在應該在 `JobInstance` 和 `JobParameters` 表中多一條記錄,在 `JobExecution` 表中多兩條記錄,如下表所示: ![](https://hackmd.io/_uploads/HyI7WcClp.png) ![](https://hackmd.io/_uploads/SypIW50ga.png) 注意:為了清晰和格式化的緣故,可能已經縮寫或刪除了列名稱。 # Step 一個`Step`是一個`domain object`,封裝了批次作業的獨立、順序階段。因此,每個`Job`完全由一個或多個`Step`組成。一個`Step`包含了定義和控制實際batch所需的所有信息。這是一個必然模糊的描述,因為任何給定`Step`的內容都取決於編寫作業的開發人員。一個`Step`可以像開發人員希望的那樣簡單或複雜。一個簡單的`Step`可以從文件中將數據加載到數據庫中,需要很少或沒有代碼(取決於所使用的實現方式)。一個更複雜的`Step`可能會有複雜的業務規則,這些規則作為處理的一部分應用。和`Job`一樣,一個`Step`具有一個獨立的`Step`執行(StepExecution),它與唯一的``Step``執行(JobExecution)相關聯,如下圖所示: ![](https://hackmd.io/_uploads/rkU0X5Aea.png) StepExecution StepExecution 代表對一個`Step`執行的單次嘗試。每次運行一個`Step`時都會創建一個新的 StepExecution,類似於 JobExecution。然而,如果一個`Step`無法執行,因為前面的`Step`失敗,則不會為它保留執行記錄。只有在實際開始`Step`時才會創建 StepExecution。 `Step`執行由 StepExecution 類的對象表示。每個執行都包含對應`Step`和 JobExecution 的引用以及與事務相關的數據,例如提交和回滾計數以及開始和結束時間。此外,每個`Step`執行還包含一個 ExecutionContext,其中包含開發人員需要在批次運行之間保留的任何數據,例如重新啟動所需的統計信息或狀態信息。以下表格列出了 StepExecution 的屬性: StepExecution StepExecution 代表對一個`Step`執行的單次嘗試。每次運行一個`Step`時都會創建一個新的 StepExecution,類似於 JobExecution。然而,如果一個`Step`無法執行,因為前面的`Step`失敗,則不會為它保留執行記錄。只有在實際開始`Step`時才會創建 StepExecution。 `Step excutions` 由 StepExecution 類的對象表示。每個`excution`都包含對應`Step`和 `JobExecution` 的引用以及與事務相關的數據,例如提交和回滾計數以及開始和結束時間。此外,每個`Step`執行還包含一個 `ExecutionContext`,其中包含開發人員需要在批次運行之間保留的任何數據,例如重新啟動所需的統計信息或狀態信息。以下表格列出了 `StepExecution` 的屬性: ![](https://hackmd.io/_uploads/rkwiU9Cxp.png) ![](https://hackmd.io/_uploads/Byw0z2Rga.png) 這些是 `StepExecution` 的屬性和定義: - **Status(狀態)**:一個 `BatchStatus` 對象,指示執行的狀態。當正在運行時,狀態是 `BatchStatus.STARTED`。如果失敗,狀態是 `BatchStatus.FAILED`。如果成功完成,狀態是 `BatchStatus.COMPLETED`。 - **startTime(開始時間)**:一個 `java.time.LocalDateTime` 代表了執行開始時的當前系統時間。如果`Step`尚未開始,此欄位為空。 - **endTime(結束時間)**:一個 `java.time.LocalDateTime` 代表了執行完成時的當前系統時間,無論其是否成功。如果`Step`尚未退出,此欄位為空。 - **exitStatus(退出狀態)**:指示執行結果的 `ExitStatus`。這是最重要的,因為它包含了返回給呼叫者的退出代碼。有關更多詳細信息,請參閱第五章。如果`Step`尚未退出,此欄位為空。 - **executionContext(執行上下文)**:包含需要在執行之間保留的任何用戶數據的「property-bag」。 - **readCount(讀取計數)**:已成功讀取的項目數量。 - **writeCount(寫入計數)**:已成功寫入的項目數量。 - **commitCount(提交計數)**:已為此執行提交的事務數量。 - **rollbackCount(回滾計數)**:由`Step`控制的業務事務已回滾的次數。 - **readSkipCount(讀取跳過計數)**:由於讀取失敗導致的跳過項目的次數。 - **processSkipCount(處理跳過計數)**:由於處理失敗導致的跳過項目的次數。 - **filterCount(過濾計數)**:被 ItemProcessor “過濾”的項目數量。 - **writeSkipCount(寫入跳過計數)**:由於寫入失敗導致的跳過項目的次數。 這些屬性提供了有關 `StepExecution` 的詳細信息,可以用於監控和追蹤執行情況,以及處理可能出現的錯誤或異常情況。 `ExecutionContext` `ExecutionContext` 代表了一個由鍵/值對構成的集合,由框架持久化並控制,為開發人員提供了一個存儲與 `StepExecution` 或 `JobExecution` 物件相關的持久化狀態的地方(對於熟悉 Quartz 的人來說,它與 JobDataMap 非常相似)。 最佳的使用案例是用於方便重新啟動。以平面文件輸入為例,當處理單獨的行時,框架會在提交點定期持久化 `ExecutionContext`。這樣做可以讓 ItemReader 在執行過程中發生嚴重錯誤甚至斷電時保存其狀態。所需的只是將當前已讀取的行數放入上下文中,正如以下示例所示,剩下的工作就由框架完成: ```java // 將當前已讀取的行數存入 ExecutionContext 中 ExecutionContext executionContext = stepExecution.getExecutionContext(); executionContext.putInt("currentLine", currentLine); 或是 executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition()); ``` 這樣,在需要重新啟動作業時,可以從上下文中獲取已保存的狀態,而不需要從頭開始處理所有數據。這在處理大量數據時尤其有用,可以節省大量時間。 以Job stereotypes部分的EndOfDate示例為例,假設有一個`Step`叫做「載入數據(loadData)」,它將一個文件載入到數據庫中。在第一次運行失敗後,元數據表將如以下示例所示: ![](https://hackmd.io/_uploads/B1atIn0ea.png) 在前述情況下,該`Step`執行了30分鐘,處理了40,321個「元素」,這在這種情況下可能代表文件中的行。這個值在每次提交之前由框架更新,可以包含與 `ExecutionContext` 中的條目相對應的多行。要在提交之前收到通知,需要使用各種 `StepListener` 實現之一(或者一個 `ItemStream`),這些將在本指南後面進行更詳細的討論。與前一個示例一樣,假設`Job`在第二天重新啟動。重新啟動時,將從數據庫中重建上次執行的 `ExecutionContext` 中的值。當 `ItemReader` 被打開時,它可以檢查它是否在上下文中有任何存儲的狀態,並從那裡初始化自己,如以下示例所示: if (executionContext.containsKey(getKey(LINES_READ_COUNT))) { log.debug("Initializing for restart. Restart data is: " + executionContext); long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT)); LineReader reader = getReader(); Object record = ""; while (reader.getPosition() < lineCount && record != null) { record = readLine(); } } 在這種情況下,在前述代碼運行後,當前行數為40,322,讓`Step`從上次停止的地方重新開始。你也可以使用 `ExecutionContext` 來保存關於執行本身的統計信息。例如,如果一個平面文件包含跨多行存在的待處理訂單,可能需要保存已處理的訂單數量(這與讀取的行數非常不同),以便在步驟結束時發送一封電子郵件,其中包含處理的總訂單數。 框架負責為開發人員存儲這些信息,以正確地將其與單個`Job`實例進行範圍化。了解是否應該使用現有的 `ExecutionContext` 可能非常困難。例如,在上述的「日結」示例中,當 01-01 的運行再次開始時,框架識別到這是相同的作業實例,並且在個別步驟的基礎上,從數據庫中提取 `ExecutionContext`,並將其(作為 `StepExecution` 的一部分)交給步驟本身。 相反,在 01-02 的運行中,框架識別到這是一個不同的實例,因此必須將一個空的上下文傳遞給`Step`。框架為開發人員做出了許多這樣的決定,以確保在正確的時間為它們提供狀態。還要注意,每個時刻每個 `StepExecution` 只存在一個 `ExecutionContext`。`ExecutionContext` 的用戶應該小心,因為這創建了一個共享的鍵空間。因此,在放置值時應謹慎,以確保不會覆蓋任何數據。然而,`Step`絕對不會在上下文中存儲任何數據,因此不會對框架造成不利影響。 請注意,每個 `JobExecution` 至少有一個 `ExecutionContext`,而每個 `StepExecution` 也有一個。例如,考慮以下代碼片段: ```java ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext(); //ecStep does not equal ecJob ... ``` 如註釋中所述,`ecStep` 不等於 `ecJob`。它們是兩個不同的 `ExecutionContexts`。對於`Step`範圍的 `ExecutionContext`,它會在每次提交點(commit point)中保存,而對於作業範圍的 `ExecutionContext`,它會在每次`Step`執行之間保存。 `JobRepository` 是前面提到的所有原型的持久化機制。它為 `JobLauncher`、`Job` 和 `Step` 實現提供了 CRUD(創建、讀取、更新、刪除) 操作。當首次啟動`Job`時,將從存儲庫中獲取一個 `JobExecution`。此外,在執行過程中,通過將 `StepExecution` 和 `JobExecution` 實現傳遞給存儲庫,它們也會被持久化。 在使用 Java 配置時,`@EnableBatchProcessing` 注解會自動配置 `JobRepository` 作為其中的一個組件。 `JobLauncher` 代表一個簡單的界面,用於以給定的 `JobParameters` 启动作业。以下是一個示例: ```java public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; } ``` 預期實現應該從 `JobRepository` 中獲取一個有效的 `JobExecution`,然後執行該`Job`。 ItemReader(項目讀取器)是一個抽象,代表一次從`Step`中獲取輸入的過程,每次一個項目。當ItemReader已經提供了所有可以提供的項目時,它通過返回null來表示這一點。您可以在「讀取器和寫入器」中找到有關ItemReader接口及其各種實現的更多詳細信息。 ItemWriter(項目寫入器)是一個抽象,代表一次將批量或一組項目輸出到`Step`中的過程。通常,ItemWriter不知道應該接收下一個什麼樣的輸入,它只知道當前調用中傳遞的項目。我們可以在「讀取器和寫入器」中找到有關ItemWriter接口及其各種實現的更多詳細信息。 ItemProcessor(項目處理器)是一個抽象,代表對項目進行業務處理的過程。雖然ItemReader讀取一個項目,ItemWriter寫一個項目,但ItemProcessor提供了一個訪問點來進行轉換或應用其他業務處理。如果在處理項目時確定該項目無效,則返回null表示不應該將該項目寫出。我們可以在「讀取器和寫入器」中找到有關ItemProcessor接口的更多詳細信息。 # 項目讀取器和項目寫入器 所有批處理可以以其最簡單的形式描述為讀取大量數據,執行某種類型的計算或轉換,然後將結果寫出。Spring Batch 提供了三個關鍵的接口來幫助執行大量的讀取和寫入:ItemReader、ItemProcessor 和 ItemWriter。 ## ItemReader 儘管它是一個簡單的概念,但項目讀取器是從許多不同類型的輸入提供數據的手段。最常見的例子包括: 1. 平面文件:平面文件項目讀取器從一個平面文件中讀取數據行,通常這些數據行以文件中的固定位置定義的字段來描述記錄,或者通過某些特殊字符(如逗號)來進行分隔。 2. XML:XML 項目讀取器獨立於用於解析、映射和驗證對象的技術。輸入數據允許對 XML 文件進行與 XSD 模式的驗證。 3. 數據庫:訪問數據庫資源以返回結果集,這些結果集可以映射到對象進行處理。默認的 SQL 項目讀取器實現會調用 RowMapper 來返回對象,跟蹤當需要重新啟動時的當前行,存儲基本統計信息,並提供一些稍後會進一步解釋的事務增強功能。 還有許多其他的可能性,但我們在本章節中專注於基本的幾種。可以在附錄 A 中找到所有可用的 ItemReader 實現的完整列表。 ItemReader 是一個用於通用輸入操作的基本接口,如下面的接口定義所示: ```java public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; } ``` `read` 方法定義了 `ItemReader` 最基本的契約。調用它會返回一個項目,如果沒有更多的項目可供使用,則返回 null。一個項目可以代表文件中的一行,數據庫中的一行,或者是 XML 文件中的一個元素。通常預期這些會被映射為可用的域對象(如 Trade、Foo 或其他),但在契約中並沒有這方面的要求。 預期 `ItemReader` 接口的實現是僅向前的。然而,如果底層資源是事務性的(例如 JMS 佇列),那麼在回滾情況下,調用 `read` 可能會在後續調用中返回相同的邏輯項目。還值得一提的是,由於 `ItemReader` 沒有要處理的項目,並不會引發異常。例如,如果配置了返回 0 個結果的查詢的數據庫 `ItemReader`,則在第一次調用 `read` 時會返回 null。 ItemWriter 與 ItemReader 在功能上類似,但執行的是相反的操作。它們仍然需要定位、打開和關閉資源,但它們的區別在於 ItemWriter 進行寫出,而不是讀取。對於數據庫或佇列,這些操作可能是插入、更新或發送操作。輸出的序列化格式對於每個批次作業都是特定的。 與 ItemReader 類似,ItemWriter 是一個相當通用的接口,如下面的接口定義所示: ```java public interface ItemWriter<T> { void write(Chunk<? extends T> items) throws Exception; } ``` 就像在 ItemReader 上的 `read` 一樣,`write` 提供了 ItemWriter 的基本契約。它嘗試寫出傳遞的項目列表,只要它處於打開狀態。由於通常預期將項目一起“批量”組合成一個區塊,然後進行輸出,該接口接受項目列表,而不是單個項目。在寫出列表之後,可以執行可能需要的任何刷新操作,然後再從 `write` 方法返回。例如,如果要寫入 Hibernate DAO,可以進行多次對 `write` 的調用,每次針對一個項目進行。然後,在返回之前,寫入程序可以在 Hibernate 會話上調用 flush。 在這個情境中,"flush" 是指將緩存中的數據寫入到持久性存儲中的操作。在Hibernate等ORM(對象關係映射)框架中,當你對數據進行操作時,它可能會暫時存儲在一個緩存中,以提高性能。但是,這些更改可能不會立即寫入到數據庫中,而是在某個時候進行批量處理。 當你調用 "flush" 時,它會將緩存中的數據寫入到數據庫中,確保它們得到持久化。 總的來說,"flush" 操作是將內存中的數據同步到持久性存儲的一個步驟。這在需要確保數據的完整性和一致性時非常重要。 ItemStream 雖然 ItemReader 和 ItemWriter 各自都很好地完成了它們的個別任務,但它們都存在一個共同的問題,這需要另一個接口來解決。通常,在批處理作業的範圍內,讀取器和寫入器需要被打開、關閉,並需要一個機制來持久化狀態。ItemStream 接口就是為此而存在的,如下面的例子所示: ```java public interface ItemStream { void open(ExecutionContext executionContext) throws ItemStreamException; void update(ExecutionContext executionContext) throws ItemStreamException; void close() throws ItemStreamException; } ``` 在描述每個方法之前,我們應該提一下 ExecutionContext。如果 ItemReader 的客戶端也實現了 ItemStream,則在調用 read 之前,應該先調用 open,以打開任何資源,例如文件或獲取連接。對於實現 ItemStream 的 ItemWriter,也適用類似的限制。如第二章所述,如果在 ExecutionContext 中找到了預期的數據,則可以使用它來在非初始狀態下啟動 ItemReader 或 ItemWriter。相反,close 用於確保在 open 過程中分配的任何資源都能安全地被釋放。update 主要用於確保當前正在持有的任何狀態被加載到提供的 ExecutionContext 中。該方法在提交之前被調用,以確保當前狀態在提交到數據庫之前得到持久化。 在 ItemStream 的客戶端是一個 Step 的特殊情況下(來自 Spring Batch Core),為每個 StepExecution 創建一個 ExecutionContext,以允許用戶存儲特定執行的狀態,並期望在重新啟動相同的 JobInstance 時返回該狀態。對於熟悉 Quartz 的人來說,語義非常類似於 Quartz 的 JobDataMap。 **委託模式和在`Step`中註冊** 需要注意的是,`CompositeItemWriter` 是委派模式的一個例子,在Spring Batch中這是很常見的。這些代理本身可能會實現回調接口,比如 `StepListener`。如果它們確實實現了這些接口,並且作為Job中的一個步驟的一部分與Spring Batch Core一起使用,那麼它們幾乎肯定需要手動在步驟中進行註冊。 如果讀取器、寫入器或處理器直接連接到步驟,並且實現了 `ItemStream` 或 `StepListener` 接口,它們將自動註冊。然而,由於代理不是步驟所知道的,它們需要作為listeners或streams(如果適用,可能兩者都需要)注入。 **Java 配置** ```java @Bean public Job ioSampleJob(JobRepository jobRepository) { return new JobBuilder("ioSampleJob", jobRepository) .start(step1()) .build(); } @Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(2, transactionManager) .reader(fooReader()) .processor(fooProcessor()) .writer(compositeItemWriter()) .stream(barWriter()) .build(); } @Bean public CustomCompositeItemWriter compositeItemWriter() { CustomCompositeItemWriter writer = new CustomCompositeItemWriter(); writer.setDelegate(barWriter()); return writer; } @Bean public BarWriter barWriter() { return new BarWriter(); } ``` (註:這是一段Spring的Java配置,用於配置一個批處理作業,包括一個Job和一個Step。在這裡,`compositeItemWriter` 使用了 `barWriter` 作為代理,並在 `step1` 方法中被註冊為一個流。) 這段敘述在談論使用了「委派模式」的情況,這在Spring Batch中很常見。委派模式指的是一個對象把工作委託給另一個對象來執行。 在這段代碼中,涉及到了一個 `CompositeItemWriter`,它是一個寫入器(writer)的組合,可以一次性處理多個寫入操作。在這個範例中,它把具體的寫入操作委託給了一個名為 `barWriter` 的對象。 接著提到了在Spring Batch中,如果一個讀取器、寫入器或處理器直接連接到一個步驟(Step)中,並且實現了 `ItemStream` 或 `StepListener` 接口,它們會被自動註冊。 但是對於像 `CompositeItemWriter` 這樣的委派模式,它的代理對象並不被`Step`直接知曉,所以需要手動進行註冊。這樣就確保了委派的對象也能參與到步驟的執行中。 簡單來說,這段代碼片段是在配置一個Spring Batch的作業(Job)和步驟(Step),並使用了委派模式來處理寫入操作。 **Flat Files(平面文件)** 平面文件一直以來都是交換大量數據的最常見機制之一。與XML不同的是,XML有一個共識標準來定義其結構(XSD),而讀取平面文件的人必須事先完全了解文件的結構。一般來說,所有的平面文件可以分為兩種類型:分隔符和固定長度。分隔符文件是指字段之間由分隔符(如逗號)分隔的文件。固定長度文件的字段是固定長度的。 **FieldSet** 在Spring Batch中使用平面文件(無論是輸入還是輸出)時,最重要的之一是FieldSet類。許多架構和庫都包含了幫助你從文件中讀取數據的抽象,但它們通常返回一個字符串或字符串對象數組。這只能解決一半的問題。FieldSet是Spring Batch用於實現對文件資源的字段綁定的抽象。它允許開發人員以與處理數據庫輸入類似的方式處理文件輸入。FieldSet在概念上類似於JDBC ResultSet。FieldSet僅需要一個參數:一個字符串數組(標記)。此外,您還可以配置字段的名稱,以便可以按索引或名稱訪問字段,如ResultSet的模式所示,如以下示例所示: ```java String[] tokens = new String[]{"foo", "1", "true"}; FieldSet fs = new DefaultFieldSet(tokens); String name = fs.readString(0); int value = fs.readInt(1); boolean booleanValue = fs.readBoolean(2); ``` FieldSet接口還有許多其他選項,例如Date、long、BigDecimal等等。FieldSet的最大優勢在於它提供了對平面文件輸入的一致解析。與每個批處理作業可能以潛在的意外方式進行不同的解析相比,它可以保持一致,無論是在處理由於格式異常引起的錯誤時還是進行簡單的數據轉換時。 **FlatFileItemReader** 平面文件是任何包含最多二維(表格)數據的文件。Spring Batch框架中讀取平面文件的功能由名為FlatFileItemReader的類提供,該類提供了讀取和解析平面文件的基本功能。FlatFileItemReader的兩個最重要的必需依賴項是Resource和LineMapper。LineMapper接口將在後面的部分中進一步探討。resource屬性表示Spring Core資源。有關如何創建此類bean的文檔可以在Spring框架的第五章資源中找到。因此,本指南不會詳細介紹如何創建Resource對象,僅展示以下簡單示例: ```java Resource resource = new FileSystemResource("resources/trades.csv"); ``` 在複雜的批處理環境中,目錄結構通常由企業應用集成(EAI)基礎設施管理,其中為外部接口建立了放置區,用於在FTP位置和批處理位置之間移動文件。文件移動工具超出了Spring Batch架構的範圍,但在批處理作業流中包含文件移動工具作為作業流中的步驟並不罕見。批處理架構只需要知道如何定位要處理的文件。Spring Batch從這個起點開始將數據輸送到管道中。但是,Spring Integration提供了許多這些類型的服務。 在FlatFileItemReader中的其他屬性允許您進一步指定數據的解釋方式,如下表所述: ![](https://hackmd.io/_uploads/HyAaMJ1Za.png) - comments (String[]): 指定表示註釋行的行前綴。 - encoding (String): 指定要使用的文本編碼。默認值是 UTF-8。 - lineMapper (LineMapper): 將字符串轉換為表示項目的對象。 - linesToSkip (int): 文件頂部要忽略的行數。 - recordSeparatorPolicy (RecordSeparatorPolicy): 用於確定行結尾的位置,並進行一些操作,例如在引號字符串內部時繼續跨越行結尾。 - resource (Resource): 從中讀取的資源。 - skippedLinesCallback (LineCallbackHandler): 一個接口,將要跳過的文件中行的原始內容傳遞給它。如果 linesToSkip 設置為 2,則會兩次調用此接口。 - strict (boolean): 在嚴格模式下,如果輸入資源不存在,讀取器會在 ExecutionContext 中拋出異常。否則,它會記錄該問題並繼續進行。 LineMapper 與 RowMapper 類似,它接受低層級的構造,例如 ResultSet,並返回一個對象,扁平文件處理需要相同的構造將一個字符串行轉換為一個對象,如下面的接口定義所示: ```java public interface LineMapper<T> { T mapLine(String line, int lineNumber) throws Exception; } ``` 基本契約是,給定當前行及其關聯的行號,映射器應返回結果域對象。這與 RowMapper 類似,因為每行都與其行號相關聯,就像 ResultSet 中的每行與其行號相關聯一樣。這使得可以將行號與生成的域對象進行關聯比較或進行更詳細的記錄。但是,與 RowMapper 不同的是,LineMapper 獲得了一行原始文本,正如上面所討論的,它只完成了一半工作。必須將該行分為一個 FieldSet,然後可以在後面的文檔中將其映射到一個對象。 LineTokenizer 為了將輸入的一行轉換為 FieldSet,需要一個抽象,因為可能會有許多格式的扁平文件數據需要轉換為 FieldSet。在 Spring Batch 中,這個接口就是 LineTokenizer: ```java public interface LineTokenizer { FieldSet tokenize(String line); } ``` LineTokenizer 的契約是,給定一行輸入(理論上字符串可以包含多行),將返回代表該行的 FieldSet。然後可以將此 FieldSet 傳遞給 FieldSetMapper。Spring Batch 包含以下 LineTokenizer 實現: - DelimitedLineTokenizer: 用於記錄中字段由分隔符分隔的文件。最常見的分隔符是逗號,但也經常使用管道符號或分號。 - FixedLengthTokenizer: 用於記錄中字段都具有“固定寬度”的文件。必須為每種記錄類型定義每個字段的寬度。 - PatternMatchingCompositeLineTokenizer: 通過檢查模式來確定在特定行上應使用列表中的哪個 LineTokenizer。