Im trying to combine lightning and torchdata using datapipes and dataloader2 but is running into this error when i try to train. I also end up having to kill some left over processes with pkill -9 -f "python src/train.py"
I think that the problem is that the teardown doesn’t get called correctly to finalize the multi process reading service
have anyone tried to use torchdata together with lightning? if so I would love to see how its implimented
The error:
Traceback (most recent call last):
File "src/train.py", line 115, in main
metric_dict, _ = train(cfg)
File "/lustre/hpc/icecube/moust/work/IceCubeEncoderTransformer/src/utils/utils.py", line 75, in wrap
raise ex
File "/lustre/hpc/icecube/moust/work/IceCubeEncoderTransformer/src/utils/utils.py", line 65, in wrap
metric_dict, object_dict = task_func(cfg=cfg)
File "src/train.py", line 87, in train
trainer.fit(model=model, datamodule=datamodule, ckpt_path=cfg.get("ckpt_path"))
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 520, in fit
call._call_and_handle_interrupt(
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 44, in _call_and_handle_interrupt
return trainer_fn(*args, **kwargs)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 559, in _fit_impl
self._run(model, ckpt_path=ckpt_path)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 948, in _run
call._call_teardown_hook(self)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 104, in _call_teardown_hook
_call_lightning_datamodule_hook(trainer, "teardown", stage=fn)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 162, in _call_lightning_datamodule_hook
return fn(*args, **kwargs)
File "/lustre/hpc/icecube/moust/work/IceCubeEncoderTransformer/src/data/datapipe_icecube_datamodule.py", line 342, in teardown
self.icecube_train_dataloader.shutdown()
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torchdata/dataloader2/dataloader2.py", line 253, in shutdown
self.reading_service.finalize()
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torchdata/dataloader2/reading_service.py", line 357, in finalize
clean_me(process, req_queue, res_queue)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torchdata/dataloader2/reading_service.py", line 349, in clean_me
_ = res_queue.get(timeout=default_dl2_worker_join_timeout_in_s)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/queues.py", line 116, in get
return _ForkingPickler.loads(res)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torch/multiprocessing/reductions.py", line 307, in rebuild_storage_fd
fd = df.detach()
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/resource_sharer.py", line 57, in detach
with _resource_sharer.get_connection(self._id) as conn:
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/resource_sharer.py", line 87, in get_connection
c = Client(address, authkey=process.current_process().authkey)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/connection.py", line 502, in Client
c = SocketClient(address)
File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/connection.py", line 630, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
Set the environment variable HYDRA_FULL_ERROR=1 for a complete stack trace.
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
My Datamodule
class IceCubeDatamodule(LightningDataModule):
def __init__(
self,
db_path: str,
pulsemap: str,
train_csv_file: str,
test_csv_file: str,
val_csv_file: str,
input_cols: List[str],
target_cols: List[str],
truth_table: str = "truth",
max_token_count: int = 64,
num_workers: int = 16,
multi_processing_reading_service_num_workers: int = 4,
pin_memory: bool = False,
):
super().__init__()
# this line allows to access init params with 'self.hparams' attribute
# also ensures init params will be stored in ckpt
self.save_hyperparameters(logger=False)
self.datapipe_train: Optional[IterDataPipe] = None
self.datapipe_val: Optional[IterDataPipe] = None
self.datapipe_test: Optional[IterDataPipe] = None
self.rs = MultiProcessingReadingService(
num_workers = self.hparams.multi_processing_reading_service_num_workers
)
def setup(self, stage: Optional[str] = None):
if not self.datapipe_train and not self.datapipe_val and not self.datapipe_test:
self.datapipe_train, self.datapipe_val, self.datapipe_test = make_train_test_val_datapipe(
train_csv_file = self.hparams.train_csv_file,
test_csv_file = self.hparams.test_csv_file,
val_csv_file = self.hparams.val_csv_file,
db_path = self.hparams.db_path,
input_cols = self.hparams.input_cols,
pulsemap = self.hparams.pulsemap,
target_cols = self.hparams.target_cols,
truth_table = self.hparams.truth_table,
max_token_count = self.hparams.max_token_count,
feature_transform = upgrade_feature_transform,
truth_transform = None,
)
def train_dataloader(self):
self.icecube_train_dataloader = DataLoader2(
datapipe = self.datapipe_train,
reading_service = self.rs,
)
return self.icecube_train_dataloader
def val_dataloader(self):
self.icecube_val_dataloader = DataLoader2(
datapipe = self.datapipe_val,
reading_service = self.rs,
)
return self.icecube_val_dataloader
def test_dataloader(self):
self.icecube_test_dataloader = DataLoader2(
datapipe = self.datapipe_val,
reading_service = self.rs,
)
return self.icecube_test_dataloader
def teardown(self, stage: Optional[str] = None):
"""Clean up after fit or test."""
self.icecube_train_dataloader.shutdown()
self.icecube_test_dataloader.shutdown()
self.icecube_val_dataloader.shutdown()