Try   HackMD

touch() and deadlocks

Problem Statement

  • touch() wants to update the last-noticed-time for many rows in Content and Artifact
  • postgres-update acquires a row-lock for every row being updated
  • postgres-update does not guarantee ordering for such an update
  • syncing multiple repositories with overlapping content can therefore cause deadlocks
  • The test-scripts (and previous 'fix') "assume" that .order_by() works with update() in Django. Alas, this is not the case for Postgres - see ordered queryset)

See the following:

Current State

  • PR 2159 applies an EXCLUSIVE TABLE LOCK to the db-table involved in the touch() operation. This serializes touch(), thereby guaranteeing that nobody else can be locking rows out from under us.

  • UPDATE : after performance investigation, PR2159 modified to use Script #4.

Pros

  • Simple
  • Guarantees correctness
  • Protects against non-touch paths as well

Cons

  • Adds a bottleneck to the system
  • Delays all updates/inserts to the affected table, even for rows not involved in the touch()

Other Options

Several options are discussed in the associated BZ. They include

  • Forcing select-for-update prior to the touch()
    • see 2021406#c16 for comments/performance anecdote from hyu
  • "Optimistic" approach (ie, catch deadlock exceptions and "just" retry)
    • see 2021406#c18 for comments/performance anecdote from hyu

Performance Investigation

We want to measure how performant (or not) the three approaches to a fix are. The goal here is to acquire some 'median' time, to perform the update process, on a 'realistic' dataset.

NOTE: the test scripts used here are absolutely the worst-possible-case scenarios. They are deliberately designed to force deadlocks to happen as much as possible. In real-world Pulp3 use, observations suggest that reducing the number of workers to 5 eliminated deadlock-failures from workflows that were seeing "one or two" on every run.

CONCLUSIONS

(Yes, we put conclusions first - it's what you're looking for, right?)

Calling select-for-update on the rows we want to touch(), ordered by pulp_id, using raw-SQL, with SKIP LOCKS, gives us performance comparable to the unprotected update() call, no deadlocks, and preserves concurrency. This is Script #4 below.

Continue reading for gory technical details.

Setup

  • Need to have a large number of entries in Content
    • file-perf has 20K entries
    • Use this script to set up the repository:
    ​​​​pulp file remote create --name perf --url https://fixtures.pulpproject.org/file-perf/PULP_MANIFEST --policy immediate
    ​​​​pulp file repository create --name perf --remote perf
    ​​​​pulp file repository sync --name perf
    
  • need to be running a lot of workers, to allow for lots of concurrency
#!/bin/bash
num_workers=`sudo systemctl status pulpcore-worker* | grep "service - Pulp Worker" | wc -l`
echo "Current num-workers ${num_workers}"
if [ ${num_workers} -lt 20 ]
then
    for (( i=${num_workers}+1; i<=20; i++ ))
    do
        echo "Starting worker ${i}"
        sudo systemctl start pulpcore-worker@${i}
    done
fi
  • need to allow for A LOT of postgres connections, to allow for lots of concurrency
$ sudo vi /var/lib/pgsql/data/postgresql.conf
max_connections = 500                   # (change requires restart)
$ 
  • need a script that can force the deadlock to happen
    ​​​​import _thread
    ​​​​from django.db import transaction
    ​​​​from pulpcore.plugin.models import Content, RepositoryVersion
    ​​​​from django.utils.timezone import now
    ​​​​def update_timestamp(index):
    ​​​​    Content.objects.update(timestamp_of_interest=now())
    ​​​​    print(">>>done")
    
    ​​​​for i in range(8):
    ​​​​    _thread.start_new_thread( update_timestamp, (i,) )
    
    • run pulpcore-manager shell in two terminal winows
    • Enter the script above into each
    • Hit 'enter' at the same(ish) time
    • Watch deadlock warnings start popping up
    • Alternate script from the GH issue:
    ​​​​import _thread
    ​​​​from pulpcore.plugin.models import Content
    ​​​​def update_timestamp(index):
    ​​​​   print(">>>in update_timedstamp index {}".format(index))
    ​​​​   Content.objects.touch()
    ​​​​   print(">>>done {}".format(index))
    
    ​​​​for i in range(8):
    ​​​​   _thread.start_new_thread(update_timestamp, (i,))
    

