Salsa Cycling Handling

big picture

as a user

history of cycle handling in salsa

db.query(a)

...

db.query(b)

...

db.query(a)

in salsa

  • you tag a query q with #[salsa::recover(some_fn)]

    • if there should be a cycle that involves q
      • execute some_fn which will yield a result
      • and you use that instead
  • salsa::Cycle

  • DatabaseKeyIndex

    • integer that uniquely identifies a query

interesting example

  • build_all_the_modules() Vec<Result<Module, Error>>
    • build_module(a) -> Module
    • build_module(b)
    • build_module©
      • build_all_the_modules()
      • use the recover fn here
  • build_module has #[salsa::recover]
  • you would wind up with a vector
    • ModuleA
    • ModuleB
    • Module::Err()

error recovery

easy case: cycle on one thread

  • just look up the stack to find all the participants

hard case: cycles across threads

  • Thread A
    • query A1
    • query A2
    • query A3
    • query B2 (already active, on thread B)
      • get a lock on the dependency graph (shared across all threads)
      • insert the edge A -> B into the graph with:
        • blocked_on_id: B
        • blocked_on_key: B2
        • stack: [A1, A2, A3]
        • waits on the condvar
        • wakes up, reads from the wait_results map and completes normally
  • Thread B
    • query B1
    • query B2
      • it detects that someone is blocked on this result
      • calls unblock_runtimes_blocked_on(B2, result) method on the runtime
      • this will push the result into the wait_results map
      • and then call notify_one on the condvar, which wakes up thread A
    • query B3
      • completes normally

  • Thread A: query A1
  • Thread A: query A2 // suppose A2 has recovery
  • Thread B: query B1
  • Thread B: query B2
  • Thread A: query A3
  • Thread A: query B2 (already active, on thread B)
    • get a lock on the dependency graph (shared across all threads)
    • insert the edge A -> B into the graph with:
      • blocked_on_id: B
      • blocked_on_key: B2
      • stack: [A1, A2, A3]
      • waits on the condvar
  • Thread B: query A2
    • call block_on_or_unwind(A2)(
      • get a lock on the dependency graph (shared across all threads)
      • walks the dependency graph between runtimes:
        • is thread A blocked on the current thread? (Thread B)
        • if so, we have a cycle

step 1:

Identify the cycle participants:

  • v = vec![A2, A3, B2, B3]
  • A2.cycle = Some(), A3.cycle = Some()
    • every other stack frame, the cycle flag is None
  • wake up the threads with recovery
    • we are in thread B
    • so we wake up thread A
      • give it a WaitResult::Cycle
  • thread B: add a dep B -> A and block, releases the lock
  • thread A: acquires the lock, sees that a cycle occurred, initiates unwind
  • thread A: unwind query A3
  • thread A: unwind query A2
  • thread A: catch the unwind, execute the recovery
    • this gives us a value V for A2, which we store as normal
  • thread A: resume executing query A1 completes with the value V for A2
  • thread A: notify people waiting on A2
  • thread B will wake up and resume executing, it will read from the memo map and see the value V

suppose A2/B2 both have recovery

  • v = vec![A2, A3, B2, B3]
  • A2.cycle = Some(), A3.cycle = Some()
    • every other stack frame, the cycle flag is None
  • wake up the threads with recovery
    • we are in thread B
    • so we wake up thread A
      • give it a WaitResult::Cycle
  • thread B: initiate unwinding
  • thread B: unwind B3
  • thread B: unwind B2 see that we have recovery and execute the recovery fn, storing the result as normal
  • thread A: unwind A3
  • thread A: as before

guarantee that

  • if your query participates in a cycle
  • and it has recovery

then you will see the recovery

true no matter where the cycle started


what if A1 and A2 had recovery set?

  • execute A2 recovery and store it in the memoized map
  • but then:
    • The code for A1 had invoked the A2 function
    • we check as we pop the stack frame for A2 from the runtime
      • if the cycle flag is set; if so, initiatve unwinding again

Entity system

silly compiler:

parser(input-text) = vec[item]
item = class | function

old system

  • parse_file() -> Vec<Item>
  • process_all_items
    • let p = parse_file().len();
    • for i in 0..p { process_item(i); }
  • get_item(i)
    • parse_file
    • returns item i
  • process-item(i)
    • get_item(i)
    • parse_file()[i] very wrong

new system

entity Item {
    ... class | function ...
}
  • parser:
    • allocates items by doing Item::new(db, data)
      • returns an Item
      • but an Item is just an integer (newtype'd)
    • return vec[item]
      • returning a vector of ids
  • process_all_items
    • for item in parser() { process_item(item); }
  • process_item(item: Item)
    • item.field1(db) item.field1

graph:

[parser-query: vec[item]]
^ |
| v
[entity: Item 0]
[data: record when this field last changed] <- process_item(Item 0)

[entity: Item 1]

[entity: Item 0]

Select a repo