# touch() and deadlocks
## Problem Statement
* touch() wants to update the last-noticed-time for many rows in Content and Artifact
* postgres-update acquires a row-lock for every row being updated
* postgres-update does not guarantee ordering for such an update
* syncing multiple repositories with overlapping content can therefore cause deadlocks
* The test-scripts (and previous 'fix') "assume" that .order_by() works with update() in Django. Alas, this is not the case for Postgres - see [ordered queryset](https://docs.djangoproject.com/en/4.0/ref/models/querysets/#ordered-queryset))
See the following:
* [BZ 2021406](https://bugzilla.redhat.com/show_bug.cgi?id=2021406)
* [Issue 2157](https://github.com/pulp/pulpcore/issues/2157)
* [PR 2159](https://github.com/pulp/pulpcore/pull/2159)
## Current State
* [PR 2159](https://github.com/pulp/pulpcore/pull/2159) applies an EXCLUSIVE TABLE LOCK to the db-table involved in the touch() operation. This serializes touch(), thereby guaranteeing that nobody else can be locking rows out from under us.
* **UPDATE** : after performance investigation, PR2159 modified to use [Script #4](https://hackmd.io/eBAa4F3lRzaV65sQ7zu_yw?both#Script-v4---raw-SQL-skip-locks).
### Pros
* Simple
* Guarantees correctness
* Protects against non-touch paths as well
### Cons
* Adds a bottleneck to the system
* Delays **all** updates/inserts to the affected table, even for rows not involved in the touch()
## Other Options
Several options are discussed in the associated BZ. They include
* Forcing select-for-update prior to the touch()
* see [2021406#c16](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c16) for comments/performance anecdote from hyu
* "Optimistic" approach (ie, catch deadlock exceptions and "just" retry)
* see [2021406#c18](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c18) for comments/performance anecdote from hyu
## Performance Investigation
We want to measure how performant (or not) the three approaches to a fix are. The goal here is to acquire some 'median' time, to perform the update process, on a 'realistic' dataset.
**NOTE**: the test scripts used here are absolutely the worst-possible-case scenarios. They are deliberately designed to force deadlocks to happen as much as possible. In real-world Pulp3 use, observations suggest that reducing the number of workers to 5 eliminated deadlock-failures from workflows that were seeing "one or two" on every run.
### CONCLUSIONS
(Yes, we put conclusions first - it's what you're looking for, right?)
Calling select-for-update on the rows we want to touch(), ordered by pulp_id, using raw-SQL, with SKIP LOCKS, gives us performance comparable to the unprotected update() call, no deadlocks, and preserves concurrency. This is [Script #4](https://hackmd.io/eBAa4F3lRzaV65sQ7zu_yw?both#Script-v4---raw-SQL-skip-locks) below.
Continue reading for gory technical details.
### Setup
* Need to have a large number of entries in Content
* [file-perf](https://fixtures.pulpproject.org/file-perf/) has 20K entries
* Use this script to set up the repository:
```
pulp file remote create --name perf --url https://fixtures.pulpproject.org/file-perf/PULP_MANIFEST --policy immediate
pulp file repository create --name perf --remote perf
pulp file repository sync --name perf
```
* need to be running a lot of workers, to allow for lots of concurrency
```
#!/bin/bash
num_workers=`sudo systemctl status pulpcore-worker* | grep "service - Pulp Worker" | wc -l`
echo "Current num-workers ${num_workers}"
if [ ${num_workers} -lt 20 ]
then
for (( i=${num_workers}+1; i<=20; i++ ))
do
echo "Starting worker ${i}"
sudo systemctl start pulpcore-worker@${i}
done
fi
```
* need to allow for A LOT of postgres connections, to allow for lots of concurrency
```
$ sudo vi /var/lib/pgsql/data/postgresql.conf
max_connections = 500 # (change requires restart)
$
```
* need a script that can force the deadlock to happen
* see [2021406#c16)](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c16)
* Script from that comment:
```
import _thread
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
def update_timestamp(index):
Content.objects.update(timestamp_of_interest=now())
print(">>>done")
for i in range(8):
_thread.start_new_thread( update_timestamp, (i,) )
```
* run `pulpcore-manager shell` in two terminal winows
* Enter the script above into each
* Hit 'enter' at the same(ish) time
* Watch deadlock warnings start popping up
* Alternate script from the GH issue:
```
import _thread
from pulpcore.plugin.models import Content
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
Content.objects.touch()
print(">>>done {}".format(index))
for i in range(8):
_thread.start_new_thread(update_timestamp, (i,))
```
### Base time to do touch, no concurrency
* current core/main, just timing touch(), sequential calls, on the entire Content table with 20K File repo sync'd:
```
import time
from pulpcore.plugin.models import Content
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
Content.objects.touch()
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for i in range(100):
update_timestamp(i,)
avg = sum(durations) / 100
print(f"Avg time : {avg}")
```
* Result : ```Avg time : 0.5344140982627869```
* so ~500ms is the absolute minimum time to execute the update of ~20K rows in one update
* Script to Just Do It, ignoring deadlocks:
```
from threading import Thread
import statistics
import time
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
```
* Running in one shell, 10 runs, 10 threads@:
* Results:
* Deadlocks occasionally
```
Avg time : 3.9687717117247034
Median time : 4.196575880050659
StdDev : 2.008056319970726
```
* Running in 2 shells, 10 runs, 10 threads@:
* Results:
* Deadlocks Galore
* first shell
```
Avg time : 6.688158090297993
Median time : 7.511222839355469
StdDev : 2.943007946559003
```
* second shell
```
Avg time : 6.688158090297993
Median time : 7.511222839355469
StdDev : 2.943007946559003
```
* Thoughts
* In this instance, "no deadlocks" implies that the simultaneous updates happened to lock/release rows in non-deadalocking order, by chance
* With 10 threads, the avg time is ~10x slower than with 1, even without forcing threads to wait
### Table-lock
* This is implemented in the PR
* Script I used to test the deadlock-fix:
```
import _thread
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
cursor = connection.cursor()
with transaction.atomic():
cursor.execute("LOCK TABLE core_content IN EXCLUSIVE MODE")
Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
print(">>>done {}".format(index))
for i in range(8):
_thread.start_new_thread(update_timestamp, (i,))
```
* Script for testing timing using a table-lock
```
from threading import Thread
import statistics
import time
from django.db import transaction, connection
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
cursor = connection.cursor()
with transaction.atomic():
cursor.execute("LOCK TABLE core_content IN EXCLUSIVE MODE")
Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
cursor.close()
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
```
* Running in one shell, 10 runs, 10 threads@:
* Results:
```
Avg time : 3.9036862729470942
Median time : 3.6761786937713623
StdDev : 2.2266992520878244
```
* Running in 2 shells, 10 runs, 10 threads@:
* Results:
* first shell
```
Avg time : 10.609893147945405
Median time : 10.532786250114441
StdDev : 4.209340841786183
```
* second shell
```
Avg time : 11.153922910690307
Median time : 10.65884256362915
StdDev : 3.4015462449104144
```
* Thoughts
* With one shell, timings are similar to "do nothing"
* With two shells conflicting, timings are ~50% slower than "do nothing" - but with no deadlock problems
* **NOTE** : if you are seeing occasional "psycopg2.OperationalError: FATAL: remaining connection slots are reserved for non-replication superuser connections" - update number of allowed-connections to postgres as noted under "Setup". Still need to understand why these scripts aren't returning/closing connections in a timely fashion.
### Select-for-update
* See [2021406#c10](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c10) for Hao's experience attempting this
* See [2021406#c11](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c11) where he may explain why it didn't work
#### Script v1:
import _thread
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
def update_timestamp(index):
content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
Content.objects.select_for_update().filter(
pk__in=content_q
).update(timestamp_of_interest=now())
print(">>>done")
#### Script v2 - skip_locked
```
from threading import Thread
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
durations = []
def update_timestamp(index):
content_q = Content.objects.filter().order_by("pk").values_list("pk", flat=True)[0:30000]
Content.objects.filter(
pk__in=content_q
).select_for_update(skip_locked=True).update(timestamp_of_interest=now())
print(">>>done")
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
```
* Results
* still deadlocks.
#### Script v3 - raw SQL
from threading import Thread
import statistics
import time
from pulpcore.plugin.models import Content, RepositoryVersion
from django.db import transaction, connection
from django.utils.timezone import now
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
pulp_ids = ["'"+str(id)+"'" for id in content_q]
joined_string = ",".join(pulp_ids)
cursor = connection.cursor()
with transaction.atomic():
#cursor.execute('SELECT * FROM core_content ORDER BY "pulp_id" FOR UPDATE')
cursor.execute('SELECT * FROM core_content WHERE pulp_id in (' + joined_string + ') ORDER BY "pulp_id" FOR UPDATE')
Content.objects.filter().update(timestamp_of_interest=now())
cursor.close()
end = time.time()
durations.append(end-start)
print(">>>done")
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
* Results
* Does NOT deadlock
* Ten runs, one thread each time
```
Avg time : 0.7034803628921509
Median time : 0.6625971794128418
StdDev : 0.14323956360501708
```
* Running in one shell, 10 runs, 10 threads@:
```
Avg time : 11.52471222639084
Median time : 11.497084856033325
StdDev : 6.669561361683516
```
* Running in 2 shells, 10 runs, 10 threads@:
* first shell
```
Avg time : 34.451250185966494
Median time : 35.91042387485504
StdDev : 11.700707926209752
```
* second shell
```
Avg time : 34.48055074453354
Median time : 35.82683265209198
StdDev : 12.753260859963618
```
#### Script v4 - raw SQL, skip-locks <== We Have A Winner!
The important role of touch() is to mark a row with a "last time we looked" value, which orphan-cleanup relies on to insue it doesn't "clean up" Artifacts or Content that we are processing but have not quite gotten around to associating with a repository. In this context, it doesn't matter **who** touch()es a row in a given moment - just that **somebody** is. If someone already has a row locked, we assume that row's timestamp_of_interest is going to be roughly "now()" whether 'we' do it or ';the other thread' does it, and can safely ignore/skip updating that row. Let's see what happens if we skip locking/updating rows that someone else already holds...
**NOTE** : Adding 'SKIP LOCKS' to a separate 'SELECT FOR UPDATE', followed by an update(), fails to prevent deadlock. This is due to the SELECT-FOR skipping rows, but the separate update() trying to acquire them anyway. Therefore, for this example we combine the SELECT-FOR-UPDATE and the UPDATE into one SQL statement.
from threading import Thread
import statistics
import time
from pulpcore.plugin.models import Content, RepositoryVersion
from django.db import transaction, connection
from django.utils.timezone import now
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
pulp_ids = ["'"+str(id)+"'" for id in content_q]
joined_string = ",".join(pulp_ids)
cursor = connection.cursor()
with transaction.atomic():
stmt = (
'UPDATE core_content SET timestamp_of_interest = NOW() '
' WHERE pulp_id IN ('
' SELECT pulp_id '
' FROM core_content '
' WHERE pulp_id in (' + joined_string + ') '
' ORDER BY pulp_id '
' FOR UPDATE SKIP LOCKED)'
)
cursor.execute(stmt)
cursor.close()
end = time.time()
durations.append(end-start)
print(">>>done")
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
* Results
* Ten runs, one thread each time
```
Avg time : 0.6527105093002319
Median time : 0.6612157821655273
StdDev : 0.11319572562790735
```
* Running in one shell, 10 runs, 10 threads@
```
Avg time : 1.354753210544586
Median time : 1.2656549215316772
StdDev : 0.43396279702310464
```
* Running in 2 shells, 10 runs, 10 threads@:
* first shell
```
Avg time : 1.9388477754592897
Median time : 1.8810685873031616
StdDev : 0.5169462138555866
```
* second shell
```
Avg time : 2.03319078207016
Median time : 1.81693696975708
StdDev : 0.6529352612394995
```
#### Other discussion
* See [2021406#c16](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c16) for more attempts.
* "raw sql" select-for-update (this is Script v3)
* what about skipLocks=true? (this is Script v2)
* see [2021406#c15](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c15)
### Optimistic approach (ie, fail-and-retry)
* See 2021406#c18](https://bugzilla.redhat.com/show_bug.cgi?id=2021406#c18) for Hao's experience there.
* Script:
```
import _thread
import time
import random
from django.db import transaction
from pulpcore.plugin.models import Content
from django.utils.timezone import now
from django.db.utils import OperationalError
current = now()
def update_timestamp(index):
tried = 0
tries = 10
while True:
tried += 1
try:
Content.objects.filter().order_by("pk").update(timestamp_of_interest=current)
print("Thread %d done" % index)
break
except OperationalError as error:
delay = random.uniform(1, 3)
if tried >= tries:
print("Thread %d: deadlock detected give up. Tried %d" % (index, tried))
break
print("Thread %d: deadlock detected. trying %d in %f" % (index, tried, delay))
time.sleep(delay)
```
* Script for testing retry-on-deadlock timings
```
from threading import Thread
import random
import statistics
import time
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
from django.db.utils import OperationalError
durations = []
current = now()
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
tried = 0
tries = 10
while True:
tried += 1
try:
Content.objects.filter().order_by("pk").update(timestamp_of_interest=current)
print("Thread %d done" % index)
break
except OperationalError as error:
delay = random.uniform(1, 3)
if tried >= tries:
print("Thread %d: deadlock detected give up. Tried %d" % (index, tried))
break
print("Thread %d: deadlock detected. trying %d in %f" % (index, tried, delay))
time.sleep(delay)
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
```
* Running in one shell, 10 runs, 10 threads@:
* Results:
```
Avg time : 16.73605344772339
Median time : 15.415827512741089
StdDev : 13.358905676352004
```
* Deadlocks hit?
* `sudo tail -f /var/lib/pgsql/data/log/postgresql-Wed.log | grep -i deadlock `
* **YES** - 197(!!) times, due to retries
* saw some runs give up after 10 retries
* Running in 2 shells, 10 runs, 10 threads@:
* Results:
* first shell
```
Avg time : 64.14785997629166
Median time : 67.73133504390717
StdDev : 18.53609656000721
```
* second shell
```
Avg time : 66.60404789447784
Median time : 68.0915459394455
StdDev : 15.597686384535603
```
* Conclusions
* can still fail, even at 10 retries, in pathologically-bad cases
* logs are Awash in scary deadlock-failure messages
* median time is terrible (again, in the pathologically-bad case)