Source code for lightning.pytorch.strategies.hpu_parallel
# Copyright The Lightning AI team.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importloggingimportosfromtypingimportAny,Callable,Dict,List,Optional,Unionimporttorch.distributedfromtorch.nnimportModulefromtorch.optim.optimizerimportOptimizerimportlightning.pytorchasplfromlightning.fabric.pluginsimportCheckpointIO,ClusterEnvironmentfromlightning.fabric.utilities.distributedimportgroupas_groupfromlightning.pytorch.accelerators.hpuimport_HPU_AVAILABLEfromlightning.pytorch.plugins.io.hpu_pluginimportHPUCheckpointIOfromlightning.pytorch.plugins.io.wrapperimport_WrappingCheckpointIOfromlightning.pytorch.plugins.precisionimportPrecisionPluginfromlightning.pytorch.strategies.ddpimportDDPStrategyfromlightning.pytorch.utilities.exceptionsimportMisconfigurationExceptionif_HPU_AVAILABLE:importhabana_frameworks.torch.coreashtcoreimporthabana_frameworks.torch.distributed.hccl# noqa: F401log=logging.getLogger(__name__)
[docs]classHPUParallelStrategy(DDPStrategy):"""Strategy for distributed training on multiple HPU devices. .. warning:: This is an :ref:`experimental <versioning:Experimental API>` feature. """strategy_name="hpu_parallel"def__init__(self,accelerator:Optional["pl.accelerators.Accelerator"]=None,parallel_devices:Optional[List[torch.device]]=None,cluster_environment:Optional[ClusterEnvironment]=None,checkpoint_io:Optional[CheckpointIO]=None,precision_plugin:Optional[PrecisionPlugin]=None,ddp_comm_state:Optional[object]=None,ddp_comm_hook:Optional[Callable]=None,ddp_comm_wrapper:Optional[Callable]=None,model_averaging_period:Optional[int]=None,process_group_backend:Optional[str]="hccl",**kwargs:Any,)->None:ifnot_HPU_AVAILABLE:raiseMisconfigurationException("`HPUParallelStrategy` requires HPU devices to run")super().__init__(accelerator=accelerator,parallel_devices=parallel_devices,cluster_environment=cluster_environment,checkpoint_io=checkpoint_io,precision_plugin=precision_plugin,ddp_comm_state=ddp_comm_state,ddp_comm_hook=ddp_comm_hook,ddp_comm_wrapper=ddp_comm_wrapper,model_averaging_period=model_averaging_period,process_group_backend=process_group_backend,**kwargs,)@propertydefcheckpoint_io(self)->CheckpointIO:ifself._checkpoint_ioisNone:self._checkpoint_io=HPUCheckpointIO()elifisinstance(self._checkpoint_io,_WrappingCheckpointIO):self._checkpoint_io.checkpoint_io=HPUCheckpointIO()returnself._checkpoint_io@checkpoint_io.setterdefcheckpoint_io(self,io:Optional[CheckpointIO])->None:self._checkpoint_io=io
[docs]defsetup_environment(self)->None:os.environ["ID"]=str(self.local_rank)ifself._process_group_backend=="hccl":# this env is used in overrides to check the backend initiatedos.environ["HCCL_DISTRIBUTED_BACKEND"]=str(1)super().setup_environment()
defon_after_backward(self)->None:# Break lazy accumulation of graph after fwd+bwdhtcore.mark_step()
[docs]defoptimizer_step(self,optimizer:Optimizer,closure:Callable[[],Any],model:Optional[Union["pl.LightningModule",Module]]=None,**kwargs:Any,)->Any:optimizer_output=super().optimizer_step(optimizer,closure,model,**kwargs)# Break lazy accumulation of graph after optimizerhtcore.mark_step()returnoptimizer_output
[docs]defteardown(self)->None:super().teardown()# Was set to local rankos.environ.pop("ID",None)os.environ.pop("HCCL_DISTRIBUTED_BACKEND",None)
# The code underneath is taken from PyTorch `torch/distributed/distributed_c10d.py`# the distributed backend and tensor type updates for habana backend is done here before broadcastdef_hpu_broadcast_object_list(object_list,src=0,group=None,device=None):# type: ignorefromtorch.distributedimport_rank_not_in_group,Backend,broadcast,get_backend,get_rankfromtorch.distributed.distributed_c10dimport_object_to_tensor,_tensor_to_objectif_rank_not_in_group(group):returnmy_rank=get_rank()# Serialize object_list elements to tensors on src rank.ifmy_rank==src:tensor_list,size_list=zip(*[_object_to_tensor(obj,device)forobjinobject_list])object_sizes_tensor=torch.cat(size_list)else:object_sizes_tensor=torch.empty(len(object_list),dtype=torch.long)# Current device selection.# To preserve backwards compatibility, ``device`` is default to ``None``# in which case we run current logic of device selection, i.e.# ``current_device`` is CUDA if backend is NCCL otherwise CPU device. In the# case it is not ``None`` we move the size and object tensors to be# broadcasted to this device.group_backend=get_backend(group)is_nccl_backend=group_backend==Backend.NCCLis_hpu_backend=os.environ.get("HCCL_DISTRIBUTED_BACKEND")=="1"ifdeviceisnotNone:ifis_nccl_backendanddevice.type!="cuda":raiseValueError("device type must be cuda for nccl backend")current_device=deviceelse:current_device=torch.device("cpu")ifis_nccl_backend:# See note about using torch.cuda.current_device() here in# docstring. We cannot simply use my_rank since rank == device is# not necessarily true.current_device=torch.device("cuda",torch.cuda.current_device())ifis_nccl_backend:object_sizes_tensor=object_sizes_tensor.to(current_device)elifis_hpu_backend:current_device=torch.device("hpu")# Workaround: HPU doesn't not support long tensors for collectivesif(object_sizes_tensor.type()=="torch.LongTensor")or(object_sizes_tensor.type()=="torch.hpu.LongTensor"):object_sizes_tensor=object_sizes_tensor.int()else:print("unhandled hpu object_sizes_tensor type :: ",object_sizes_tensor.type())object_sizes_tensor=object_sizes_tensor.to(current_device)# Broadcast object sizesbroadcast(object_sizes_tensor,src=src,group=group)# Concatenate and broadcast serialized object tensorsifmy_rank==src:object_tensor=torch.cat(tensor_list)else:object_tensor=torch.empty(torch.sum(object_sizes_tensor).int().item(),dtype=torch.uint8,)ifis_nccl_backendoris_hpu_backend:object_tensor=object_tensor.to(current_device)broadcast(object_tensor,src=src,group=group)# Deserialize objects using their stored sizes.offset=0ifmy_rank!=src:fori,obj_sizeinenumerate(object_sizes_tensor):obj_view=object_tensor[offset:offset+obj_size]obj_view=obj_view.type(torch.uint8)ifobj_view.device!=torch.device("cpu"):obj_view=obj_view.cpu()offset+=obj_sizeobject_list[i]=_tensor_to_object(obj_view,obj_size)
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. Read PyTorch Lightning's Privacy Policy.