Run LightningWork in parallel¶
Audience: Users who want to run a LightningWork in parallel (asynchronously).
Prereqs: You must have finished the Basic levels.
When to run a Components in parallel¶
Run LightningWork in parallel when you want to execute work in the background or at the same time as another work. An example of when this comes up in machine learning is when data streams-in while a model trains.
Toy example¶
By default, a Component must complete before the next one runs. We can enable one component to start in parallel which allows the code to proceed without having to wait for the first one to finish.
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())
Multiple components in parallel¶
In this example, we start all 3 components at once. The first two start in parallel, which allows the third component to run without waiting for the others to finish.
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.baseline_1 = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.baseline_1.run("machine C counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.baseline_1 = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.baseline_1.run("machine C counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.baseline_1 = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.baseline_1.run("machine C counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class AnalyzeComponent(LightningWork):
def run(self, message):
for i in range(100000000000):
print(message, i)
class LitWorkflow(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.baseline_1 = TrainComponent(cloud_compute=CloudCompute('cpu'), parallel=True)
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('cpu'))
def run(self):
self.train.run("machine A counting")
self.baseline_1.run("machine C counting")
self.analyze.run("machine B counting")
app = LightningApp(LitWorkflow())