LightningFlow

class lightning.app.core.flow.LightningFlow[source]

Bases: object

The LightningFlow is used by the LightningApp to coordinate and manage long- running jobs contained, the LightningWork.

A LightningFlow is characterized by:

  • A set of state variables.

  • Long-running jobs (LightningWork).

  • Its children LightningFlow or LightningWork with their state variables.

State variables

The LightningFlow are special classes whose attributes require to be json-serializable (e.g., int, float, bool, list, dict, …).

They also may not reach into global variables unless they are constant.

The attributes need to be all defined in __init__ method, and eventually assigned to different values throughout the lifetime of the object. However, defining new attributes outside of __init__ is not allowed.

Attributes taken together represent the state of the component. Components are capable of retrieving their state and that of their children recursively at any time. They are also capable of setting an externally provided state recursively to its children.

Execution model and work

The entry point for execution is the run method at the root component. The run method of the root component may call the run method of its children, and the children may call the run methods of their children and so on.

The run method of the root component is called repeatedly in a while loop forever until the app gets terminated. In this programming model (reminiscent of React, Vue or Streamlit from the JavaScript world), the values of the state variables, or their changes, are translated into actions throughout the component hierarchy. This means the flow of execution will only be affected by state changes in a component or one of its children, and otherwise remain idempotent.

The actions themselves are self-contained within LightningWork. The LightningWork are typically used for long-running jobs, like downloading a dataset, performing a query, starting a computationally heavy script. While one may access any state variable in a LightningWork from a LightningFlow, one may not directly call methods of other components from within a LightningWork as LightningWork can’t have any children. This limitation allows applications to be distributed at scale.

Component hierarchy and App

Given the above characteristics, a root LightningFlow, potentially containing children components, can be passed to an App object and its execution can be distributed (each LightningWork will be run within its own process or different arrangements).

Example

>>> from lightning.app import LightningFlow
>>> class RootFlow(LightningFlow):
...     def __init__(self):
...         super().__init__()
...         self.counter = 0
...     def run(self):
...         self.counter += 1
...
>>> flow = RootFlow()
>>> flow.run()
>>> assert flow.counter == 1
>>> assert flow.state["vars"]["counter"] == 1
configure_api()[source]

Configure the API routes of the LightningFlow.

Returns a list of HttpMethod such as Post or Get.

from lightning.app import LightningFlow
from lightning.app.api import Post

from pydantic import BaseModel


class HandlerModel(BaseModel):
    name: str


class Flow(LightningFlow):
    def __init__(self):
        super().__init__()
        self.names = []

    def handler(self, config: HandlerModel) -> None:
        self.names.append(config.name)

    def configure_api(self):
        return [Post("/v1/api/request", self.handler)]

Once the app is running, you can access the Swagger UI of the app under the /docs route.

Return type:

None

configure_commands()[source]

Configure the commands of this LightningFlow.

Returns a list of dictionaries mapping a command name to a flow method.

class Flow(LightningFlow):
    def __init__(self):
        super().__init__()
        self.names = []

    def configure_commands(self):
        return {"my_command_name": self.my_remote_method}

    def my_remote_method(self, name):
        self.names.append(name)

Once the app is running with the following command:

lightning_app run app app.py
lightning_app my_command_name --args name=my_own_name
Return type:

None

configure_layout()[source]

Configure the UI layout of this LightningFlow.

You can either :rtype: Union[Dict[str, Any], List[Dict[str, Any]], Frontend]

  1. Return a single Frontend object to serve a user interface for this Flow.

  2. Return a single dictionary to expose the UI of a child flow.

  3. Return a list of dictionaries to arrange the children of this flow in one or multiple tabs.

Example: Serve a static directory (with at least a file index.html inside).

from lightning.app.frontend import StaticWebFrontend


class Flow(LightningFlow):
    ...

    def configure_layout(self):
        return StaticWebFrontend("path/to/folder/to/serve")

Example: Serve a streamlit UI (needs the streamlit package to be installed).

from lightning.app.frontend import StaticWebFrontend


