Shortcuts

LightningFlow

class lightning_app.core.flow.LightningFlow[source]

Bases: object

The LightningFlow is used by the LightningApp to coordinate and manage long- running jobs contained, the LightningWork.

A LightningFlow is characterized by:

  • A set of state variables.

  • Long-running jobs (LightningWork).

  • Its children LightningFlow or LightningWork with their state variables.

State variables

The LightningFlow are special classes whose attributes require to be json-serializable (e.g., int, float, bool, list, dict, …).

They also may not reach into global variables unless they are constant.

The attributes need to be all defined in __init__ method, and eventually assigned to different values throughout the lifetime of the object. However, defining new attributes outside of __init__ is not allowed.

Attributes taken together represent the state of the component. Components are capable of retrieving their state and that of their children recursively at any time. They are also capable of setting an externally provided state recursively to its children.

Execution model and work

The entry point for execution is the run method at the root component. The run method of the root component may call the run method of its children, and the children may call the run methods of their children and so on.

The run method of the root component is called repeatedly in a while loop forever until the app gets terminated. In this programming model (reminiscent of React, Vue or Streamlit from the JavaScript world), the values of the state variables, or their changes, are translated into actions throughout the component hierarchy. This means the flow of execution will only be affected by state changes in a component or one of its children, and otherwise remain idempotent.

The actions themselves are self-contained within LightningWork. The LightningWork are typically used for long-running jobs, like downloading a dataset, performing a query, starting a computationally heavy script. While one may access any state variable in a LightningWork from a LightningFlow, one may not directly call methods of other components from within a LightningWork as LightningWork can’t have any children. This limitation allows applications to be distributed at scale.

Component hierarchy and App

Given the above characteristics, a root LightningFlow, potentially containing children components, can be passed to an App object and its execution can be distributed (each LightningWork will be run within its own process or different arrangements).

>>> from lightning import LightningFlow
>>> class RootFlow(LightningFlow):
...     def __init__(self):
...         super().__init__()
...         self.counter = 0
...     def run(self):
...         self.counter += 1
...
>>> flow = RootFlow()
>>> flow.run()
>>> assert flow.counter == 1
>>> assert flow.state["vars"]["counter"] == 1
configure_api()[source]

Configure the API routes of the LightningFlow.

Returns a list of HttpMethod such as Post or Get.

from lightning_app import LightningFlow
from lightning_app.api import Post

from pydantic import BaseModel


class HandlerModel(BaseModel):
    name: str


class Flow(L.LightningFlow):
    def __init__(self):
        super().__init__()
        self.names = []

    def handler(self, config: HandlerModel) -> None:
        self.names.append(config.name)

    def configure_api(self):
        return [Post("/v1/api/request", self.handler)]

Once the app is running, you can access the Swagger UI of the app under the /docs route.

configure_commands()[source]

Configure the commands of this LightningFlow.

Returns a list of dictionaries mapping a command name to a flow method.

class Flow(LightningFlow):
    def __init__(self):
        super().__init__()
        self.names = []

    def configure_commands(self):
        return {"my_command_name": self.my_remote_method}

    def my_remote_method(self, name):
        self.names.append(name)

Once the app is running with the following command:

lightning run app app.py
lightning my_command_name --args name=my_own_name
configure_layout()[source]

Configure the UI layout of this LightningFlow.

You can either

  1. Return a single Frontend object to serve a user interface for this Flow.

  2. Return a single dictionary to expose the UI of a child flow.

  3. Return a list of dictionaries to arrange the children of this flow in one or multiple tabs.

Example: Serve a static directory (with at least a file index.html inside).

from lightning_app.frontend import StaticWebFrontend


class Flow(LightningFlow):
    ...

    def configure_layout(self):
        return StaticWebFrontend("path/to/folder/to/serve")

Example: Serve a streamlit UI (needs the streamlit package to be installed).

from lightning_app.frontend import StaticWebFrontend


class Flow(LightningFlow):
    ...

    def configure_layout(self):
        return StreamlitFrontend(render_fn=my_streamlit_ui)


def my_streamlit_ui(state):
    # add your streamlit code here!
    import streamlit as st

Example: Arrange the UI of my children in tabs (default UI by Lightning).

class Flow(LightningFlow):
    def configure_layout(self):
        return [
            dict(name="First Tab", content=self.child0),
            dict(name="Second Tab", content=self.child1),
            dict(name="Lightning", content="https://lightning.ai"),
        ]

If you don’t implement configure_layout, Lightning will collect all children and display their UI in a tab (if they have their own configure_layout implemented).

Note

This hook gets called at the time of app creation and then again as part of the loop. If desired, the returned layout configuration can depend on the state. The only exception are the flows that return a Frontend. These need to be provided at the time of app creation in order for the runtime to start the server.

Learn more about adding UI


Return type

Union[Dict[str, Any], List[Dict[str, Any]], Frontend]

experimental_iterate(iterable, run_once=True, user_key='')[source]

This method should always be used with any kind of iterable to ensure its fault tolerant.

If you want your iterable to always be consumed from scratch, you shouldn’t use this method.

Parameters
  • iterable (Iterable) – Iterable to iterate over. The iterable shouldn’t have side effects or be random.

  • run_once (bool) – Whether to run the entire iteration only once. Otherwise, it would restart from the beginning.

  • user_key (str) – Key to be used to track the caching mechanism.

Return type

Generator

named_works(recurse=True)[source]

Return its LightningWork with their names.

Return type

List[Tuple[str, LightningWork]]

run(*args, **kwargs)[source]

Override with your own logic.

Return type

None

schedule(cron_pattern, start_time=None, user_key=None)[source]

The schedule method is used to run a part of the flow logic on timely manner.

from lightning_app import LightningFlow


class Flow(LightningFlow):
    def run(self):
        if self.schedule("hourly"):
            print("run some code every hour")
Parameters

A best practice is to avoid running a dynamic flow or work under the self.schedule method. Instead, instantiate them within the condition, but run them outside.

from lightning_app import LightningFlow
from lightning_app.structures import List


class SchedulerDAG(LightningFlow):
    def __init__(self):
        super().__init__()
        self.dags = List()

    def run(self):
        if self.schedule("hourly"):
            self.dags.append(DAG(...))

        for dag in self.dags:
            payload = dag.run()

Learn more about Scheduling


Return type

bool

works(recurse=True)[source]

Return its LightningWork.

Return type

List[LightningWork]

property flows[source]

Return its children LightningFlow.

property name[source]

Return the current LightningFlow name.

property state[source]

Returns the current flow state along its children.