owned this note
owned this note
Published
Linked with GitHub
# Salsa Cycling Handling
## big picture
as a user
history of cycle handling in salsa
```
db.query(a)
...
db.query(b)
...
db.query(a)
```
in salsa
* you tag a query `q` with `#[salsa::recover(some_fn)]`
* if there should be a cycle that involves `q`...
* execute `some_fn` which will yield a result
* and you use that instead
* salsa::Cycle
* DatabaseKeyIndex
* integer that uniquely identifies a query
## interesting example
* build_all_the_modules() Vec<Result<Module, Error>>
* build_module(a) -> Module
* build_module(b)
* build_module(c)
* build_all_the_modules()
* use the recover fn here
* build_module has `#[salsa::recover]`
* you would wind up with a vector
* ModuleA
* ModuleB
* Module::Err()
## error recovery
## easy case: cycle on one thread
* just look up the stack to find all the participants
## hard case: cycles across threads
* Thread A
* query A1
* query A2
* query A3
* query B2 (already active, on thread B)
* get a lock on the dependency graph (shared across all threads)
* insert the edge A -> B into the graph with:
* blocked_on_id: B
* blocked_on_key: B2
* stack: [A1, A2, A3]
* waits on the condvar
* ...
* wakes up, reads from the `wait_results` map and completes normally
* Thread B
* query B1
* query B2
* it detects that someone is blocked on this result
* calls `unblock_runtimes_blocked_on(B2, result)` method on the runtime
* this will push the result into the `wait_results` map
* and then call `notify_one` on the condvar, which wakes up thread A
* query B3
* completes normally
---
* Thread A: query A1
* Thread A: query A2 // suppose A2 has recovery
* Thread B: query B1
* Thread B: query B2
* Thread A: query A3
* Thread A: query B2 (already active, on thread B)
* get a lock on the dependency graph (shared across all threads)
* insert the edge A -> B into the graph with:
* blocked_on_id: B
* blocked_on_key: B2
* stack: [A1, A2, A3]
* waits on the condvar
* Thread B: query A2
* call `block_on_or_unwind(A2)(`
* get a lock on the dependency graph (shared across all threads)
* walks the dependency graph between runtimes:
* is thread A blocked on the current thread? (Thread B)
* if so, we have a cycle
---
step 1:
Identify the cycle participants:
* `v = vec![A2, A3, B2, B3]`
* A2.cycle = Some(), A3.cycle = Some()
* every other stack frame, the cycle flag is None
* wake up the threads with recovery
* we are in thread B
* so we wake up thread A
* give it a `WaitResult::Cycle`
* thread B: add a dep B -> A and block, releases the lock
* thread A: acquires the lock, sees that a cycle occurred, initiates unwind
* thread A: unwind query A3
* thread A: unwind query A2
* thread A: catch the unwind, execute the recovery
* this gives us a value V for A2, which we store as normal
* thread A: resume executing -- query A1 completes with the value V for A2
* thread A: notify people waiting on A2
* thread B will wake up and resume executing, it will read from the memo map and see the value V
---
suppose A2/B2 both have recovery
* `v = vec![A2, A3, B2, B3]`
* A2.cycle = Some(), A3.cycle = Some()
* every other stack frame, the cycle flag is None
* wake up the threads with recovery
* we are in thread B
* so we wake up thread A
* give it a `WaitResult::Cycle`
* thread B: initiate unwinding
* thread B: unwind B3
* thread B: unwind B2 -- see that we have recovery and execute the recovery fn, storing the result as normal
* thread A: unwind A3
* thread A: ... as before ...
---
guarantee that
* if your query participates in a cycle
* and it has recovery
then you will see the recovery
true no matter where the cycle started
---
what if A1 and A2 had recovery set?
* execute A2 recovery and store it in the memoized map
* but then:
* The code for A1 had invoked the A2 function
* we check as we pop the stack frame for A2 from the runtime
* if the cycle flag is set; if so, initiatve unwinding again
---
# Entity system
silly compiler:
```
parser(input-text) = vec[item]
item = class | function
```
## old system
* `parse_file() -> Vec<Item>`
* `process_all_items`
* `let p = parse_file().len();`
* `for i in 0..p { process_item(i); }`
* get_item(i)
* parse_file
* returns item i
* process-item(i)
* get_item(i)
* `parse_file()[i]` -- very wrong
## new system
```
entity Item {
... class | function ...
}
```
* parser:
* allocates items by doing `Item::new(db, data)`
* returns an `Item`
* but an `Item` is just an integer (newtype'd)
* return `vec[item]`
* returning a vector of ids
* `process_all_items`
* `for item in parser() { process_item(item); }`
* `process_item(item: Item)`
* `item.field1(db)` `item.field1`
graph:
[parser-query: vec[item]]
^ |
| v
[entity: Item 0]
[data: record when this field last changed] <--- `process_item(Item 0)`
[entity: Item 1]
[entity: Item 0]