subprocess
and threading
(Items 52–57)
David Ye @ Houzz
Concurrency (並行):
Parallelism (平行):
def game_logic(state, neighbors):
...
# Do some blocking input/output in here:
data = my_socket.recv(100)
...
subprocess
subprocess
to Manage Child Processesos.system
etc.subprocess.run
: run and wait
res = subprocess.run(
['echo', 'Hello from the child process'],
capture_output=True,
)
# raise subprocess.CalledProcessError when return code not 0
res.check_returncode()
print(res.stdout)
# Hello from the child process
subprocess.run(["test", "-d", "someFolder"], check=True)
# CalledProcessError: Command '['test', '-d', 'someFolder']' returned non-zero exit status 1.
subprocess.Popen
will not waitPopen.poll()
None
proc = subprocess.Popen(['sleep', '1']) # return a Popen object
while proc.poll() is None:
print('Working...')
print('Exit status', proc.poll())
# Working...
# Working...
# Working...
# Exit status 0
Popen.communicate
wait and get output
proc = subprocess.Popen(
..., stdout=subprocess.PIPE, # pipe to the standard stream should be opened
)
try:
outs, errs = proc.communicate(timeout=15)
except TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
manage streams
def run_encrypt(data):
env = os.environ.copy()
env['password'] = 'zf7ShyBhZOraQDdE/FiZpm/m/8f9X+M1'
proc = subprocess.Popen(
['openssl', 'enc', '-des3', '-pass', 'env:password'],
env=env,
stdin=subprocess.PIPE, # pipe to the standard stream should be opened
stdout=subprocess.PIPE
)
proc.stdin.write(data) # add some data to stdin
proc.stdin.flush() # Ensure that the child gets input
return proc
pipe one stdout to another stdin
def run_hash(input_stdin):
return subprocess.Popen(
['openssl', 'dgst', '-whirlpool', '-binary'],
stdin=input_stdin,
stdout=subprocess.PIPE
)
encrypt_procs = []
hash_procs = []
for _ in range(3):
data = os.urandom(100)
encrypt_proc = run_encrypt(data)
encrypt_procs.append(encrypt_proc)
# pipe
hash_proc = run_hash(encrypt_proc.stdout)
hash_procs.append(hash_proc)
# Allow encrypt_proc to receive a SIGPIPE if hash_proc exits.
encrypt_proc.stdout.close()
encrypt_proc.stdout = None # ?
# fan in
for proc in encrypt_procs:
proc.communicate()
for proc in hash_procs:
out, _ = proc.communicate()
print(out[-10:])
proc = subprocess.Popen(['sleep', '10'])
try:
proc.communicate(timeout=0.1) # raise TimeoutExpired after 0.1 sec
except subprocess.TimeoutExpired:
proc.terminate() # send SIGTERM to child process
proc.wait() # wait to the end and get return code
>>>
-15 # killed
threading
In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.
One thread runs Python, while N others sleep or await I/O
from threading import Thread
threads = [] # fan out 5 threads to wait for I/O
for _ in range(5):
thread = Thread(target=slow_systemcall)
thread.start()
threads.append(thread)
for thread in threads: # fan in
thread.join()
Lock
to Prevent Data Races in ThreadsGIL is thread-safe on bytecodes, not Python data structure.
+=
is not atomic
counter.count += 1
value = getattr(counter, 'count')
result = value + 1
setattr(counter, 'count', result)
from threading import Lock
class LockedCounter:
def __init__(self):
self.lock = Lock() # a lock instance for each instance
self.count = 0
def increment(self, offset):
with self.lock(): # mutex lock, make the block atomic
self.count += offset
counter = LockedCounter()
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
... # read from sensor
counter.increment(1)
threads = [] # fan out 5 threads to wait for I/O
for i in range(5):
thread = Thread(target=worker, args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads: # fan in
thread.join()
Queue
to Coordinate Work Between ThreadsI/O blocked pipeline, e.g. download
-> resize
-> upload
a Queue can:
from queue import Queue
queue = Queue(1) # buffer size is 1
def consumer():
print('consumer waiting')
queue.get() # block, wait until some data are put
print('consumer get 1')
queue.get()
print('consumer get 2')
thread = Thread(target=consumer)
thread.start()
print('producer putting')
queue.put(object())
print('producer put 1')
queue.put(object())
print('producer put 2')
thread.join()
consumer waiting
producer putting
producer put 1
consumer get 1
producer put 2
consumer get 2
Queue.task_done()
mark last get
doneQueue.join()
wait for all put
done
task_count = 2
def consumer():
print('consumer waiting')
for i in range(task_count):
queue.get()
print('consumer working', i + 1)
print('consumer done', i + 1)
queue.task_done() # mark this task done
thread = Thread(target=consumer)
thread.start()
print('producer putting')
for i in range(task_count):
queue.put(object())
print('producer put', i + 1)
queue.join() # wait for all task done
print('producer done')
thread.join()
consumer waiting
producer putting
producer put 1
consumer working 1
consumer done 1
consumer working 2
consumer done 2
producer put 2
producer done
class ClosableQueue(Queue):
SENTINEL = object() # arbitrary mark
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # end iter
yield item
finally:
self.task_done() # always mark task done for every iteration
get
from this queue and put
to next queue
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue: # get
result = self.func(item)
self.out_queue.put(result)
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue() # get results here
threads = [
# download, resize, upload are functions
StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, done_queue),
]
for thread in threads:
thread.start()
for _ in range(n_jobs):
download_queue.put(object())
download_queue.close() # send "SENTINEL" to the queue
download_queue.join() # wait and mark the queue done
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
for thread in threads:
thread.join()
def start_threads(count, *args):
threads = [StoppableWorker(*args) for _ in range(count)]
for thread in threads:
thread.start()
return threads
def stop_threads(closable_queue, threads):
for _ in threads:
closable_queue.close() # every worker can get "SENTINEL"
closable_queue.join()
for thread in threads:
thread.join()
use the function:
download_threads = start_threads(n_dowlnoad_worker, download, download_queue, resize_queue)
resize_threads = start_threads(n_resize_worker, resize, resize_queue, upload_queue)
upload_threads = start_threads(n_upload_worker, upload, upload_queue, done_queue)
for _ in range(n_jobs):
download_queue.put(object())
stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)
Thread
Instances for On-demand Fan-outThread
instance
Thanks for listening!