Base time to do touch, no concurrency

  • current core/main, just timing touch(), sequential calls, on the entire Content table with 20K File repo sync'd:
    ​​​​import time
    ​​​​from pulpcore.plugin.models import Content
    ​​​​durations = []
    ​​​​def update_timestamp(index):
    ​​​​   print(">>>in update_timedstamp index {}".format(index))
    ​​​​   start = time.time()
    ​​​​   Content.objects.touch()
    ​​​​   end = time.time()
    ​​​​   durations.append(end-start)
    ​​​​   print(">>>done {}".format(index))
    
    ​​​​for i in range(100):
    ​​​​   update_timestamp(i,)
    ​​​​avg = sum(durations) / 100
    ​​​​print(f"Avg time : {avg}")
    
  • Result : Avg time : 0.5344140982627869
    • so ~500ms is the absolute minimum time to execute the update of ~20K rows in one update
  • Script to Just Do It, ignoring deadlocks:
    ​​​​from threading import Thread
    ​​​​import statistics
    ​​​​import time
    ​​​​from django.db import transaction
    ​​​​from pulpcore.plugin.models import Content, RepositoryVersion
    ​​​​from django.utils.timezone import now
    ​​​​from django.db import connection
    ​​​​durations = []
    ​​​​def update_timestamp(index):
    ​​​​   print(">>>in update_timedstamp index {}".format(index))
    ​​​​   start = time.time()
    ​​​​   Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
    ​​​​   end = time.time()
    ​​​​   durations.append(end-start)
    ​​​​   print(">>>done {}".format(index))
    
    ​​​​for r in range(10):
    ​​​​  threads = []
    ​​​​  for i in range(10):
    ​​​​    threads.append(Thread(target=update_timestamp, args=(i,)))
    ​​​​  for t in threads:
    ​​​​    t.start()
    ​​​​  for t in threads:
    ​​​​    t.join()
    ​​​​print("Avg time : {}".format(sum(durations) / len(durations)))
    ​​​​print("Median time : {}".format(statistics.median(durations)))
    ​​​​print("StdDev : {}".format(statistics.stdev(durations)))
    
  • Running in one shell, 10 runs, 10 threads@:
    • Results:
      • Deadlocks occasionally
      ​​​​​​Avg time : 3.9687717117247034
      ​​​​​​Median time : 4.196575880050659
      ​​​​​​StdDev : 2.008056319970726
      
  • Running in 2 shells, 10 runs, 10 threads@:
    • Results:
      • Deadlocks Galore
      • first shell
      ​​​​​​Avg time : 6.688158090297993
      ​​​​​​Median time : 7.511222839355469
      ​​​​​​StdDev : 2.943007946559003
      
      • second shell
      ​​​​​​Avg time : 6.688158090297993
      ​​​​​​Median time : 7.511222839355469
      ​​​​​​StdDev : 2.943007946559003
      
  • Thoughts
    • In this instance, "no deadlocks" implies that the simultaneous updates happened to lock/release rows in non-deadalocking order, by chance
    • With 10 threads, the avg time is ~10x slower than with 1, even without forcing threads to wait

Table-lock

  • This is implemented in the PR
  • Script I used to test the deadlock-fix:
import _thread
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
def update_timestamp(index):
   print(">>>in update_timedstamp index {}".format(index))
   cursor = connection.cursor()
   with transaction.atomic():
     cursor.execute("LOCK TABLE core_content IN EXCLUSIVE MODE")
     Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
   print(">>>done {}".format(index))
for i in range(8):
   _thread.start_new_thread(update_timestamp, (i,))
  • Script for testing timing using a table-lock
from threading import Thread
import statistics
import time
from django.db import transaction, connection
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
durations = []
def update_timestamp(index):
   print(">>>in update_timedstamp index {}".format(index))
   start = time.time()
   cursor = connection.cursor()
   with transaction.atomic():
     cursor.execute("LOCK TABLE core_content IN EXCLUSIVE MODE")
     Content.objects.filter().order_by("pulp_id").update(timestamp_of_interest=now())
   cursor.close()
   end = time.time()
   durations.append(end-start)
   print(">>>done {}".format(index))

