Python multiprocessing.Pool ignores class method -


i wrote program class research, , i've attempted parallelize it. when i've used python 2.7's multiprocessing.process joinablequeue , managed data, program hangs defunct processes.

import multiprocessing mp import traceback  class paramfit(object):     def __init__(self):         pass      def _calc_bond(self, index):         # calculate data      def _calc_parallel(self, index):         self._calc_bond(index)      def run(self):         ts, force in itertools.izip(self.coortrj, self.forcevec):         try:             consumers = [mp.process(target=self._calc_parallel,                          args=(force,)) in range(nprocs)]             w in consumers:                 w.start()              # enqueue jobs             in range(self.totalsites):                 self.tasks.put(i)              # add poison pill each consumer             in range(nprocs):                 self.tasks.put(none)              self.tasks.close()             self.tasks.join()      #       w in consumers:     #           w.join()         except:             traceback.print_exc() 

_calc_parallel calls other class methods.

i have tried use multiprocessing.pool purpose using copy_reg option found elsewhere on http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods.

import multiprocessing mp import traceback  class paramfit(object):     def __init__(self):         pass      def _calc_bond(self, index):         # calculate data      def _use_force(force):         # calculate data      def _calc_parallel(self, index, force):         self._calc_bond(index)         self._use_force(force)      def run(self):         try:             pool = mp.pool(processes=nprocs, maxtasksperchild=2)             args = itertools.izip(range(self.totalsites), itertools.repeat(force))             pool.map_async(self._calc_parallel, args)             pool.close()             pool.join()         except:             traceback.print_exc() 

however, pool.map_async not seem call self._calc_parallel. know in both cases (process , pool), i'm overlooking something, i'm not clear what. processing typically on 40,000 elements.

thanks help.

update

after reading on several other posts, tried pathos.multiprocessing.

import pathos.multiprocessing mp class paramfit(object):     def __init__(self):         pass      def _calc_bond(self, index):         # calculate data      def _use_force(force):         # calculate data      def _calc_parallel(self, index, force):         self._calc_bond(index)         self._use_force(force)      def run(self):         try:             pool = mp.processingpool(nprocs)             args = itertools.izip(range(self.totalsites), itertools.repeat(force))             pool.amap(lambda x: self._calc_parallel(*x), args)         except:             traceback.print_exc() 

and, previous attempts, seems speed through without calling method.

update 2

i decided revamp code split behemoth class smaller, more manageable components. however, if use pathos.multiprocessing, run different situation previous posted (see link). new code has object can used calculation , via methods, should return value.

import itertools import pandas pd import pathos.multiprocessing mp  class forcedata(object):     def __init__(self, *args, **kwargs):         # setup data         self.value = pd.dataframe()     def calculatebonddata(self, index):         # calculation         return self.value     def calculatenonbondeddata(self, index):         # calculation         return self.value     def calculateall(self, index):         # because self.value pandas.dataframe, changed internally         self.calculatebonddata(index)         self.calculatenonbondeddata(index)         return self.value  class forcematrix(object):     def __init__(self, *args, **kwargs):         # initialize data         self._matrix = pd.dataframe()     def map(self, data):         value in data.get():             i, j in itertools.product(value.index, repeat=2):                 self._matrix.loc[[i], [j]] += value.values  def calculate(self, *args, **kwargs):     # setup initial information.     fd = forcedata()     matrix = forcematrix()     pool = mp.processingpool()     data = pool.amap(fd.calculateall, range(x))     matrix.map(data, force)     return matrix 

i thought separate function func(dataobj, force), doesn't seem either. @ current rate, estimate complete calculation on single processor take on 1000 hours, long should quicker.

update 3 (4/30/15)

