Level 1: Package code in a lightning component¶
Prereqs: You know basic Python.
Goal: In this guide you’ll learn to develop a Lightning component.
Why you need Lightning components¶
A Lightning component is a self-contained, modular machine-learning component that you can plug into your existing ML workflows. A Lightning component organizes arbitrary code so it can run on the cloud, manages its own infrastructure, cloud costs, networking and more. Connect components using your current workflow management tools or our next-generation reactive orchestrator.
Components run on the cloud or your laptop without code changes 🤯🤯.
Organizing your code into Lightning components offers these benefits:
Build systems not scripts
The Lightning structure forces best practices so you don’t have to be an expert production engineer. Although it feels like you’re writing a script, you are actually building a production-ready system.
Cost control
The component run-time has been optimized for cost management to support the largest machine-learning workloads. Lower your cloud bill with machines that shut down or spin up faster.
For beginners: Code like an expert
Lightning embeds the best practices of building production-ready full stack AI apps into your coding experience. You can write code like you normally do, and the Lightning structure ensures your code is implicitly production ready… even if you’re just doing research.
For experts: Scale with full control
if you know what you are doing, Lightning gives you full control to manage your own scaling logic, fault-tolerance and even pre-provisioning, all from Python.
Integrate into your current workflow tools
Lightning components are self-contained pieces of functionality. Add them to your current workflow tools to quickly fill in gaps in your ML workflow such as monitoring drift, training LLMs and more. You can (optionally) use the Lightning App to integrate components into a cohesive workflow.
Packaged code
Lightning apps bundles components into an app that runs in any environment. The same code will run on your laptop, or any cloud or private clusters. You don’t have to think about the cluster or know anything about the cloud.
Rapid iteration
Iterate through ideas in hours not months because you don’t have to learn a million other concepts that the components handle for you such as kubernetes, cost management, auto-scaling and more.
Modularity
Components are modular and inter-operable by design. Leverage our vibrant community of components so you don’t have to build each piece of the system yourself.
Install Lightning¶
First, install Lightning.
pip install lightning
# needed for M1/M2/M3
export GRPC_PYTHON_BUILD_SYSTEM_OPENSSL=1
export GRPC_PYTHON_BUILD_SYSTEM_ZLIB=1
pip install lightning
# install pip
# install git
# setup an alias for Python: python=python3
# Add the root folder of Lightning to the Environment Variables to PATH
Build your first component¶
A Lightning component organizes arbitrary code so it can run on the cloud, manages its own infrastructure, cloud costs, networking and more
Run one of these components!
# app.py
from lightning.app import LightningWork, LightningApp
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
component = YourComponent()
app = LightningApp(component)
# app.py
from lightning.app import LightningWork, LightningApp, CloudCompute
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
# run on a cloud machine ("cpu", "gpu", ...)
compute = CloudCompute("gpu")
component = YourComponent(cloud_compute=compute)
app = LightningApp(component)
# app.py
from lightning import Trainer
from lightning.app import LightningWork, LightningApp, CloudCompute
from lightning.app.components import LightningTrainerMultiNode
from lightning.pytorch.demos.boring_classes import BoringModel
class LightningTrainerDistributed(LightningWork):
def run(self):
model = BoringModel()
trainer = Trainer(max_epochs=10, strategy="ddp")
trainer.fit(model)
# 8 GPUs: (2 nodes of 4 x v100)
component = LightningTrainerMultiNode(
LightningTrainerDistributed,
num_nodes=4,
cloud_compute=CloudCompute("gpu-fast-multi"), # 4 x v100
)
app = LightningApp(component)
# app.py
# ! pip install torch
from lightning.app import LightningWork, LightningApp, CloudCompute
import torch
class PyTorchComponent(LightningWork):
def run(self):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.nn.Sequential(torch.nn.Linear(1, 1),
torch.nn.ReLU(),
torch.nn.Linear(1, 1))
model.to(device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
for step in range(10000):
model.zero_grad()
x = torch.tensor([0.8]).to(device)
target = torch.tensor([1.0]).to(device)
output = model(x)
loss = criterion(output, target)
print(f'step: {step}. loss {loss}')
loss.backward()
optimizer.step()
compute = CloudCompute('gpu')
componet = PyTorchComponent(cloud_compute=compute)
app = LightningApp(componet)
# app.py
# ! pip install torch
from lightning.app import LightningWork, LightningApp, CloudCompute
from lightning.app.components import MultiNode
import torch
from torch.nn.parallel.distributed import DistributedDataParallel
def distributed_train(local_rank: int, main_address: str, main_port: int, num_nodes: int, node_rank: int, nprocs: int):
# 1. SET UP DISTRIBUTED ENVIRONMENT
global_rank = local_rank + node_rank * nprocs
world_size = num_nodes * nprocs
if torch.distributed.is_available() and not torch.distributed.is_initialized():
torch.distributed.init_process_group(
"nccl" if torch.cuda.is_available() else "gloo",
rank=global_rank,
world_size=world_size,
init_method=f"tcp://{main_address}:{main_port}",
)
# 2. PREPARE DISTRIBUTED MODEL
model = torch.nn.Linear(32, 2)
device = torch.device(f"cuda:{local_rank}") if torch.cuda.is_available() else torch.device("cpu")
model = DistributedDataParallel(model, device_ids=[local_rank] if torch.cuda.is_available() else None).to(device)
# 3. SETUP LOSS AND OPTIMIZER
criterion = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 4.TRAIN THE MODEL FOR 50 STEPS
for step in range(50):
model.zero_grad()
x = torch.randn(64, 32).to(device)
output = model(x)
loss = criterion(output, torch.ones_like(output))
print(f"global_rank: {global_rank} step: {step} loss: {loss}")
loss.backward()
optimizer.step()
# 5. VERIFY ALL COPIES OF THE MODEL HAVE THE SAME WEIGTHS AT END OF TRAINING
weight = model.module.weight.clone()
torch.distributed.all_reduce(weight)
assert torch.equal(model.module.weight, weight / world_size)
print("Multi Node Distributed Training Done!")
class PyTorchDistributed(LightningWork):
def run(self, main_address: str, main_port: int, num_nodes: int, node_rank: int):
nprocs = torch.cuda.device_count() if torch.cuda.is_available() else 1
torch.multiprocessing.spawn(
distributed_train,
args=(main_address, main_port, num_nodes, node_rank, nprocs),
nprocs=nprocs
)
# 32 GPUs: (8 nodes x 4 v 100)
compute = CloudCompute("gpu-fast-multi") # 4xV100
component = MultiNode(PyTorchDistributed, num_nodes=8, cloud_compute=compute)
app = LightningApp(component)
# !pip install torchvision
from lightning.app import LightningApp, CloudCompute
from lightning.app.components.serve import PythonServer, Image, Number
import base64, io, torchvision, torch
from PIL import Image as PILImage
class PyTorchServer(PythonServer):
def setup(self):
self._model = torchvision.models.resnet18(pretrained=True)
self._device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self._model.to(self._device)
def predict(self, request):
image = base64.b64decode(request.image.encode("utf-8"))
image = PILImage.open(io.BytesIO(image))
transforms = torchvision.transforms.Compose([
torchvision.transforms.Resize(224),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
image = transforms(image)
image = image.to(self._device)
prediction = self._model(image.unsqueeze(0))
return {"prediction": prediction.argmax().item()}
component = PyTorchServer(
input_type=Image, output_type=Number, cloud_compute=CloudCompute('gpu')
)
app = LightningApp(component)
# app.py
# !curl https://raw.githubusercontent.com/Lightning-AI/lightning/master/examples/app/multi_node/pl_boring_script.py -o pl_boring_script.py
from lightning.app import LightningApp, CloudCompute
from lightning.app.components.training import LightningTrainerScript
# run script that trains PyTorch with the Lightning Trainer
model_script = 'pl_boring_script.py'
component = LightningTrainerScript(
model_script,
num_nodes=1,
cloud_compute=CloudCompute("gpu")
)
app = LightningApp(component)
# app.py
# !pip install scikit-learn xgboost
from lightning.app import LightningWork, LightningApp
from sklearn import datasets
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
class XGBoostComponent(LightningWork):
def run(self):
iris = datasets.load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
bst = XGBClassifier(verbosity=3)
bst.fit(X_train, y_train)
preds = bst.predict(X_test)
print(f'preds: {preds}')
app = LightningApp(XGBoostComponent())
# app.py
# !pip install streamlit omegaconf scipy
# !pip install torch
from lightning.app import LightningApp
import torch
from io import BytesIO
from functools import partial
from scipy.io.wavfile import write
import streamlit as st
class StreamlitApp(app.components.ServeStreamlit):
def build_model(self):
sample_rate = 48000
model, _ = torch.hub.load('snakers4/silero-models', model='silero_tts',speaker="v3_en")
return partial(model.apply_tts, sample_rate=sample_rate, speaker="en_0"), sample_rate
def render(self):
st.title("Text To Speech")
text = st.text_input("Text:", "Lightning Apps are the best!")
if text:
model, sample_rate = self.model
audio_numpy = model(text).numpy()
audio = BytesIO()
write(audio, sample_rate, audio_numpy)
audio.seek(0)
st.audio(audio)
app = LightningApp(StreamlitApp())
Key features¶
You now know enough to build a self-contained component that runs any Python code on the cloud that can be connected to form a powerful Lightning app. Here are a few key features available to super-charge your work:
# app.py
from lightning.app import LightningWork, LightningApp, CloudCompute
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
# custom accelerators
compute = CloudCompute('gpu')
component = YourComponent(cloud_compute=compute)
app = LightningApp(component)
# OTHER ACCELERATORS:
# compute = CloudCompute('default') # 1 CPU
# compute = CloudCompute('cpu-medium') # 8 CPUs
# compute = CloudCompute('gpu') # 1 T4 GPU
# compute = CloudCompute('gpu-fast-multi') # 4 V100 GPU
# compute = CloudCompute('p4d.24xlarge') # AWS instance name (8 A100 GPU)
# compute = ...
# app.py
from lightning.app import LightningWork, LightningApp, CloudCompute
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
# stop the machine when idle for 10 seconds
compute = CloudCompute('gpu', idle_timeout=10)
component = YourComponent(cloud_compute=compute)
app = LightningApp(component)
# app.py
from lightning.app import LightningWork, LightningApp, CloudCompute
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
# if the machine hasn't started after 60 seconds, cancel the work
compute = CloudCompute('gpu', wait_timeout=60)
component = YourComponent(cloud_compute=compute)
app = LightningApp(component)
# app.py
from lightning.app import LightningWork, LightningApp, CloudCompute
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
# spot machines can be turned off without notice, use for non-critical, resumable work
# request a spot machine, after 60 seconds of waiting switch to full-price
compute = CloudCompute('gpu', wait_timeout=60, spot=True)
component = YourComponent(cloud_compute=compute)
app = LightningApp(component)
# app.py
from lightning.app import LightningWork, LightningApp, CloudCompute
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
# use 100 GB of space on that machine (max size: 64 TB)
compute = CloudCompute('gpu', disk_size=100)
component = YourComponent(cloud_compute=compute)
app = LightningApp(component)
from lightning.app import LightningWork, LightningApp, CloudCompute
import os
class YourComponent(LightningWork):
def run(self):
os.listdir('/foo')
# mount the files on the s3 bucket under this path
mount = Mount(source="s3://lightning-example-public/", mount_path="/foo")
compute = CloudCompute(mounts=mount)
component = YourComponent(cloud_compute=compute)
app = LightningApp(component)
# app.py
from lightning.app import LightningWork, LightningApp
class YourComponent(LightningWork):
def run(self):
print('RUN ANY PYTHON CODE HERE')
# custom image (from any provider)
config= BuildConfig(image="gcr.io/google-samples/hello-app:1.0")
component = YourComponent(cloud_build_config=config)
app = LightningApp(component)
Next: Explore real component implementations¶
In this section we introduced components. Let’s explore real component implementations in-depth.