for r in range(10):
  threads = []
  for i in range(10):
    threads.append(Thread(target=update_timestamp, args=(i,)))
  for t in threads:
    t.start()
  for t in threads:
    t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))

  • Running in one shell, 10 runs, 10 threads@:
    • Results:
    ​​​​Avg time : 3.9036862729470942
    ​​​​Median time : 3.6761786937713623
    ​​​​StdDev : 2.2266992520878244
    
  • Running in 2 shells, 10 runs, 10 threads@:
    • Results:
      • first shell
      ​​​​​​Avg time : 10.609893147945405
      ​​​​​​Median time : 10.532786250114441
      ​​​​​​StdDev : 4.209340841786183
      
      • second shell
      ​​​​​​Avg time : 11.153922910690307
      ​​​​​​Median time : 10.65884256362915
      ​​​​​​StdDev : 3.4015462449104144
      
  • Thoughts
    • With one shell, timings are similar to "do nothing"
    • With two shells conflicting, timings are ~50% slower than "do nothing" - but with no deadlock problems
  • NOTE : if you are seeing occasional "psycopg2.OperationalError: FATAL: remaining connection slots are reserved for non-replication superuser connections" - update number of allowed-connections to postgres as noted under "Setup". Still need to understand why these scripts aren't returning/closing connections in a timely fashion.

Select-for-update

  • See 2021406#c10 for Hao's experience attempting this
  • See 2021406#c11 where he may explain why it didn't work

Script v1:

​​​​import _thread
​​​​from django.db import transaction
​​​​from pulpcore.plugin.models import Content, RepositoryVersion
​​​​from django.utils.timezone import now
​​​​def update_timestamp(index):
​​​​  content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
​​​​  Content.objects.select_for_update().filter(
​​​​    pk__in=content_q
​​​​  ).update(timestamp_of_interest=now())
​​​​  print(">>>done")

Script v2 - skip_locked

​​​​```
​​​​from threading import Thread
​​​​from pulpcore.plugin.models import Content, RepositoryVersion
​​​​from django.utils.timezone import now
​​​​durations = []

​​​​def update_timestamp(index):
​​​​  content_q = Content.objects.filter().order_by("pk").values_list("pk", flat=True)[0:30000]
​​​​  Content.objects.filter(
​​​​    pk__in=content_q
​​​​  ).select_for_update(skip_locked=True).update(timestamp_of_interest=now())
​​​​  print(">>>done")

​​​​for r in range(10):
​​​​  threads = []
​​​​  for i in range(10):
​​​​    threads.append(Thread(target=update_timestamp, args=(i,)))
​​​​  for t in threads:
​​​​    t.start()
​​​​  for t in threads:
​​​​    t.join()
​​​​```
  • Results
    • still deadlocks.

Script v3 - raw SQL

​​​​from threading import Thread
​​​​import statistics
​​​​import time
​​​​from pulpcore.plugin.models import Content, RepositoryVersion
​​​​from django.db import transaction, connection
​​​​from django.utils.timezone import now
​​​​durations = []

​​​​def update_timestamp(index):
​​​​   print(">>>in update_timedstamp index {}".format(index))
​​​​   start = time.time()
​​​​   content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
​​​​   pulp_ids = ["'"+str(id)+"'" for id in content_q]
​​​​   joined_string = ",".join(pulp_ids)
​​​​   cursor = connection.cursor()
​​​​   with transaction.atomic():
​​​​      #cursor.execute('SELECT * FROM core_content ORDER BY "pulp_id" FOR UPDATE')
​​​​      cursor.execute('SELECT * FROM core_content WHERE pulp_id in (' + joined_string + ') ORDER BY "pulp_id" FOR UPDATE')

​​​​      Content.objects.filter().update(timestamp_of_interest=now())
​​​​   cursor.close()
​​​​   end = time.time()
​​​​   durations.append(end-start)
​​​​   print(">>>done")

​​​​for r in range(10):
​​​​  threads = []
​​​​  for i in range(10):
​​​​    threads.append(Thread(target=update_timestamp, args=(i,)))
​​​​  for t in threads:
​​​​    t.start()
​​​​  for t in threads:
​​​​    t.join()

