changed 2 years ago
Published Linked with GitHub

web3.storage

as

Finite State Machine (FSM)

https://hackmd.io/@gozala/w3-fsm


Concurrency is Hard

in subtle ways

Software transactional memory (STM)

is an alternative to lock-based synchronization

STM IPLD


🚚 🚛 🚚 🚛

flowchart LR
   Idle((" "))
   Active(["🚦 active"])
   Exceed(["🚫 exceeds capacity"])
   Expired(["🚫 expired"])
   Receiving(["🚦 receiving"])
   Ok(["🏁 stored"])
   
   storage{{storage provider}}
   s3{{S3}}
   agent{{"💻 agent"}}
   timer{{"⏱️ timer"}}
   
   
   Idle -- "📬 store/add" --> Active
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "❌ store/exceed" .-> Exceed
     storage -. "✅ s3/stat" .-> s3
     
     s3 -. "✅ store/put" .-> Ok
     s3 -. "❌ store/receive" .-> Receiving
     
  Receiving -. "⚙️ s3/put" .-> agent
  Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
  
  agent -. "♻️ s3/put" .-> Ok

State

flowchart LR
   Idle((" "))
   Active(["🚦 active"])
   Exceed(["🚫 exceeds capacity"])
   Expired(["🚫 expired"])
   Receiving(["🚦 receiving"])
   Ok(["🏁 stored"])
   
   storage{{storage provider}}
   s3{{S3}}
   agent{{"💻 agent"}}
   timer{{"⏱️ timer"}}
   
   
   Idle -- "📬 store/add" --> Active
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "❌ store/exceed" .-> Exceed
     storage -. "✅ s3/stat" .-> s3
     
     s3 -. "✅ store/put" .-> Ok
     s3 -. "❌ store/receive" .-> Receiving
     
  Receiving -. "⚙️ s3/put" .-> agent
  Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
  
  agent -. "♻️ s3/put" .-> Ok
flowchart TB
   Idle((" "))
   Active(["🚦 active"])
   Exceed(["🚫 exceeds capacity"])
   Expired(["🚫 expired"])
   Receiving(["🚦 receiving"])
   Ok(["🏁 stored"])

State

flowchart TB
   Idle((" "))
   Active(["🚦 active"])
   Exceed(["🚫 exceeds capacity"])
   Expired(["🚫 expired"])
   Receiving(["🚦 receiving"])
   Ok(["🏁 stored"])
type State =
  | {status: "active", active: CAR}
  | {status: "error", error: Expired|ExceedsLimit|NotFound}
  | {status: "receiving", receiving: Receiving}
  | {status: "ok",  ok: CAR}

type CAR = { link: IPLD.Link, size: number }
type Expired = { message: string }
type ExceedsLimit = { message: string }
type NotFound = { message: string }
type Receiving = {
  url: string
  headers: {[key:string]: string}
}

Events

flowchart LR
   Idle((" "))
   Active(["🚦 active"])
   Exceed(["🚫 exceeds capacity"])
   Expired(["🚫 expired"])
   Receiving(["🚦 receiving"])
   Ok(["🏁 stored"])
   
   storage{{storage provider}}
   s3{{S3}}
   agent{{"💻 agent"}}
   timer{{"⏱️ timer"}}
   
   
   Idle -- "📬 store/add" --> Active
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "❌ store/exceed" .-> Exceed
     storage -. "✅ s3/stat" .-> s3
     
     s3 -. "✅ store/put" .-> Ok
     s3 -. "❌ store/receive" .-> Receiving
     
  Receiving -. "⚙️ s3/put" .-> agent
  Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
  
  agent -. "♻️ s3/put" .-> Ok


✉️ store/add
✉️ store/exceed
✉️ store/receive
✉️ store/expire
✉️ store/put


Events

type Event =
  | Message<"store/add", CAR>
  | Message<"store/exceed", CAR>
  | Message<"store/receive", CAR>
  | Message<"store/expire", CAR>
  | Message<"store/put", CAR>

type Message<Do extends string, In> = {
  with: DID
  do: DO
  input: In
}

Wiring

Store.add.provide(post)
Store.exceed.provide(post)
Store.receive.provide(post)
Store.expire.provide(post)
Store.put.provide(post)
const post = (invocation:Event, context) => {
  // Derive path with-in application DAG model
  const path = [invocation.with, invocation.input.root]
  // Use `update` function to do atomic change at `path`

  // ⚠️ Note `update` MAY be called multiple
  // times and thus should be free of side effects.
  context.store.transact(path, update, invocation)
}

STM

  // ⚠️ Note `update` MAY be called multiple
  // times and thus should be free of side effects.
  context.store.transact(path, update, invocation)
const transact = async (path, update, ...params) => {
  while (true) {
    try {
      const {cid, data} = await STM.get(path)
      const {state, effects} = update(data, ...params)
      await STM.swap(path, cid, state)
      Scheduler.enqueue(...effects)
    } catch(_) {}
  }
}

Update ♽

const update = (event:Event, state = init(event):State) => {
  switch (message.type) {
    case 'store/add': return add(event, state)
    case 'store/exceed': return exceed(event, state)
    case 'store/receive': return receive(event, state)
    case 'store/expire': return expire(event, state)
    case 'store/put': return put(event, state)
    default: return panic('Invalid event')
  }
}

const init = (event: Event) =>
  ({status:'error', error:Error(`Not found`)})

✉️ store/add

