# Inovcations & Receipts
> ℹ️ We extend IPLD Schema syntax with generics to reduce number of duplicated fields across varios type definitions
```ipldsch
# Task describes
type Task<Input> struct {
# Version of the invocation spec
v SemVer
# Principal execution is requested from
aud Principal
# Invoker of the task
iss Principal
# Resource to be executed with
with URI
# Ability to be executed
do Ability
# Task input parameterizing execution
# Usually a struct with named options, but also
# can be any data structure if task one param
input Input
# Arbitrary metadata associated with the request.
# Metadata should not affect result of the execution
# but it could inform e.g. the region of the execution
# or number of CPUs can be utilized.
meta {String : Any} (implicit {})
# UCAN proofs showing that issuer has required capabilities
# for this task
prf [&UCAN] (implicit [])
# Optional nonce
nnc optional String
# Optional time to schedule task to given time.
nbf optional Int
# Signature from the issuer
s Varsig
} representation map {}
type Receipt<Ok, Error, State> union {
| TaskState<Input, State, Any> "pending"
| TaskState<Input, Ok, Any> "succeeded"
| TaskState<Input, Error, Any> "failed"
} representation inline {
discriminantKey "status"
}
type TaskState<Input, State, Origin> struct {
# Link to the task been executed
task &Task<Input>
# Current state of the task been executed
state State
# Previous state of the task. This forms a
# verifiable trace of state changes.
origin optional &Origin
# Any other metadata related to task execution
meta { String: Any } implicit ({})
# Principal that issued this receipt
iss Principal
# Signature from `iss`.
s Varsig
# Proof that `iss` was authorized to perform the task on behalf of the
# `aud`. Omitted when this `iss` and task `aud` are the same principal.
prf [&UCAN] implicit ([])
} representation map {}
type TaskResultSelector struct {
task: &Task<Any>
select: Selector
}
type Selector struct {
Select "task/wait"
Select "task/catch"
Select "task/finally"
Select "task/progress"
} representation keyed
type Select union {
| Bool "*"
| Selection "only"
} representation kinded
type Selection { String: Select }
# Query is just a set of task to be executed.
# Tasks can be run concurrently except ones
# that explicitly take output of the other
type Query { String: TaskResultSelector }
```
We need to model system in which invoking a task may produce a result that is either
1. A plain data
2. An effect to be performed (either by a service or a client agent)
3. An error
So what happens when we perform `store/add` with some CAR cid ?
> ℹ️ The flow chart below illustrates finite-state machine ([FSM]) of the specific CAR in the user space
```mermaid
flowchart LR
Idle("💤 void") -- store/add -->
Active("⚙️ active") --> Storage{Enough storage?}
Storage --> |Yes| S3{Is in S3 ?}
Storage --> |No| OverCapacity("⛔️ over capacity")
S3 --> |YES| Done("✅ done")
S3 --> |No| WaitS3Put("🚦await put")
WaitS3Put -- s3/put --> Active
WaitS3Put -- s3/expire --> Expired("⛔️ expired")
Active -- store/add --> Active
Expired -- store/add --> Active
OverCapacity -- store/add --> Active
WaitS3Put -- store/add --> WaitS3Put
Done -- store/add --> Done
```
CAR record can be in followig states
- 💤 void - error: not found
- ⚙️ active - queued: pending
- ⛔️ over capacity - error: over capcity
- ⛔️ expired - error: put expired
- 🚦 await put - blocked: awaiting s3 put
- ✅ done - ok: car is stored
We can invoke `store/add` in every state and it will either transitions FSM to active state
```mermaid
flowchart LR
Idle("💤 void") -- store/add --> Active("⚙️ active")
Active -- store/add --> Active
Expired("⛔️ expired") -- store/add --> Active
OverCapacity("⛔️ over capacity") -- store/add --> Active
```
Or operation is a noop and state remains will remain the same
```mermaid
flowchart TB
WaitS3Put("🚦await put") -- store/add --> WaitS3Put
Done("✅ Done") -- store/add --> Done
```
It is also worth calling out that when we invoke `store/add` and requested CAR in not in the S3 we block the task 🚦, and wait for either S3 event or a session expiry.
I think we could model the whole thing in terms of "transactions" that
1. Update a state
2. Schedule more subtasks
```ts
interface Task<State> {
perform(): Promise<Transaction<State>>
}
interface Transaction<State> {
/**
* New state of the task
*/
state: State
/**
* Concurrent tasks to schedule
*/
tasks: Task<State>[]
}
```
```ts
provide(Stare.add, await (invocation, { agent }) => {
await agent.invoke(Store.post({
message: invocation.capability,
proof: [invocation]
}))
return Store.get(invocation.capability.root)
})
const Expired = Schema.struct({
type: 'expired'
root: Schema.link()
})
const S3Put = Schema.struct({
type: 's3-put'
root: Schema.link()
})
const Add = Schema.struct({
type: 'store/add',
root: Schema.link()
...
})
Store.post =
Store.add
.or(Stare.expire)
.or(S3.put)
.derive({
to: capability({
can: 'store/post',
with: Schema.did()
nb: {
message: Expired.or(S3Put).or(Add)
}),
derives: (put, parent) => {
retur true
}
})
provide(Store.add, post)
provide(Store.expire, post)
provide(S3.put, post)
/**
* @param {InferCapability<Store.add>|InferCapability<Store.expire>} capability
*/
function post async ({state,invocation,context}) => {
const { message } = invocation.capability.nb
const { agent, space, s3 } = context
switch (message.type) {
case 'store/add': {
switch (state.status) {
case 'active':
case 'pending':
return Transact.state({
from: state,
to: state
})
case 'error':
case 'idle':
// If we don't have enough storage space trasition to ⛔️ state
if (!await space.hasCapacityFor(input.size)) {
return state.transact({
to: {
status: 'error',
error: new Error('Not enough storage capacity')
},
tasks: []
})
// If we have enough space but no such CAR in S3
// we create presigned put URL and dispatch
// several concurrent tasks
} else if (!await s3.has(input.root)) {
const { url, headers, timeout } = s3.createPresignedPut(input)
// timeout after S3 presigned URL expires
const expire = Task
.sleep(timeout)
.and(agent.invoke(Store.post({
mesasge: { type: "expired" }
}))
// schedule a task on a client
// to upload car to S3
const request = UCAN.invoke({
issuer: agent,
audience: invocation.issuer,
capabilities: [{
with: 'did:web:web3.storage',
can: 's3/put'
nb: { url, headers, input }
}]
})
return Task.transact({
from: state,
to: {
status: 'pending',
url,
headers
}
tasks: [
Task.spawn(expire),
Task.spawn(request)
]
})
} else {
return Task.transact({
from: state,
to: {
status: 'done',
ok: {}
}
tasks: []
})
}
}
}
}
case 'expired': {
switch (state.status) {
case 'pending':
return Task.transact({
from: state,
to: {
status: 'error'
error: new Error('S3 URL expired')
}
})
case 'failed':
case 'active':
return Task.noop()
case 'idle':
return panic("This should never happen")
}
case 's3/put':
switch (state.status) {
case 'pending':
return Task.trasact({
to: {
status: 'active',
}
tasks: [Store.post({
root: state.root,
type: 'store/add'
})]
})
case 'failed':
Return Task.noop()
case 'done':
return Task.noop()
}
}
}
})
```
### Take 2
```mermaid
flowchart LR
Idle("⛔️ not found") -- store/add -->
Active("🚦 active") --> Storage{Enough storage?}
Storage --> |Yes| S3{Is in S3 ?}
Storage --> |No| OverCapacity("⛔️ exceed capacity")
S3 --> |YES| Done("✅ stored")
S3 --> |No| WaitS3Put("🚦 receiving")
WaitS3Put -- store/put --> Active
WaitS3Put -- store/expire --> Expired("⛔️ expired")
Active -- store/receive --> S3
Active -- store/exceed --> OverCapacity
Expired -- store/add --> Active
OverCapacity -- store/add --> Active
```
First lets define our states
```ts
type State =
| { status: "ok", ok: CAR }
| { status: "error", error: Expired|ExceedsLimit|NotFound }
| { status: "receiving", receiving: Receiving }
| { status: "active", active: CAR }
type CAR = { link: IPLD.Link, size: number }
type Expired = { message: string }
type ExceedsLimit = { message: string }
type NotFound = { message: string }
type Receiving = {
url: string
headers: {[key:string]: string}
}
```
Now lets define events that cause state transitions
```ts
type Event =
| StoreAdd
| StoreExpire
| StoreExceed
| StoreReceive
// Request to add a car from user
type StoreAdd = {
with: DID
do: 'store/add'
input: CAR
}
type StoreExpire = {
with: DID
do: 'store/expire'
input: { link: IPLD.Link }
}
type StoreExceed = {
with: DID
do: 'store/exceed'
input: CAR
}
type StoreReceive = {
with: DID
do: 'store/receive'
input: CAR
}
type StorePut = {
do: 'store/put'
with: DID
input: CAR
}
```
Finally we can define our state machine and invocation handler(s)
```ts
Store.add.provide(post)
Store.receive.provide(post)
Store.exceed.provide(post)
Store.expire.provide(post)
Store.put.provide(post)
const post = (invocation:Event, context) => {
// Derive path with-in application DAG model
const at = [invocation.with, invocation.input.root]
// Atomically update state at given path
// with provided update function. Returns
// new state.
// ⚠️ Note that update MAY be called multiple
// types and thus should be free of side effects.
context.store.transact(at, update, invocation)
}
const update = (event:Event, state = init(event): State) => {
switch (message.type) {
case 'store/add':
return add(event, state)
case 'store/receive':
return receive(event, state)
case 'store/exceed':
return exceed(event, state)
case 'store/expire':
return expire(event, state)
case 'store/put':
return put(event, state)
default:
return panic('Invalid event')
}
}
const init = (event: Event) => ({
status: 'error',
error: new Error(`Not found`)
})
const add (event:StoreAdd, state:State) => {
switch (state.status) {
// If we are in error state we just retry
case 'error':
return activate(event)
// Otherwise store/add is noop
case 'ok':
case 'active':
case 'receiving':
return {state}
default:
return panic('Invalid state')
}
}
const activate = (event: StoreAdd) => {
// First we need to allocate storage space
const allocate = invoke({
with: event.with,
do: 'store/allocate',
input: { size: event.input.size }
})
// If allocation succeeded we'll check if
// we have matching CAR
const get = invoke({
with: event.with,
do: 's3/get',
input: event.input
})
// We trigger store/put after we chech S3
const get = invoke({
with: event.with,
do: 'store/put'
input: {
...input,
_: wait({ ok: get })
}
})
// If allocation fails will transition to
// error state.
const fail = invoke({
with: event.with,
do: 'store/exceed'
input: event.input
})
return {
// Now we transiton to 'active' state
state: { status: 'active', active: input }
// And schedule followup tasks to transition
fx: [
// to get state if allocation went ok
Task.wait({ ok: allocate }).and(get),
// or to a failed state if allocation failed
Task.wait({ error: allocate }).and(fail)
]
}
}
const receive = (event: StoreAwait, state:State) => {
switch (state.status) {
// If we're in active state
case 'active': {
// and there we have such car in S3 we're done
if (event.has) {
return { state: { status: 'ok', ok: {} }}
}
// Otherwise we issue presigned URL and
// invoke s3/put on the client.
else {
const { url, headers } = presignS3Put()
return {
state: {
status: 'pending',
pending: { url, headers }
},
fx: [
event.source.issuer.invoke({
do: 's3/put',
with: event.source.issuer.did(),
input: { url, headers }
})
]
}
}
}
case 'pending':
case 'ok':
case 'error':
return { state }
default:
return panic('Unknown state')
}
}
const overCapacity = (event: StoreOverCapacity) => {
return {
state: {
status: 'error',
error: { message: "Not enough storage" }
}
}
}
const expire = (event:S3Expire, state:State) => {
}
const put = (event:S3Put, state:State) => {
}
```
[FSM]:https://learnyousomeerlang.com/finite-state-machines