Source code for abcpy.backends.mpi

import numpy as np
import cloudpickle
import time
import pickle

from mpi4py import MPI
from abcpy.backends import Backend, PDS, BDS

[docs]class BackendMPIMaster(Backend): """Defines the behavior of the master process This class defines the behavior of the master process (The one with rank==0) in MPI. """ #Define some operation codes to make it more readable OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] finalized = False
[docs] def __init__(self, master_node_ranks=[0],chunk_size=1): """ Parameters ---------- master_node_ranks: Python list list of ranks computation should not happen on. Should include the master so it doesn't get overwhelmed with work. chunk_size: Integer size of one block of data to be sent to free executors """ self.comm = MPI.COMM_WORLD self.size = self.comm.Get_size() self.rank = self.comm.Get_rank() self.master_node_ranks = master_node_ranks #Initialize the current_pds_id and bds_id self.__current_pds_id = 0 self.__current_bds_id = 0 #Initialize a BDS store for both master & slave. self.bds_store = {} self.pds_store = {} #Initialize a store for the pds data that #.. hasn't been sent to the workers yet self.pds_pending_store = {} self.chunk_size = chunk_size
def __command_slaves(self, command, data): """Tell slaves to enter relevant execution block This method handles the sending of the command to the slaves telling them what operation to perform next. Parameters ---------- command: operation code of OP_xxx One of the operation codes defined in the class definition as OP_xxx which tell the slaves what operation they're performing. data: tuple Any of the data required for the operation which needs to be bundled in the data packet sent. """ if command == self.OP_PARALLELIZE: #In parallelize we receive data as (pds_id) data_packet = (command, data[0]) elif command == self.OP_MAP: #In map we receive data as (pds_id,pds_id_new,func) #Use cloudpickle to dump the function into a string. # function_packed = self.__sanitize_and_pack_func() function_packed = cloudpickle.dumps(data[2],pickle.HIGHEST_PROTOCOL) data_packet = (command, data[0], data[1], function_packed) elif command == self.OP_BROADCAST: data_packet = (command, data[0]) elif command == self.OP_COLLECT: #In collect we receive data as (pds_id) data_packet = (command, data[0]) elif command == self.OP_DELETEPDS or command == self.OP_DELETEBDS: #In deletepds we receive data as (pds_id) or bds_id data_packet = (command, data[0]) elif command == self.OP_FINISH: data_packet = (command,) _ = self.comm.bcast(data_packet, root=0) def __generate_new_pds_id(self): """ This method generates a new pds_id to associate a PDS with it's remote counterpart that slaves use to store & index data based on the pds_id they receive Returns ------- Returns a unique integer id. """ self.__current_pds_id += 1 return self.__current_pds_id def __generate_new_bds_id(self): """ This method generates a new bds_id to associate a BDS with it's remote counterpart that slaves use to store & index data based on the bds_id they receive Returns ------- Returns a unique integer id. """ self.__current_bds_id += 1 return self.__current_bds_id
[docs] def parallelize(self, python_list): """ This method distributes the list on the available workers and returns a reference object. The list is split into number of workers many parts as a numpy array. Each part is sent to a separate worker node using the MPI scatter. MASTER: python_list is the real data that is to be split up Parameters ---------- list: Python list the list that should get distributed on the worker nodes Returns ------- PDSMPI class (parallel data set) A reference object that represents the parallelized list """ # Tell the slaves to enter parallelize() pds_id = self.__generate_new_pds_id() self.__command_slaves(self.OP_PARALLELIZE, (pds_id,)) #Don't send any data. Just keep it as a queue we're going to pop. self.pds_store[pds_id] = list(python_list) pds = PDSMPI([], pds_id, self) return pds
[docs] def orchestrate_map(self,pds_id): """Orchestrates the slaves/workers to perform a map function This works by keeping track of the workers who haven't finished executing, waiting for them to request the next chunk of data when they are free, responding to them with the data and then sending them a Sentinel signalling that they can exit. """ is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)] status = MPI.Status() #Copy it to the pending. This is so when master accesses #the PDS data it's not empty. self.pds_pending_store[pds_id] = list(self.pds_store[pds_id]) #While we have some ranks that haven't finished while sum(is_map_done)<self.size: #Wait for a reqest from anyone data_request = self.comm.recv( source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status, ) request_from_rank = status.source if data_request!=pds_id: print("Ignoring stale PDS data request from", request_from_rank,":",data_request,"/",pds_id) continue #Pointer so we don't have to keep doing dict lookups current_pds_items = self.pds_pending_store[pds_id] num_current_pds_items = len(current_pds_items) #Everyone's already exhausted all the data. # Send a sentinel and mark the node as finished if num_current_pds_items == 0: self.comm.send(None, dest=request_from_rank, tag=pds_id) is_map_done[request_from_rank] = True else: #Create the chunk of data to send. Pop off items and tag them with an id. # so we can sort them later chunk_to_send = [] for i in range(self.chunk_size): chunk_to_send+=[(num_current_pds_items-i,current_pds_items.pop())] self.comm.send(chunk_to_send, dest=request_from_rank, tag=pds_id)
[docs] def map(self, func, pds): """ A distributed implementation of map that works on parallel data sets (PDS). On every element of pds the function func is called. Parameters ---------- func: Python func A function that can be applied to every element of the pds pds: PDS class A parallel data set to which func should be applied Returns ------- PDSMPI class a new parallel data set that contains the result of the map """ # Tell the slaves to enter the map() with the current pds_id & func. #Get pds_id of dataset we want to operate on pds_id = pds.pds_id #Generate a new pds_id to be used by the slaves for the resultant PDS pds_id_new = self.__generate_new_pds_id() data = (pds_id, pds_id_new, func) self.__command_slaves(self.OP_MAP, data) self.orchestrate_map(pds_id) pds_res = PDSMPI([], pds_id_new, self) return pds_res
[docs] def collect(self, pds): """ Gather the pds from all the workers, send it to the master and return it as a standard Python list. Parameters ---------- pds: PDS class a parallel data set Returns ------- Python list all elements of pds as a list """ # Tell the slaves to enter collect with the pds's pds_id self.__command_slaves(self.OP_COLLECT, (pds.pds_id,)) all_data = self.comm.gather(pds.python_list, root=0) #Initialize lists to accumulate results all_data_indices,all_data_items = [],[] for node_data in all_data: for item in node_data: all_data_indices+=[item[0]] all_data_items+=[item[1]] #Sort the accumulated data according to the indices we tagged #them with when distributing rdd_sorted = [all_data_items[i] for i in np.argsort(all_data_indices)] return rdd_sorted
[docs] def broadcast(self, value): # Tell the slaves to enter broadcast() bds_id = self.__generate_new_bds_id() self.__command_slaves(self.OP_BROADCAST, (bds_id,)) _ = self.comm.bcast(value, root=0) bds = BDSMPI(value, bds_id, self) return bds
[docs] def delete_remote_pds(self, pds_id): """ A public function for the PDS objects on the master to call when they go out of scope or are deleted in order to ensure the same happens on the slaves. Parameters ---------- pds_id: int A pds_id identifying the remote PDS on the slaves to delete. """ if not self.finalized: self.__command_slaves(self.OP_DELETEPDS, (pds_id,))
[docs] def delete_remote_bds(self, bds_id): """ Public function for the BDS objects on the master to call when they go out of score or are deleted in order to ensure they are deleted ont he slaves as well. Parameters ---------- bds_id: int A bds_id identifying the remote BDS on the slaves to delete. """ if not self.finalized: #The master deallocates it's BDS data. Explicit because #.. bds_store and BDSMPI object are disconnected. del backend.bds_store[bds_id] self.__command_slaves(self.OP_DELETEBDS, (bds_id,))
def __del__(self): """ Overriding the delete function to explicitly call MPI.finalize(). This is also required so we can tell the slaves to get out of the while loop they are in and exit gracefully and they themselves call finalize when they die. """ #Tell the slaves they can exit gracefully. self.__command_slaves(self.OP_FINISH, None) #Finalize the connection because the slaves should have finished. MPI.Finalize() self.finalized = True
[docs]class BackendMPISlave(Backend): """Defines the behavior of the slaves/worker processes This class defines how the slaves should behave during operation. Slaves are those processes(not nodes like Spark) that have rank!=0 and whose ids are not present in the list of non workers. """ OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7]
[docs] def __init__(self): self.comm = MPI.COMM_WORLD self.size = self.comm.Get_size() self.rank = self.comm.Get_rank() #Define the vars that will hold the pds ids received from master to operate on self.__rec_pds_id = None self.__rec_pds_id_result = None #Initialize a BDS store for both master & slave. self.bds_store = {} #Go into an infinite loop waiting for commands from the user. self.slave_run()
[docs] def slave_run(self): """ This method is the infinite loop a slave enters directly from init. It makes the slave wait for a command to perform from the master and then calls the appropriate function. This method also takes care of the synchronization of data between the master and the slaves by matching PDSs based on the pds_ids sent by the master with the command. Commands received from the master are of the form of a tuple. The first component of the tuple is always the operation to be performed and the rest are conditional on the operation. (op,pds_id) where op == OP_PARALLELIZE for parallelize (op,pds_id, pds_id_result,func) where op == OP_MAP for map. (op,pds_id) where op == OP_COLLECT for a collect operation (op,pds_id) where op == OP_DELETEPDS for a delete of the remote PDS on slaves (op,) where op==OP_FINISH for the slave to break out of the loop and terminate """ # Initialize PDS data store here because only slaves need to do it. self.pds_store = {} while True: data = self.comm.bcast(None, root=0) op = data[0] if op == self.OP_PARALLELIZE: pds_id = data[1] self.__rec_pds_id = pds_id pds_id, pds_id_new = self.__get_received_pds_id() self.pds_store[pds_id] = None elif op == self.OP_MAP: pds_id, pds_id_result, function_packed = data[1:] self.__rec_pds_id, self.__rec_pds_id_result = pds_id, pds_id_result #Use cloudpickle to convert back function string to a function func = cloudpickle.loads(function_packed) #Enter the map so we can grab data and perform the func. #Func sent before and not during for performance reasons pds_res = self.map(func) # Store the result in a newly gnerated PDS pds_id self.pds_store[pds_res.pds_id] = pds_res elif op == self.OP_BROADCAST: self.__bds_id = data[1] self.broadcast(None) elif op == self.OP_COLLECT: pds_id = data[1] # Access an existing PDS from data store pds = self.pds_store[pds_id] self.collect(pds) elif op == self.OP_DELETEPDS: pds_id = data[1] del self.pds_store[pds_id] elif op == self.OP_DELETEBDS: bds_id = data[1] del self.bds_store[bds_id] elif op == self.OP_FINISH: quit() else: raise Exception("Slave recieved unknown command code")
def __get_received_pds_id(self): """ Function to retrieve the pds_id(s) we received from the master to associate our slave's created PDS with the master's. """ return self.__rec_pds_id, self.__rec_pds_id_result
[docs] def parallelize(self): pass
[docs] def map(self, func): """ A distributed implementation of map that works on parallel data sets (PDS). On every element of pds the function func is called. Parameters ---------- func: Python func A function that can be applied to every element of the pds Returns ------- PDSMPI class a new parallel data set that contains the result of the map """ map_start = time.time() #Get the PDS id we operate on and the new one to store the result in pds_id, pds_id_new = self.__get_received_pds_id() rdd = [] while True: #Ask for a chunk of data since it's free data_chunks = self.comm.sendrecv(pds_id, 0, pds_id) #If it receives a sentinel, it's done and it can exit if data_chunks is None: break #Accumulate the indicess and *processed* chunks for chunk in data_chunks: data_index,data_item = chunk rdd+=[(data_index,func(data_item))] pds_res = PDSMPI(rdd, pds_id_new, self) return pds_res
[docs] def collect(self, pds): """ Gather the pds from all the workers, send it to the master and return it as a standard Python list. Parameters ---------- pds: PDS class a parallel data set Returns ------- Python list all elements of pds as a list """ #Send the data we have back to the master _ = self.comm.gather(pds.python_list, root=0)
[docs] def broadcast(self, value): """ Value is ignored for the slaves. We get data from master """ value = self.comm.bcast(None, root=0) self.bds_store[self.__bds_id] = value
[docs]class BackendMPI(BackendMPIMaster if MPI.COMM_WORLD.Get_rank() == 0 else BackendMPISlave): """A backend parallelized by using MPI The backend conditionally inherits either the BackendMPIMaster class or the BackendMPISlave class depending on it's rank. This lets BackendMPI have a uniform interface for the user but allows for a logical split between functions performed by the master and the slaves. """
[docs] def __init__(self, master_node_ranks=[0]): self.comm = MPI.COMM_WORLD self.size = self.comm.Get_size() self.rank = self.comm.Get_rank() if self.size < 2: raise ValueError('A minimum of 2 ranks are required for the MPI backend') #Set the global backend globals()['backend'] = self #Call the appropriate constructors and pass the required data if self.rank == 0: super().__init__(master_node_ranks) else: super().__init__() raise Exception("Slaves exitted main loop.")
[docs]class PDSMPI(PDS): """ This is an MPI wrapper for a Python parallel data set. """
[docs] def __init__(self, python_list, pds_id, backend_obj): self.python_list = python_list self.pds_id = pds_id self.backend_obj = backend_obj
def __del__(self): """ Destructor to be called when a PDS falls out of scope and/or is being deleted. Uses the backend to send a message to destroy the slaves' copy of the pds. """ try: self.backend_obj.delete_remote_pds(self.pds_id) except AttributeError: #Catch "delete_remote_pds not defined" for slaves and ignore. pass
[docs]class BDSMPI(BDS): """ This is a wrapper for MPI's BDS class. """
[docs] def __init__(self, object, bds_id, backend_obj): #The BDS data is no longer saved in the BDS object. #It will access & store the data only from the current backend self.bds_id = bds_id backend.bds_store[self.bds_id] = object
# self.backend_obj = backend_obj
[docs] def value(self): """ This method returns the actual object that the broadcast data set represents. """ return backend.bds_store[self.bds_id]
def __del__(self): """ Destructor to be called when a BDS falls out of scope and/or is being deleted. Uses the backend to send a message to destroy the slaves' copy of the bds. """ try: backend.delete_remote_bds(self.bds_id) except AttributeError: #Catch "delete_remote_pds not defined" for slaves and ignore. pass
[docs]class BackendMPITestHelper: """ Helper function for some of the test cases to be able to access and verify class members. """
[docs] def check_pds(self, k): """Checks if a PDS exists in the pds data store. Used to verify deletion and creation """ return k in backend.pds_store.keys()
[docs] def check_bds(self, k): """Checks if a BDS exists in the BDS data store. Used to verify deletion and creation """ return k in backend.bds_store.keys()