Effective Python 2E

Chapter 7: Concurrency and Parallelism

subprocess and threading (Items 5257)

David Ye @ Houzz


Outline


When Concurrency is Necessary

Concurrency (並行):

  • enable many distinct paths of execution.
  • do many different things seemingly at the same time.

Parallelism (平行):

  • actually doing many different things at the same time.

Item 56: Know How to Recognize When Concurrency Is Necessary

  • do the I/O in parallel
def game_logic(state, neighbors): ... # Do some blocking input/output in here: data = my_socket.recv(100) ...
  • Fan-out: start concurrent units
  • Fan-in: wait for concurrent units of work to finish

subprocess

Item 52: Use subprocess to Manage Child Processes

PEP-324

  • provide higher level api for os.system etc.
  • manage input / output stream

subprocess.run: run and wait

res = subprocess.run( ['echo', 'Hello from the child process'], capture_output=True, ) # raise subprocess.CalledProcessError when return code not 0 res.check_returncode() print(res.stdout) # Hello from the child process
subprocess.run(["test", "-d", "someFolder"], check=True) # CalledProcessError: Command '['test', '-d', 'someFolder']' returned non-zero exit status 1.

  • subprocess.Popen will not wait
  • Popen.poll()
    • terminated: return "return code"
    • otherwise: return None
proc = subprocess.Popen(['sleep', '1']) # return a Popen object while proc.poll() is None: print('Working...') print('Exit status', proc.poll()) # Working... # Working... # Working... # Exit status 0

Popen.communicate wait and get output

