[docs]classCollective(ABC):"""Interface for collective operations. Supports communications between multiple processes and multiple nodes. A collective owns a group. .. warning:: This API is experimental and subject to change """def__init__(self)->None:self._group:Optional[CollectibleGroup]=None@property@abstractmethoddefrank(self)->int:...@property@abstractmethoddefworld_size(self)->int:...@propertydefgroup(self)->CollectibleGroup:ifself._groupisNone:raiseRuntimeError(f"`{type(self).__name__}` does not own a group. HINT: try `collective.create_group().group`")returnself._group@abstractmethoddefbroadcast(self,tensor:Tensor,src:int)->Tensor:...@abstractmethoddefall_reduce(self,tensor:Tensor,op:str)->Tensor:...@abstractmethoddefreduce(self,tensor:Tensor,dst:int,op:str)->Tensor:...@abstractmethoddefall_gather(self,tensor_list:List[Tensor],tensor:Tensor)->List[Tensor]:...@abstractmethoddefgather(self,tensor:Tensor,gather_list:List[Tensor],dst:int=0)->List[Tensor]:...@abstractmethoddefscatter(self,tensor:Tensor,scatter_list:List[Tensor],src:int=0)->Tensor:...@abstractmethoddefreduce_scatter(self,output:Tensor,input_list:List[Tensor],op:str)->Tensor:...@abstractmethoddefall_to_all(self,output_tensor_list:List[Tensor],input_tensor_list:List[Tensor])->List[Tensor]:...@abstractmethoddefsend(self,tensor:Tensor,dst:int,tag:int=0)->None:...@abstractmethoddefrecv(self,tensor:Tensor,src:Optional[int]=None,tag:int=0)->Tensor:...@abstractmethoddefbarrier(self,device_ids:Optional[List[int]]=None)->None:...@classmethod@abstractmethoddefis_available(cls)->bool:...@classmethod@abstractmethoddefis_initialized(cls)->bool:...@classmethod@abstractmethoddefinit_group(cls,**kwargs:Any)->None:...@classmethod@abstractmethoddefnew_group(cls,**kwargs:Any)->CollectibleGroup:...@classmethod@abstractmethoddefdestroy_group(cls,group:CollectibleGroup)->None:...@classmethod@abstractmethoddef_convert_to_native_op(cls,op:str)->Any:...defsetup(self,**kwargs:Any)->Self:ifnotself.is_initialized():self.init_group(**kwargs)returnself
[docs]defcreate_group(self,**kwargs:Any)->Self:"""Create a group. This assumes that :meth:`~lightning_fabric.plugins.collectives.Collective.init_group` has been called already by the user. """ifself._groupisnotNone:raiseRuntimeError(f"`{type(self).__name__}` already owns a group.")self._group=self.new_group(**kwargs)returnself
defteardown(self)->Self:ifself._groupisNone:raiseRuntimeError(f"`{type(self).__name__}` does not own a group to destroy.")self.destroy_group(self._group)self._group=Nonereturnself
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. Read PyTorch Lightning's Privacy Policy.