​​​​print("Avg time : {}".format(sum(durations) / len(durations)))
​​​​print("Median time : {}".format(statistics.median(durations)))
​​​​print("StdDev : {}".format(statistics.stdev(durations)))
  • Results
    • Does NOT deadlock
    • Ten runs, one thread each time
      ​​​​​​​​Avg time : 0.7034803628921509
      ​​​​​​​​Median time : 0.6625971794128418
      ​​​​​​​​StdDev : 0.14323956360501708
      
    • Running in one shell, 10 runs, 10 threads@:
      ​​​​​​​​Avg time : 11.52471222639084
      ​​​​​​​​Median time : 11.497084856033325
      ​​​​​​​​StdDev : 6.669561361683516
      
    • Running in 2 shells, 10 runs, 10 threads@:
      • first shell
      ​​​​​​Avg time : 34.451250185966494
      ​​​​​​Median time : 35.91042387485504
      ​​​​​​StdDev : 11.700707926209752
      
      • second shell
      ​​​​​​Avg time : 34.48055074453354
      ​​​​​​Median time : 35.82683265209198
      ​​​​​​StdDev : 12.753260859963618
      

Script v4 - raw SQL, skip-locks <== We Have A Winner!

The important role of touch() is to mark a row with a "last time we looked" value, which orphan-cleanup relies on to insue it doesn't "clean up" Artifacts or Content that we are processing but have not quite gotten around to associating with a repository. In this context, it doesn't matter who touch()es a row in a given moment - just that somebody is. If someone already has a row locked, we assume that row's timestamp_of_interest is going to be roughly "now()" whether 'we' do it or ';the other thread' does it, and can safely ignore/skip updating that row. Let's see what happens if we skip locking/updating rows that someone else already holds

NOTE : Adding 'SKIP LOCKS' to a separate 'SELECT FOR UPDATE', followed by an update(), fails to prevent deadlock. This is due to the SELECT-FOR skipping rows, but the separate update() trying to acquire them anyway. Therefore, for this example we combine the SELECT-FOR-UPDATE and the UPDATE into one SQL statement.

​​​​from threading import Thread
​​​​import statistics
​​​​import time
​​​​from pulpcore.plugin.models import Content, RepositoryVersion
​​​​from django.db import transaction, connection
​​​​from django.utils.timezone import now
​​​​durations = []

​​​​def update_timestamp(index):
​​​​   print(">>>in update_timedstamp index {}".format(index))
​​​​   start = time.time()
​​​​   content_q = Content.objects.filter().values_list("pk", flat=True)[0:30000]
​​​​   pulp_ids = ["'"+str(id)+"'" for id in content_q]
​​​​   joined_string = ",".join(pulp_ids)
​​​​   cursor = connection.cursor()
​​​​   with transaction.atomic():
​​​​        stmt = (
​​​​        'UPDATE core_content SET timestamp_of_interest = NOW() '
​​​​        ' WHERE pulp_id IN ('
​​​​        '    SELECT pulp_id '
​​​​        '      FROM core_content '
​​​​        '     WHERE pulp_id in (' + joined_string + ') '
​​​​        '     ORDER BY pulp_id '
​​​​        '   FOR UPDATE SKIP LOCKED)'
​​​​        )
​​​​        cursor.execute(stmt)
​​​​   cursor.close()
​​​​   end = time.time()
​​​​   durations.append(end-start)
​​​​   print(">>>done")

​​​​for r in range(10):
​​​​  threads = []
​​​​  for i in range(10):
​​​​    threads.append(Thread(target=update_timestamp, args=(i,)))
​​​​  for t in threads:
​​​​    t.start()
​​​​  for t in threads:
​​​​    t.join()

