☯
IPLDflowchart LR
Idle((" "))
Active(["🚦 active"])
Exceed(["🚫 exceeds capacity"])
Expired(["🚫 expired"])
Receiving(["🚦 receiving"])
Ok(["🏁 stored"])
storage{{storage provider}}
s3{{S3}}
agent{{"💻 agent"}}
timer{{"⏱️ timer"}}
Idle -- "📬 store/add" --> Active
Active -. "⚙️ storage/allocate" .-> storage
storage -. "❌ store/exceed" .-> Exceed
storage -. "✅ s3/stat" .-> s3
s3 -. "✅ store/put" .-> Ok
s3 -. "❌ store/receive" .-> Receiving
Receiving -. "⚙️ s3/put" .-> agent
Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
agent -. "♻️ s3/put" .-> Ok
flowchart LR
Idle((" "))
Active(["🚦 active"])
Exceed(["🚫 exceeds capacity"])
Expired(["🚫 expired"])
Receiving(["🚦 receiving"])
Ok(["🏁 stored"])
storage{{storage provider}}
s3{{S3}}
agent{{"💻 agent"}}
timer{{"⏱️ timer"}}
Idle -- "📬 store/add" --> Active
Active -. "⚙️ storage/allocate" .-> storage
storage -. "❌ store/exceed" .-> Exceed
storage -. "✅ s3/stat" .-> s3
s3 -. "✅ store/put" .-> Ok
s3 -. "❌ store/receive" .-> Receiving
Receiving -. "⚙️ s3/put" .-> agent
Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
agent -. "♻️ s3/put" .-> Ok
flowchart TB
Idle((" "))
Active(["🚦 active"])
Exceed(["🚫 exceeds capacity"])
Expired(["🚫 expired"])
Receiving(["🚦 receiving"])
Ok(["🏁 stored"])
flowchart TB
Idle((" "))
Active(["🚦 active"])
Exceed(["🚫 exceeds capacity"])
Expired(["🚫 expired"])
Receiving(["🚦 receiving"])
Ok(["🏁 stored"])
type State =
| {status: "active", active: CAR}
| {status: "error", error: Expired|ExceedsLimit|NotFound}
| {status: "receiving", receiving: Receiving}
| {status: "ok", ok: CAR}
type CAR = { link: IPLD.Link, size: number }
type Expired = { message: string }
type ExceedsLimit = { message: string }
type NotFound = { message: string }
type Receiving = {
url: string
headers: {[key:string]: string}
}
flowchart LR
Idle((" "))
Active(["🚦 active"])
Exceed(["🚫 exceeds capacity"])
Expired(["🚫 expired"])
Receiving(["🚦 receiving"])
Ok(["🏁 stored"])
storage{{storage provider}}
s3{{S3}}
agent{{"💻 agent"}}
timer{{"⏱️ timer"}}
Idle -- "📬 store/add" --> Active
Active -. "⚙️ storage/allocate" .-> storage
storage -. "❌ store/exceed" .-> Exceed
storage -. "✅ s3/stat" .-> s3
s3 -. "✅ store/put" .-> Ok
s3 -. "❌ store/receive" .-> Receiving
Receiving -. "⚙️ s3/put" .-> agent
Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
agent -. "♻️ s3/put" .-> Ok
store/add
store/exceed
store/receive
store/expire
store/put
type Event =
| Message<"store/add", CAR>
| Message<"store/exceed", CAR>
| Message<"store/receive", CAR>
| Message<"store/expire", CAR>
| Message<"store/put", CAR>
type Message<Do extends string, In> = {
with: DID
do: DO
input: In
}
Store.add.provide(post)
Store.exceed.provide(post)
Store.receive.provide(post)
Store.expire.provide(post)
Store.put.provide(post)
const post = (invocation:Event, context) => {
// Derive path with-in application DAG model
const path = [invocation.with, invocation.input.root]
// Use `update` function to do atomic change at `path`
// ⚠️ Note `update` MAY be called multiple
// times and thus should be free of side effects.
context.store.transact(path, update, invocation)
}
// ⚠️ Note `update` MAY be called multiple
// times and thus should be free of side effects.
context.store.transact(path, update, invocation)
const transact = async (path, update, ...params) => {
while (true) {
try {
const {cid, data} = await STM.get(path)
const {state, effects} = update(data, ...params)
await STM.swap(path, cid, state)
Scheduler.enqueue(...effects)
} catch(_) {}
}
}
const update = (event:Event, state = init(event):State) => {
switch (message.type) {
case 'store/add': return add(event, state)
case 'store/exceed': return exceed(event, state)
case 'store/receive': return receive(event, state)
case 'store/expire': return expire(event, state)
case 'store/put': return put(event, state)
default: return panic('Invalid event')
}
}
const init = (event: Event) =>
({status:'error', error:Error(`Not found`)})
store/add
flowchart LR
Idle(" ")
Active("🚦 active")
OverCapacity("⛔️ exceeds capacity")
Expired("⛔️ expired")
Done("🏁 stored")
Receiving("🚦 receiving")
Idle -- "✉️ store/add" --> Active
Active -- "✉️ store/add" --> Active
OverCapacity -- "✉️ store/add" --> Active
Expired -- "✉️ store/add" --> Active
Done -- "✉️ store/add" --> Done
Receiving -- "✉️ store/add" --> Receiving
store/add
const add (event:StoreAdd, state:State) => {
switch (state.status) {
// If we are in error state we just retry
case 'error':
return activate(event)
// Otherwise store/add is noop
case 'ok':
case 'active':
case 'receiving':
return {state}
default:
return panic('Invalid state')
}
}
flowchart LR
Active(["🚦 active"])
storage{{storage provider}}
s3{{S3}}
Active -. "⚙️ storage/allocate" .-> storage
storage -. "❌ store/exceed" .-> I(("📬"))
storage -. "✅ s3/stat" .-> s3
s3 -. "✅ store/put" .-> I
s3 -. "❌ store/receive" .-> I
const activate = ({input}: StoreAdd) => ({
state: { status: 'active', active: input },
effects: async () => {
const alloc = await allocate(input.root, input.size)
if (alloc.error) {
await send({ 'store/exceed', input })
} else {
const info = await stat(input.root)
await send(info.ok
? {'store/put':input}
: {'store/receive':input})
}
}
})
const activate = ({input}: StoreAdd) => {
const alloc = allocate(input.size)
const exceed = alloc.catch()
.and(send({ 'store/exceed': input }))
const info = alloc.and(stat(input.root))
const put = info.and(send({ 'store/put': input }))
const receive = alloc
.and(info.catch())
.and(send({ 'store/receive': input }))
return {
state: { status: 'active', active: input },
effects: [exceed.try(), put.try(), receive.try()]
}
}
flowchart LR
Active(["🚦 active"])
Exceed(["🚫 exceeds capacity"])
storage{{storage provider}}
Active -. "⚙️ storage/allocate" .-> storage
storage -. "❌ store/exceed" .-> Exceed
const exceed = ({input}, state) => {
switch (state.status) {
case 'active':
return {
state: {
status: 'error',
error: new Error('Not enough space') }
}
case 'error':
case 'receiving':
case 'ok':
return { state }
}
}
flowchart LR
Active(["🚦 active"])
Receiving(["🚦 receiving"])
storage{{storage provider}}
s3{{S3}}
agent{{"💻 agent"}}
timer{{"⏱️ timer"}}
Active -. "⚙️ storage/allocate" .-> storage
storage -. "✅ s3/stat" .-> s3
s3 -. "❌ store/receive" .-> Receiving
Receiving -. "⚙️ s3/put" .-> agent
Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> I(("📬"))
const receive = ({input:{root}, issuer, audience}, state) => {
switch (state.status) {
case 'active': return {
state: { ...state, status: 'receiving' },
effects: [
invoke({with:issuer, do:'s3/put', url:sign(input)}),
sleep(200).and(send({ 'store/expire': input }))
]
}
case 'error': return { state }
case 'receiving': return { state }
case 'ok': return { state }
}
}
flowchart LR
Expired(["🚫 expired"])
Receiving(["🚦 receiving"])
timer{{"⏱️ timer"}}
Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
const expire = ({input}, state) => {
switch (state.status) {
case 'receiving': return {
state: {status: 'error', error: new Error('Expired')},
effects: [deallocate(input.size)]
}
case 'error': return { state }
case 'active': return { state }
case 'ok': return { state }
}
flowchart LR
Idle((" "))
Active(["🚦 active"])
Receiving(["🚦 receiving"])
Ok(["🏁 stored"])
storage{{storage provider}}
s3{{S3}}
agent{{"💻 agent"}}
Active -. "⚙️ storage/allocate" .-> storage
storage -. "✅ s3/stat" .-> s3
s3 -. "✅ store/put" .-> Ok
s3 -. "❌ store/receive" .-> Receiving
Receiving -. "⚙️ s3/put" .-> agent
agent -. "♻️ s3/put" .-> Ok
const put = ({ input }, state) => {
switch (state.status) {
case 'active':
case 'receiving':
return { state: { ...state, status: 'ok' }
case 'error': return { state }
case 'active': return { state }
case 'ok': return { state }
}
flowchart LR
Idle((" "))
Active(["🚦 active"])
Exceed(["🚫 exceeds capacity"])
Expired(["🚫 expired"])
Receiving(["🚦 receiving"])
Ok(["🏁 stored"])
storage{{storage provider}}
s3{{S3}}
agent{{"💻 agent"}}
timer{{"⏱️ timer"}}
Idle -- "📬 store/add" --> Active
Active -. "⚙️ storage/allocate" .-> storage
storage -. "❌ store/exceed" .-> Exceed
storage -. "✅ s3/stat" .-> s3
s3 -. "✅ store/put" .-> Ok
s3 -. "❌ store/receive" .-> Receiving
Receiving -. "⚙️ s3/put" .-> agent
Receiving -. "⚙️ task/sleep" .-> timer -. "✅ store/expire" .-> Expired
agent -. "♻️ s3/put" .-> Ok