Source code for pytorch_lightning.strategies.hpu_parallel
# Copyright The Lightning AI team.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importloggingimportosfromtypingimportAny,Callable,Dict,List,Optional,Unionimporttorch.distributedfromtorch.nnimportModulefromtorch.optim.optimizerimportOptimizerimportpytorch_lightningasplfromlightning_fabric.pluginsimportCheckpointIO,ClusterEnvironmentfromlightning_fabric.utilities.distributedimportgroupas_groupfrompytorch_lightning.accelerators.hpuimport_HPU_AVAILABLEfrompytorch_lightning.overridesimportLightningDistributedModulefrompytorch_lightning.plugins.io.hpu_pluginimportHPUCheckpointIOfrompytorch_lightning.plugins.io.wrapperimport_WrappingCheckpointIOfrompytorch_lightning.plugins.precisionimportPrecisionPluginfrompytorch_lightning.strategies.ddpimportDDPStrategyfrompytorch_lightning.utilities.exceptionsimportMisconfigurationExceptionfrompytorch_lightning.utilities.importsimport_TORCH_LESSER_EQUAL_1_10_2frompytorch_lightning.utilities.typesimportSTEP_OUTPUTif_HPU_AVAILABLE:importhabana_frameworks.torch.coreashtcoreimporthabana_frameworks.torch.distributed.hccl# noqa: F401log=logging.getLogger(__name__)
[docs]classHPUParallelStrategy(DDPStrategy):"""Strategy for distributed training on multiple HPU devices."""strategy_name="hpu_parallel"def__init__(self,accelerator:Optional["pl.accelerators.Accelerator"]=None,parallel_devices:Optional[List[torch.device]]=None,cluster_environment:Optional[ClusterEnvironment]=None,checkpoint_io:Optional[CheckpointIO]=None,precision_plugin:Optional[PrecisionPlugin]=None,ddp_comm_state:Optional[object]=None,ddp_comm_hook:Optional[Callable]=None,ddp_comm_wrapper:Optional[Callable]=None,model_averaging_period:Optional[int]=None,process_group_backend:Optional[str]="hccl",**kwargs:Any,)->None:ifnot_HPU_AVAILABLE:raiseMisconfigurationException("`HPUParallelStrategy` requires HPU devices to run")super().__init__(accelerator=accelerator,parallel_devices=parallel_devices,cluster_environment=cluster_environment,checkpoint_io=checkpoint_io,precision_plugin=precision_plugin,ddp_comm_state=ddp_comm_state,ddp_comm_hook=ddp_comm_hook,ddp_comm_wrapper=ddp_comm_wrapper,model_averaging_period=model_averaging_period,process_group_backend=process_group_backend,**kwargs,)@propertydefcheckpoint_io(self)->CheckpointIO:ifself._checkpoint_ioisNone:self._checkpoint_io=HPUCheckpointIO()elifisinstance(self._checkpoint_io,_WrappingCheckpointIO):self._checkpoint_io.checkpoint_io=HPUCheckpointIO()returnself._checkpoint_io@checkpoint_io.setterdefcheckpoint_io(self,io:Optional[CheckpointIO])->None:self._checkpoint_io=io
[docs]defsetup_environment(self)->None:os.environ["ID"]=str(self.local_rank)ifself._process_group_backend=="hccl":# this env is used in overrides to check the backend initiatedos.environ["HCCL_DISTRIBUTED_BACKEND"]=str(1)super().setup_environment()
defdetermine_ddp_device_ids(self)->None:returnNonedef_pre_configure_ddp(self)->None:# if unset, default `find_unused_parameters` `True`# Many models require setting this parameter to True, as there are corner cases# when not all parameter backward hooks are fired by the autograd engine even if require_grad is set to True.# This flag does come with a performance hit, so it is suggested to disable in cases where it is possible.self._ddp_kwargs["find_unused_parameters"]=self._ddp_kwargs.get("find_unused_parameters",True)self._static_graph=Falsestatic_graph=self._ddp_kwargs.get("static_graph")ifstatic_graph:# when _set_static_graph() is called find_unused_parameters does not have any significance.# Resetting the value of find_unused_parameters to False which is the default value to DDPself._ddp_kwargs["find_unused_parameters"]=Falseself._static_graph=Trueifstatic_graphisnotNone:# DDP does not accept static_graph as a parameter, hence removing it from the listdelself._ddp_kwargs["static_graph"]defconfigure_ddp(self)->None:# DDP does not accept static graph as param with torch < 1.11if_TORCH_LESSER_EQUAL_1_10_2:log.detail(f"{self.__class__.__name__}: configuring DistributedDataParallel")self._pre_configure_ddp()self.model=self._setup_model(LightningDistributedModule(self.model))# type: ignoreifself.root_device.type=="hpu"andself._static_graph:self._model._set_static_graph()# type: ignoreself._register_ddp_hooks()else:super().configure_ddp()
defon_after_backward(self)->None:# Break lazy accumulation of graph after fwd+bwdhtcore.mark_step()
[docs]defoptimizer_step(self,optimizer:Optimizer,opt_idx:int,closure:Callable[[],Any],model:Optional[Union["pl.LightningModule",Module]]=None,**kwargs:Any,)->Any:optimizer_output=super().optimizer_step(optimizer,opt_idx,closure,model,**kwargs)# Break lazy accumulation of graph after optimizerhtcore.mark_step()returnoptimizer_output
defvalidation_step_end(self,step_output:STEP_OUTPUT)->STEP_OUTPUT:# Break lazy accumulation of graph after every stephtcore.mark_step()returnstep_outputdeftest_step_end(self,step_output:STEP_OUTPUT)->STEP_OUTPUT:# Break lazy accumulation of graph after every stephtcore.mark_step()returnstep_output@classmethoddefregister_strategies(cls,strategy_registry:Dict)->None:strategy_registry.register(cls.strategy_name,cls,description=f"{cls.__class__.__name__}",)
[docs]defteardown(self)->None:super().teardown()# Was set to local rankos.environ.pop("ID",None)os.environ.pop("HCCL_DISTRIBUTED_BACKEND",None)
# The code underneath is taken from PyTorch `torch/distributed/distributed_c10d.py`# the distributed backend and tensor type updates for habana backend is done here before broadcastdef_hpu_broadcast_object_list(object_list,src=0,group=None,device=None):# type: ignorefromtorch.distributedimport_rank_not_in_group,Backend,broadcast,get_backend,get_rankfromtorch.distributed.distributed_c10dimport_object_to_tensor,_tensor_to_objectif_rank_not_in_group(group):returnmy_rank=get_rank()# Serialize object_list elements to tensors on src rank.ifmy_rank==src:tensor_list,size_list=zip(*[_object_to_tensor(obj,device)forobjinobject_list])object_sizes_tensor=torch.cat(size_list)else:object_sizes_tensor=torch.empty(len(object_list),dtype=torch.long)# Current device selection.# To preserve backwards compatibility, ``device`` is default to ``None``# in which case we run current logic of device selection, i.e.# ``current_device`` is CUDA if backend is NCCL otherwise CPU device. In the# case it is not ``None`` we move the size and object tensors to be# broadcasted to this device.group_backend=get_backend(group)is_nccl_backend=group_backend==Backend.NCCLis_hpu_backend=os.environ.get("HCCL_DISTRIBUTED_BACKEND")=="1"ifdeviceisnotNone:ifis_nccl_backendanddevice.type!="cuda":raiseValueError("device type must be cuda for nccl backend")current_device=deviceelse:current_device=torch.device("cpu")ifis_nccl_backend:# See note about using torch.cuda.current_device() here in# docstring. We cannot simply use my_rank since rank == device is# not necessarily true.current_device=torch.device("cuda",torch.cuda.current_device())ifis_nccl_backend:object_sizes_tensor=object_sizes_tensor.to(current_device)elifis_hpu_backend:current_device=torch.device("hpu")# Workaround: HPU doesn't not support long tensors for collectivesif(object_sizes_tensor.type()=="torch.LongTensor")or(object_sizes_tensor.type()=="torch.hpu.LongTensor"):object_sizes_tensor=object_sizes_tensor.int()else:print("unhandled hpu object_sizes_tensor type :: ",object_sizes_tensor.type())object_sizes_tensor=object_sizes_tensor.to(current_device)# Broadcast object sizesbroadcast(object_sizes_tensor,src=src,group=group)# Concatenate and broadcast serialized object tensorsifmy_rank==src:object_tensor=torch.cat(tensor_list)else:object_tensor=torch.empty(torch.sum(object_sizes_tensor).int().item(),dtype=torch.uint8,)ifis_nccl_backendoris_hpu_backend:object_tensor=object_tensor.to(current_device)broadcast(object_tensor,src=src,group=group)# Deserialize objects using their stored sizes.offset=0ifmy_rank!=src:fori,obj_sizeinenumerate(object_sizes_tensor):obj_view=object_tensor[offset:offset+obj_size]obj_view=obj_view.type(torch.uint8)ifobj_view.device!=torch.device("cpu"):obj_view=obj_view.cpu()offset+=obj_sizeobject_list[i]=_tensor_to_object(obj_view,obj_size)
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. Read PyTorch Lightning's Privacy Policy.