See the following:
PR 2159 applies an EXCLUSIVE TABLE LOCK to the db-table involved in the touch() operation. This serializes touch(), thereby guaranteeing that nobody else can be locking rows out from under us.
UPDATE : after performance investigation, PR2159 modified to use Script #4.
Several options are discussed in the associated BZ. They include
We want to measure how performant (or not) the three approaches to a fix are. The goal here is to acquire some 'median' time, to perform the update process, on a 'realistic' dataset.
NOTE: the test scripts used here are absolutely the worst-possible-case scenarios. They are deliberately designed to force deadlocks to happen as much as possible. In real-world Pulp3 use, observations suggest that reducing the number of workers to 5 eliminated deadlock-failures from workflows that were seeing "one or two" on every run.
(Yes, we put conclusions first - it's what you're looking for, right?)
Calling select-for-update on the rows we want to touch(), ordered by pulp_id, using raw-SQL, with SKIP LOCKS, gives us performance comparable to the unprotected update() call, no deadlocks, and preserves concurrency. This is Script #4 below.
Continue reading for gory technical details.
pulp file remote create --name perf --url https://fixtures.pulpproject.org/file-perf/PULP_MANIFEST --policy immediate
pulp file repository create --name perf --remote perf
pulp file repository sync --name perf
#!/bin/bash
num_workers=`sudo systemctl status pulpcore-worker* | grep "service - Pulp Worker" | wc -l`
echo "Current num-workers ${num_workers}"
if [ ${num_workers} -lt 20 ]
then
for (( i=${num_workers}+1; i<=20; i++ ))
do
echo "Starting worker ${i}"
sudo systemctl start pulpcore-worker@${i}
done
fi
$ sudo vi /var/lib/pgsql/data/postgresql.conf
max_connections = 500 # (change requires restart)
$
import _thread
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
def update_timestamp(index):
Content.objects.update(timestamp_of_interest=now())
print(">>>done")
for i in range(8):
_thread.start_new_thread( update_timestamp, (i,) )
pulpcore-manager shell
in two terminal winowsimport _thread
from pulpcore.plugin.models import Content
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
Content.objects.touch()
print(">>>done {}".format(index))
for i in range(8):
_thread.start_new_thread(update_timestamp, (i,))
import time
from pulpcore.plugin.models import Content
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
Content.objects.touch()
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for i in range(100):
update_timestamp(i,)
avg = sum(durations) / 100
print(f"Avg time : {avg}")
Avg time : 0.5344140982627869
from threading import Thread
import statistics
import time
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
Avg time : 3.9687717117247034
Median time : 4.196575880050659
StdDev : 2.008056319970726
Avg time : 6.688158090297993
Median time : 7.511222839355469
StdDev : 2.943007946559003
Avg time : 6.688158090297993
Median time : 7.511222839355469
StdDev : 2.943007946559003
import _thread
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
cursor = connection.cursor()
with transaction.atomic():
cursor.execute("LOCK TABLE core_content IN EXCLUSIVE MODE")
Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
print(">>>done {}".format(index))
for i in range(8):
_thread.start_new_thread(update_timestamp, (i,))
from threading import Thread
import statistics
import time
from django.db import transaction, connection
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
cursor = connection.cursor()
with transaction.atomic():
cursor.execute("LOCK TABLE core_content IN EXCLUSIVE MODE")
Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
cursor.close()
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
Avg time : 3.9036862729470942
Median time : 3.6761786937713623
StdDev : 2.2266992520878244
Avg time : 10.609893147945405
Median time : 10.532786250114441
StdDev : 4.209340841786183
Avg time : 11.153922910690307
Median time : 10.65884256362915
StdDev : 3.4015462449104144
import _thread
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
def update_timestamp(index):
content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
Content.objects.select_for_update().filter(
pk__in=content_q
).update(timestamp_of_interest=now())
print(">>>done")
```
from threading import Thread
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
durations = []
def update_timestamp(index):
content_q = Content.objects.filter().order_by("pk").values_list("pk", flat=True)[0:30000]
Content.objects.filter(
pk__in=content_q
).select_for_update(skip_locked=True).update(timestamp_of_interest=now())
print(">>>done")
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
```
from threading import Thread
import statistics
import time
from pulpcore.plugin.models import Content, RepositoryVersion
from django.db import transaction, connection
from django.utils.timezone import now
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
pulp_ids = ["'"+str(id)+"'" for id in content_q]
joined_string = ",".join(pulp_ids)
cursor = connection.cursor()
with transaction.atomic():
#cursor.execute('SELECT * FROM core_content ORDER BY "pulp_id" FOR UPDATE')
cursor.execute('SELECT * FROM core_content WHERE pulp_id in (' + joined_string + ') ORDER BY "pulp_id" FOR UPDATE')
Content.objects.filter().update(timestamp_of_interest=now())
cursor.close()
end = time.time()
durations.append(end-start)
print(">>>done")
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
Avg time : 0.7034803628921509
Median time : 0.6625971794128418
StdDev : 0.14323956360501708
Avg time : 11.52471222639084
Median time : 11.497084856033325
StdDev : 6.669561361683516
Avg time : 34.451250185966494
Median time : 35.91042387485504
StdDev : 11.700707926209752
Avg time : 34.48055074453354
Median time : 35.82683265209198
StdDev : 12.753260859963618
The important role of touch() is to mark a row with a "last time we looked" value, which orphan-cleanup relies on to insue it doesn't "clean up" Artifacts or Content that we are processing but have not quite gotten around to associating with a repository. In this context, it doesn't matter who touch()es a row in a given moment - just that somebody is. If someone already has a row locked, we assume that row's timestamp_of_interest is going to be roughly "now()" whether 'we' do it or ';the other thread' does it, and can safely ignore/skip updating that row. Let's see what happens if we skip locking/updating rows that someone else already holds…
NOTE : Adding 'SKIP LOCKS' to a separate 'SELECT FOR UPDATE', followed by an update(), fails to prevent deadlock. This is due to the SELECT-FOR skipping rows, but the separate update() trying to acquire them anyway. Therefore, for this example we combine the SELECT-FOR-UPDATE and the UPDATE into one SQL statement.
from threading import Thread
import statistics
import time
from pulpcore.plugin.models import Content, RepositoryVersion
from django.db import transaction, connection
from django.utils.timezone import now
durations = []
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
pulp_ids = ["'"+str(id)+"'" for id in content_q]
joined_string = ",".join(pulp_ids)
cursor = connection.cursor()
with transaction.atomic():
stmt = (
'UPDATE core_content SET timestamp_of_interest = NOW() '
' WHERE pulp_id IN ('
' SELECT pulp_id '
' FROM core_content '
' WHERE pulp_id in (' + joined_string + ') '
' ORDER BY pulp_id '
' FOR UPDATE SKIP LOCKED)'
)
cursor.execute(stmt)
cursor.close()
end = time.time()
durations.append(end-start)
print(">>>done")
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
Avg time : 0.6527105093002319
Median time : 0.6612157821655273
StdDev : 0.11319572562790735
Avg time : 1.354753210544586
Median time : 1.2656549215316772
StdDev : 0.43396279702310464
Avg time : 1.9388477754592897
Median time : 1.8810685873031616
StdDev : 0.5169462138555866
Avg time : 2.03319078207016
Median time : 1.81693696975708
StdDev : 0.6529352612394995
import _thread
import time
import random
from django.db import transaction
from pulpcore.plugin.models import Content
from django.utils.timezone import now
from django.db.utils import OperationalError
current = now()
def update_timestamp(index):
tried = 0
tries = 10
while True:
tried += 1
try:
Content.objects.filter().order_by("pk").update(timestamp_of_interest=current)
print("Thread %d done" % index)
break
except OperationalError as error:
delay = random.uniform(1, 3)
if tried >= tries:
print("Thread %d: deadlock detected give up. Tried %d" % (index, tried))
break
print("Thread %d: deadlock detected. trying %d in %f" % (index, tried, delay))
time.sleep(delay)
from threading import Thread
import random
import statistics
import time
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
from django.db.utils import OperationalError
durations = []
current = now()
def update_timestamp(index):
print(">>>in update_timedstamp index {}".format(index))
start = time.time()
tried = 0
tries = 10
while True:
tried += 1
try:
Content.objects.filter().order_by("pk").update(timestamp_of_interest=current)
print("Thread %d done" % index)
break
except OperationalError as error:
delay = random.uniform(1, 3)
if tried >= tries:
print("Thread %d: deadlock detected give up. Tried %d" % (index, tried))
break
print("Thread %d: deadlock detected. trying %d in %f" % (index, tried, delay))
time.sleep(delay)
end = time.time()
durations.append(end-start)
print(">>>done {}".format(index))
for r in range(10):
threads = []
for i in range(10):
threads.append(Thread(target=update_timestamp, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
Avg time : 16.73605344772339
Median time : 15.415827512741089
StdDev : 13.358905676352004
sudo tail -f /var/lib/pgsql/data/log/postgresql-Wed.log | grep -i deadlock
Avg time : 64.14785997629166
Median time : 67.73133504390717
StdDev : 18.53609656000721
Avg time : 66.60404789447784
Median time : 68.0915459394455
StdDev : 15.597686384535603