# Inovcations & Receipts > ℹ️ We extend IPLD Schema syntax with generics to reduce number of duplicated fields across varios type definitions ```ipldsch # Task describes type Task<Input> struct { # Version of the invocation spec v SemVer # Principal execution is requested from aud Principal # Invoker of the task iss Principal # Resource to be executed with with URI # Ability to be executed do Ability # Task input parameterizing execution # Usually a struct with named options, but also # can be any data structure if task one param input Input # Arbitrary metadata associated with the request. # Metadata should not affect result of the execution # but it could inform e.g. the region of the execution # or number of CPUs can be utilized. meta {String : Any} (implicit {}) # UCAN proofs showing that issuer has required capabilities # for this task prf [&UCAN] (implicit []) # Optional nonce nnc optional String # Optional time to schedule task to given time. nbf optional Int # Signature from the issuer s Varsig } representation map {} type Receipt<Ok, Error, State> union { | TaskState<Input, State, Any> "pending" | TaskState<Input, Ok, Any> "succeeded" | TaskState<Input, Error, Any> "failed" } representation inline { discriminantKey "status" } type TaskState<Input, State, Origin> struct { # Link to the task been executed task &Task<Input> # Current state of the task been executed state State # Previous state of the task. This forms a # verifiable trace of state changes. origin optional &Origin # Any other metadata related to task execution meta { String: Any } implicit ({}) # Principal that issued this receipt iss Principal # Signature from `iss`. s Varsig # Proof that `iss` was authorized to perform the task on behalf of the # `aud`. Omitted when this `iss` and task `aud` are the same principal. prf [&UCAN] implicit ([]) } representation map {} type TaskResultSelector struct { task: &Task<Any> select: Selector } type Selector struct { Select "task/wait" Select "task/catch" Select "task/finally" Select "task/progress" } representation keyed type Select union { | Bool "*" | Selection "only" } representation kinded type Selection { String: Select } # Query is just a set of task to be executed. # Tasks can be run concurrently except ones # that explicitly take output of the other type Query { String: TaskResultSelector } ``` We need to model system in which invoking a task may produce a result that is either 1. A plain data 2. An effect to be performed (either by a service or a client agent) 3. An error So what happens when we perform `store/add` with some CAR cid ? > ℹ️ The flow chart below illustrates finite-state machine ([FSM]) of the specific CAR in the user space ```mermaid flowchart LR Idle("💤 void") -- store/add --> Active("⚙️ active") --> Storage{Enough storage?} Storage --> |Yes| S3{Is in S3 ?} Storage --> |No| OverCapacity("⛔️ over capacity") S3 --> |YES| Done("✅ done") S3 --> |No| WaitS3Put("🚦await put") WaitS3Put -- s3/put --> Active WaitS3Put -- s3/expire --> Expired("⛔️ expired") Active -- store/add --> Active Expired -- store/add --> Active OverCapacity -- store/add --> Active WaitS3Put -- store/add --> WaitS3Put Done -- store/add --> Done ``` CAR record can be in followig states - 💤 void - error: not found - ⚙️ active - queued: pending - ⛔️ over capacity - error: over capcity - ⛔️ expired - error: put expired - 🚦 await put - blocked: awaiting s3 put - ✅ done - ok: car is stored We can invoke `store/add` in every state and it will either transitions FSM to active state ```mermaid flowchart LR Idle("💤 void") -- store/add --> Active("⚙️ active") Active -- store/add --> Active Expired("⛔️ expired") -- store/add --> Active OverCapacity("⛔️ over capacity") -- store/add --> Active ``` Or operation is a noop and state remains will remain the same ```mermaid flowchart TB WaitS3Put("🚦await put") -- store/add --> WaitS3Put Done("✅ Done") -- store/add --> Done ``` It is also worth calling out that when we invoke `store/add` and requested CAR in not in the S3 we block the task 🚦, and wait for either S3 event or a session expiry. I think we could model the whole thing in terms of "transactions" that 1. Update a state 2. Schedule more subtasks ```ts interface Task<State> { perform(): Promise<Transaction<State>> } interface Transaction<State> { /** * New state of the task */ state: State /** * Concurrent tasks to schedule */ tasks: Task<State>[] } ``` ```ts provide(Stare.add, await (invocation, { agent }) => { await agent.invoke(Store.post({ message: invocation.capability, proof: [invocation] })) return Store.get(invocation.capability.root) }) const Expired = Schema.struct({ type: 'expired' root: Schema.link() }) const S3Put = Schema.struct({ type: 's3-put' root: Schema.link() }) const Add = Schema.struct({ type: 'store/add', root: Schema.link() ... }) Store.post = Store.add .or(Stare.expire) .or(S3.put) .derive({ to: capability({ can: 'store/post', with: Schema.did() nb: { message: Expired.or(S3Put).or(Add) }), derives: (put, parent) => { retur true } }) provide(Store.add, post) provide(Store.expire, post) provide(S3.put, post) /** * @param {InferCapability<Store.add>|InferCapability<Store.expire>} capability */ function post async ({state,invocation,context}) => { const { message } = invocation.capability.nb const { agent, space, s3 } = context switch (message.type) { case 'store/add': { switch (state.status) { case 'active': case 'pending': return Transact.state({ from: state, to: state }) case 'error': case 'idle': // If we don't have enough storage space trasition to ⛔️ state if (!await space.hasCapacityFor(input.size)) { return state.transact({ to: { status: 'error', error: new Error('Not enough storage capacity') }, tasks: [] }) // If we have enough space but no such CAR in S3 // we create presigned put URL and dispatch // several concurrent tasks } else if (!await s3.has(input.root)) { const { url, headers, timeout } = s3.createPresignedPut(input) // timeout after S3 presigned URL expires const expire = Task .sleep(timeout) .and(agent.invoke(Store.post({ mesasge: { type: "expired" } })) // schedule a task on a client // to upload car to S3 const request = UCAN.invoke({ issuer: agent, audience: invocation.issuer, capabilities: [{ with: 'did:web:web3.storage', can: 's3/put' nb: { url, headers, input } }] }) return Task.transact({ from: state, to: { status: 'pending', url, headers } tasks: [ Task.spawn(expire), Task.spawn(request) ] }) } else { return Task.transact({ from: state, to: { status: 'done', ok: {} } tasks: [] }) } } } } case 'expired': { switch (state.status) { case 'pending': return Task.transact({ from: state, to: { status: 'error' error: new Error('S3 URL expired') } }) case 'failed': case 'active': return Task.noop() case 'idle': return panic("This should never happen") } case 's3/put': switch (state.status) { case 'pending': return Task.trasact({ to: { status: 'active', } tasks: [Store.post({ root: state.root, type: 'store/add' })] }) case 'failed': Return Task.noop() case 'done': return Task.noop() } } } }) ``` ### Take 2 ```mermaid flowchart LR Idle("⛔️ not found") -- store/add --> Active("🚦 active") --> Storage{Enough storage?} Storage --> |Yes| S3{Is in S3 ?} Storage --> |No| OverCapacity("⛔️ exceed capacity") S3 --> |YES| Done("✅ stored") S3 --> |No| WaitS3Put("🚦 receiving") WaitS3Put -- store/put --> Active WaitS3Put -- store/expire --> Expired("⛔️ expired") Active -- store/receive --> S3 Active -- store/exceed --> OverCapacity Expired -- store/add --> Active OverCapacity -- store/add --> Active ``` First lets define our states ```ts type State = | { status: "ok", ok: CAR } | { status: "error", error: Expired|ExceedsLimit|NotFound } | { status: "receiving", receiving: Receiving } | { status: "active", active: CAR } type CAR = { link: IPLD.Link, size: number } type Expired = { message: string } type ExceedsLimit = { message: string } type NotFound = { message: string } type Receiving = { url: string headers: {[key:string]: string} } ``` Now lets define events that cause state transitions ```ts type Event = | StoreAdd | StoreExpire | StoreExceed | StoreReceive // Request to add a car from user type StoreAdd = { with: DID do: 'store/add' input: CAR } type StoreExpire = { with: DID do: 'store/expire' input: { link: IPLD.Link } } type StoreExceed = { with: DID do: 'store/exceed' input: CAR } type StoreReceive = { with: DID do: 'store/receive' input: CAR } type StorePut = { do: 'store/put' with: DID input: CAR } ``` Finally we can define our state machine and invocation handler(s) ```ts Store.add.provide(post) Store.receive.provide(post) Store.exceed.provide(post) Store.expire.provide(post) Store.put.provide(post) const post = (invocation:Event, context) => { // Derive path with-in application DAG model const at = [invocation.with, invocation.input.root] // Atomically update state at given path // with provided update function. Returns // new state. // ⚠️ Note that update MAY be called multiple // types and thus should be free of side effects. context.store.transact(at, update, invocation) } const update = (event:Event, state = init(event): State) => { switch (message.type) { case 'store/add': return add(event, state) case 'store/receive': return receive(event, state) case 'store/exceed': return exceed(event, state) case 'store/expire': return expire(event, state) case 'store/put': return put(event, state) default: return panic('Invalid event') } } const init = (event: Event) => ({ status: 'error', error: new Error(`Not found`) }) const add (event:StoreAdd, state:State) => { switch (state.status) { // If we are in error state we just retry case 'error': return activate(event) // Otherwise store/add is noop case 'ok': case 'active': case 'receiving': return {state} default: return panic('Invalid state') } } const activate = (event: StoreAdd) => { // First we need to allocate storage space const allocate = invoke({ with: event.with, do: 'store/allocate', input: { size: event.input.size } }) // If allocation succeeded we'll check if // we have matching CAR const get = invoke({ with: event.with, do: 's3/get', input: event.input }) // We trigger store/put after we chech S3 const get = invoke({ with: event.with, do: 'store/put' input: { ...input, _: wait({ ok: get }) } }) // If allocation fails will transition to // error state. const fail = invoke({ with: event.with, do: 'store/exceed' input: event.input }) return { // Now we transiton to 'active' state state: { status: 'active', active: input } // And schedule followup tasks to transition fx: [ // to get state if allocation went ok Task.wait({ ok: allocate }).and(get), // or to a failed state if allocation failed Task.wait({ error: allocate }).and(fail) ] } } const receive = (event: StoreAwait, state:State) => { switch (state.status) { // If we're in active state case 'active': { // and there we have such car in S3 we're done if (event.has) { return { state: { status: 'ok', ok: {} }} } // Otherwise we issue presigned URL and // invoke s3/put on the client. else { const { url, headers } = presignS3Put() return { state: { status: 'pending', pending: { url, headers } }, fx: [ event.source.issuer.invoke({ do: 's3/put', with: event.source.issuer.did(), input: { url, headers } }) ] } } } case 'pending': case 'ok': case 'error': return { state } default: return panic('Unknown state') } } const overCapacity = (event: StoreOverCapacity) => { return { state: { status: 'error', error: { message: "Not enough storage" } } } } const expire = (event:S3Expire, state:State) => { } const put = (event:S3Put, state:State) => { } ``` [FSM]:https://learnyousomeerlang.com/finite-state-machines