Source code for lightning.fabric.plugins.environments.slurm
# Copyright The Lightning AI team.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importloggingimportosimportreimportshutilimportsignalimportsysfromtypingimportOptionalfromlightning.fabric.plugins.environments.cluster_environmentimportClusterEnvironmentfromlightning.fabric.utilities.importsimport_IS_WINDOWSfromlightning.fabric.utilities.rank_zeroimportrank_zero_warnfromlightning.fabric.utilities.warningsimportPossibleUserWarninglog=logging.getLogger(__name__)
[docs]classSLURMEnvironment(ClusterEnvironment):"""Cluster environment for training on a cluster managed by SLURM. Args: auto_requeue: Whether automatic job resubmission is enabled or not. How and under which conditions a job gets rescheduled gets determined by the owner of this plugin. requeue_signal: The signal that SLURM will send to indicate that the job should be requeued. Defaults to SIGUSR1 on Unix. """def__init__(self,auto_requeue:bool=True,requeue_signal:Optional[signal.Signals]=None)->None:super().__init__()self.auto_requeue=auto_requeueifrequeue_signalisNoneandnot_IS_WINDOWS:requeue_signal=signal.SIGUSR1self.requeue_signal=requeue_signalself._validate_srun_used()self._validate_srun_variables()@propertydefcreates_processes_externally(self)->bool:returnTrue@propertydefmain_address(self)->str:nodelist=os.environ.get("SLURM_NODELIST","127.0.0.1")root_node=self.resolve_root_node_address(nodelist)os.environ["MASTER_ADDR"]=root_nodelog.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")returnroot_node@propertydefmain_port(self)->int:# -----------------------# SLURM JOB = PORT number# -----------------------# this way every process knows what port to usejob_id=os.environ.get("SLURM_JOB_ID")ifjob_idisnotNone:# use the last 4 numbers in the job id as the iddefault_port=job_id[-4:]# all ports should be in the 10k+ rangedefault_port=int(default_port)+15000else:default_port=12910# -----------------------# PORT NUMBER = MASTER_PORT# -----------------------# in case the user passed it inif"MASTER_PORT"inos.environ:default_port=int(os.environ["MASTER_PORT"])else:os.environ["MASTER_PORT"]=str(default_port)log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}")returndefault_port
[docs]@staticmethoddefdetect()->bool:"""Returns ``True`` if the current process was launched on a SLURM cluster. It is possible to use the SLURM scheduler to request resources and then launch processes manually using a different environment. For this, the user can set the job name in SLURM to 'bash' or 'interactive' (srun --job- name=interactive). This will then avoid the detection of ``SLURMEnvironment`` and another environment can be detected automatically. """SLURMEnvironment._validate_srun_used()return_is_srun_used()
@staticmethoddefjob_name()->Optional[str]:returnos.environ.get("SLURM_JOB_NAME")@staticmethoddefjob_id()->Optional[int]:# in interactive mode, don't make logs use the same job idif_is_slurm_interactive_mode():returnNonejob_id=os.environ.get("SLURM_JOB_ID")ifjob_idisNone:returnNonetry:returnint(job_id)exceptValueError:returnNone
[docs]@staticmethoddefresolve_root_node_address(nodes:str)->str:"""The node selection format in SLURM supports several formats. This function selects the first host name from - a space-separated list of host names, e.g., 'host0 host1 host3' yields 'host0' as the root - a comma-separated list of host names, e.g., 'host0,host1,host3' yields 'host0' as the root - the range notation with brackets, e.g., 'host[5-9]' yields 'host5' as the root """nodes=re.sub(r"\[(.*?)[,-].*\]","\\1",nodes)# Take the first node of every node rangenodes=re.sub(r"\[(.*?)\]","\\1",nodes)# handle special case where node range is single numberreturnnodes.split(" ")[0].split(",")[0]
@staticmethoddef_validate_srun_used()->None:"""Checks if the `srun` command is available and used. Parallel jobs (multi-GPU, multi-node) in SLURM are launched by prepending `srun` in front of the Python command. Not doing so will result in processes hanging, which is a frequent user error. Lightning will emit a warning if `srun` is found but not used. """if_IS_WINDOWS:returnsrun_exists=shutil.which("srun")isnotNoneifsrun_existsandnot_is_srun_used():hint=" ".join(["srun",os.path.basename(sys.executable),*sys.argv])[:64]rank_zero_warn("The `srun` command is available on your system but is not used. HINT: If your intention is to run"f" Lightning on SLURM, prepend your python command with `srun` like so: {hint} ...",category=PossibleUserWarning,)@staticmethoddef_validate_srun_variables()->None:"""Checks for conflicting or incorrectly set variables set through `srun` and raises a useful error message. Right now, we only check for the most common user errors. See `the srun docs <https://slurm.schedmd.com/srun.html>`_ for a complete list of supported srun variables. """ntasks=int(os.environ.get("SLURM_NTASKS","1"))ifntasks>1and"SLURM_NTASKS_PER_NODE"notinos.environ:raiseRuntimeError(f"You set `--ntasks={ntasks}` in your SLURM bash script, but this variable is not supported."f" HINT: Use `--ntasks-per-node={ntasks}` instead.")
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. Read PyTorch Lightning's Privacy Policy.