Skip to content

Async & loading

Goal

Run stdlib-thread work with streamtree.asyncio.submit, poll TaskHandle on reruns, and map status to loading / ready / error subtrees with match_task or match_task_many.

Install

streamtree.asyncio ships in the default install (no extra). Keep [asyncio] as metadata for future backends per Dependency strategy.

Minimal submit + poll

Full small demo:

"""Background task demo: ``streamtree.asyncio.submit`` + poll ``status`` / ``result``.

For rerun-polled **progress** from the worker (0.5+), use ``set_task_progress(key=..., value=...)``
with the same ``key`` as ``submit``, and read ``TaskHandle.progress()`` on the main thread.

**0.7+:** use ``submit_many`` for several independent callables, and cooperative cancel
(``TaskHandle.cancel()`` while **running**, then ``is_task_cancel_requested`` /
``complete_cancelled`` inside long workers) — see ``streamtree.asyncio`` module docstring.
"""

from __future__ import annotations

import time

from streamtree import component, render
from streamtree.asyncio import submit, submit_many
from streamtree.elements import Markdown, Page, Text, VStack


@component
def AsyncDemo() -> object:
    h = submit(lambda: time.sleep(0.3) or 42, key="demo_slow_job")
    status = h.status()
    if status == "done":
        return VStack(Text(f"Result: {h.result()}"), Markdown("_Task finished._"))
    if status == "error":
        return Markdown(h.error() or "error")
    return VStack(Text(f"Status: {status}"), Markdown("Rerun to poll until **done**."))


@component
def ManySmallJobs() -> object:
    a, b = submit_many((("job_a", lambda: 10), ("job_b", lambda: 20)))
    if a.status() == "done" and b.status() == "done":
        return Text(f"submit_many: {a.result()} + {b.result()} = {a.result()!s} + {b.result()!s}")
    return Text(f"Many jobs: {a.status()=!s} {b.status()=!s} (rerun to poll)")


if __name__ == "__main__":
    render(Page(VStack(AsyncDemo(), Markdown("---"), ManySmallJobs())))

Run:

streamlit run examples/async_bg.py

Declarative loading UI

Use streamtree.loading.match_task to pick which element subtree to show for each TaskHandle.status. Multi-handle orchestration: match_task_many, stable ordering with submit_many_ordered (see Performance).

"""``match_task`` + :func:`streamtree.asyncio.submit` for loading / ready / error subtrees."""

from __future__ import annotations

import time

from streamtree import asyncio, component, render
from streamtree.core.element import Element
from streamtree.elements import Page, Text, VStack
from streamtree.loading import match_task


@component
def LoaderDemo() -> Element:
    def fetch() -> int:
        time.sleep(0.15)
        return 42

    handle = asyncio.submit(fetch, key="loader_demo_value")

    return match_task(
        handle,
        loading=VStack(Text("Loading…")),
        ready=lambda n: VStack(Text(f"Ready: result={n!r}")),
        error=VStack(Text("Something went wrong.")),
        cancelled=VStack(Text("Cancelled.")),
    )


if __name__ == "__main__":
    render(Page(LoaderDemo()))

Cancellation & cleanup

  • TaskHandle.cancel() — cooperative cancel for running work; pair with is_task_cancel_requested / complete_cancelled in the worker.
  • dismiss_task / dismiss_tasks — remove terminal session entries so a key can be reused without stale collisions.

Dev inspection

streamtree.asyncio.summarize_async_tasks() returns JSON-serializable rows (status, progress preview, cancel flags) for debugging panels or logs.

See also