Single-Node multi-GPU Deepspeed training fails with cuda OOM on Azure

Hi community,
we are currently trying to run Pytorch-Lightning on Azure (specs below) using a single node with four GPU’s for training a transformer.
It starts training (refer to std_log_process_0.tx) and then runs into a cuda out-of-memory error. Apparently none of the other GPU’s has started processing (refer to std_log_process_1.txt, std_log_process_2.txt, std_log_process_3.txt ) so far. We couldn’t find anything helpful to fix this issue so we are counting on the community to help us out.

Compute Specs on Azure: Standard_NC64as_T4_v3 (single node, 4 GPU’s)

Environment:
channels:

  • conda-forge
    dependencies:
  • python=3.8.10
  • pip:
    • azureml-defaults
    • torch
    • torchvision
    • pytorch-lightning
    • pandas==1.4.1
    • datasets
    • transformers==4.26.1
    • tqdm
    • pyarrow
    • scikit-learn
    • deep-translator
    • evaluate
    • rouge_score
    • azure-ai-ml==1.4.0
    • deepspeed

Docker Image: openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04
`
import pandas as pd
import pytorch_lightning as pl
from pytorch_lightning.callbacks.progress import TQDMProgressBar
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

from data_module import LightningDataModule
from model_module import LightningModel

class CustomT5Trainer:

def __init__(self, strategy) -> None:
    self.model_name = "t5-small"

    self.model = AutoModelForSeq2SeqLM.from_pretrained(pretrained_model_name_or_path=self.model_name)
    self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=self.model_name)

    self.strategy = strategy

def train(
    self,
    train_df: pd.DataFrame,
    eval_df: pd.DataFrame,
    source_max_token_len: int = 512,
    target_max_token_len: int = 512,
    batch_size: int = 1,
    max_epochs: int = 5,
    use_gpu: bool = True,
    outputdir: str = "outputs",
    early_stopping_patience_epochs: int = 0,  # 0 to disable early stopping feature
    precision=32,
    logger="default",
    dataloader_num_workers: int = 64,
    save_only_last_epoch: bool = False,
):
  
    self.data_module = LightningDataModule(
        train_df,
        eval_df,
        self.tokenizer,
        batch_size=batch_size,
        source_max_token_len=source_max_token_len,
        target_max_token_len=target_max_token_len,
        num_workers=dataloader_num_workers,
    )

    self.T5Model = LightningModel(
        tokenizer=self.tokenizer,
        model=self.model,
        outputdir=outputdir,
        save_only_last_epoch=save_only_last_epoch,
    )


    callbacks = [TQDMProgressBar(refresh_rate=5)]

    if early_stopping_patience_epochs > 0:
        early_stop_callback = EarlyStopping(
            monitor="val_loss",
            min_delta=0.00,
            patience=early_stopping_patience_epochs,
            verbose=True,
            mode="min",
        )
        callbacks.append(early_stop_callback)

    loggers = True if logger == "default" else logger

    trainer = pl.Trainer(
        logger=loggers,
        callbacks=callbacks,
        max_epochs=max_epochs,
        precision=precision,
        log_every_n_steps=1,
        
        strategy=self.strategy,
        accelerator='cuda',
        devices=4,
    )

    # fit trainer
    trainer.fit(self.T5Model, self.data_module)

from sklearn.model_selection import train_test_split
from azureml.core import Datastore, Workspace

from trainer import CustomT5Trainer
from azure_helper import CustomAzureHelper
import torch

import os

import gc
gc.collect()
torch.cuda.empty_cache()

import os
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.strategies import DeepSpeedStrategy, DDPStrategy

class OpenMPIClusterEnvironment(ClusterEnvironment):
def init(self, devices: int = 4) → None:
super().init()
self.devices = devices

@property
def creates_processes_externally(self) -> bool:
    """Return True if the cluster is managed (you don't launch processes yourself)"""
    return True

def world_size(self) -> int:
    return int(os.environ.get("OMPI_COMM_WORLD_SIZE"))

def set_world_size(self, size: int) -> None:
    pass

def global_rank(self) -> int:
    return int(os.environ.get("OMPI_COMM_WORLD_RANK"))

def set_global_rank(self, rank: int) -> None:
    pass

def local_rank(self) -> int:
    return int(os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK"))

def node_rank(self) -> int:
    print(f'node_rank : {int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)}') # debugging
    # this may not exist, defaulting to 0
    return int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)

@property
def main_address(self) -> str:
    # AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
    if "AZ_BATCH_MASTER_NODE" in os.environ:
        print(f"main_address : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[0]}") # debugging
        return os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[0]
    elif "AZ_BATCHAI_MPI_MASTER_NODE" in os.environ:
        print(f"main_address : {os.environ.get('AZ_BATCHAI_MPI_MASTER_NODE')}") # debugging
        return os.environ.get("AZ_BATCHAI_MPI_MASTER_NODE")
    else:
        raise("main_address not found")

@property
def main_port(self) -> int:
    # AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
    if "AZ_BATCH_MASTER_NODE" in os.environ:
        print(f"main_port : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[1]}") # debugging
        return int(os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[1])
    else:
        return int(47586) # set port to arbitrary high number

@staticmethod
def detect() -> bool:
    return "OMPI_COMM_WORLD_SIZE" in os.environ

def train(trainer):

ws = Workspace.from_config("config_atd.json")
datastore = Datastore.get(ws, 'atd_datastore')

dataset = ws.datasets['atd_dataset']
df = dataset.to_pandas_dataframe()

train_df, test_df = train_test_split(df, test_size=0.20)   
trainer.train(train_df=train_df, eval_df=test_df)

if name == “main”:

strategy = DeepSpeedStrategy(
            stage = 3,
            cluster_environment = OpenMPIClusterEnvironment(devices=4)
        )

print('Initializing Custom Trainer')
trainer = CustomT5Trainer(strategy)
train(trainer)


azure_helper = CustomAzureHelper()
azure_helper.register_model(trainer.model_name, 
                                    trainer.model_name, 
                                    trainer.T5Model.outputdir)

`

