Issue
I am trying to implement a producer-consumer based solution using threads within a subprocess in python. When I try to subclass mp.Process
, both producer and consumer threads block forever but everything works as expected when using target function in mp.Process
to achieve the same functionality. Can anyone please explain this disparity?
Minimal example using target function:
import queue
import multiprocessing as mp
import threading
class Worker(threading.Thread):
def __init__(self, output_queue):
super().__init__()
self.queue = output_queue
def run(self):
for i in range(3):
print('producer executing queue.put', flush=True)
self.queue.put(i)
print('producer queue.put successful', flush=True)
self.queue.put(None)
def gen():
output_queue = queue.Queue(1)
Worker(output_queue).start()
while True:
print('consumer executing queue.get', flush=True)
x = output_queue.get()
if x is None:
break
print(x, flush=True)
print('consumer queue.get successful', flush=True)
print('finished')
if __name__=='__main__':
mp.Process(target=gen).start()
Output:
producer executing queue.put
consumer executing queue.get
producer queue.put successful
0
consumer queue.get successful
producer executing queue.put
consumer executing queue.get
producer queue.put successful
producer executing queue.put
1
consumer queue.get successful
consumer executing queue.get
producer queue.put successful
2
consumer queue.get successful
consumer executing queue.get
finished
Same example with mp.Process subclass
import queue
import multiprocessing as mp
import threading
class Worker(threading.Thread):
def __init__(self, output_queue):
super().__init__()
self.queue = output_queue
def run(self):
for i in range(3):
print('producer executing queue.put', flush=True)
self.queue.put(i)
print('producer queue.put successful', flush=True)
self.queue.put(None)
class Generator(mp.Process):
def __init__(self):
super().__init__()
self.output_queue = queue.Queue(1)
Worker(self.output_queue).start()
def run(self):
while True:
print('consumer executing queue.get', flush=True)
x = self.output_queue.get()
if x is None:
break
print(x, flush=True)
print('consumer queue.get successful', flush=True)
print('finished')
if __name__=='__main__':
Generator().start()
Output:
producer executing queue.put
producer queue.put successful
producer executing queue.put
consumer executing queue.get
0
consumer queue.get successful
consumer executing queue.get
[Program gets stuck here]
[Edit]: I am running this with python 3.7 and the following also works:
import queue
import multiprocessing as mp
import threading
class Worker(threading.Thread):
def __init__(self, output_queue):
super().__init__()
self.queue = output_queue
def run(self):
for i in range(3):
print('producer executing queue.put', flush=True)
self.queue.put(i)
print('producer queue.put successful', flush=True)
self.queue.put(None)
def gen(output_queue, worker):
worker.start()
while True:
print('consumer executing queue.get', flush=True)
x = output_queue.get()
if x is None:
break
print(x, flush=True)
print('consumer queue.get successful', flush=True)
print('finished')
if __name__=='__main__':
output_queue = queue.Queue(1)
mp.Process(target=gen, args=(output_queue, Worker(output_queue))).start()
Solution
Since Booboo's answer doesn't explain the blocking issue (and it's not specific to MacOS), I am adding this answer for anyone interested.
In 2nd version of the code, the following sequence of events occur which explain the blocking:
- In main process,
Generator()
callsGenerator.__init__()
for initialisation of consumer. During this initialisation,output_queue
in created andWorker
(producer thread) is started. Since this is all done by main process, producer starts filling the queue in main process's memory space. - Then
Generator.start()
is called which starts the consumer process and in doing so creates a copy of main process's memory space. While creating this copy, theoutput_queue
from main's memory is also copied in whatever state it is (can be empty, filled or partially filled depending on how many elements producer was able to put till this time). Sinceoutput_queue
is a not a multiprocessing queue, the new copy doesn't share memory with the original one. Reference for how fork (default process creation in unix) works: https://www.csl.mtu.edu/cs4411.ck/www/NOTES/process/fork/create.html - At this point,
Worker
(producer thread) is running in main's memory space and keeps trying to put elements to the queue in that space (since the queue size is limited to 1, put call blocks). - Also,
Generator
(consumer process) keeps trying to read fromoutput_queue
copy in its own memory. Since this copy was cloned as it is from main's copy when consumer was created, consumer is able to get 1 element out of the queue (which in this case is the number of elements producer was able to produce before the copy was created). After reading this 1 element, theget
call in consumer blocks since the queue in it's memory space is now empty.
So in summary, there are two different copies of output_queue
: 1 in main's memory space to which producer writes to, and other in consumer's memory space from which consumer reads. Since the 2nd is cloned from 1st when 1 element was already there, consumer is able to access that element.
To further add, even 3rd version of code has 2 copies of output_queue
but since the producer in 3rd version is started in consumer's memory space (in gen
function), both are writing to and reading from the output_queue
in same memory.
To conclude, 1st version of code is probably the best thing to do since it keeps producer and consumer threads in same memory and also avoids creating unnecessary copy of the queue in main's memory. mp.Queue
can also be used to avoid all these issues as Booboo mentioned, but the unnecessary use of shared memory can be avoided by using the 1st version in the first place since queue is never accessed in main.
Another option is to use 2nd version (if subclassing is required) but start the producer inside run
function so that it's running in consumer's memory space which would make it equivalent to 1st version:
class Generator(mp.Process):
def __init__(self):
super().__init__()
def run(self):
self.output_queue = queue.Queue(1)
Worker(self.output_queue).start()
while True:
print('consumer executing queue.get', flush=True)
x = self.output_queue.get()
if x is None:
break
print(x, flush=True)
print('consumer queue.get successful', flush=True)
print('finished')
if __name__=='__main__':
Generator().start()
Answered By - Abhinav Goyal