Source code for pytorch_lightning.plugins.environments.lsf_environment
# Copyright The PyTorch Lightning team.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importosimportsocketfromtypingimportDict,Listfrompytorch_lightningimport_loggeraslogfrompytorch_lightning.plugins.environmentsimportClusterEnvironmentfrompytorch_lightning.utilities.cloud_ioimportget_filesystem
[docs]classLSFEnvironment(ClusterEnvironment):"""An environment for running on clusters managed by the LSF resource manager. It is expected that any execution using this ClusterEnvironment was executed using the Job Step Manager i.e. ``jsrun``. This plugin expects the following environment variables: ``LSB_JOBID`` The LSF assigned job ID ``LSB_DJOB_RANKFILE`` The OpenMPI compatible rank file for the LSF job ``JSM_NAMESPACE_LOCAL_RANK`` The node local rank for the task. This environment variable is set by ``jsrun`` ``JSM_NAMESPACE_SIZE`` The world size for the task. This environment variable is set by ``jsrun`` ``JSM_NAMESPACE_RANK`` The global rank for the task. This environment variable is set by ``jsrun`` """def__init__(self)->None:super().__init__()self._main_address=self._get_main_address()self._main_port=self._get_main_port()self._node_rank=self._get_node_rank()self._set_init_progress_group_env_vars()def_set_init_progress_group_env_vars(self)->None:# set environment variables needed for initializing torch distributed process groupos.environ["MASTER_ADDR"]=str(self._main_address)log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")os.environ["MASTER_PORT"]=str(self._main_port)log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}")@propertydefcreates_processes_externally(self)->bool:"""LSF creates subprocesses, i.e., PyTorch Lightning does not need to spawn them."""returnTrue@propertydefmain_address(self)->str:"""The main address is read from an OpenMPI host rank file in the environment variable ``LSB_DJOB_RANKFILE``."""returnself._main_address@propertydefmain_port(self)->int:"""The main port is calculated from the LSF job ID."""returnself._main_port
[docs]@staticmethoddefdetect()->bool:"""Returns ``True`` if the current process was launched using the ``jsrun`` command."""required_env_vars={"LSB_JOBID","LSB_DJOB_RANKFILE","JSM_NAMESPACE_LOCAL_RANK","JSM_NAMESPACE_SIZE"}returnrequired_env_vars.issubset(os.environ.keys())
[docs]defworld_size(self)->int:"""The world size is read from the environment variable ``JSM_NAMESPACE_SIZE``."""world_size=os.environ.get("JSM_NAMESPACE_SIZE")ifworld_sizeisNone:raiseValueError("Cannot determine world size. Environment variable `JSM_NAMESPACE_SIZE` not found.""Make sure you run your executable with `jsrun`.")returnint(world_size)
defset_world_size(self,size:int)->None:log.debug("LSFEnvironment.set_world_size was called, but setting world size is not allowed. Ignored.")
[docs]defglobal_rank(self)->int:"""The world size is read from the environment variable ``JSM_NAMESPACE_RANK``."""global_rank=os.environ.get("JSM_NAMESPACE_RANK")ifglobal_rankisNone:raiseValueError("Cannot determine global rank. Environment variable `JSM_NAMESPACE_RANK` not found.""Make sure you run your executable with `jsrun`.")returnint(global_rank)
defset_global_rank(self,rank:int)->None:log.debug("LSFEnvironment.set_global_rank was called, but setting global rank is not allowed. Ignored.")
[docs]deflocal_rank(self)->int:"""The local rank is read from the environment variable `JSM_NAMESPACE_LOCAL_RANK`."""local_rank=os.environ.get("JSM_NAMESPACE_LOCAL_RANK")iflocal_rankisNone:raiseValueError("Cannot determine local rank. Environment variable `JSM_NAMESPACE_LOCAL_RANK` not found.""Make sure you run your executable with `jsrun`.")returnint(local_rank)
[docs]defnode_rank(self)->int:"""The node rank is determined by the position of the current hostname in the OpenMPI host rank file stored in ``LSB_DJOB_RANKFILE``."""returnself._node_rank
def_get_node_rank(self)->int:"""A helper method for getting the node rank. The node rank is determined by the position of the current node in the list of hosts used in the job. This is calculated by reading all hosts from ``LSB_DJOB_RANKFILE`` and finding this node's hostname in the list. """hosts=self._read_hosts()count:Dict[str,int]={}forhostinhosts:ifhostnotincount:count[host]=len(count)returncount[socket.gethostname()]@staticmethoddef_read_hosts()->List[str]:"""Read compute hosts that are a part of the compute job. LSF uses the Job Step Manager (JSM) to manage job steps. Job steps are executed by the JSM from "launch" nodes. Each job is assigned a launch node. This launch node will be the first node in the list contained in ``LSB_DJOB_RANKFILE``. """var="LSB_DJOB_RANKFILE"rankfile=os.environ.get(var)ifrankfileisNone:raiseValueError("Did not find the environment variable `LSB_DJOB_RANKFILE`")ifnotrankfile:raiseValueError("The environment variable `LSB_DJOB_RANKFILE` is empty")fs=get_filesystem(rankfile)withfs.open(rankfile,"r")asf:ret=[line.strip()forlineinf]# remove the launch node (i.e. the first node in LSB_DJOB_RANKFILE) from the listreturnret[1:]def_get_main_address(self)->str:"""A helper for getting the main address. The main address is assigned to the first node in the list of nodes used for the job. """hosts=self._read_hosts()returnhosts[0]@staticmethoddef_get_main_port()->int:"""A helper function for accessing the main port. Uses the LSF job ID so all ranks can compute the main port. """# check for user-specified main portif"MASTER_PORT"inos.environ:log.debug(f"Using externally specified main port: {os.environ['MASTER_PORT']}")returnint(os.environ["MASTER_PORT"])if"LSB_JOBID"inos.environ:port=int(os.environ["LSB_JOBID"])# all ports should be in the 10k+ rangeport=port%1000+10000log.debug(f"calculated LSF main port: {port}")returnportraiseValueError("Could not find job id in environment variable LSB_JOBID")
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. Read PyTorch Lightning's Privacy Policy.