| dip | title | author | date | statue |
| -------- | -------- | -------- | -------- | -------- |
| 1000 | DB3 Multi-chain Streaming SQL ETL(MSSE): Efficiently moving on-chain data to off-chain in real time | [imotai](https://github.com/imotai) |2022-07-15| wip|
## Why MSSE
* multi-chain,解决多链数据实时处理成为未来必然需求
* 公链发展的趋势是多链共存,而未来L2公链会让多链共存更加明显
* 同一个应用服务会部署到多个公链上面
* streaming,流式不仅能够提升时效性,还能预处理,让API响应时间变低
* sql,sql已经成为数据处理最常见的编程语言
## MSSE需要解决的问题
* 如何基于json rpc**生成**流式数据
* 如何对json rpc返回数据建立数据模型
* 如何在数据模型基础之上进行流式SQL处理
## 通过定义Trigger来生成流式数据
### Trigger
* Block Based Trigger, L1的交易确认是通过block来确认的,所以针对L1会是基于Block来驱动流式数据处理
* Transaction Based Trigger TBD
定义节点自带订阅方式例子
```yaml=
trigger:
name: "eth_trigger"
type: "subscribe" // 通过订阅方式
rpc_call: "eth_subscribe"
```
定义定时检查机制
```yaml=
trigger:
name: "eth_trigger"
type: "interval" // 通过订阅方式
rpc_call: "eth_blockNumber"
interval: "11s" // 定义周期,可以以“1s”,“1m”,“1h”方式描述
```
trigger 会驱动去抓取最新的block
## 通过Multiple Streams Source来定义数据模型
* 这里multiple streams的含义是,定义一个trigger,可以产生多个stream,比如block stream,transaction stream,log stream
* 并且所有的stream是lazy,如果没有任何sql访问相关stream是不会去读取额外的数据,比如如果一个sql只使用block stream,那么访问rpc时就不会去访问交易流
* 支持嵌套访问,因为log stream里面有`data`字段是没有解析的,但是经常是需要对这些数据进行加工处理,所以在sql里面可以使用`select xxxx from log.burn_event`
定义
```yaml=
mstream:
name: "eth"
trigger: "eth_trigger" // 将使用上面定义的trigger
node_url: "https://xxxx.xx" // 可以使用https,ws
schema: "", // schema为 json rpc返回的result
```
### 定义合约事件数据Stream Source
合约里面事件可能存在多个,所以针对
```yaml=
stream:
name: "transacion"
trigger: "eth_trigger" // 将使用上面定义的trigger
node_url: "https://xxxx.xx" // 可以使用https,ws
schema: "", // schema为 json rpc返回的result
rpc_call: "eth_getBlockByNumber" //
```
## 支持多stream驱动计算的流式SQL引擎
### Catalog
```
```
### Design
整个流式计算引擎是基于以下技术
* https://github.com/db3-teams/timely-dataflow,所以db3要做的事情是扩展stream和支持新的operator
* 逻辑计划抽象会使用https://substrait.io/来描述
* 逻辑计划转dataflow物理计划
* 状态存储会使用rocksdb-cloud
### Usecase
#### multiple streams union
我们假设一个使用场景,uniswap的合约同时部署在以太坊,Optimism等其他公链,我们要开发一个实时API来统计uniswap 1分钟,30分钟,1个小时,12小时交易量
```sql=
CREATE MATERIALIZED uniswap_vol
select sum(w1.amount) as one_minute_vol,
sum(w2.amount) as thirdty_minutes_vol,
sum(w3.amount) as one_hour_vol,
sum(w4.amount) as twelve_hours_vol,
transfer.time as `time`
from (
select * from eth.log.Transfer where eth.log.addr="0x1F98431c8aD98523631AE4a59f267346ea31F984"
union
select * from optimism.log.Transfer where optimism.log.addr="0x1F98431c8aD98523631AE4a59f267346ea31F984"
) as transfer
WINDOW w1 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 1m PRECEDING AND CURRENT ROW) ,
w2 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 30m PRECEDING AND CURRENT ROW) ,
w3 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 1h PRECEDING AND CURRENT ROW)
w4 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 12h PRECEDING AND CURRENT ROW)
```
而目前做的好的the graph的实现需要
```typescript=
/* eslint-disable prefer-const */
import { BigDecimal, BigInt, EthereumEvent } from '@graphprotocol/graph-ts'
import { Bundle, Pair, PairDayData, Token, TokenDayData, UniswapDayData, UniswapFactory } from '../types/schema'
import { PairHourData } from './../types/schema'
import { FACTORY_ADDRESS, ONE_BI, ZERO_BD, ZERO_BI } from './helpers'
export function updateUniswapDayData(event: EthereumEvent): UniswapDayData {
let uniswap = UniswapFactory.load(FACTORY_ADDRESS)
let timestamp = event.block.timestamp.toI32()
let dayID = timestamp / 86400
let dayStartTimestamp = dayID * 86400
let uniswapDayData = UniswapDayData.load(dayID.toString())
if (uniswapDayData === null) {
uniswapDayData = new UniswapDayData(dayID.toString())
uniswapDayData.date = dayStartTimestamp
uniswapDayData.dailyVolumeUSD = ZERO_BD
uniswapDayData.dailyVolumeETH = ZERO_BD
uniswapDayData.totalVolumeUSD = ZERO_BD
uniswapDayData.totalVolumeETH = ZERO_BD
uniswapDayData.dailyVolumeUntracked = ZERO_BD
}
uniswapDayData.totalLiquidityUSD = uniswap.totalLiquidityUSD
uniswapDayData.totalLiquidityETH = uniswap.totalLiquidityETH
uniswapDayData.txCount = uniswap.txCount
uniswapDayData.save()
return uniswapDayData as UniswapDayData
}
export function updatePairDayData(event: EthereumEvent): PairDayData {
let timestamp = event.block.timestamp.toI32()
let dayID = timestamp / 86400
let dayStartTimestamp = dayID * 86400
let dayPairID = event.address
.toHexString()
.concat('-')
.concat(BigInt.fromI32(dayID).toString())
let pair = Pair.load(event.address.toHexString())
let pairDayData = PairDayData.load(dayPairID)
if (pairDayData === null) {
pairDayData = new PairDayData(dayPairID)
pairDayData.date = dayStartTimestamp
pairDayData.token0 = pair.token0
pairDayData.token1 = pair.token1
pairDayData.pairAddress = event.address
pairDayData.dailyVolumeToken0 = ZERO_BD
pairDayData.dailyVolumeToken1 = ZERO_BD
pairDayData.dailyVolumeUSD = ZERO_BD
pairDayData.dailyTxns = ZERO_BI
}
pairDayData.totalSupply = pair.totalSupply
pairDayData.reserve0 = pair.reserve0
pairDayData.reserve1 = pair.reserve1
pairDayData.reserveUSD = pair.reserveUSD
pairDayData.dailyTxns = pairDayData.dailyTxns.plus(ONE_BI)
pairDayData.save()
return pairDayData as PairDayData
}
export function updatePairHourData(event: EthereumEvent): PairHourData {
let timestamp = event.block.timestamp.toI32()
let hourIndex = timestamp / 3600 // get unique hour within unix history
let hourStartUnix = hourIndex * 3600 // want the rounded effect
let hourPairID = event.address
.toHexString()
.concat('-')
.concat(BigInt.fromI32(hourIndex).toString())
let pair = Pair.load(event.address.toHexString())
let pairHourData = PairHourData.load(hourPairID)
if (pairHourData === null) {
pairHourData = new PairHourData(hourPairID)
pairHourData.hourStartUnix = hourStartUnix
pairHourData.pair = event.address.toHexString()
pairHourData.hourlyVolumeToken0 = ZERO_BD
pairHourData.hourlyVolumeToken1 = ZERO_BD
pairHourData.hourlyVolumeUSD = ZERO_BD
pairHourData.hourlyTxns = ZERO_BI
}
pairHourData.totalSupply = pair.totalSupply
pairHourData.reserve0 = pair.reserve0
pairHourData.reserve1 = pair.reserve1
pairHourData.reserveUSD = pair.reserveUSD
pairHourData.hourlyTxns = pairHourData.hourlyTxns.plus(ONE_BI)
pairHourData.save()
return pairHourData as PairHourData
}
```
#### multiple streams join
比如查询一类nft的交易价格趋势,输入为nft的合约地址, 我们假设nft都在以太坊同一个公链上面 TBD
## 状态管理器
## 块数据回滚
## Reference
* https://flink.apache.org/flink-applications.html