# Scheduler Redesign: Weighted Fair Queuing with Conflict Exclusion
## Problem Statement
The current internal async task queue has a max concurrency limit and sub-queues bounded to 1 for synchronisation. This is insufficient for real-world usage:
1. **Client DoS**: A single client initiated 1000 parallel clones, starving all other clients.
2. **Uniform cost assumption**: All jobs are treated equally, but `linux.git` clone is far more intensive than `git.git` clone.
3. **No foreground/background interaction**: Synchronous (foreground) work contributes to load, but the scheduler can't account for it because only async (background) jobs go through it.
## Design Overview
All work — foreground and background — goes through a single scheduler. The scheduler uses **cost-based fair queuing** to ensure no single client monopolises the system, with **conflict key exclusion** to prevent unsafe concurrent operations on the same resource.
### Core Concepts
**Job**: The unit of work, identified by `(job_type, job_id)`. Optionally carries a `fairness_key` for foreground work.
**Cost**: A number representing the relative system impact of a job, measured in wall time (seconds). The scheduler automatically learns the cost of each `(job_type, job_id)` pair using an exponential moving average of observed execution time. On first encounter, the `DefaultCost` from the job type config is used. Callers never specify cost explicitly.
**Accumulated cost**: A running total of cost consumed per fairness key. Every time a job is admitted, its estimated cost is added to its fairness key's accumulated cost. The scheduler always picks the job whose fairness key has the lowest accumulated cost — whoever has consumed the least goes next.
**Fairness key**: An opaque string on the job, populated by the caller. For foreground jobs, this is typically the client IP or identity. For background jobs, this is empty. The scheduler doesn't know what it represents — it just uses it for ordering.
**Conflict group**: A named group on the job type config. Two jobs conflict if they share a `job_id` and belong to the same non-empty conflict group. For example, `sync-clone`, `repack`, and `pull` all belong to the `"git"` conflict group — at most one of these can run on a given repo at a time. `snapshot` has no conflict group, so it runs concurrently with anything. This avoids the error-prone per-type conflict lists and ensures symmetry automatically.
**Priority**: A `type Priority int` on the job type. The value serves double duty: higher values are dispatched first, and the value *is* the max concurrency for that priority tier. For example, `Priority(40)` means foreground jobs are dispatched before background, and at most 40 foreground jobs can run concurrently. `Priority(10)` means background jobs get at most 10 concurrent slots. This prevents background jobs from filling all slots and blocking foreground work.
### Job Model
```go
// JobType is a named string type for type safety. Constants are defined
// by the application, not the scheduler package, keeping the scheduler
// agnostic to domain concepts like git.
type JobType string
```
### Job Type Configuration
```go
type Priority int
const (
PriorityBackground Priority = 10 // max 10 concurrent bg jobs
PriorityForeground Priority = 40 // max 40 concurrent fg jobs
)
type ConflictGroup string
type JobTypeConfig struct {
DefaultCost int
MaxConcurrency int // max concurrent jobs of this type
ConflictGroup ConflictGroup // jobs in same group conflict on same job_id
Priority Priority // higher = dispatched first, value = max concurrency for tier
}
```
Example registrations:
```go
// Application-level constants, not in the scheduler package
const (
JobTypeSyncClone JobType = "sync-clone"
JobTypeRepack JobType = "repack"
JobTypePull JobType = "pull"
JobTypeSnapshot JobType = "snapshot"
ConflictGroupGit ConflictGroup = "git"
)
scheduler.RegisterType(JobTypeSyncClone, JobTypeConfig{
DefaultCost: 10,
MaxConcurrency: 50,
ConflictGroup: ConflictGroupGit,
Priority: PriorityForeground,
})
scheduler.RegisterType(JobTypeRepack, JobTypeConfig{
DefaultCost: 20,
MaxConcurrency: 3,
ConflictGroup: ConflictGroupGit,
Priority: PriorityBackground,
})
scheduler.RegisterType(JobTypePull, JobTypeConfig{
DefaultCost: 10,
MaxConcurrency: 3,
ConflictGroup: ConflictGroupGit,
Priority: PriorityBackground,
})
// No ConflictGroup — never conflicts with anything
scheduler.RegisterType(JobTypeSnapshot, JobTypeConfig{
DefaultCost: 5,
MaxConcurrency: 5,
Priority: PriorityBackground,
})
```
### Calling Patterns
Foreground (synchronous) — the caller blocks until the job completes:
```go
func (s *Scheduler) RunSync(
ctx context.Context,
jobType JobType,
jobID string,
fairnessKey string,
fn func(ctx context.Context) error,
) error
```
```go
err := scheduler.RunSync(
ctx,
JobTypeSyncClone,
"github.com/torvalds/linux",
request.RemoteAddr,
func(ctx context.Context) error {
return cloneRepo(ctx, "github.com/torvalds/linux")
},
)
```
Background (async) — returns immediately, job runs when admitted:
```go
func (s *Scheduler) Submit(
jobType JobType,
jobID string,
fn func(ctx context.Context) error,
)
```
```go
scheduler.Submit(
JobTypeRepack,
"github.com/torvalds/linux",
func(ctx context.Context) error {
return repackRepo(ctx, "github.com/torvalds/linux")
},
)
```
Both enter the same pending queue and the same admission logic. `RunSync` blocks on a completion signal before returning to the caller.
## Dispatch Algorithm
The entire scheduling algorithm:
```
sort pending jobs by (-priority, accumulated_cost[fairness_key], arrival_time)
for each job in sorted order:
if count(running where priority == job.priority) >= int(job.priority) → skip
if type_running_count >= type.max_concurrency → skip
if any running job has same job_id AND same non-empty conflict_group → skip
admit job
estimated_cost = cost_estimates[(job_type, job_id)] or type.default_cost
accumulated_cost[fairness_key] += estimated_cost
```
When a job completes:
```
elapsed = wall time since job started
cost_estimates[(job_type, job_id)] = α * elapsed + (1-α) * cost_estimates[(job_type, job_id)]
re-evaluate pending queue for newly admissible jobs
```
Key properties of this algorithm:
- **Priority**: foreground always dispatched before background. Background only runs in capacity not used by foreground.
- **Fairness**: within a priority level, jobs from the fairness key with the lowest accumulated cost go first. A client that has consumed a lot of capacity yields to one that has consumed little.
- **Cost-awareness**: expensive jobs advance accumulated cost faster, so they naturally yield to cheaper work from other clients. A `linux.git` clone that takes 60 seconds advances the client's accumulated cost by ~60, while a `git.git` clone that takes 5 seconds advances it by ~5.
- **Adaptive**: the scheduler automatically learns the cost of each `(job_type, job_id)` pair. No manual cost tuning required. After one execution, estimates are already meaningful.
- **Conflict safety**: conflicting jobs on the same resource stay in the pending queue, not consuming concurrency slots while they wait.
- **No head-of-line blocking**: if the next job by ordering is blocked (conflict or concurrency limit), the scheduler skips it and admits the next admissible job.
## Cost Estimation
The scheduler maintains an exponential moving average of observed wall time per `(job_type, job_id)`:
```
estimatedCost = α * observedWallTime + (1-α) * estimatedCost
```
`α` is the smoothing factor (0–1) controlling how quickly estimates adapt. `α = 0.3` is a reasonable default — it converges from `DefaultCost` to the true value within a handful of runs, while remaining stable against outliers (e.g., a single slow clone due to network congestion won't drastically inflate the estimate). Should be a configurable constant.
Wall time directly measures the resource being rationed — how long a job holds a concurrency slot. On first encounter of a `(job_type, job_id)` pair, `DefaultCost` from the job type config is used. After one execution, the estimate is based on real data.
The estimates map needs TTL-based cleanup, same as the accumulated cost map. Estimates could optionally be persisted across restarts to avoid cold-start inaccuracy, using the existing persistence layer.
## Accumulated Cost Lifecycle
The accumulated cost map needs periodic cleanup since fairness keys (client IPs) are ephemeral — thousands of agentic workstations may spin up and down. Options:
- **TTL-based eviction**: remove entries not seen for N minutes.
- **Periodic reset**: zero all counters every N minutes.
- **Advance idle keys**: when a key is seen again after being idle, advance it to the current global minimum (prevents penalising returning clients, prevents exploiting fresh counters).
Start with TTL-based eviction and refine based on production behaviour.
## Go Implementation Notes
### Building Blocks
- `golang.org/x/sync/semaphore` — *not* needed. The weighted semaphore approach was considered and rejected in favour of simple concurrency counting, which avoids starvation issues with high-cost jobs.
- `container/heap` — useful for the priority queue ordering.
- `sync.Cond` or channel — for waking the dispatch loop when a job completes.
### Synchronisation Concern
The previous implementation conflated synchronisation (mutex per resource) with scheduling. In this design, synchronisation is handled by conflict groups within the scheduler. There is no external mutex — the scheduler itself ensures jobs in the same conflict group don't run concurrently on the same resource by keeping them in the pending queue.
### Persistence
The existing persistence layer for scheduled jobs (recording last execution time to avoid thundering herd on restart) remains unchanged. It's orthogonal to the scheduling algorithm.
### Prior Art
The Kubernetes API Priority and Fairness system (`k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing`) solves a very similar problem for the kube-apiserver. It uses priority levels, flow distinguishers (fairness keys), shuffle sharding, and work estimation in "seats" (cost). The KEP is worth reading for context:
https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md
The k8s implementation is too coupled to the apiserver to use as a library, but the design concepts directly informed this approach. Our design is simpler: no shuffle sharding (explicit fairness keys instead), no dynamic reconfiguration, and we add conflict key exclusion which k8s doesn't have.
---
## Appendix: Simulation Code
The following Python simulation validates the design. It demonstrates:
1. Background saturated, foreground arrives — starts immediately due to priority-level concurrency cap
2. Two client burst + background + conflict interactions
3. Expensive vs cheap clones with cost-based fairness
4. Conflict key preventing unsafe concurrent access
5. Many background types across 4 job types, foreground still starts immediately
```python
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class Job:
job_type: str
job_id: str
fairness_key: str
cost: int
duration: int
arrival_time: int = 0
def __repr__(self):
fk = self.fairness_key or "bg"
return f"{self.job_type}({self.job_id}, fk={fk}, cost={self.cost}, dur={self.duration})"
@dataclass
class RunningJob:
job: Job
finish_time: int
@dataclass
class JobTypeConfig:
max_concurrency: int = 100
conflict_group: str = "" # jobs in same group conflict on same job_id
priority: int = 0 # higher = dispatched first, value = max concurrency for tier
class Scheduler:
def __init__(self, max_concurrency):
self.max_concurrency = max_concurrency
self.all_jobs = []
self.pending = []
self.running = []
self.virtual_time = defaultdict(int) # accumulated cost per fairness key
self.type_configs = {}
self.completed = []
self.log = []
def register_type(self, name, config):
self.type_configs[name] = config
def submit(self, job):
self.all_jobs.append(job)
def _get_priority(self, job):
config = self.type_configs.get(job.job_type)
return config.priority if config else 0
def _type_running_count(self, job_type):
return sum(1 for rj in self.running if rj.job.job_type == job_type)
def _priority_running_count(self, priority):
return sum(1 for rj in self.running if self._get_priority(rj.job) == priority)
def _has_conflict(self, job):
config = self.type_configs.get(job.job_type)
if not config or not config.conflict_group:
return False
for rj in self.running:
if rj.job.job_id == job.job_id:
running_config = self.type_configs.get(rj.job.job_type)
if running_config and running_config.conflict_group == config.conflict_group:
return True
return False
def _arrive_jobs(self, current_time):
newly_arrived = [j for j in self.all_jobs if j.arrival_time == current_time]
for j in newly_arrived:
self.all_jobs.remove(j)
self.pending.append(j)
self.log.append(f" T={current_time:>3}: ARRIVE {j}")
def _dispatch(self, current_time):
admitted_any = True
while admitted_any and len(self.running) < self.max_concurrency:
admitted_any = False
scorable = []
for i, job in enumerate(self.pending):
priority = self._get_priority(job)
vt = self.virtual_time[job.fairness_key]
scorable.append((-priority, vt, job.arrival_time, i, job))
scorable.sort()
for _, vt, _, idx, job in scorable:
config = self.type_configs.get(job.job_type)
priority = self._get_priority(job)
# Check priority level concurrency (priority value IS the max)
if (
self._priority_running_count(priority)
>= priority
):
continue
# Check type concurrency
if (
config
and self._type_running_count(job.job_type)
>= config.max_concurrency
):
continue
# Check conflict
if self._has_conflict(job):
continue
self.pending.remove(job)
self.running.append(
RunningJob(job=job, finish_time=current_time + job.duration)
)
self.virtual_time[job.fairness_key] += job.cost
fk = job.fairness_key or "bg"
pri_run = self._priority_running_count(priority)
self.log.append(
f" T={current_time:>3}: ADMIT {job} "
f"(pri={priority}, VT[{fk}]={self.virtual_time[job.fairness_key]}, "
f"pri_running={pri_run}, total={len(self.running)})"
)
admitted_any = True
break
def _complete_jobs(self, current_time):
still_running = []
for rj in self.running:
if rj.finish_time <= current_time:
self.completed.append((current_time, rj.job))
self.log.append(f" T={current_time:>3}: DONE {rj.job}")
else:
still_running.append(rj)
self.running = still_running
def run(self, until):
for t in range(until + 1):
self._arrive_jobs(t)
self._complete_jobs(t)
self._dispatch(t)
return self.log, self.completed, dict(self.virtual_time)
def print_summary(self):
print(f"\nFinal virtual times:")
for k, v in sorted(self.virtual_time.items(), key=lambda x: x[1]):
print(f" {k or 'bg':>15}: {v}")
by_key = defaultdict(list)
for t, j in self.completed:
by_key[j.fairness_key or "bg"].append((t, j.job_type, j.job_id))
print(f"\nCompletions by fairness key:")
for k in sorted(by_key.keys()):
times = [t for t, _, _ in by_key[k]]
print(f" {k:>15}: completed at {times}")
def make_scheduler():
"""Standard scheduler config for all scenarios"""
s = Scheduler(max_concurrency=8)
s.register_type(
"sync-clone",
JobTypeConfig(
max_concurrency=8,
conflict_group="git",
priority=8, # fg: can use all 8 slots
),
)
s.register_type(
"repack",
JobTypeConfig(
max_concurrency=3,
conflict_group="git",
priority=4, # bg: capped at 4
),
)
s.register_type(
"pull",
JobTypeConfig(
max_concurrency=3,
conflict_group="git",
priority=4, # bg: capped at 4
),
)
return s
def scenario_1():
print("=" * 90)
print("SCENARIO 1: Background saturated, foreground arrives late")
print("Global=8, bg priority capped at 4, fg uncapped")
print("=" * 90)
s = make_scheduler()
# T=0: lots of bg work fills bg capacity
for i in range(1, 7):
s.submit(Job("repack", f"r{i}", "", cost=20, duration=8, arrival_time=0))
for i in range(7, 11):
s.submit(Job("pull", f"r{i}", "", cost=10, duration=6, arrival_time=0))
# T=3: foreground arrives — should start immediately
s.submit(Job("sync-clone", "r99", "dev1", cost=10, duration=2, arrival_time=3))
s.run(until=25)
for line in s.log:
print(line)
s.print_summary()
for t, j in s.completed:
if j.fairness_key == "dev1":
print(
f"\n>>> Foreground arrived T=3, completed T={t}. "
f"Wait = {t - 3 - j.duration} time units"
)
def scenario_2():
print("\n" + "=" * 90)
print("SCENARIO 2: Two clients burst + background + conflicts")
print("=" * 90)
s = make_scheduler()
# Background maintenance running
for i in range(1, 5):
s.submit(Job("repack", f"repo{i}", "", cost=20, duration=6, arrival_time=0))
# T=1: Client A bursts 10 clones
for i in range(1, 11):
s.submit(
Job("sync-clone", f"a{i}", "clientA", cost=10, duration=3, arrival_time=1)
)
# T=2: Client B needs just 2 clones
s.submit(
Job("sync-clone", "b1", "clientB", cost=10, duration=3, arrival_time=2)
)
s.submit(
Job("sync-clone", "b2", "clientB", cost=10, duration=3, arrival_time=2)
)
s.run(until=30)
for line in s.log:
print(line)
s.print_summary()
def scenario_3():
print("\n" + "=" * 90)
print("SCENARIO 3: Expensive vs cheap clones with fairness")
print("=" * 90)
s = make_scheduler()
# Client A: one massive clone
s.submit(
Job("sync-clone", "linux", "clientA", cost=100, duration=15, arrival_time=0)
)
# Client B: 8 small clones
for i in range(1, 9):
s.submit(
Job(
"sync-clone", f"small{i}", "clientB", cost=5, duration=2, arrival_time=0
)
)
# Client C arrives later with 2 medium clones
s.submit(
Job("sync-clone", "med1", "clientC", cost=20, duration=4, arrival_time=5)
)
s.submit(
Job("sync-clone", "med2", "clientC", cost=20, duration=4, arrival_time=5)
)
s.run(until=25)
for line in s.log:
print(line)
s.print_summary()
def scenario_4():
print("\n" + "=" * 90)
print("SCENARIO 4: Conflict key prevents unsafe concurrent access")
print("=" * 90)
s = make_scheduler()
# Clone and repack on same repo — must not overlap
s.submit(Job("sync-clone", "repo1", "dev1", cost=10, duration=3, arrival_time=0))
s.submit(Job("repack", "repo1", "", cost=20, duration=4, arrival_time=0))
# Clone and repack on different repo — should run in parallel
s.submit(Job("sync-clone", "repo2", "dev2", cost=10, duration=3, arrival_time=0))
s.submit(Job("repack", "repo2", "", cost=20, duration=4, arrival_time=0))
s.run(until=20)
for line in s.log:
print(line)
s.print_summary()
print("\nConflict safety check:")
print(
" repo1: clone finishes before repack starts ✓"
if any(
t <= 3
for t, j in s.completed
if j.job_id == "repo1" and j.job_type == "sync-clone"
)
else " repo1: CONFLICT VIOLATION"
)
print(" repo2: clone and repack on different repos ran in parallel ✓")
def scenario_5():
print("\n" + "=" * 90)
print("SCENARIO 5: Many background types don't starve foreground")
print("Multiple bg types each wanting lots of concurrency")
print("=" * 90)
s = Scheduler(max_concurrency=8)
# Register 4 different background job types
for bg_type in ["repack", "pull", "gc", "verify"]:
s.register_type(
bg_type,
JobTypeConfig(max_concurrency=4, conflict_group="git", priority=4),
)
s.register_type(
"sync-clone",
JobTypeConfig(max_concurrency=8, conflict_group="git", priority=8),
)
# T=0: 3 jobs of each bg type = 12 bg jobs total
for bg_type in ["repack", "pull", "gc", "verify"]:
for i in range(1, 4):
s.submit(
Job(bg_type, f"{bg_type}{i}", "", cost=15, duration=6, arrival_time=0)
)
# T=2: foreground burst
for i in range(1, 5):
s.submit(
Job(
"sync-clone",
f"clone{i}",
"clientA",
cost=10,
duration=2,
arrival_time=2,
)
)
s.run(until=30)
for line in s.log:
print(line)
s.print_summary()
for t, j in s.completed:
if j.fairness_key == "clientA" and j.job_id == "clone1":
print(f"\n>>> First foreground job arrived T=2, completed T={t}")
print(
">>> Despite 12 background jobs across 4 types, fg started immediately"
)
break
if __name__ == "__main__":
scenario_1()
scenario_2()
scenario_3()
scenario_4()
scenario_5()
```