Python Multiprocessing Pool and KeyboardInterrupt Revisited
Earlier today I was in the process of cleaning out some Chrome bookmarks when I came across a post by John M. Reese I bookmarked titled, Python: Using KeyboardInterrupt with a Multiprocessing Pool. I had bookmarked John’s post a number of months ago as it referenced my previous post, Python Multiprocessing and KeyboardInterrupt, however, not until today had I been able to look at his findings.
John suggests that by having the worker processes ignore SIGINT, the signal that results in python’s KeyboardInterrupt, the entire problem can be solved. Astute readers will note that I actually used the same approach in my second update to my aforementioned post, which suffered from the problem that intermediate results could not be processed, i.e., jobs that completed prior to the keyboard interrupt. While, John’s solution did educate me as to the existence of the initializer and initargs parameters to the multiprocessing.Pool function, his solution in-fact does not work. The only reason it appears to work is due to the
time.sleep(10) in his try block. In most code this
sleep call would not exist, rather the code would immediately call
join() on the pool object.
In the absence of the delay introduced by the
sleep call, John’s code still suffers from the original problem which is the KeyboardInterrupt exception does not reach the main process until all of the jobs have completed. The proper solution to the problem would be to fix the multiprocessing library to allow the
join function to be interrupted. Until then, my suggestion of rolling your own pool functionality is the best solution I am aware of.
Below is a verbatim copy of my original solution for your convenience:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
#!/usr/bin/env python import multiprocessing, os, signal, time, Queue def do_work(): print 'Work Started: %d' % os.getpid() time.sleep(2) return 'Success' def manual_function(job_queue, result_queue): signal.signal(signal.SIGINT, signal.SIG_IGN) while not job_queue.empty(): try: job = job_queue.get(block=False) result_queue.put(do_work()) except Queue.Empty: pass #except KeyboardInterrupt: pass def main(): job_queue = multiprocessing.Queue() result_queue = multiprocessing.Queue() for i in range(6): job_queue.put(None) workers =  for i in range(3): tmp = multiprocessing.Process(target=manual_function, args=(job_queue, result_queue)) tmp.start() workers.append(tmp) try: for worker in workers: worker.join() except KeyboardInterrupt: print 'parent received ctrl-c' for worker in workers: worker.terminate() worker.join() while not result_queue.empty(): print result_queue.get(block=False) if __name__ == "__main__": main()