### web3.storage ###### as ## Finite State Machine (FSM) <!-- Put the link to this slide here so people can follow --> https://hackmd.io/@gozala/w3-fsm --- ## Concurrency is Hard ###### in subtle ways --- ### Software transactional memory (STM) ##### is an alternative to lock-based synchronization --- # STM `☯` IPLD --- # 🚚 🚛 🚚 🚛 ```mermaid flowchart LR Idle((" ")) Active(["🚦 active"]) Exceed(["🚫 exceeds capacity"]) Expired(["🚫 expired"]) Receiving(["🚦 receiving"]) Ok(["🏁 stored"]) storage{{storage provider}} s3{{S3}} agent{{"💻 agent"}} timer{{"⏱️ timer"}} Idle -- "📬 store/add" --> Active Active -. "⚙️ storage/allocate" .-> storage storage -. "❌ store/exceed" .-> Exceed storage -. "✅ s3/stat" .-> s3 s3 -. "✅ store/put" .-> Ok s3 -. "❌ store/receive" .-> Receiving Receiving -. "⚙️ s3/put" .-> agent Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired agent -. "♻️ s3/put" .-> Ok ``` --- ## State ```mermaid flowchart LR Idle((" ")) Active(["🚦 active"]) Exceed(["🚫 exceeds capacity"]) Expired(["🚫 expired"]) Receiving(["🚦 receiving"]) Ok(["🏁 stored"]) storage{{storage provider}} s3{{S3}} agent{{"💻 agent"}} timer{{"⏱️ timer"}} Idle -- "📬 store/add" --> Active Active -. "⚙️ storage/allocate" .-> storage storage -. "❌ store/exceed" .-> Exceed storage -. "✅ s3/stat" .-> s3 s3 -. "✅ store/put" .-> Ok s3 -. "❌ store/receive" .-> Receiving Receiving -. "⚙️ s3/put" .-> agent Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired agent -. "♻️ s3/put" .-> Ok ``` ```mermaid flowchart TB Idle((" ")) Active(["🚦 active"]) Exceed(["🚫 exceeds capacity"]) Expired(["🚫 expired"]) Receiving(["🚦 receiving"]) Ok(["🏁 stored"]) ``` --- ## State ```mermaid flowchart TB Idle((" ")) Active(["🚦 active"]) Exceed(["🚫 exceeds capacity"]) Expired(["🚫 expired"]) Receiving(["🚦 receiving"]) Ok(["🏁 stored"]) ``` ```ts type State = | {status: "active", active: CAR} | {status: "error", error: Expired|ExceedsLimit|NotFound} | {status: "receiving", receiving: Receiving} | {status: "ok", ok: CAR} type CAR = { link: IPLD.Link, size: number } type Expired = { message: string } type ExceedsLimit = { message: string } type NotFound = { message: string } type Receiving = { url: string headers: {[key:string]: string} } ``` --- ## Events ```mermaid flowchart LR Idle((" ")) Active(["🚦 active"]) Exceed(["🚫 exceeds capacity"]) Expired(["🚫 expired"]) Receiving(["🚦 receiving"]) Ok(["🏁 stored"]) storage{{storage provider}} s3{{S3}} agent{{"💻 agent"}} timer{{"⏱️ timer"}} Idle -- "📬 store/add" --> Active Active -. "⚙️ storage/allocate" .-> storage storage -. "❌ store/exceed" .-> Exceed storage -. "✅ s3/stat" .-> s3 s3 -. "✅ store/put" .-> Ok s3 -. "❌ store/receive" .-> Receiving Receiving -. "⚙️ s3/put" .-> agent Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired agent -. "♻️ s3/put" .-> Ok ``` <small><center> ✉️ `store/add` ✉️ `store/exceed` ✉️ `store/receive` ✉️ `store/expire` ✉️ `store/put` </center></small> --- ## Events ```ts type Event = | Message<"store/add", CAR> | Message<"store/exceed", CAR> | Message<"store/receive", CAR> | Message<"store/expire", CAR> | Message<"store/put", CAR> type Message<Do extends string, In> = { with: DID do: DO input: In } ``` --- ## Wiring ```ts Store.add.provide(post) Store.exceed.provide(post) Store.receive.provide(post) Store.expire.provide(post) Store.put.provide(post) const post = (invocation:Event, context) => { // Derive path with-in application DAG model const path = [invocation.with, invocation.input.root] // Use `update` function to do atomic change at `path` // ⚠️ Note `update` MAY be called multiple // times and thus should be free of side effects. context.store.transact(path, update, invocation) } ``` --- ## STM ```ts // ⚠️ Note `update` MAY be called multiple // times and thus should be free of side effects. context.store.transact(path, update, invocation) ``` ```ts const transact = async (path, update, ...params) => { while (true) { try { const {cid, data} = await STM.get(path) const {state, effects} = update(data, ...params) await STM.swap(path, cid, state) Scheduler.enqueue(...effects) } catch(_) {} } } ``` --- # Update ♽ ```ts const update = (event:Event, state = init(event):State) => { switch (message.type) { case 'store/add': return add(event, state) case 'store/exceed': return exceed(event, state) case 'store/receive': return receive(event, state) case 'store/expire': return expire(event, state) case 'store/put': return put(event, state) default: return panic('Invalid event') } } const init = (event: Event) => ({status:'error', error:Error(`Not found`)}) ``` --- ### ✉️ `store/add` ```mermaid flowchart LR Idle(" ") Active("🚦 active") OverCapacity("⛔️ exceeds capacity") Expired("⛔️ expired") Done("🏁 stored") Receiving("🚦 receiving") Idle -- "✉️ store/add" --> Active Active -- "✉️ store/add" --> Active OverCapacity -- "✉️ store/add" --> Active Expired -- "✉️ store/add" --> Active Done -- "✉️ store/add" --> Done Receiving -- "✉️ store/add" --> Receiving ``` --- ### ✉️ `store/add` ```ts const add (event:StoreAdd, state:State) => { switch (state.status) { // If we are in error state we just retry case 'error': return activate(event) // Otherwise store/add is noop case 'ok': case 'active': case 'receiving': return {state} default: return panic('Invalid state') } } ``` --- ### 🚦 activate ```mermaid flowchart LR Active(["🚦 active"]) storage{{storage provider}} s3{{S3}} Active -. "⚙️ storage/allocate" .-> storage storage -. "❌ store/exceed" .-> I(("📬")) storage -. "✅ s3/stat" .-> s3 s3 -. "✅ store/put" .-> I s3 -. "❌ store/receive" .-> I ``` --- --- ```ts const activate = ({input}: StoreAdd) => ({ state: { status: 'active', active: input }, effects: async () => { const alloc = await allocate(input.root, input.size) if (alloc.error) { await send({ 'store/exceed', input }) } else { const info = await stat(input.root) await send(info.ok ? {'store/put':input} : {'store/receive':input}) } } }) ``` --- ```ts const activate = ({input}: StoreAdd) => { const alloc = allocate(input.size) const exceed = alloc.catch() .and(send({ 'store/exceed': input })) const info = alloc.and(stat(input.root)) const put = info.and(send({ 'store/put': input })) const receive = alloc .and(info.catch()) .and(send({ 'store/receive': input })) return { state: { status: 'active', active: input }, effects: [exceed.try(), put.try(), receive.try()] } } ``` --- ```mermaid flowchart LR Active(["🚦 active"]) Exceed(["🚫 exceeds capacity"]) storage{{storage provider}} Active -. "⚙️ storage/allocate" .-> storage storage -. "❌ store/exceed" .-> Exceed ``` --- ```ts const exceed = ({input}, state) => { switch (state.status) { case 'active': return { state: { status: 'error', error: new Error('Not enough space') } } case 'error': case 'receiving': case 'ok': return { state } } } ``` --- ```mermaid flowchart LR Active(["🚦 active"]) Receiving(["🚦 receiving"]) storage{{storage provider}} s3{{S3}} agent{{"💻 agent"}} timer{{"⏱️ timer"}} Active -. "⚙️ storage/allocate" .-> storage storage -. "✅ s3/stat" .-> s3 s3 -. "❌ store/receive" .-> Receiving Receiving -. "⚙️ s3/put" .-> agent Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> I(("📬")) ``` --- ```ts const receive = ({input:{root}, issuer, audience}, state) => { switch (state.status) { case 'active': return { state: { ...state, status: 'receiving' }, effects: [ invoke({with:issuer, do:'s3/put', url:sign(input)}), sleep(200).and(send({ 'store/expire': input })) ] } case 'error': return { state } case 'receiving': return { state } case 'ok': return { state } } } ``` --- ```mermaid flowchart LR Expired(["🚫 expired"]) Receiving(["🚦 receiving"]) timer{{"⏱️ timer"}} Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired ``` --- ```ts const expire = ({input}, state) => { switch (state.status) { case 'receiving': return { state: {status: 'error', error: new Error('Expired')}, effects: [deallocate(input.size)] } case 'error': return { state } case 'active': return { state } case 'ok': return { state } } ``` --- ```mermaid flowchart LR Idle((" ")) Active(["🚦 active"]) Receiving(["🚦 receiving"]) Ok(["🏁 stored"]) storage{{storage provider}} s3{{S3}} agent{{"💻 agent"}} Active -. "⚙️ storage/allocate" .-> storage storage -. "✅ s3/stat" .-> s3 s3 -. "✅ store/put" .-> Ok s3 -. "❌ store/receive" .-> Receiving Receiving -. "⚙️ s3/put" .-> agent agent -. "♻️ s3/put" .-> Ok ``` --- ```ts const put = ({ input }, state) => { switch (state.status) { case 'active': case 'receiving': return { state: { ...state, status: 'ok' } case 'error': return { state } case 'active': return { state } case 'ok': return { state } } ``` --- ```mermaid flowchart LR Idle((" ")) Active(["🚦 active"]) Exceed(["🚫 exceeds capacity"]) Expired(["🚫 expired"]) Receiving(["🚦 receiving"]) Ok(["🏁 stored"]) storage{{storage provider}} s3{{S3}} agent{{"💻 agent"}} timer{{"⏱️ timer"}} Idle -- "📬 store/add" --> Active Active -. "⚙️ storage/allocate" .-> storage storage -. "❌ store/exceed" .-> Exceed storage -. "✅ s3/stat" .-> s3 s3 -. "✅ store/put" .-> Ok s3 -. "❌ store/receive" .-> Receiving Receiving -. "⚙️ s3/put" .-> agent Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired agent -. "♻️ s3/put" .-> Ok ```
{"metaMigratedFrom":"YAML","metaMigratedAt":"2023-06-17T18:34:33.941Z","title":"web3.storage Finite State Machine (FSM)","breaks":true,"description":"View the slide with \"Slide Mode\".","contributors":"[{\"id\":\"fff58d5d-df29-4dd3-93ef-42fd0194a26f\",\"add\":23569,\"del\":12962}]"}
    810 views