Shortcuts

Source code for pytorch_lightning.plugins.environments.lsf_environment

# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import socket

from pytorch_lightning import _logger as log
from pytorch_lightning.plugins.environments import ClusterEnvironment


[docs]class LSFEnvironment(ClusterEnvironment): """An environment for running on clusters managed by the LSF resource manager. It is expected that any execution using this ClusterEnvironment was executed using the Job Step Manager i.e. ``jsrun``. This plugin expects the following environment variables. LSB_JOBID: The LSF assigned job ID LSB_HOSTS: The hosts used in the job. This string is expected to have the format "batch <rank_0_host> ...." JSM_NAMESPACE_LOCAL_RANK: The node local rank for the task. This environment variable is set by jsrun JSM_NAMESPACE_SIZE: The world size for the task. This environment variable is set by jsrun """ def __init__(self): self._master_address = self._get_master_address() self._master_port = self._get_master_port() log.debug(f"MASTER_ADDR: {self._master_address}") log.debug(f"MASTER_PORT: {self._master_port}")
[docs] @staticmethod def is_using_lsf() -> bool: """Returns ``True`` if the current process was launched using the jsrun command.""" required_env_vars = ("LSB_JOBID", "LSB_HOSTS", "JSM_NAMESPACE_LOCAL_RANK", "JSM_NAMESPACE_SIZE") return all(v in os.environ for v in required_env_vars)
@property def creates_processes_externally(self) -> bool: return True
[docs] def master_address(self): """The master address is read from a list of hosts contained in the environment variable `LSB_HOSTS`.""" return self._master_address
[docs] def master_port(self): """THe master port gets calculated from the LSF job ID.""" return self._master_port
[docs] def world_size(self): """The world size is read from the environment variable `JSM_NAMESPACE_SIZE`.""" var = "JSM_NAMESPACE_SIZE" world_size = os.environ.get(var) if world_size is None: raise ValueError( f"Cannot determine world size from environment variable {var}." " Make sure you run your executable with `jsrun`" ) return int(world_size)
def set_world_size(self, size: int) -> None: log.debug("LSFEnvironment.set_world_size was called, but setting world size is not allowed. Ignored.")
[docs] def global_rank(self): """The world size is read from the environment variable `JSM_NAMESPACE_RANK`.""" var = "JSM_NAMESPACE_RANK" global_rank = os.environ.get(var) if global_rank is None: raise ValueError( f"Cannot determine global rank from environment variable {var}." " Make sure you run your executable with `jsrun`" ) return int(global_rank)
def set_global_rank(self, rank: int) -> None: log.debug("LSFEnvironment.set_global_rank was called, but setting global rank is not allowed. Ignored.")
[docs] def local_rank(self): """The local rank is read from the environment variable `JSM_NAMESPACE_LOCAL_RANK`.""" var = "JSM_NAMESPACE_LOCAL_RANK" local_rank = os.environ.get(var) if local_rank is None: raise ValueError( f"Cannot determine local rank from environment variable {var}." " Make sure you run your executable with `jsrun`" ) return int(local_rank)
[docs] def node_rank(self): """The node rank is determined by the position of the current hostname in the list of hosts stored in the environment variable `LSB_HOSTS`.""" hosts = self._read_hosts() count = {} for host in hosts: if "batch" in host or "login" in host: continue if host not in count: count[host] = len(count) return count[socket.gethostname()]
@staticmethod def _read_hosts(): hosts = os.environ.get("LSB_HOSTS") if not hosts: raise ValueError("Could not find hosts in environment variable LSB_HOSTS") hosts = hosts.split() if len(hosts) < 2: raise ValueError( 'Cannot parse hosts from LSB_HOSTS environment variable. Expected format: "batch <rank_0_host> ..."' ) return hosts def _get_master_address(self): hosts = self._read_hosts() return hosts[1] @staticmethod def _get_master_port(): """A helper function for accessing the master port. Uses the LSF job ID so all ranks can compute the master port. """ # check for user-specified master port port = os.environ.get("MASTER_PORT") if not port: jobid = os.environ.get("LSB_JOBID") if not jobid: raise ValueError("Could not find job id in environment variable LSB_JOBID") port = int(jobid) # all ports should be in the 10k+ range port = int(port) % 1000 + 10000 log.debug(f"calculated LSF master port: {port}") else: log.debug(f"using externally specified master port: {port}") return int(port)

© Copyright Copyright (c) 2018-2023, William Falcon et al...

Built with Sphinx using a theme provided by Read the Docs.