Python multiprocessing.Pool ignores class method -
i wrote program class research, , i've attempted parallelize it. when i've used python 2.7's multiprocessing.process joinablequeue , managed data, program hangs defunct processes.
import multiprocessing mp import traceback class paramfit(object): def __init__(self): pass def _calc_bond(self, index): # calculate data def _calc_parallel(self, index): self._calc_bond(index) def run(self): ts, force in itertools.izip(self.coortrj, self.forcevec): try: consumers = [mp.process(target=self._calc_parallel, args=(force,)) in range(nprocs)] w in consumers: w.start() # enqueue jobs in range(self.totalsites): self.tasks.put(i) # add poison pill each consumer in range(nprocs): self.tasks.put(none) self.tasks.close() self.tasks.join() # w in consumers: # w.join() except: traceback.print_exc()
_calc_parallel calls other class methods.
i have tried use multiprocessing.pool purpose using copy_reg option found elsewhere on http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods.
import multiprocessing mp import traceback class paramfit(object): def __init__(self): pass def _calc_bond(self, index): # calculate data def _use_force(force): # calculate data def _calc_parallel(self, index, force): self._calc_bond(index) self._use_force(force) def run(self): try: pool = mp.pool(processes=nprocs, maxtasksperchild=2) args = itertools.izip(range(self.totalsites), itertools.repeat(force)) pool.map_async(self._calc_parallel, args) pool.close() pool.join() except: traceback.print_exc()
however, pool.map_async not seem call self._calc_parallel. know in both cases (process , pool), i'm overlooking something, i'm not clear what. processing typically on 40,000 elements.
thanks help.
update
after reading on several other posts, tried pathos.multiprocessing.
import pathos.multiprocessing mp class paramfit(object): def __init__(self): pass def _calc_bond(self, index): # calculate data def _use_force(force): # calculate data def _calc_parallel(self, index, force): self._calc_bond(index) self._use_force(force) def run(self): try: pool = mp.processingpool(nprocs) args = itertools.izip(range(self.totalsites), itertools.repeat(force)) pool.amap(lambda x: self._calc_parallel(*x), args) except: traceback.print_exc()
and, previous attempts, seems speed through without calling method.
update 2
i decided revamp code split behemoth class smaller, more manageable components. however, if use pathos.multiprocessing, run different situation previous posted (see link). new code has object can used calculation , via methods, should return value.
import itertools import pandas pd import pathos.multiprocessing mp class forcedata(object): def __init__(self, *args, **kwargs): # setup data self.value = pd.dataframe() def calculatebonddata(self, index): # calculation return self.value def calculatenonbondeddata(self, index): # calculation return self.value def calculateall(self, index): # because self.value pandas.dataframe, changed internally self.calculatebonddata(index) self.calculatenonbondeddata(index) return self.value class forcematrix(object): def __init__(self, *args, **kwargs): # initialize data self._matrix = pd.dataframe() def map(self, data): value in data.get(): i, j in itertools.product(value.index, repeat=2): self._matrix.loc[[i], [j]] += value.values def calculate(self, *args, **kwargs): # setup initial information. fd = forcedata() matrix = forcematrix() pool = mp.processingpool() data = pool.amap(fd.calculateall, range(x)) matrix.map(data, force) return matrix
i thought separate function func(dataobj, force)
, doesn't seem either. @ current rate, estimate complete calculation on single processor take on 1000 hours, long should quicker.
update 3 (4/30/15)
because of @mikemckerns helpful insights, may have settled upon possible solution. on imac (quad-core) or 16-core node of cluster, have found that, coarse-grain (cg) system no bonds, double itertools.imap
seems best solution (1000 cg sites) clocks in @ approximately 5.2 s per trajectory frame. when move onto system includes bond details (3000 cg sites representing water), found that, on imac (using 1 core), itertools.imap
followed pathos.threadingpool.uimap
(4 threads) clocks in @ approximately 85 s/frame; if attempt process pool (4 cores x 2)/thread pool (4 threads) suggested in comments @mikemckerns, computation time increased 2.5 times. on 16-core cluster (32 pp/16 tp), cg system goes (approx. 160 s/frame). cg system 42,778 sites , numerous bonds on imac (1 core/4 threads) may clock in around 58 min./frame. have yet test large system on 16-core node of cluster, i'm unsure whether using process pool/thread pool speed further.
examples:
# cg system no bond details in range(nframes): data1 = itertools.imap(func1, range(nsites)) data2 = itertools.imap(func2, data1) values in data2: func3(values) # system bond details import pathos.multiprocessing mp tpool = mp.threadingpool(mp.cpu_count()) in range(nframes): data1 = itertools.imap(func1, range(nsites)) data2 = tpool.uimap(func2, data1) values in data2: func3(values) # seems slowest in bunch on imac , possibly on 16-cores of node. ppool = mp.processingpool(mp.cpu_count() * 2) tpool = mp.threadingpool(mp.cpu_count()) in range(nframes): data1 = ppool.uimap(func1, range(nsites)) data2 = tpool.uimap(func2, data1) values in data2: func3(values)
i suspect larger system, more benefit may gain multiprocessing. know large cg system (42,778 sites) takes approximately 0.08 s/site compared 0.02 s/site (3000 cg sites) or 0.05 s/site (1000 sites no bonds).
amidst striving shave off computation times, discovered areas trim down of computations, (e.g., global
variables , algorithm changes), if reduce down further full-scale multirpcoessing, great.
your options limited if using python 2.7
here's short example of calling object's method arguments pool.
the first problem function defined @ top level of module can pickled. unix-based systems have way of getting around limitation, shouldn't relied upon. must define function takes object want , arguments needed call relevant method.
for instance:
def do_square(args): squarer, x = args # unpack args return squarer.square(x)
the squarer
class might this:
class squarer(object): def square(self, x): return x * x
now apply square function in parallel.
if __name__ == "__main__": # pool work must done inside if statement otherwise recursive # cycle of pool spawning created. pool = pool() sq = squarer() # create args sequence of tuples args = [(sq, i) in range(10)] print pool.map(do_square, args) pool.close() pool.join()
note squarer not stateful. receives data, processes , returns result. because child's state separate parent's. changes in 1 not reflected in other unless queues, pipes or other shared state classes made available multiprocessing used. in general better return computed data child process , data in parent process rather try store data in shared variable child process has access .
example of stateful squarer not working multiprocessing:
class statefulsquarer(object): def __init__(self): self.results = [] def square(self, x): self.results.append(x * x) if __name__ == "__main__": print("without pool") sq = statefulsquarer() map(do_square, [(sq, i) in range(10)]) print(sq.results) print("with pool") pool = pool() sq = statefulsquarer() pool.map(do_square, [(sq, i) in range(10)]) print(sq.results) pool.close() pool.join()
if wanted make work better solution like:
for result in pool.map(do_square, [(sq, i) in range(10)]): sq.results.append(result)
what if class big, immutable. every time start new task in map huge object has copied on child process. can, however, save time copying on child process once.
from multiprocessing import pool def child_init(sq_): global sq sq = sq_ def do_square(x): return sq.square(x) class squarer(object): def square(self, x): return x * x if __name__ == "__main__": sq = squarer() pool = pool(initializer=child_init, initargs=(sq,)) print(pool.map(do_square, range(10))) pool.close() pool.join()