StatefulLoop
StatefulLoop orchestrates ordered, batched processing with resumable state and memory-aware buffering.
Typical usage
from pathlib import Path
import tempfile
import pandas as pd
from oups.stateful_loop import StatefulLoop
# Define stateful function.
def process_fn(df, agg_state):
agg_state["sum"] += df["value"].sum()
with tempfile.TemporaryDirectory() as tmpdir:
tmp = Path(tmpdir) / "loop.pkl"
loop = StatefulLoop(tmp)
agg_state = {"sum": 0} # function initial state
# Bind function state to stateful loop.
sum_fn = loop.bind_function_state(process_fn, state={"agg_state": agg_state})
batches = [pd.DataFrame({"value": [1]}), pd.DataFrame({"value": [2]})]
for item_ctx in loop.iterate(batches, check_loop_usage=False):
with item_ctx as df:
sum_fn(df)
out = loop.buffer({"main": df}, memory_limit_mb=0.000001) # raises Skip until flush
# downstream only runs on flush (or last iteration)
assert agg_state["sum"] == 3
from pathlib import Path
import tempfile
from oups.stateful_loop import StatefulLoop
with tempfile.TemporaryDirectory() as tmpdir:
tmp = Path(tmpdir) / "loop.pkl"
loop = StatefulLoop(tmp)
out = []
for item_ctx in loop.iterate([1, 2], check_loop_usage=False):
with item_ctx as item:
out.append(item)
assert out == [1, 2]
Key ideas
State binding: bind mutable parameters to functions, or bind object attributes.
Resuming: if the loop persistence file exists at construction, initial state values are ignored and stored state is used.
Buffering:
bufferaccumulates DataFrames per-call-site and per-key; returns concatenated DataFrames when memory limit is exceeded or at the last iteration; otherwise raisesSkipwhich the iteration context swallows.Validation: set
iterate(..., check_loop_usage=True)to strictly enforce safe placement ofwith item_ctx as ...andbuffercalls at top level in loop body.
Behavioral notes
IterationContextswallowsSkipto continue to the next iteration without running downstream code.When
check_loop_usageis enabled (or on first run when a persistence file does not yet exist), illegal placements ofbuffer()(e.g., nested blocks, nested loops) are rejected early by an AST validator.
API
StatefulLoop.iterate(iterable, check_loop_usage=None)StatefulLoop.buffer(data, memory_limit_mb=None, concat_func=pandas.concat)StatefulLoop.bind_function_state(func, state, name=None)StatefulLoop.bind_object_state(obj, state, name=None)
See also
Concepts: Stateful Processing Concepts
Quickstart: Quickstart
Store: Store Architecture
AsofMerger: AsofMerger