​​​​print("Avg time : {}".format(sum(durations) / len(durations)))
​​​​print("Median time : {}".format(statistics.median(durations)))
​​​​print("StdDev : {}".format(statistics.stdev(durations)))
  • Results
    • Ten runs, one thread each time
      ​​​​​​​​Avg time : 0.6527105093002319
      ​​​​​​​​Median time : 0.6612157821655273
      ​​​​​​​​StdDev : 0.11319572562790735
      
    • Running in one shell, 10 runs, 10 threads@
      ​​​​​​​​Avg time : 1.354753210544586
      ​​​​​​​​Median time : 1.2656549215316772
      ​​​​​​​​StdDev : 0.43396279702310464
      
    • Running in 2 shells, 10 runs, 10 threads@:
      • first shell
      ​​​​​​Avg time : 1.9388477754592897
      ​​​​​​Median time : 1.8810685873031616
      ​​​​​​StdDev : 0.5169462138555866
      
      • second shell
      ​​​​​​Avg time : 2.03319078207016
      ​​​​​​Median time : 1.81693696975708
      ​​​​​​StdDev : 0.6529352612394995
      

Other discussion

  • See 2021406#c16 for more attempts.
    • "raw sql" select-for-update (this is Script v3)
    • what about skipLocks=true? (this is Script v2)

Optimistic approach (ie, fail-and-retry)

import _thread
import time
import random
from django.db import transaction
from pulpcore.plugin.models import Content
from django.utils.timezone import now
from django.db.utils import OperationalError
current = now()
def update_timestamp(index):
    tried = 0
    tries = 10
    while True:
        tried += 1
        try:
            Content.objects.filter().order_by("pk").update(timestamp_of_interest=current)
            print("Thread %d done" % index)
            break
        except OperationalError as error:
            delay = random.uniform(1, 3)
            if tried >= tries:
                 print("Thread %d: deadlock detected give up. Tried %d" % (index, tried))
                 break
            print("Thread %d: deadlock detected. trying %d in %f" % (index, tried, delay))
            time.sleep(delay)
  • Script for testing retry-on-deadlock timings
from threading import Thread
import random
import statistics
import time
from django.db import transaction
from pulpcore.plugin.models import Content, RepositoryVersion
from django.utils.timezone import now
from django.db import connection
from django.db.utils import OperationalError

durations = []
current = now()
def update_timestamp(index):
    print(">>>in update_timedstamp index {}".format(index))
    start = time.time()
    tried = 0
    tries = 10
    while True:
        tried += 1
        try:
            Content.objects.filter().order_by("pk").update(timestamp_of_interest=current)
            print("Thread %d done" % index)
            break
        except OperationalError as error:
            delay = random.uniform(1, 3)
            if tried >= tries:
                 print("Thread %d: deadlock detected give up. Tried %d" % (index, tried))
                 break
            print("Thread %d: deadlock detected. trying %d in %f" % (index, tried, delay))
            time.sleep(delay)
    end = time.time()
    durations.append(end-start)
    print(">>>done {}".format(index))

for r in range(10):
  threads = []
  for i in range(10):
    threads.append(Thread(target=update_timestamp, args=(i,)))
  for t in threads:
    t.start()
  for t in threads:
    t.join()
print("Avg time : {}".format(sum(durations) / len(durations)))
print("Median time : {}".format(statistics.median(durations)))
print("StdDev : {}".format(statistics.stdev(durations)))
  • Running in one shell, 10 runs, 10 threads@:
    • Results:
    ​​​​Avg time : 16.73605344772339
    ​​​​Median time : 15.415827512741089
    ​​​​StdDev : 13.358905676352004
    
    • Deadlocks hit?
      • sudo tail -f /var/lib/pgsql/data/log/postgresql-Wed.log | grep -i deadlock
      • YES - 197(!!) times, due to retries
        • saw some runs give up after 10 retries
  • Running in 2 shells, 10 runs, 10 threads@:
    • Results:
      • first shell
      ​​​​​​Avg time : 64.14785997629166
      ​​​​​​Median time : 67.73133504390717
      ​​​​​​StdDev : 18.53609656000721
      
      • second shell
      ​​​​​​Avg time : 66.60404789447784
      ​​​​​​Median time : 68.0915459394455
      ​​​​​​StdDev : 15.597686384535603
      
  • Conclusions
    • can still fail, even at 10 retries, in pathologically-bad cases
    • logs are Awash in scary deadlock-failure messages
    • median time is terrible (again, in the pathologically-bad case)