Error when shutting down dataloader2 and readingservice from torchdata

Im trying to combine lightning and torchdata using datapipes and dataloader2 but is running into this error when i try to train. I also end up having to kill some left over processes with pkill -9 -f "python src/train.py"

I think that the problem is that the teardown doesn’t get called correctly to finalize the multi process reading service

have anyone tried to use torchdata together with lightning? if so I would love to see how its implimented

The error:

Traceback (most recent call last):
  File "src/train.py", line 115, in main
    metric_dict, _ = train(cfg)
  File "/lustre/hpc/icecube/moust/work/IceCubeEncoderTransformer/src/utils/utils.py", line 75, in wrap
    raise ex
  File "/lustre/hpc/icecube/moust/work/IceCubeEncoderTransformer/src/utils/utils.py", line 65, in wrap
    metric_dict, object_dict = task_func(cfg=cfg)
  File "src/train.py", line 87, in train
    trainer.fit(model=model, datamodule=datamodule, ckpt_path=cfg.get("ckpt_path"))
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 520, in fit
    call._call_and_handle_interrupt(
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 44, in _call_and_handle_interrupt
    return trainer_fn(*args, **kwargs)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 559, in _fit_impl
    self._run(model, ckpt_path=ckpt_path)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 948, in _run
    call._call_teardown_hook(self)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 104, in _call_teardown_hook
    _call_lightning_datamodule_hook(trainer, "teardown", stage=fn)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 162, in _call_lightning_datamodule_hook
    return fn(*args, **kwargs)
  File "/lustre/hpc/icecube/moust/work/IceCubeEncoderTransformer/src/data/datapipe_icecube_datamodule.py", line 342, in teardown
    self.icecube_train_dataloader.shutdown()
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torchdata/dataloader2/dataloader2.py", line 253, in shutdown
    self.reading_service.finalize()
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torchdata/dataloader2/reading_service.py", line 357, in finalize
    clean_me(process, req_queue, res_queue)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torchdata/dataloader2/reading_service.py", line 349, in clean_me
    _ = res_queue.get(timeout=default_dl2_worker_join_timeout_in_s)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/site-packages/torch/multiprocessing/reductions.py", line 307, in rebuild_storage_fd
    fd = df.detach()
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/resource_sharer.py", line 57, in detach
    with _resource_sharer.get_connection(self._id) as conn:
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/resource_sharer.py", line 87, in get_connection
    c = Client(address, authkey=process.current_process().authkey)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/connection.py", line 502, in Client
    c = SocketClient(address)
  File "/groups/icecube/moust/miniconda3/envs/icet2/lib/python3.8/multiprocessing/connection.py", line 630, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

Set the environment variable HYDRA_FULL_ERROR=1 for a complete stack trace.
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15
[rank: 0] Received SIGTERM: 15

My Datamodule

class IceCubeDatamodule(LightningDataModule):
    def __init__(
        self,
        db_path: str,
        pulsemap: str,
        train_csv_file: str, 
        test_csv_file: str,
        val_csv_file: str,
        input_cols: List[str],
        target_cols: List[str],
        truth_table: str = "truth",
        max_token_count: int = 64,
        num_workers: int = 16,
        multi_processing_reading_service_num_workers: int = 4,
        pin_memory: bool = False,
    ):
        super().__init__()

        # this line allows to access init params with 'self.hparams' attribute
        # also ensures init params will be stored in ckpt
        self.save_hyperparameters(logger=False)

        self.datapipe_train: Optional[IterDataPipe] = None
        self.datapipe_val: Optional[IterDataPipe] = None
        self.datapipe_test: Optional[IterDataPipe] = None

        self.rs = MultiProcessingReadingService(
            num_workers = self.hparams.multi_processing_reading_service_num_workers
            )

    def setup(self, stage: Optional[str] = None):
        if not self.datapipe_train and not self.datapipe_val and not self.datapipe_test:
            self.datapipe_train, self.datapipe_val, self.datapipe_test = make_train_test_val_datapipe(
                train_csv_file = self.hparams.train_csv_file,
                test_csv_file = self.hparams.test_csv_file,
                val_csv_file = self.hparams.val_csv_file,
                db_path = self.hparams.db_path, 
                input_cols = self.hparams.input_cols, 
                pulsemap = self.hparams.pulsemap, 
                target_cols = self.hparams.target_cols, 
                truth_table = self.hparams.truth_table, 
                max_token_count = self.hparams.max_token_count,
                feature_transform = upgrade_feature_transform,
                truth_transform = None,
            )

    def train_dataloader(self):
        self.icecube_train_dataloader = DataLoader2(
            datapipe = self.datapipe_train,
            reading_service = self.rs,
        )
        return self.icecube_train_dataloader

    def val_dataloader(self):
        self.icecube_val_dataloader = DataLoader2( 
            datapipe = self.datapipe_val,
            reading_service = self.rs,
            )
        return self.icecube_val_dataloader

    def test_dataloader(self):
        self.icecube_test_dataloader = DataLoader2( 
            datapipe = self.datapipe_val,
            reading_service = self.rs,
            )
        return self.icecube_test_dataloader
            
    def teardown(self, stage: Optional[str] = None):
        """Clean up after fit or test."""
        self.icecube_train_dataloader.shutdown()
        self.icecube_test_dataloader.shutdown()
        self.icecube_val_dataloader.shutdown()