Multiprocessing Size and Rank
I have always thought Python did a great job exposing parallel processing with the multiprocessing
package. The Pool
class in particular made it relatively simple to jump from the built-in map
function, which is a good first step to accelerating loops, to utilizing all cores on a processor without any obscure hoops.
All multiprocessing
functions spawn new processes, so standard objects, even those in global scope, are not shared between processes. This means that all input gets broadcast to child processes, either manually with Pipe
and Queue
functions, or automatically with process Pools. These functions can be relied upon when data is small and work is complex. When data grows, and work is simple, communication quickly becomes the bottleneck. I have recently been circumventing this with shared ctypes
objects, which creates values and arrays in shared memory. This means that all child processes can read and write to these locations in memory without communication overhead, just the normal concurrency issues.
Once the arrays are created, I usually process them with Pool.map
, and give the worker function (pool_rank, pool_size)
arguments so each process can safely work on a section of the array without concurrency issues.
import multiprocessing as mp
poolSize = mp.cpu_count()
p = mp.Pool(poolSize)
p.map(worker, [(rank, poolSize) for rank in range(poolSize)])
p.close()
p.join()
However, I thought there might be a more elegant way to achieve this. After some digging, I found that each child process spawned in the Pool can be detected
curProc = multiprocessing.current_process()
and then identified
curProc._identity[0]
I decided to test this method of process rank detection, where the only function argument is the process pool size.
First Version
I created a simple myPID
function to return the processor rank from the multiprocessing API.
import multiprocessing as mp
def myPID():
# Returns relative PID of a pool process
return mp.current_process()._identity[0]
def helloWorker(np):
# np = number of processes in pool
pid = myPID()
print("Hello from process %i of %i"%(pid, np))
# do actual work
return 0
# Create a pool of 8 processes, and run helloWorker
poolSize = 8
p = mp.Pool(np)
ret = p.map(helloWorker, [poolSize]*poolSize)
p.close()
p.join()
Output
While this code looked cleaner, you’ll notice that the pool used process 1 twice for execution. If I had used this for operation on an array, two chunks would have been operated on twice, while the last (8th) chunk would have been untouched.
Hello from process 1 of 8
Hello from process 2 of 8
Hello from process 3 of 8
Hello from process 4 of 8
Hello from process 5 of 8
Hello from process 6 of 8
Hello from process 1 of 8
Hello from process 7 of 8
Second Version
To stop a process from being reused the process Pool can be initialized with the maxtasksperchild
argument. When set to 1
, a process will never be reused. In cases when there are more chunks of work than processes, new processes will be spawned.
import multiprocessing as mp
def myPID():
# Returns relative PID of a pool process
return mp.current_process()._identity[0]
def helloWorker(np):
# np = number of processes in pool
pid = myPID()
print("Hello from process %i of %i"%(pid, np))
# do actual work
return 0
# Create a pool of 8 processes, and run helloWorker
np = 8
p = mp.Pool(np, maxtasksperchild=1)
ret = p.map(helloWorker, [np]*np)
p.close()
p.join()
Output
You can see that each process is run exactly once after specifying maxtasksperchild=1
.
Hello from process 1 of 8
Hello from process 2 of 8
Hello from process 3 of 8
Hello from process 4 of 8
Hello from process 5 of 8
Hello from process 6 of 8
Hello from process 7 of 8
Hello from process 8 of 8
Comparing Both
As a final test, I decided to compare both methods.
import multiprocessing as mp
def myPID():
# Returns relative PID of a pool process
return mp.current_process()._identity[0]
def helloWorker(np):
# np = number of processes in pool
pid = myPID()
print("Hello from process %i of %i"%(pid, np))
# do actual work
return 0
# Create a pool of 8 processes, and run helloWorker
np = 8
print("Default method that breaks indexing")
p = mp.Pool(np)
ret = p.map(helloWorker, [np]*np)
p.close()
p.join()
print("Using maxtasksperchild ensures that processes are not reused")
p = mp.Pool(np, maxtasksperchild=1)
ret = p.map(helloWorker, [np]*np)
p.close()
p.join()
Output
You are probably expecting this to work without issue.
Default method that breaks indexing
Hello from process 1 of 8
Hello from process 2 of 8
Hello from process 3 of 8
Hello from process 4 of 8
Hello from process 5 of 8
Hello from process 6 of 8
Hello from process 1 of 8
Hello from process 7 of 8
Using maxtasksperchild ensures that processes are not reused
Hello from process 9 of 8
Hello from process 10 of 8
Hello from process 11 of 8
Hello from process 12 of 8
Hello from process 13 of 8
Hello from process 14 of 8
Hello from process 15 of 8
Hello from process 16 of 8
Even though I terminate all child processes between pools with p.join()
, the process rank is never reset between pools. This means that while my first version of manually passing rank and size was not pretty, it was the most reliable.
26 May 2018