StatefulLoop ============ ``StatefulLoop`` orchestrates ordered, batched processing with resumable state and memory-aware buffering. Typical usage ------------- .. code-block:: python :name: test_stateful_loop_usage from pathlib import Path import tempfile import pandas as pd from oups.stateful_loop import StatefulLoop # Define stateful function. def process_fn(df, agg_state): agg_state["sum"] += df["value"].sum() with tempfile.TemporaryDirectory() as tmpdir: tmp = Path(tmpdir) / "loop.pkl" loop = StatefulLoop(tmp) agg_state = {"sum": 0} # function initial state # Bind function state to stateful loop. sum_fn = loop.bind_function_state(process_fn, state={"agg_state": agg_state}) batches = [pd.DataFrame({"value": [1]}), pd.DataFrame({"value": [2]})] for item_ctx in loop.iterate(batches, check_loop_usage=False): with item_ctx as df: sum_fn(df) out = loop.buffer({"main": df}, memory_limit_mb=0.000001) # raises Skip until flush # downstream only runs on flush (or last iteration) assert agg_state["sum"] == 3 .. code-block:: python :name: test_stateful_loop_basic from pathlib import Path import tempfile from oups.stateful_loop import StatefulLoop with tempfile.TemporaryDirectory() as tmpdir: tmp = Path(tmpdir) / "loop.pkl" loop = StatefulLoop(tmp) out = [] for item_ctx in loop.iterate([1, 2], check_loop_usage=False): with item_ctx as item: out.append(item) assert out == [1, 2] Key ideas --------- - **State binding**: bind mutable parameters to functions, or bind object attributes. - **Resuming**: if the loop persistence file exists at construction, initial state values are ignored and stored state is used. - **Buffering**: ``buffer`` accumulates DataFrames per-call-site and per-key; returns concatenated DataFrames when memory limit is exceeded or at the last iteration; otherwise raises ``Skip`` which the iteration context swallows. - **Validation**: set ``iterate(..., check_loop_usage=True)`` to strictly enforce safe placement of ``with item_ctx as ...`` and ``buffer`` calls at top level in loop body. Behavioral notes ---------------- - ``IterationContext`` swallows ``Skip`` to continue to the next iteration without running downstream code. - When ``check_loop_usage`` is enabled (or on first run when a persistence file does not yet exist), illegal placements of ``buffer()`` (e.g., nested blocks, nested loops) are rejected early by an AST validator. API --- - ``StatefulLoop.iterate(iterable, check_loop_usage=None)`` - ``StatefulLoop.buffer(data, memory_limit_mb=None, concat_func=pandas.concat)`` - ``StatefulLoop.bind_function_state(func, state, name=None)`` - ``StatefulLoop.bind_object_state(obj, state, name=None)`` See also -------- - Concepts: :doc:`stateful_processing` - Quickstart: :doc:`quickstart` - Store: :doc:`store` - AsofMerger: :doc:`asof_merger`