# Spring Data JPA - MySQL using streams (1) ## 場景 需要撈取從資料庫 (MySQL) 中某個時間點以後的使用者瀏覽紀錄,並發送活動通知。但是這些紀錄量非常的龐大,經常會造成 OOM (**out-of-memory**)。 過程中使用到的項目是 - Spring Data JPA + Hibernate - MySQL ```java! @Entity public class UserBrowsingHistory{ @Id long id; @Basic Instant createdTime; @ManyToOne User user; } ``` ```java! public interface UserBrowsingHistoryRepository extends JpaRepository<UserBrowsingHistory, Long>{ List<UserBrowsingHistory> findAllByCreatedTimeIsGreaterThan(Instant now); } ``` ```java= private final UserBrowsingHistoryRepository repository; public void sendPromotionMessage(){ repository.findAllByCreatedTimeIsGreaterThan(Instant.now()) .forEach(this::sendMessage); } ``` ## 為何會發生 OOM ? 滿足 createdTime 條件的瀏覽紀錄都會被從資料庫中讀取出來,`findAllByCreatedTimeIsGreaterThan` 是一口氣將資料都讀取進入記憶體中,所以才會造成這樣的問題。 ## 改 stream ? Spring Data JPA 支援使用 stream 回傳結果,那就來試看看吧! ```java public interface UserBrowsingHistoryRepository extends JpaRepository<UserBrowsingHistory, Long>{ Stream<UserBrowsingHistory> streamAllByCreatedTimeIsGreaterThan(Instant now); } ``` 然後記得使用 try-with-resources block 處理 >A Stream potentially wraps underlying data store specific resources and must therefore be closed after usage. You can either manually close the Stream using the close() method or by using a Java 7 try-with-resources block. ```java= private final UserBrowsingHistoryRepository repository; public void sendPromotionMessage(){ try(var historyStream = repository.streamAllByCreatedTimeIsGreaterThan(Instant.now())){ historyStream.forEach(this::sendMessage); } } ``` Ref: [Streaming query results](https://docs.spring.io/spring-data/jpa/docs/1.8.0.RELEASE/reference/html/#repositories.query-streaming) ## 疑?記憶體的使用情況似乎一樣沒有改善 都已經改用 stream 了,那表示資料的讀取已經不是一口氣從資料庫中取得了,那為什麼記憶體使用情況依然沒有得到改善? `org.springframework.data.jpa.provider.PersistenceProvider` 中可以發現使用的是 `ScrollableResults` 外表看似是使用 stream 但骨子裡走的還是一口氣取得資料的老路呢! ## 那... 真。stream 的方式要如何實現 參考 [ JDBC API Implementation Notes](https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html) ### ResultSet > By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time. ```java= stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); ``` 從這邊可以知道需要滿足三項條件 1. TYPE_FORWARD_ONLY 2. CONCUR_READ_ONLY 3. fetchSize = Integer.MIN_VALUE 好的,那就來看看我們要如何透過 Spring Data JPA 來滿足這些條件,啟動真正的 stream! ### TYPE_FORWARD_ONLY [Interface ResultSet Doc](https://docs.oracle.com/javase/8/docs/api/java/sql/ResultSet.html) >A default ResultSet object is not updatable and has a cursor that moves forward only. Thus, you can iterate through it only once and only from the first row to the last row. ### CONCUR_READ_ONLY 加上 `@Transactional(readOnly = true)` ```jav= private final UserBrowsingHistoryRepository repository; @Transactional(readOnly = true) public void sendPromotionMessage(){ try(var historyStream = repository.streamAllByCreatedTimeIsGreaterThan(Instant.now())){ historyStream.forEach(this::sendMessage); } } ``` ### fetchSize = Integer.MIN_VALUE 加上 Query Hint ```java public interface UserBrowsingHistoryRepository extends JpaRepository<UserBrowsingHistory, Long>{ @QueryHints( value = @QueryHint(name = HINT_FETCH_SIZE, value = Integer.MIN_VALUE+"")) Stream<UserBrowsingHistory> streamAllByCreatedTimeIsGreaterThan(Instant now); } ``` ## 大功告成 在這樣的設定下,我們就能使用 Spring Data JPA 針對 MySQL 做 stream 處理,再也不用擔心因為資料量太大而產生 OOM 的問題,下一篇中會再介紹這中間更細節的注意事項。 ###### tags: `blog` `java` `spring data jpa` `MySQL`