reactive.poll
reactive.poll(
poll_func,=1,
interval_secs*,
=eq,
equals=0,
priority=MISSING,
session )
Create a reactive polling object.
Polling is a technique that approximates "real-time" or streaming updates, as if a data source were pushing notifications each time it is updated. The data source does not actually push notifications; a polling object repeatedly checks for changes in an efficient way at specified intervals. If a change is detected, the polling object runs a function to re-read the data source.
A reactive polling object is constructed using two functions: a polling function, which is a fast-running, inexpensive function that is used to determine whether some data source has changed (such as the timestamp of a file, or a SELECT MAX(updated) FROM table
query); and a slower-running reading function that actually loads and returns the data that is desired. The poll()
function is intended to be used as a decorator: the poll function is passed as the poll_func
arg to @poll()
, while the data reading function is the target of the decorator.
Reactive consumers can invoke the resulting polling object to get the current data, and will automatically invalidate when the polling function detects a change. Polling objects also cache the results of the read function; for this reason, apps where all sessions depend on the same data source may want to declare the polling object at the top level of app.py (outside of the server function).
Both poll_func
and the decorated (data reading) function can read reactive values and calc
objects. Any invalidations triggered by reactive dependencies will apply to the reactive polling object immediately (not waiting for the interval_secs
delay to expire).
Parameters
poll_func : Callable[[], Any] | Callable[[], Awaitable[Any]]
-
A function to be called frequently to determine whether a data source has changed. The return value should be something that can be compared inexpensively using
==
. Both regular functions and co-routine functions are allowed. Note that thepoll_func
should NOT return a bool that indicates whether the data source has changed. Rather, eachpoll_func
return value will be checked for equality with its precedingpoll_func
return value (using==
semantics by default), and if it differs, the data source will be considered changed. interval_secs : float = 1
-
The number of seconds to wait after each
poll_func
invocation before polling again. Note: depending on what other tasks are executing, the actual wait time may far exceed this value. equals : Callable[[Any, Any], bool] = eq
-
The function that will be used to compare each
poll_func
return value with its immediate predecessor. priority : int = 0
-
Reactive polling is implemented using an
effect
to callpoll_func
on a timer; use thepriority
argument to control the order of this Effect’s execution versus other Effects in your app. Seeeffect
for more details. session : MISSING_TYPE | Session | None = MISSING
-
A
Session
instance. If not provided, a session is inferred viaget_current_session
. If there is no current session (i.e.poll
is being created outside of the server function), the lifetime of this reactive poll object will not be tied to any specific session.
Returns
: Callable[[Callable[[],
T
]], Callable[[],T
]]-
A decorator that should be applied to a no-argument function that (expensively) reads whatever data is desired. (This function may be a regular function or a co-routine function.) The result of the decorator is a reactive
calc
that always returns up-to-date data, and invalidates callers when changes are detected via polling.
See Also
Examples
#| standalone: true
#| components: [editor, viewer]
#| layout: vertical
#| viewerHeight: 400
## file: app.py
import asyncio
import random
import sqlite3
from datetime import datetime
from typing import Any, Awaitable
import pandas as pd
from shiny import reactive
from shiny.express import input, render, ui
SYMBOLS = ["AAA", "BBB", "CCC", "DDD", "EEE", "FFF"]
def timestamp() -> str:
return datetime.now().strftime("%x %X")
def rand_price() -> float:
return round(random.random() * 250, 2)
# === Initialize the database =========================================
def init_db(con: sqlite3.Connection) -> None:
cur = con.cursor()
try:
cur.executescript(
"""
CREATE TABLE stock_quotes (timestamp text, symbol text, price real);
CREATE INDEX idx_timestamp ON stock_quotes (timestamp);
"""
)
cur.executemany(
"INSERT INTO stock_quotes (timestamp, symbol, price) VALUES (?, ?, ?)",
[(timestamp(), symbol, rand_price()) for symbol in SYMBOLS],
)
con.commit()
finally:
cur.close()
conn = sqlite3.connect(":memory:")
init_db(conn)
# === Randomly update the database with an asyncio.task ==============
def update_db(con: sqlite3.Connection) -> None:
"""Update a single stock price entry at random"""
cur = con.cursor()
try:
sym = SYMBOLS[random.randint(0, len(SYMBOLS) - 1)]
print(f"Updating {sym}")
cur.execute(
"UPDATE stock_quotes SET timestamp = ?, price = ? WHERE symbol = ?",
(timestamp(), rand_price(), sym),
)
con.commit()
finally:
cur.close()
async def update_db_task(con: sqlite3.Connection) -> Awaitable[None]:
"""Task that alternates between sleeping and updating prices"""
while True:
await asyncio.sleep(random.random() * 1.5)
update_db(con)
_ = asyncio.create_task(update_db_task(conn))
# === Create the reactive.poll object ===============================
def tbl_last_modified() -> Any:
df = pd.read_sql_query("SELECT MAX(timestamp) AS timestamp FROM stock_quotes", conn)
return df["timestamp"].to_list()
@reactive.poll(tbl_last_modified, 0.5)
def stock_quotes() -> pd.DataFrame:
return pd.read_sql_query("SELECT timestamp, symbol, price FROM stock_quotes", conn)
with ui.card():
ui.markdown(
"""
# `shiny.reactive.poll` demo
This example app shows how to stream results from a database (in this
case, an in-memory sqlite3) with the help of `shiny.reactive.poll`.
"""
)
ui.input_selectize("symbols", "Filter by symbol", [""] + SYMBOLS, multiple=True)
@render.data_frame
def table():
df = stock_quotes()
if input.symbols():
df = df[df["symbol"].isin(input.symbols())]
return df