flowchart LR
   Idle(" ")
   Active("🚦 active")
   OverCapacity("⛔️ exceeds capacity")
   Expired("⛔️ expired")
   Done("🏁 stored")
   Receiving("🚦 receiving")
   
   Idle -- "✉️ store/add" --> Active
   Active -- "✉️ store/add" --> Active
   OverCapacity -- "✉️ store/add" --> Active
   Expired -- "✉️ store/add" --> Active
   Done -- "✉️ store/add" --> Done
   Receiving -- "✉️ store/add" --> Receiving

✉️ store/add

const add (event:StoreAdd, state:State) => {
  switch (state.status) {
    // If we are in error state we just retry
    case 'error':
      return activate(event)
    // Otherwise store/add is noop
    case 'ok':
    case 'active':
    case 'receiving':
      return {state}
    default:
      return panic('Invalid state')
  }
}

🚦 activate

flowchart LR
   Active(["🚦 active"])

   
   storage{{storage provider}}
   s3{{S3}}
   
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "❌ store/exceed" .-> I(("📬"))
     storage -. "✅ s3/stat" .-> s3
     
     s3 -. "✅ store/put" .-> I
     s3 -. "❌ store/receive" .-> I


const activate = ({input}: StoreAdd) => ({
  state: { status: 'active', active: input },
  effects: async () => {
    const alloc = await allocate(input.root, input.size)
    if (alloc.error) {
      await send({ 'store/exceed', input })
    } else {
      const info = await stat(input.root)
      await send(info.ok
        ? {'store/put':input}
        : {'store/receive':input})
    }
  }
})

const activate = ({input}: StoreAdd) => {
  const alloc = allocate(input.size)
  const exceed = alloc.catch()
    .and(send({ 'store/exceed': input }))
  const info = alloc.and(stat(input.root))
  const put = info.and(send({ 'store/put': input }))
  const receive = alloc
    .and(info.catch())
    .and(send({ 'store/receive': input }))
  return {
    state: { status: 'active', active: input },
    effects: [exceed.try(), put.try(), receive.try()]
  }
}


flowchart LR

   Active(["🚦 active"])
   Exceed(["🚫 exceeds capacity"])
   
   storage{{storage provider}}
   
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "❌ store/exceed" .-> Exceed
   

const exceed = ({input}, state) => {
  switch (state.status) {
    case 'active':
      return {
        state: {
          status: 'error',
          error: new Error('Not enough space')  }
      }
    case 'error':
    case 'receiving':
    case 'ok':
      return { state }
  }
}

flowchart LR
   Active(["🚦 active"])
   Receiving(["🚦 receiving"])
   
   storage{{storage provider}}
   s3{{S3}}
   agent{{"💻 agent"}}
   timer{{"⏱️ timer"}}
   
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "✅ s3/stat" .-> s3
     
     s3 -. "❌ store/receive" .-> Receiving
  Receiving -. "⚙️ s3/put" .-> agent
  Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> I(("📬"))
  

const receive = ({input:{root}, issuer, audience}, state) => {
  switch (state.status) {
    case 'active': return {
      state: { ...state, status: 'receiving' },
      effects: [
        invoke({with:issuer, do:'s3/put', url:sign(input)}),
        sleep(200).and(send({ 'store/expire': input }))
      ]
    }
    case 'error': return { state }
    case 'receiving': return { state }
    case 'ok': return { state }
  }
}

flowchart LR

   Expired(["🚫 expired"])
   Receiving(["🚦 receiving"])
   

   timer{{"⏱️ timer"}}   
   

  Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired

const expire = ({input}, state) => {
  switch (state.status) {
    case 'receiving': return {
      state: {status: 'error', error: new Error('Expired')},
      effects: [deallocate(input.size)]
    }
    case 'error': return { state }
    case 'active': return { state }
    case 'ok': return { state }
}

flowchart LR
   Idle((" "))
   Active(["🚦 active"])
   Receiving(["🚦 receiving"])
   Ok(["🏁 stored"])
   
   storage{{storage provider}}
   s3{{S3}}
   agent{{"💻 agent"}}

   
   
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "✅ s3/stat" .-> s3
     
     s3 -. "✅ store/put" .-> Ok
     s3 -. "❌ store/receive" .-> Receiving
     
  Receiving -. "⚙️ s3/put" .-> agent
  
  agent -. "♻️ s3/put" .-> Ok

const put = ({ input }, state) => {
  switch (state.status) {
    case 'active':
    case 'receiving':
      return { state: { ...state, status: 'ok' }
    case 'error': return { state }
    case 'active': return { state }
    case 'ok': return { state }
}

flowchart LR
   Idle((" "))
   Active(["🚦 active"])
   Exceed(["🚫 exceeds capacity"])
   Expired(["🚫 expired"])
   Receiving(["🚦 receiving"])
   Ok(["🏁 stored"])
   
   storage{{storage provider}}
   s3{{S3}}
   agent{{"💻 agent"}}
   timer{{"⏱️ timer"}}
   
   
   Idle -- "📬 store/add" --> Active
   
   Active -. "⚙️ storage/allocate" .-> storage
     storage -. "❌ store/exceed" .-> Exceed
     storage -. "✅ s3/stat" .-> s3
     
     s3 -. "✅ store/put" .-> Ok
     s3 -. "❌ store/receive" .-> Receiving
     
  Receiving -. "⚙️ s3/put" .-> agent
  Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
  
  agent -. "♻️ s3/put" .-> Ok
Select a repo