Develop a Directed Acyclic Graph (DAG)

Audience: Users coming from MLOps to Lightning Apps, looking for more flexibility.

A typical ML training workflow can be implemented with a simple DAG.

Below is a pseudo-code using the lightning framework that uses a LightningFlow to orchestrate the serial workflow: process data, train a model, and serve the model.

import lightning as L

class DAGFlow(L.LightningFlow):

    def __init__(self):
        super().__init__()
        self.processor = DataProcessorWork(...)
        self.train_work = TrainingWork(...)
        self.serve_work = ServeWork(...)

    def run(self):
        self.processor.run(...)
        self.train_work.run(...)
        self.serve_work.run(...)

Below is a pseudo-code to run several works in parallel using a built-in Dict.

import lightning as L

class DAGFlow(L.LightningFlow):

    def __init__(self):
        super().__init__()
        ...
        self.train_works = L.structures.Dict(**{
            "1": TrainingWork(..., parallel=True),
            "2": TrainingWork(..., parallel=True),
            "3": TrainingWork(..., parallel=True),
            ...
            })
        ...

    def run(self):
        self.processor.run(...)

        # The flow runs through them all, so we need to guard self.serve_work.run
        for work in self.train_works.values():
            work.run(...)

        # Wait for all to have finished without errors.
        if not all(w.has_succeeded for w in self.train_works):
            continue

        self.serve_work.run(...)

Next Steps