This is the notebook we use for starting the training:

`
ws = Workspace.from_config(“config_atd.json”)
datastore = Datastore.get(ws, ‘atd_datastore’)

gpu_cluster_name = “gpu-compute-…-4x16”

try:
gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)
print(‘Found existing cluster, use it.’)
except ComputeTargetException:
compute_config = AmlCompute.provisioning_configuration(vm_size=‘Standard_NC12s_v3’,
max_nodes=2)
gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, compute_config)

gpu_cluster.wait_for_completion(show_output=True)

env = Environment.from_conda_specification(‘test’, ‘env.yaml’)

env.docker.enabled = True
env.docker.base_image = (
mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04
)

cluster = ws.compute_targets[gpu_cluster_name]

job_config = MpiConfiguration(node_count=1, process_count_per_node=4)

src = ScriptRunConfig(
source_directory=‘source_files/’,
script=‘training_script.py’,
compute_target=cluster,
environment=env,
distributed_job_config = job_config,
)

run = Experiment(ws, ‘test’).submit(src)

`

The output of the error log for the four GPU’s is attached:

[std_log_process_3.txt](https://github.com/Lightning-AI/lightning/files/1105 Hi community,
we are currently trying to run Pytorch-Lightning on Azure (specs below) using a single node with four GPU’s for training a transformer.
It starts training (refer to std_log_process_0.tx) and then runs into a cuda out-of-memory error. Apparently none of the other GPU’s has started processing (refer to std_log_process_1.txt, std_log_process_2.txt, std_log_process_3.txt ) so far. We couldn’t find anything helpful to fix this issue so we are counting on the community to help us out.

Compute Specs on Azure: Standard_NC64as_T4_v3 (single node, 4 GPU’s)

Environment:
channels:

  • conda-forge
    dependencies:
  • python=3.8.10
  • pip:
    • azureml-defaults
    • torch
    • torchvision
    • pytorch-lightning
    • pandas==1.4.1
    • datasets
    • transformers==4.26.1
    • tqdm
    • pyarrow
    • scikit-learn
    • deep-translator
    • evaluate
    • rouge_score
    • azure-ai-ml==1.4.0
    • deepspeed

Docker Image: openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04
`
import pandas as pd
import pytorch_lightning as pl
from pytorch_lightning.callbacks.progress import TQDMProgressBar
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

from data_module import LightningDataModule
from model_module import LightningModel

class CustomT5Trainer:

def __init__(self, strategy) -> None:
    self.model_name = "t5-small"

    self.model = AutoModelForSeq2SeqLM.from_pretrained(pretrained_model_name_or_path=self.model_name)
    self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=self.model_name)

    self.strategy = strategy

def train(
    self,
    train_df: pd.DataFrame,
    eval_df: pd.DataFrame,
    source_max_token_len: int = 512,
    target_max_token_len: int = 512,
    batch_size: int = 1,
    max_epochs: int = 5,
    use_gpu: bool = True,
    outputdir: str = "outputs",
    early_stopping_patience_epochs: int = 0,  # 0 to disable early stopping feature
    precision=32,
    logger="default",
    dataloader_num_workers: int = 64,
    save_only_last_epoch: bool = False,
):
  
    self.data_module = LightningDataModule(
        train_df,
        eval_df,
        self.tokenizer,
        batch_size=batch_size,
        source_max_token_len=source_max_token_len,
        target_max_token_len=target_max_token_len,
        num_workers=dataloader_num_workers,
    )

    self.T5Model = LightningModel(
        tokenizer=self.tokenizer,
        model=self.model,
        outputdir=outputdir,
        save_only_last_epoch=save_only_last_epoch,
    )


    callbacks = [TQDMProgressBar(refresh_rate=5)]

    if early_stopping_patience_epochs > 0:
        early_stop_callback = EarlyStopping(
            monitor="val_loss",
            min_delta=0.00,
            patience=early_stopping_patience_epochs,
            verbose=True,
            mode="min",
        )
        callbacks.append(early_stop_callback)

    loggers = True if logger == "default" else logger

    trainer = pl.Trainer(
        logger=loggers,
        callbacks=callbacks,
        max_epochs=max_epochs,
        precision=precision,
        log_every_n_steps=1,
        
        strategy=self.strategy,
        accelerator='cuda',
        devices=4,
    )

    # fit trainer
    trainer.fit(self.T5Model, self.data_module)

from sklearn.model_selection import train_test_split
from azureml.core import Datastore, Workspace

from trainer import CustomT5Trainer
from azure_helper import CustomAzureHelper
import torch

import os

import gc
gc.collect()
torch.cuda.empty_cache()

import os
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.strategies import DeepSpeedStrategy, DDPStrategy

class OpenMPIClusterEnvironment(ClusterEnvironment):
def init(self, devices: int = 4) → None:
super().init()
self.devices = devices

@property
def creates_processes_externally(self) -> bool:
    """Return True if the cluster is managed (you don't launch processes yourself)"""
    return True

def world_size(self) -> int:
    return int(os.environ.get("OMPI_COMM_WORLD_SIZE"))

def set_world_size(self, size: int) -> None:
    pass

def global_rank(self) -> int:
    return int(os.environ.get("OMPI_COMM_WORLD_RANK"))

def set_global_rank(self, rank: int) -> None:
    pass

def local_rank(self) -> int:
    return int(os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK"))

def node_rank(self) -> int:
    print(f'node_rank : {int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)}') # debugging
    # this may not exist, defaulting to 0
    return int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)

@property
def main_address(self) -> str:
    # AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
    if "AZ_BATCH_MASTER_NODE" in os.environ:
        print(f"main_address : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[0]}") # debugging
        return os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[0]
    elif "AZ_BATCHAI_MPI_MASTER_NODE" in os.environ:
        print(f"main_address : {os.environ.get('AZ_BATCHAI_MPI_MASTER_NODE')}") # debugging
        return os.environ.get("AZ_BATCHAI_MPI_MASTER_NODE")
    else:
        raise("main_address not found")

@property
def main_port(self) -> int:
    # AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
    if "AZ_BATCH_MASTER_NODE" in os.environ:
        print(f"main_port : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[1]}") # debugging
        return int(os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[1])
    else:
        return int(47586) # set port to arbitrary high number

@staticmethod
def detect() -> bool:
    return "OMPI_COMM_WORLD_SIZE" in os.environ

def train(trainer):

ws = Workspace.from_config("config_atd.json")
datastore = Datastore.get(ws, 'atd_datastore')

dataset = ws.datasets['atd_dataset']
df = dataset.to_pandas_dataframe()

train_df, test_df = train_test_split(df, test_size=0.20)   
trainer.train(train_df=train_df, eval_df=test_df)

if name == “main”:

strategy = DeepSpeedStrategy(
            stage = 3,
            cluster_environment = OpenMPIClusterEnvironment(devices=4)
        )

print('Initializing Custom Trainer')
trainer = CustomT5Trainer(strategy)
train(trainer)


azure_helper = CustomAzureHelper()
azure_helper.register_model(trainer.model_name, 
                                    trainer.model_name, 
                                    trainer.T5Model.outputdir)

`

This is the notebook we use for starting the training:

`
ws = Workspace.from_config(“config_atd.json”)
datastore = Datastore.get(ws, ‘atd_datastore’)

gpu_cluster_name = “gpu-compute-…-4x16”

try:
gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)
print(‘Found existing cluster, use it.’)
except ComputeTargetException:
compute_config = AmlCompute.provisioning_configuration(vm_size=‘Standard_NC12s_v3’,
max_nodes=2)
gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, compute_config)

gpu_cluster.wait_for_completion(show_output=True)

env = Environment.from_conda_specification(‘test’, ‘env.yaml’)

env.docker.enabled = True
env.docker.base_image = (
mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04
)

cluster = ws.compute_targets[gpu_cluster_name]

job_config = MpiConfiguration(node_count=1, process_count_per_node=4)

src = ScriptRunConfig(
source_directory=‘source_files/’,
script=‘training_script.py’,
compute_target=cluster,
environment=env,
distributed_job_config = job_config,
)

run = Experiment(ws, ‘test’).submit(src)

`

The output of the error log for the four GPU’s is attached:

std_log_process_3.txt
mpi_log.txt
std_log_process_0.txt
std_log_process_1.txt
std_log_process_2.txt1402/std_log_process_3.txt)
mpi_log.txt
std_log_process_0.txt
std_log_process_1.txt
std_log_process_2.txt