proc = subprocess.Popen( ..., stdout=subprocess.PIPE, # pipe to the standard stream should be opened ) try: outs, errs = proc.communicate(timeout=15) except TimeoutExpired: proc.kill() outs, errs = proc.communicate()

manage streams

def run_encrypt(data): env = os.environ.copy() env['password'] = 'zf7ShyBhZOraQDdE/FiZpm/m/8f9X+M1' proc = subprocess.Popen( ['openssl', 'enc', '-des3', '-pass', 'env:password'], env=env, stdin=subprocess.PIPE, # pipe to the standard stream should be opened stdout=subprocess.PIPE ) proc.stdin.write(data) # add some data to stdin proc.stdin.flush() # Ensure that the child gets input return proc

Chaining Parallel Processes

pipe one stdout to another stdin

def run_hash(input_stdin): return subprocess.Popen( ['openssl', 'dgst', '-whirlpool', '-binary'], stdin=input_stdin, stdout=subprocess.PIPE )
encrypt_procs = [] hash_procs = [] for _ in range(3): data = os.urandom(100) encrypt_proc = run_encrypt(data) encrypt_procs.append(encrypt_proc) # pipe hash_proc = run_hash(encrypt_proc.stdout) hash_procs.append(hash_proc) # Allow encrypt_proc to receive a SIGPIPE if hash_proc exits. encrypt_proc.stdout.close() encrypt_proc.stdout = None # ?
# fan in for proc in encrypt_procs: proc.communicate() for proc in hash_procs: out, _ = proc.communicate() print(out[-10:])

Timeout Subprocess

proc = subprocess.Popen(['sleep', '10']) try: proc.communicate(timeout=0.1) # raise TimeoutExpired after 0.1 sec except subprocess.TimeoutExpired: proc.terminate() # send SIGTERM to child process proc.wait() # wait to the end and get return code >>> -15 # killed

threading

In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.

One thread runs Python, while N others sleep or await I/O


Item 53: Use Threads for Blocking I/O, Avoid for Parallelism

from threading import Thread threads = [] # fan out 5 threads to wait for I/O for _ in range(5): thread = Thread(target=slow_systemcall) thread.start() threads.append(thread) for thread in threads: # fan in thread.join()

Item 54: Use Lock to Prevent Data Races in Threads

GIL is thread-safe on bytecodes, not Python data structure.

+= is not atomic

counter.count += 1
value = getattr(counter, 'count') result = value + 1 setattr(counter, 'count', result)

from threading import Lock class LockedCounter: def __init__(self): self.lock = Lock() # a lock instance for each instance self.count = 0 def increment(self, offset): with self.lock(): # mutex lock, make the block atomic self.count += offset
counter = LockedCounter() def worker(sensor_index, how_many, counter): for _ in range(how_many): ... # read from sensor counter.increment(1) threads = [] # fan out 5 threads to wait for I/O for i in range(5): thread = Thread(target=worker, args=(i, how_many, counter)) threads.append(thread) thread.start() for thread in threads: # fan in thread.join()

Item 55: Use Queue to Coordinate Work Between Threads

I/O blocked pipeline, e.g. download -> resize -> upload

a Queue can:

  • blocking operations
  • buffer sizes
  • joining

from queue import Queue queue = Queue(1) # buffer size is 1 def consumer(): print('consumer waiting') queue.get() # block, wait until some data are put print('consumer get 1') queue.get() print('consumer get 2') thread = Thread(target=consumer) thread.start() print('producer putting') queue.put(object()) print('producer put 1') queue.put(object()) print('producer put 2') thread.join()
consumer waiting
producer putting
producer put 1
consumer get 1
producer put 2
consumer get 2

  • Queue.task_done() mark last get done
  • Queue.join() wait for all put done
task_count = 2 def consumer(): print('consumer waiting') for i in range(task_count): queue.get() print('consumer working', i + 1) print('consumer done', i + 1) queue.task_done() # mark this task done thread = Thread(target=consumer) thread.start() print('producer putting') for i in range(task_count): queue.put(object()) print('producer put', i + 1) queue.join() # wait for all task done print('producer done') thread.join()
consumer waiting
producer putting
producer put 1
consumer working 1
consumer done 1
consumer working 2
consumer done 2
producer put 2
producer done

Task Pipeline

An iterable queue

class ClosableQueue(Queue): SENTINEL = object() # arbitrary mark def close(self): self.put(self.SENTINEL) def __iter__(self): while True: item = self.get() try: if item is self.SENTINEL: return # end iter yield item finally: self.task_done() # always mark task done for every iteration

A worker get from this queue and put to next queue

class StoppableWorker(Thread): def __init__(self, func, in_queue, out_queue): super().__init__() self.func = func self.in_queue = in_queue self.out_queue = out_queue def run(self): for item in self.in_queue: # get result = self.func(item) self.out_queue.put(result)

Start work!

download_queue = ClosableQueue() resize_queue = ClosableQueue() upload_queue = ClosableQueue() done_queue = ClosableQueue() # get results here threads = [ # download, resize, upload are functions StoppableWorker(download, download_queue, resize_queue), StoppableWorker(resize, resize_queue, upload_queue), StoppableWorker(upload, upload_queue, done_queue), ] for thread in threads: thread.start()
for _ in range(n_jobs): download_queue.put(object()) download_queue.close() # send "SENTINEL" to the queue download_queue.join() # wait and mark the queue done resize_queue.close() resize_queue.join() upload_queue.close() upload_queue.join() for thread in threads: thread.join()

Use multiple worker for every stage

def start_threads(count, *args): threads = [StoppableWorker(*args) for _ in range(count)] for thread in threads: thread.start() return threads def stop_threads(closable_queue, threads): for _ in threads: closable_queue.close() # every worker can get "SENTINEL" closable_queue.join() for thread in threads: thread.join()

use the function:

download_threads = start_threads(n_dowlnoad_worker, download, download_queue, resize_queue) resize_threads = start_threads(n_resize_worker, resize, resize_queue, upload_queue) upload_threads = start_threads(n_upload_worker, upload, upload_queue, done_queue) for _ in range(n_jobs): download_queue.put(object()) stop_threads(download_queue, download_threads) stop_threads(resize_queue, resize_threads) stop_threads(upload_queue, upload_threads)

Item 57: Avoid Creating New Thread Instances for On-demand Fan-out

  • for create a new Thread instance
    • extra memory ~ 8 MB
    • heavy overhead
  • Code changes
    • catch exception in threads manually (by default NOT raise to caller)
    • add locks

Solutions

  • Queue (Item 58)
    • cons: code changes, fixed worker count
  • Coroutines (Newer, Next part!)

The End

Thanks for listening!

Select a repo