class Flow(LightningFlow):
    ...

    def configure_layout(self):
        return StreamlitFrontend(render_fn=my_streamlit_ui)


def my_streamlit_ui(state):
    # add your streamlit code here!
    import streamlit as st

Example: Arrange the UI of my children in tabs (default UI by Lightning).

class Flow(LightningFlow):
    def configure_layout(self):
        return [
            dict(name="First Tab", content=self.child0),
            dict(name="Second Tab", content=self.child1),
            dict(name="Lightning", content="https://lightning.ai"),
        ]

If you don’t implement configure_layout, Lightning will collect all children and display their UI in a tab (if they have their own configure_layout implemented).

Note

This hook gets called at the time of app creation and then again as part of the loop. If desired, the returned layout configuration can depend on the state. The only exception are the flows that return a Frontend. These need to be provided at the time of app creation in order for the runtime to start the server.

Learn more about adding UI


experimental_iterate(iterable, run_once=True, user_key='')[source]

This method should always be used with any kind of iterable to ensure its fault tolerant.

If you want your iterable to always be consumed from scratch, you shouldn’t use this method.

Parameters:
  • iterable (Iterable) – Iterable to iterate over. The iterable shouldn’t have side effects or be random.

  • run_once (bool) – Whether to run the entire iteration only once. Otherwise, it would restart from the beginning.

  • user_key (str) – Key to be used to track the caching mechanism.

Return type:

Generator

fail(end_msg='')[source]

Method used to exit and fail the application.

Return type:

None

load_state_dict(flow_state, children_states, strict=True)[source]

Reloads the state of this flow and its children.

class Work(LightningWork):
    def __init__(self):
        super().__init__()
        self.counter = 0

    def run(self):
        self.counter += 1


class Flow(LightningFlow):
    def run(self):
        # dynamically create a work.
        if not getattr(self, "w", None):
            self.w = WorkReload()

        self.w.run()

    def load_state_dict(self, flow_state, children_states, strict) -> None:
        # 1: Re-instantiate the dynamic work
        self.w = Work()

        # 2: Make any states modification / migration.
        ...

        # 3: Call the parent ``load_state_dict`` to
        # recursively reload the states.
        super().load_state_dict(
            flow_state,
            children_states,
            strict,
        )
Parameters:
  • flow_state (Dict[str, Any]) – The state of the current flow.

  • children_states (Dict[str, Any]) – The state of the dynamic children of this flow.

  • strict (bool) – Whether to raise an exception if a dynamic children hasn’t been re-created.

Return type:

None

named_works(recurse=True)[source]

Return its LightningWork with their names.

Return type:

List[Tuple[str, LightningWork]]

run(*args, **kwargs)[source]

Override with your own logic.

Return type:

None

schedule(cron_pattern, start_time=None, user_key=None)[source]

The schedule method is used to run a part of the flow logic on timely manner.

from lightning.app import LightningFlow


class Flow(LightningFlow):
    def run(self):
        if self.schedule("hourly"):
            print("run some code every hour")
Parameters:
Return type:

bool

A best practice is to avoid running a dynamic flow or work under the self.schedule method. Instead, instantiate them within the condition, but run them outside.

from lightning.app import LightningFlow
from lightning.app.structures import List


class SchedulerDAG(LightningFlow):
    def __init__(self):
        super().__init__()
        self.dags = List()

    def run(self):
        if self.schedule("hourly"):
            self.dags.append(DAG(...))

        for dag in self.dags:
            payload = dag.run()

Learn more about Scheduling


state_dict()[source]

Returns the current flow state but not its children.

Return type:

dict

stop(end_msg='')[source]

Method used to exit the application.

Return type:

None

works(recurse=True)[source]

Return its LightningWork.

Return type:

List[LightningWork]

property flows: Dict[str, LightningFlow][source]

Return its children LightningFlow.

property lightningignore: Tuple[str, ...][source]

Programmatic equivalent of the .lightningignore file.

property name: str[source]

Return the current LightningFlow name.

property ready: bool[source]

Override to customize when your App should be ready.

property state: dict[source]

Returns the current flow state along its children.