| dip | title | author | date | statue | | -------- | -------- | -------- | -------- | -------- | | 1000 | DB3 Multi-chain Streaming SQL ETL(MSSE): Efficiently moving on-chain data to off-chain in real time | [imotai](https://github.com/imotai) |2022-07-15| wip| ## Why MSSE * multi-chain,解决多链数据实时处理成为未来必然需求 * 公链发展的趋势是多链共存,而未来L2公链会让多链共存更加明显 * 同一个应用服务会部署到多个公链上面 * streaming,流式不仅能够提升时效性,还能预处理,让API响应时间变低 * sql,sql已经成为数据处理最常见的编程语言 ## MSSE需要解决的问题 * 如何基于json rpc**生成**流式数据 * 如何对json rpc返回数据建立数据模型 * 如何在数据模型基础之上进行流式SQL处理 ## 通过定义Trigger来生成流式数据 ### Trigger * Block Based Trigger, L1的交易确认是通过block来确认的,所以针对L1会是基于Block来驱动流式数据处理 * Transaction Based Trigger TBD 定义节点自带订阅方式例子 ```yaml= trigger: name: "eth_trigger" type: "subscribe" // 通过订阅方式 rpc_call: "eth_subscribe" ``` 定义定时检查机制 ```yaml= trigger: name: "eth_trigger" type: "interval" // 通过订阅方式 rpc_call: "eth_blockNumber" interval: "11s" // 定义周期,可以以“1s”,“1m”,“1h”方式描述 ``` trigger 会驱动去抓取最新的block ## 通过Multiple Streams Source来定义数据模型 * 这里multiple streams的含义是,定义一个trigger,可以产生多个stream,比如block stream,transaction stream,log stream * 并且所有的stream是lazy,如果没有任何sql访问相关stream是不会去读取额外的数据,比如如果一个sql只使用block stream,那么访问rpc时就不会去访问交易流 * 支持嵌套访问,因为log stream里面有`data`字段是没有解析的,但是经常是需要对这些数据进行加工处理,所以在sql里面可以使用`select xxxx from log.burn_event` 定义 ```yaml= mstream: name: "eth" trigger: "eth_trigger" // 将使用上面定义的trigger node_url: "https://xxxx.xx" // 可以使用https,ws schema: "", // schema为 json rpc返回的result ``` ### 定义合约事件数据Stream Source 合约里面事件可能存在多个,所以针对 ```yaml= stream: name: "transacion" trigger: "eth_trigger" // 将使用上面定义的trigger node_url: "https://xxxx.xx" // 可以使用https,ws schema: "", // schema为 json rpc返回的result rpc_call: "eth_getBlockByNumber" // ``` ## 支持多stream驱动计算的流式SQL引擎 ### Catalog ``` ``` ### Design 整个流式计算引擎是基于以下技术 * https://github.com/db3-teams/timely-dataflow,所以db3要做的事情是扩展stream和支持新的operator * 逻辑计划抽象会使用https://substrait.io/来描述 * 逻辑计划转dataflow物理计划 * 状态存储会使用rocksdb-cloud ### Usecase #### multiple streams union 我们假设一个使用场景,uniswap的合约同时部署在以太坊,Optimism等其他公链,我们要开发一个实时API来统计uniswap 1分钟,30分钟,1个小时,12小时交易量 ```sql= CREATE MATERIALIZED uniswap_vol select sum(w1.amount) as one_minute_vol, sum(w2.amount) as thirdty_minutes_vol, sum(w3.amount) as one_hour_vol, sum(w4.amount) as twelve_hours_vol, transfer.time as `time` from ( select * from eth.log.Transfer where eth.log.addr="0x1F98431c8aD98523631AE4a59f267346ea31F984" union select * from optimism.log.Transfer where optimism.log.addr="0x1F98431c8aD98523631AE4a59f267346ea31F984" ) as transfer WINDOW w1 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 1m PRECEDING AND CURRENT ROW) , w2 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 30m PRECEDING AND CURRENT ROW) , w3 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 1h PRECEDING AND CURRENT ROW) w4 AS (PARTITION BY 1 ORDER BY transfer.time RANGE BETWEEN 12h PRECEDING AND CURRENT ROW) ``` 而目前做的好的the graph的实现需要 ```typescript= /* eslint-disable prefer-const */ import { BigDecimal, BigInt, EthereumEvent } from '@graphprotocol/graph-ts' import { Bundle, Pair, PairDayData, Token, TokenDayData, UniswapDayData, UniswapFactory } from '../types/schema' import { PairHourData } from './../types/schema' import { FACTORY_ADDRESS, ONE_BI, ZERO_BD, ZERO_BI } from './helpers' export function updateUniswapDayData(event: EthereumEvent): UniswapDayData { let uniswap = UniswapFactory.load(FACTORY_ADDRESS) let timestamp = event.block.timestamp.toI32() let dayID = timestamp / 86400 let dayStartTimestamp = dayID * 86400 let uniswapDayData = UniswapDayData.load(dayID.toString()) if (uniswapDayData === null) { uniswapDayData = new UniswapDayData(dayID.toString()) uniswapDayData.date = dayStartTimestamp uniswapDayData.dailyVolumeUSD = ZERO_BD uniswapDayData.dailyVolumeETH = ZERO_BD uniswapDayData.totalVolumeUSD = ZERO_BD uniswapDayData.totalVolumeETH = ZERO_BD uniswapDayData.dailyVolumeUntracked = ZERO_BD } uniswapDayData.totalLiquidityUSD = uniswap.totalLiquidityUSD uniswapDayData.totalLiquidityETH = uniswap.totalLiquidityETH uniswapDayData.txCount = uniswap.txCount uniswapDayData.save() return uniswapDayData as UniswapDayData } export function updatePairDayData(event: EthereumEvent): PairDayData { let timestamp = event.block.timestamp.toI32() let dayID = timestamp / 86400 let dayStartTimestamp = dayID * 86400 let dayPairID = event.address .toHexString() .concat('-') .concat(BigInt.fromI32(dayID).toString()) let pair = Pair.load(event.address.toHexString()) let pairDayData = PairDayData.load(dayPairID) if (pairDayData === null) { pairDayData = new PairDayData(dayPairID) pairDayData.date = dayStartTimestamp pairDayData.token0 = pair.token0 pairDayData.token1 = pair.token1 pairDayData.pairAddress = event.address pairDayData.dailyVolumeToken0 = ZERO_BD pairDayData.dailyVolumeToken1 = ZERO_BD pairDayData.dailyVolumeUSD = ZERO_BD pairDayData.dailyTxns = ZERO_BI } pairDayData.totalSupply = pair.totalSupply pairDayData.reserve0 = pair.reserve0 pairDayData.reserve1 = pair.reserve1 pairDayData.reserveUSD = pair.reserveUSD pairDayData.dailyTxns = pairDayData.dailyTxns.plus(ONE_BI) pairDayData.save() return pairDayData as PairDayData } export function updatePairHourData(event: EthereumEvent): PairHourData { let timestamp = event.block.timestamp.toI32() let hourIndex = timestamp / 3600 // get unique hour within unix history let hourStartUnix = hourIndex * 3600 // want the rounded effect let hourPairID = event.address .toHexString() .concat('-') .concat(BigInt.fromI32(hourIndex).toString()) let pair = Pair.load(event.address.toHexString()) let pairHourData = PairHourData.load(hourPairID) if (pairHourData === null) { pairHourData = new PairHourData(hourPairID) pairHourData.hourStartUnix = hourStartUnix pairHourData.pair = event.address.toHexString() pairHourData.hourlyVolumeToken0 = ZERO_BD pairHourData.hourlyVolumeToken1 = ZERO_BD pairHourData.hourlyVolumeUSD = ZERO_BD pairHourData.hourlyTxns = ZERO_BI } pairHourData.totalSupply = pair.totalSupply pairHourData.reserve0 = pair.reserve0 pairHourData.reserve1 = pair.reserve1 pairHourData.reserveUSD = pair.reserveUSD pairHourData.hourlyTxns = pairHourData.hourlyTxns.plus(ONE_BI) pairHourData.save() return pairHourData as PairHourData } ``` #### multiple streams join 比如查询一类nft的交易价格趋势,输入为nft的合约地址, 我们假设nft都在以太坊同一个公链上面 TBD ## 状态管理器 ## 块数据回滚 ## Reference * https://flink.apache.org/flink-applications.html