because of @mikemckerns helpful insights, may have settled upon possible solution. on imac (quad-core) or 16-core node of cluster, have found that, coarse-grain (cg) system no bonds, double itertools.imap seems best solution (1000 cg sites) clocks in @ approximately 5.2 s per trajectory frame. when move onto system includes bond details (3000 cg sites representing water), found that, on imac (using 1 core), itertools.imap followed pathos.threadingpool.uimap (4 threads) clocks in @ approximately 85 s/frame; if attempt process pool (4 cores x 2)/thread pool (4 threads) suggested in comments @mikemckerns, computation time increased 2.5 times. on 16-core cluster (32 pp/16 tp), cg system goes (approx. 160 s/frame). cg system 42,778 sites , numerous bonds on imac (1 core/4 threads) may clock in around 58 min./frame. have yet test large system on 16-core node of cluster, i'm unsure whether using process pool/thread pool speed further.

examples:

# cg system no bond details in range(nframes):     data1 = itertools.imap(func1, range(nsites))     data2 = itertools.imap(func2, data1)     values in data2:         func3(values)  # system bond details import pathos.multiprocessing mp  tpool = mp.threadingpool(mp.cpu_count()) in range(nframes):     data1 = itertools.imap(func1, range(nsites))     data2 = tpool.uimap(func2, data1)     values in data2:         func3(values)  # seems slowest in bunch on imac , possibly on 16-cores of node. ppool = mp.processingpool(mp.cpu_count() * 2) tpool = mp.threadingpool(mp.cpu_count()) in range(nframes):     data1 = ppool.uimap(func1, range(nsites))     data2 = tpool.uimap(func2, data1)     values in data2:         func3(values) 

i suspect larger system, more benefit may gain multiprocessing. know large cg system (42,778 sites) takes approximately 0.08 s/site compared 0.02 s/site (3000 cg sites) or 0.05 s/site (1000 sites no bonds).

amidst striving shave off computation times, discovered areas trim down of computations, (e.g., global variables , algorithm changes), if reduce down further full-scale multirpcoessing, great.

your options limited if using python 2.7

here's short example of calling object's method arguments pool.

the first problem function defined @ top level of module can pickled. unix-based systems have way of getting around limitation, shouldn't relied upon. must define function takes object want , arguments needed call relevant method.

for instance:

def do_square(args):     squarer, x = args # unpack args     return squarer.square(x) 

the squarer class might this:

class squarer(object):     def square(self, x):         return x * x 

now apply square function in parallel.

if __name__ == "__main__":     # pool work must done inside if statement otherwise recursive      # cycle of pool spawning created.      pool = pool()     sq = squarer()     # create args sequence of tuples     args = [(sq, i) in range(10)]     print pool.map(do_square, args)      pool.close()     pool.join() 

note squarer not stateful. receives data, processes , returns result. because child's state separate parent's. changes in 1 not reflected in other unless queues, pipes or other shared state classes made available multiprocessing used. in general better return computed data child process , data in parent process rather try store data in shared variable child process has access .

example of stateful squarer not working multiprocessing:

class statefulsquarer(object):      def __init__(self):         self.results = []      def square(self, x):         self.results.append(x * x)  if __name__ == "__main__":      print("without pool")     sq = statefulsquarer()     map(do_square, [(sq, i) in range(10)])     print(sq.results)      print("with pool")     pool = pool()     sq = statefulsquarer()     pool.map(do_square, [(sq, i) in range(10)])     print(sq.results)      pool.close()     pool.join() 

if wanted make work better solution like:

for result in pool.map(do_square, [(sq, i) in range(10)]):     sq.results.append(result) 

what if class big, immutable. every time start new task in map huge object has copied on child process. can, however, save time copying on child process once.

from multiprocessing import pool  def child_init(sq_):     global sq     sq = sq_  def do_square(x):     return sq.square(x)  class squarer(object):     def square(self, x):         return x * x  if __name__ == "__main__":     sq = squarer()     pool = pool(initializer=child_init, initargs=(sq,))      print(pool.map(do_square, range(10)))      pool.close()     pool.join() 

Popular posts from this blog

c# - ODP.NET Oracle.ManagedDataAccess causes ORA-12537 network session end of file -

matlab - Compression and Decompression of ECG Signal using HUFFMAN ALGORITHM -

utf 8 - split utf-8 string into bytes in python -