subprocess
and threading
(Items 52–57)
David Ye @ Houzz
Concurrency (並行):
Parallelism (平行):
def game_logic(state, neighbors): ... # Do some blocking input/output in here: data = my_socket.recv(100) ...
subprocess
subprocess
to Manage Child Processesos.system
etc.subprocess.run
: run and wait
res = subprocess.run( ['echo', 'Hello from the child process'], capture_output=True, ) # raise subprocess.CalledProcessError when return code not 0 res.check_returncode() print(res.stdout) # Hello from the child process
subprocess.run(["test", "-d", "someFolder"], check=True) # CalledProcessError: Command '['test', '-d', 'someFolder']' returned non-zero exit status 1.
subprocess.Popen
will not waitPopen.poll()
None
proc = subprocess.Popen(['sleep', '1']) # return a Popen object while proc.poll() is None: print('Working...') print('Exit status', proc.poll()) # Working... # Working... # Working... # Exit status 0
Popen.communicate
wait and get output
proc = subprocess.Popen( ..., stdout=subprocess.PIPE, # pipe to the standard stream should be opened ) try: outs, errs = proc.communicate(timeout=15) except TimeoutExpired: proc.kill() outs, errs = proc.communicate()
manage streams
def run_encrypt(data): env = os.environ.copy() env['password'] = 'zf7ShyBhZOraQDdE/FiZpm/m/8f9X+M1' proc = subprocess.Popen( ['openssl', 'enc', '-des3', '-pass', 'env:password'], env=env, stdin=subprocess.PIPE, # pipe to the standard stream should be opened stdout=subprocess.PIPE ) proc.stdin.write(data) # add some data to stdin proc.stdin.flush() # Ensure that the child gets input return proc
pipe one stdout to another stdin
def run_hash(input_stdin): return subprocess.Popen( ['openssl', 'dgst', '-whirlpool', '-binary'], stdin=input_stdin, stdout=subprocess.PIPE )
encrypt_procs = [] hash_procs = [] for _ in range(3): data = os.urandom(100) encrypt_proc = run_encrypt(data) encrypt_procs.append(encrypt_proc) # pipe hash_proc = run_hash(encrypt_proc.stdout) hash_procs.append(hash_proc) # Allow encrypt_proc to receive a SIGPIPE if hash_proc exits. encrypt_proc.stdout.close() encrypt_proc.stdout = None # ?
# fan in for proc in encrypt_procs: proc.communicate() for proc in hash_procs: out, _ = proc.communicate() print(out[-10:])
proc = subprocess.Popen(['sleep', '10']) try: proc.communicate(timeout=0.1) # raise TimeoutExpired after 0.1 sec except subprocess.TimeoutExpired: proc.terminate() # send SIGTERM to child process proc.wait() # wait to the end and get return code >>> -15 # killed
threading
In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.
One thread runs Python, while N others sleep or await I/O
from threading import Thread threads = [] # fan out 5 threads to wait for I/O for _ in range(5): thread = Thread(target=slow_systemcall) thread.start() threads.append(thread) for thread in threads: # fan in thread.join()
Lock
to Prevent Data Races in ThreadsGIL is thread-safe on bytecodes, not Python data structure.
+=
is not atomic
counter.count += 1
value = getattr(counter, 'count') result = value + 1 setattr(counter, 'count', result)
from threading import Lock class LockedCounter: def __init__(self): self.lock = Lock() # a lock instance for each instance self.count = 0 def increment(self, offset): with self.lock(): # mutex lock, make the block atomic self.count += offset
counter = LockedCounter() def worker(sensor_index, how_many, counter): for _ in range(how_many): ... # read from sensor counter.increment(1) threads = [] # fan out 5 threads to wait for I/O for i in range(5): thread = Thread(target=worker, args=(i, how_many, counter)) threads.append(thread) thread.start() for thread in threads: # fan in thread.join()
Queue
to Coordinate Work Between ThreadsI/O blocked pipeline, e.g. download
-> resize
-> upload
a Queue can:
from queue import Queue queue = Queue(1) # buffer size is 1 def consumer(): print('consumer waiting') queue.get() # block, wait until some data are put print('consumer get 1') queue.get() print('consumer get 2') thread = Thread(target=consumer) thread.start() print('producer putting') queue.put(object()) print('producer put 1') queue.put(object()) print('producer put 2') thread.join()
consumer waiting
producer putting
producer put 1
consumer get 1
producer put 2
consumer get 2
Queue.task_done()
mark last get
doneQueue.join()
wait for all put
donetask_count = 2 def consumer(): print('consumer waiting') for i in range(task_count): queue.get() print('consumer working', i + 1) print('consumer done', i + 1) queue.task_done() # mark this task done thread = Thread(target=consumer) thread.start() print('producer putting') for i in range(task_count): queue.put(object()) print('producer put', i + 1) queue.join() # wait for all task done print('producer done') thread.join()
consumer waiting
producer putting
producer put 1
consumer working 1
consumer done 1
consumer working 2
consumer done 2
producer put 2
producer done
class ClosableQueue(Queue): SENTINEL = object() # arbitrary mark def close(self): self.put(self.SENTINEL) def __iter__(self): while True: item = self.get() try: if item is self.SENTINEL: return # end iter yield item finally: self.task_done() # always mark task done for every iteration
get
from this queue and put
to next queueclass StoppableWorker(Thread): def __init__(self, func, in_queue, out_queue): super().__init__() self.func = func self.in_queue = in_queue self.out_queue = out_queue def run(self): for item in self.in_queue: # get result = self.func(item) self.out_queue.put(result)
download_queue = ClosableQueue() resize_queue = ClosableQueue() upload_queue = ClosableQueue() done_queue = ClosableQueue() # get results here threads = [ # download, resize, upload are functions StoppableWorker(download, download_queue, resize_queue), StoppableWorker(resize, resize_queue, upload_queue), StoppableWorker(upload, upload_queue, done_queue), ] for thread in threads: thread.start()
for _ in range(n_jobs): download_queue.put(object()) download_queue.close() # send "SENTINEL" to the queue download_queue.join() # wait and mark the queue done resize_queue.close() resize_queue.join() upload_queue.close() upload_queue.join() for thread in threads: thread.join()
def start_threads(count, *args): threads = [StoppableWorker(*args) for _ in range(count)] for thread in threads: thread.start() return threads def stop_threads(closable_queue, threads): for _ in threads: closable_queue.close() # every worker can get "SENTINEL" closable_queue.join() for thread in threads: thread.join()
use the function:
download_threads = start_threads(n_dowlnoad_worker, download, download_queue, resize_queue) resize_threads = start_threads(n_resize_worker, resize, resize_queue, upload_queue) upload_threads = start_threads(n_upload_worker, upload, upload_queue, done_queue) for _ in range(n_jobs): download_queue.put(object()) stop_threads(download_queue, download_threads) stop_threads(resize_queue, resize_threads) stop_threads(upload_queue, upload_threads)
Thread
Instances for On-demand Fan-outThread
instance