Hi community,
we are currently trying to run Pytorch-Lightning on Azure (specs below) using a single node with four GPU’s for training a transformer.
It starts training (refer to std_log_process_0.tx) and then runs into a cuda out-of-memory error. Apparently none of the other GPU’s has started processing (refer to std_log_process_1.txt, std_log_process_2.txt, std_log_process_3.txt ) so far. We couldn’t find anything helpful to fix this issue so we are counting on the community to help us out.
Compute Specs on Azure: Standard_NC64as_T4_v3 (single node, 4 GPU’s)
Environment:
channels:
- conda-forge
dependencies: - python=3.8.10
- pip:
- azureml-defaults
- torch
- torchvision
- pytorch-lightning
- pandas==1.4.1
- datasets
- transformers==4.26.1
- tqdm
- pyarrow
- scikit-learn
- deep-translator
- evaluate
- rouge_score
- azure-ai-ml==1.4.0
- deepspeed
Docker Image: openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04
`
import pandas as pd
import pytorch_lightning as pl
from pytorch_lightning.callbacks.progress import TQDMProgressBar
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from data_module import LightningDataModule
from model_module import LightningModel
class CustomT5Trainer:
def __init__(self, strategy) -> None:
self.model_name = "t5-small"
self.model = AutoModelForSeq2SeqLM.from_pretrained(pretrained_model_name_or_path=self.model_name)
self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=self.model_name)
self.strategy = strategy
def train(
self,
train_df: pd.DataFrame,
eval_df: pd.DataFrame,
source_max_token_len: int = 512,
target_max_token_len: int = 512,
batch_size: int = 1,
max_epochs: int = 5,
use_gpu: bool = True,
outputdir: str = "outputs",
early_stopping_patience_epochs: int = 0, # 0 to disable early stopping feature
precision=32,
logger="default",
dataloader_num_workers: int = 64,
save_only_last_epoch: bool = False,
):
self.data_module = LightningDataModule(
train_df,
eval_df,
self.tokenizer,
batch_size=batch_size,
source_max_token_len=source_max_token_len,
target_max_token_len=target_max_token_len,
num_workers=dataloader_num_workers,
)
self.T5Model = LightningModel(
tokenizer=self.tokenizer,
model=self.model,
outputdir=outputdir,
save_only_last_epoch=save_only_last_epoch,
)
callbacks = [TQDMProgressBar(refresh_rate=5)]
if early_stopping_patience_epochs > 0:
early_stop_callback = EarlyStopping(
monitor="val_loss",
min_delta=0.00,
patience=early_stopping_patience_epochs,
verbose=True,
mode="min",
)
callbacks.append(early_stop_callback)
loggers = True if logger == "default" else logger
trainer = pl.Trainer(
logger=loggers,
callbacks=callbacks,
max_epochs=max_epochs,
precision=precision,
log_every_n_steps=1,
strategy=self.strategy,
accelerator='cuda',
devices=4,
)
# fit trainer
trainer.fit(self.T5Model, self.data_module)
from sklearn.model_selection import train_test_split
from azureml.core import Datastore, Workspace
from trainer import CustomT5Trainer
from azure_helper import CustomAzureHelper
import torch
import os
import gc
gc.collect()
torch.cuda.empty_cache()
import os
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.strategies import DeepSpeedStrategy, DDPStrategy
class OpenMPIClusterEnvironment(ClusterEnvironment):
def init(self, devices: int = 4) → None:
super().init()
self.devices = devices
@property
def creates_processes_externally(self) -> bool:
"""Return True if the cluster is managed (you don't launch processes yourself)"""
return True
def world_size(self) -> int:
return int(os.environ.get("OMPI_COMM_WORLD_SIZE"))
def set_world_size(self, size: int) -> None:
pass
def global_rank(self) -> int:
return int(os.environ.get("OMPI_COMM_WORLD_RANK"))
def set_global_rank(self, rank: int) -> None:
pass
def local_rank(self) -> int:
return int(os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK"))
def node_rank(self) -> int:
print(f'node_rank : {int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)}') # debugging
# this may not exist, defaulting to 0
return int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)
@property
def main_address(self) -> str:
# AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
if "AZ_BATCH_MASTER_NODE" in os.environ:
print(f"main_address : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[0]}") # debugging
return os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[0]
elif "AZ_BATCHAI_MPI_MASTER_NODE" in os.environ:
print(f"main_address : {os.environ.get('AZ_BATCHAI_MPI_MASTER_NODE')}") # debugging
return os.environ.get("AZ_BATCHAI_MPI_MASTER_NODE")
else:
raise("main_address not found")
@property
def main_port(self) -> int:
# AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
if "AZ_BATCH_MASTER_NODE" in os.environ:
print(f"main_port : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[1]}") # debugging
return int(os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[1])
else:
return int(47586) # set port to arbitrary high number
@staticmethod
def detect() -> bool:
return "OMPI_COMM_WORLD_SIZE" in os.environ
def train(trainer):
ws = Workspace.from_config("config_atd.json")
datastore = Datastore.get(ws, 'atd_datastore')
dataset = ws.datasets['atd_dataset']
df = dataset.to_pandas_dataframe()
train_df, test_df = train_test_split(df, test_size=0.20)
trainer.train(train_df=train_df, eval_df=test_df)
if name == “main”:
strategy = DeepSpeedStrategy(
stage = 3,
cluster_environment = OpenMPIClusterEnvironment(devices=4)
)
print('Initializing Custom Trainer')
trainer = CustomT5Trainer(strategy)
train(trainer)
azure_helper = CustomAzureHelper()
azure_helper.register_model(trainer.model_name,
trainer.model_name,
trainer.T5Model.outputdir)
`
This is the notebook we use for starting the training:
`
ws = Workspace.from_config(“config_atd.json”)
datastore = Datastore.get(ws, ‘atd_datastore’)
gpu_cluster_name = “gpu-compute-…-4x16”
try:
gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)
print(‘Found existing cluster, use it.’)
except ComputeTargetException:
compute_config = AmlCompute.provisioning_configuration(vm_size=‘Standard_NC12s_v3’,
max_nodes=2)
gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, compute_config)
gpu_cluster.wait_for_completion(show_output=True)
env = Environment.from_conda_specification(‘test’, ‘env.yaml’)
env.docker.enabled = True
env.docker.base_image = (
“mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04”
)
cluster = ws.compute_targets[gpu_cluster_name]
job_config = MpiConfiguration(node_count=1, process_count_per_node=4)
src = ScriptRunConfig(
source_directory=‘source_files/’,
script=‘training_script.py’,
compute_target=cluster,
environment=env,
distributed_job_config = job_config,
)
run = Experiment(ws, ‘test’).submit(src)
`
The output of the error log for the four GPU’s is attached:
[std_log_process_3.txt](https://github.com/Lightning-AI/lightning/files/1105 Hi community,
we are currently trying to run Pytorch-Lightning on Azure (specs below) using a single node with four GPU’s for training a transformer.
It starts training (refer to std_log_process_0.tx) and then runs into a cuda out-of-memory error. Apparently none of the other GPU’s has started processing (refer to std_log_process_1.txt, std_log_process_2.txt, std_log_process_3.txt ) so far. We couldn’t find anything helpful to fix this issue so we are counting on the community to help us out.
Compute Specs on Azure: Standard_NC64as_T4_v3 (single node, 4 GPU’s)
Environment:
channels:
- conda-forge
dependencies: - python=3.8.10
- pip:
- azureml-defaults
- torch
- torchvision
- pytorch-lightning
- pandas==1.4.1
- datasets
- transformers==4.26.1
- tqdm
- pyarrow
- scikit-learn
- deep-translator
- evaluate
- rouge_score
- azure-ai-ml==1.4.0
- deepspeed
Docker Image: openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04
`
import pandas as pd
import pytorch_lightning as pl
from pytorch_lightning.callbacks.progress import TQDMProgressBar
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from data_module import LightningDataModule
from model_module import LightningModel
class CustomT5Trainer:
def __init__(self, strategy) -> None:
self.model_name = "t5-small"
self.model = AutoModelForSeq2SeqLM.from_pretrained(pretrained_model_name_or_path=self.model_name)
self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=self.model_name)
self.strategy = strategy
def train(
self,
train_df: pd.DataFrame,
eval_df: pd.DataFrame,
source_max_token_len: int = 512,
target_max_token_len: int = 512,
batch_size: int = 1,
max_epochs: int = 5,
use_gpu: bool = True,
outputdir: str = "outputs",
early_stopping_patience_epochs: int = 0, # 0 to disable early stopping feature
precision=32,
logger="default",
dataloader_num_workers: int = 64,
save_only_last_epoch: bool = False,
):
self.data_module = LightningDataModule(
train_df,
eval_df,
self.tokenizer,
batch_size=batch_size,
source_max_token_len=source_max_token_len,
target_max_token_len=target_max_token_len,
num_workers=dataloader_num_workers,
)
self.T5Model = LightningModel(
tokenizer=self.tokenizer,
model=self.model,
outputdir=outputdir,
save_only_last_epoch=save_only_last_epoch,
)
callbacks = [TQDMProgressBar(refresh_rate=5)]
if early_stopping_patience_epochs > 0:
early_stop_callback = EarlyStopping(
monitor="val_loss",
min_delta=0.00,
patience=early_stopping_patience_epochs,
verbose=True,
mode="min",
)
callbacks.append(early_stop_callback)
loggers = True if logger == "default" else logger
trainer = pl.Trainer(
logger=loggers,
callbacks=callbacks,
max_epochs=max_epochs,
precision=precision,
log_every_n_steps=1,
strategy=self.strategy,
accelerator='cuda',
devices=4,
)
# fit trainer
trainer.fit(self.T5Model, self.data_module)
from sklearn.model_selection import train_test_split
from azureml.core import Datastore, Workspace
from trainer import CustomT5Trainer
from azure_helper import CustomAzureHelper
import torch
import os
import gc
gc.collect()
torch.cuda.empty_cache()
import os
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.strategies import DeepSpeedStrategy, DDPStrategy
class OpenMPIClusterEnvironment(ClusterEnvironment):
def init(self, devices: int = 4) → None:
super().init()
self.devices = devices
@property
def creates_processes_externally(self) -> bool:
"""Return True if the cluster is managed (you don't launch processes yourself)"""
return True
def world_size(self) -> int:
return int(os.environ.get("OMPI_COMM_WORLD_SIZE"))
def set_world_size(self, size: int) -> None:
pass
def global_rank(self) -> int:
return int(os.environ.get("OMPI_COMM_WORLD_RANK"))
def set_global_rank(self, rank: int) -> None:
pass
def local_rank(self) -> int:
return int(os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK"))
def node_rank(self) -> int:
print(f'node_rank : {int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)}') # debugging
# this may not exist, defaulting to 0
return int(os.environ.get("OMPI_COMM_WORLD_RANK",0)) // int(self.devices)
@property
def main_address(self) -> str:
# AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
if "AZ_BATCH_MASTER_NODE" in os.environ:
print(f"main_address : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[0]}") # debugging
return os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[0]
elif "AZ_BATCHAI_MPI_MASTER_NODE" in os.environ:
print(f"main_address : {os.environ.get('AZ_BATCHAI_MPI_MASTER_NODE')}") # debugging
return os.environ.get("AZ_BATCHAI_MPI_MASTER_NODE")
else:
raise("main_address not found")
@property
def main_port(self) -> int:
# AZ_BATCH_MASTER_NODE should be defined when num_nodes > 1
if "AZ_BATCH_MASTER_NODE" in os.environ:
print(f"main_port : {os.environ.get('AZ_BATCH_MASTER_NODE').split(':')[1]}") # debugging
return int(os.environ.get("AZ_BATCH_MASTER_NODE").split(':')[1])
else:
return int(47586) # set port to arbitrary high number
@staticmethod
def detect() -> bool:
return "OMPI_COMM_WORLD_SIZE" in os.environ
def train(trainer):
ws = Workspace.from_config("config_atd.json")
datastore = Datastore.get(ws, 'atd_datastore')
dataset = ws.datasets['atd_dataset']
df = dataset.to_pandas_dataframe()
train_df, test_df = train_test_split(df, test_size=0.20)
trainer.train(train_df=train_df, eval_df=test_df)
if name == “main”:
strategy = DeepSpeedStrategy(
stage = 3,
cluster_environment = OpenMPIClusterEnvironment(devices=4)
)
print('Initializing Custom Trainer')
trainer = CustomT5Trainer(strategy)
train(trainer)
azure_helper = CustomAzureHelper()
azure_helper.register_model(trainer.model_name,
trainer.model_name,
trainer.T5Model.outputdir)
`
This is the notebook we use for starting the training:
`
ws = Workspace.from_config(“config_atd.json”)
datastore = Datastore.get(ws, ‘atd_datastore’)
gpu_cluster_name = “gpu-compute-…-4x16”
try:
gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)
print(‘Found existing cluster, use it.’)
except ComputeTargetException:
compute_config = AmlCompute.provisioning_configuration(vm_size=‘Standard_NC12s_v3’,
max_nodes=2)
gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, compute_config)
gpu_cluster.wait_for_completion(show_output=True)
env = Environment.from_conda_specification(‘test’, ‘env.yaml’)
env.docker.enabled = True
env.docker.base_image = (
“mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.3-cudnn8-ubuntu20.04”
)
cluster = ws.compute_targets[gpu_cluster_name]
job_config = MpiConfiguration(node_count=1, process_count_per_node=4)
src = ScriptRunConfig(
source_directory=‘source_files/’,
script=‘training_script.py’,
compute_target=cluster,
environment=env,
distributed_job_config = job_config,
)
run = Experiment(ws, ‘test’).submit(src)
`
The output of the error log for the four GPU’s is attached:
std_log_process_3.txt
mpi_log.txt
std_log_process_0.txt
std_log_process_1.txt
std_log_process_2.txt1402/std_log_process_3.txt)
mpi_log.txt
std_log_process_0.txt
std_log_process_1.txt
std_log_process_2.txt