LightningFlow¶
- class lightning.app.core.flow.LightningFlow[source]¶
Bases:
object
The LightningFlow is used by the
LightningApp
to coordinate and manage long- running jobs contained, theLightningWork
.A LightningFlow is characterized by:
A set of state variables.
Long-running jobs (
LightningWork
).Its children
LightningFlow
orLightningWork
with their state variables.
State variables
The LightningFlow are special classes whose attributes require to be json-serializable (e.g., int, float, bool, list, dict, …).
They also may not reach into global variables unless they are constant.
The attributes need to be all defined in __init__ method, and eventually assigned to different values throughout the lifetime of the object. However, defining new attributes outside of __init__ is not allowed.
Attributes taken together represent the state of the component. Components are capable of retrieving their state and that of their children recursively at any time. They are also capable of setting an externally provided state recursively to its children.
Execution model and work
The entry point for execution is the
run
method at the root component. Therun
method of the root component may call therun
method of its children, and the children may call therun
methods of their children and so on.The
run
method of the root component is called repeatedly in a while loop forever until the app gets terminated. In this programming model (reminiscent of React, Vue or Streamlit from the JavaScript world), the values of the state variables, or their changes, are translated into actions throughout the component hierarchy. This means the flow of execution will only be affected by state changes in a component or one of its children, and otherwise remain idempotent.The actions themselves are self-contained within
LightningWork
. TheLightningWork
are typically used for long-running jobs, like downloading a dataset, performing a query, starting a computationally heavy script. While one may access any state variable in a LightningWork from a LightningFlow, one may not directly call methods of other components from within a LightningWork as LightningWork can’t have any children. This limitation allows applications to be distributed at scale.Component hierarchy and App
Given the above characteristics, a root LightningFlow, potentially containing children components, can be passed to an App object and its execution can be distributed (each LightningWork will be run within its own process or different arrangements).
Example
>>> from lightning.app import LightningFlow >>> class RootFlow(LightningFlow): ... def __init__(self): ... super().__init__() ... self.counter = 0 ... def run(self): ... self.counter += 1 ... >>> flow = RootFlow() >>> flow.run() >>> assert flow.counter == 1 >>> assert flow.state["vars"]["counter"] == 1
- configure_api()[source]¶
Configure the API routes of the LightningFlow.
Returns a list of HttpMethod such as Post or Get.
from lightning.app import LightningFlow from lightning.app.api import Post from pydantic import BaseModel class HandlerModel(BaseModel): name: str class Flow(LightningFlow): def __init__(self): super().__init__() self.names = [] def handler(self, config: HandlerModel) -> None: self.names.append(config.name) def configure_api(self): return [Post("/v1/api/request", self.handler)]
Once the app is running, you can access the Swagger UI of the app under the
/docs
route.- Return type:
- configure_commands()[source]¶
Configure the commands of this LightningFlow.
Returns a list of dictionaries mapping a command name to a flow method.
class Flow(LightningFlow): def __init__(self): super().__init__() self.names = [] def configure_commands(self): return {"my_command_name": self.my_remote_method} def my_remote_method(self, name): self.names.append(name)
Once the app is running with the following command:
lightning run app app.py
lightning my_command_name --args name=my_own_name
- Return type:
- configure_layout()[source]¶
Configure the UI layout of this LightningFlow.
You can either :rtype:
Union
[Dict
[str
,Any
],List
[Dict
[str
,Any
]],Frontend
]Return a single
Frontend
object to serve a user interface for this Flow.Return a single dictionary to expose the UI of a child flow.
Return a list of dictionaries to arrange the children of this flow in one or multiple tabs.
Example: Serve a static directory (with at least a file index.html inside).
from lightning.app.frontend import StaticWebFrontend class Flow(LightningFlow): ... def configure_layout(self): return StaticWebFrontend("path/to/folder/to/serve")
Example: Serve a streamlit UI (needs the streamlit package to be installed).
from lightning.app.frontend import StaticWebFrontend class Flow(LightningFlow): ... def configure_layout(self): return StreamlitFrontend(render_fn=my_streamlit_ui) def my_streamlit_ui(state): # add your streamlit code here! import streamlit as st
Example: Arrange the UI of my children in tabs (default UI by Lightning).
class Flow(LightningFlow): def configure_layout(self): return [ dict(name="First Tab", content=self.child0), dict(name="Second Tab", content=self.child1), dict(name="Lightning", content="https://lightning.ai"), ]
If you don’t implement
configure_layout
, Lightning will collect all children and display their UI in a tab (if they have their ownconfigure_layout
implemented).Note
This hook gets called at the time of app creation and then again as part of the loop. If desired, the returned layout configuration can depend on the state. The only exception are the flows that return a
Frontend
. These need to be provided at the time of app creation in order for the runtime to start the server.Learn more about adding UI
- experimental_iterate(iterable, run_once=True, user_key='')[source]¶
This method should always be used with any kind of iterable to ensure its fault tolerant.
If you want your iterable to always be consumed from scratch, you shouldn’t use this method.
- Parameters:
- Return type:
- load_state_dict(flow_state, children_states, strict=True)[source]¶
Reloads the state of this flow and its children.
class Work(LightningWork): def __init__(self): super().__init__() self.counter = 0 def run(self): self.counter += 1 class Flow(LightningFlow): def run(self): # dynamically create a work. if not getattr(self, "w", None): self.w = WorkReload() self.w.run() def load_state_dict(self, flow_state, children_states, strict) -> None: # 1: Re-instantiate the dynamic work self.w = Work() # 2: Make any states modification / migration. ... # 3: Call the parent ``load_state_dict`` to # recursively reload the states. super().load_state_dict( flow_state, children_states, strict, )
- named_works(recurse=True)[source]¶
Return its
LightningWork
with their names.- Return type:
- schedule(cron_pattern, start_time=None, user_key=None)[source]¶
The schedule method is used to run a part of the flow logic on timely manner.
from lightning.app import LightningFlow class Flow(LightningFlow): def run(self): if self.schedule("hourly"): print("run some code every hour")
- Parameters:
- Return type:
A best practice is to avoid running a dynamic flow or work under the self.schedule method. Instead, instantiate them within the condition, but run them outside.
from lightning.app import LightningFlow from lightning.app.structures import List class SchedulerDAG(LightningFlow): def __init__(self): super().__init__() self.dags = List() def run(self): if self.schedule("hourly"): self.dags.append(DAG(...)) for dag in self.dags: payload = dag.run()
Learn more about Scheduling
- works(recurse=True)[source]¶
Return its
LightningWork
.- Return type:
- property flows: Dict[str, LightningFlow][source]¶
Return its children LightningFlow.