Runing ddp accross two machines

Hi,

I am trying to run torch ddp accross 2 machines (gpu servers) by tcp. Below is my set up.

#machine1
export MASTER_ADDR=10.10.10.24 
export MASTER_PORT=45547 
export WORLD_SIZE=2 
export NODE_RANK=0
export LOCAL_RANK=0

python3 test.py
#machine2
export MASTER_ADDR=10.10.10.24 
export MASTER_PORT=45547 
export WORLD_SIZE=2 
export NODE_RANK=0
export LOCAL_RANK=0

python3 test.py
#test.py
import os
import torch
from torch import nn
import torch.nn.functional as F
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import pytorch_lightning as pl

class LightningMNISTClassifier(pl.LightningModule):

  def __init__(self):
    super(LightningMNISTClassifier, self).__init__()

    # mnist images are (1, 28, 28) (channels, width, height) 
    self.layer_1 = torch.nn.Linear(28 * 28, 128)
    self.layer_2 = torch.nn.Linear(128, 256)
    self.layer_3 = torch.nn.Linear(256, 10)

  def forward(self, x):
      batch_size, channels, width, height = x.size()

      # (b, 1, 28, 28) -> (b, 1*28*28)
      x = x.view(batch_size, -1)

      # layer 1 (b, 1*28*28) -> (b, 128)
      x = self.layer_1(x)
      x = torch.relu(x)

      # layer 2 (b, 128) -> (b, 256)
      x = self.layer_2(x)
      x = torch.relu(x)

      # layer 3 (b, 256) -> (b, 10)
      x = self.layer_3(x)

      # probability distribution over labels
      x = torch.log_softmax(x, dim=1)

      return x

  def cross_entropy_loss(self, logits, labels):
    return F.nll_loss(logits, labels)

  def training_step(self, train_batch, batch_idx):
      x, y = train_batch
      logits = self.forward(x)
      loss = self.cross_entropy_loss(logits, y)

      logs = {'train_loss': loss}
      return {'loss': loss, 'log': logs}

  def validation_step(self, val_batch, batch_idx):
      x, y = val_batch
      logits = self.forward(x)
      loss = self.cross_entropy_loss(logits, y)
      return {'val_loss': loss}

  def validation_epoch_end(self, outputs):
      # called at the end of the validation epoch
      # outputs is an array with what you returned in validation_step for each batch
      # outputs = [{'loss': batch_0_loss}, {'loss': batch_1_loss}, ..., {'loss': batch_n_loss}] 
      avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
      tensorboard_logs = {'val_loss': avg_loss}
      return {'avg_val_loss': avg_loss, 'log': tensorboard_logs}

  def prepare_data(self):
    # transforms for images
    transform=transforms.Compose([transforms.ToTensor(), 
                                  transforms.Normalize((0.1307,), (0.3081,))])
      
    # prepare transforms standard to MNIST
    mnist_train = MNIST(os.getcwd(), train=True, download=True, transform=transform)
    mnist_test = MNIST(os.getcwd(), train=False, download=True, transform=transform)
    
    self.mnist_train, self.mnist_val = random_split(mnist_train, [55000, 5000])

  def train_dataloader(self):
    return DataLoader(self.mnist_train, batch_size=64)

  def val_dataloader(self):
    return DataLoader(self.mnist_val, batch_size=64)

  def test_dataloader(self):
    return DataLoader(self,mnist_test, batch_size=64)

  def configure_optimizers(self):
    optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
    return optimizer

    


class MNISTDataModule(pl.LightningDataModule):

  def setup(self, stage):
    # transforms for images
    transform=transforms.Compose([transforms.ToTensor(), 
                                  transforms.Normalize((0.1307,), (0.3081,))])
      
    # prepare transforms standard to MNIST
    mnist_train = MNIST(os.getcwd(), train=True, download=True, transform=transform)
    mnist_test = MNIST(os.getcwd(), train=False, download=True, transform=transform)
    
    self.mnist_train, self.mnist_val = random_split(mnist_train, [55000, 5000])

  def train_dataloader(self):
    return DataLoader(self.mnist_train, batch_size=64)

  def val_dataloader(self):
    return DataLoader(self.mnist_val, batch_size=64)

  def test_dataloader(self):
    return DataLoader(self,mnist_test, batch_size=64)



mnist_dm = MNISTDataModule()
model = LightningMNISTClassifier()
trainer = pl.Trainer(accelerator='ddp',gpus=4,num_nodes=2)

trainer.fit(model, mnist_dm)
#output in terminal
TPU available: False, using: 0 TPU cores
initializing ddp: GLOBAL_RANK: 5, MEMBER: 6/8
initializing ddp: GLOBAL_RANK: 6, MEMBER: 7/8
initializing ddp: GLOBAL_RANK: 7, MEMBER: 8/8
initializing ddp: GLOBAL_RANK: 4, MEMBER: 5/8
#hang

it will hang there which means tcp is not initiated.
What would be the correct way running ddp accross 2 machines in my settings?

It works for me while I chang ethe LOCAL_RANK of the 2nd machine. Also, I put export CUDA_VISIBLE_DEVICES=0 separately on each machine.

Hey @IsidoreSong @fzzfd

You need to set the NODE_RANK to 0 on the first machine and 1 on the second machine. And you need to remove LOCAL_RANK, because Lightning will set that to the correct value.

Please try again using:

export MASTER_ADDR=10.10.10.24 
export MASTER_PORT=45547 
export NODE_RANK=0  # and 1 on other machine
python3 test.py

Make sure you unset the exported variables you already had (or restart the terminal session).

Yeah, you are right. I fogot to say that I was simulate multi-node on a single machine with 2-GPU.

1 Like