StatefulLoop

StatefulLoop orchestrates ordered, batched processing with resumable state and memory-aware buffering.

Typical usage

 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups.stateful_loop import StatefulLoop

 # Define stateful function.
 def process_fn(df, agg_state):
     agg_state["sum"] += df["value"].sum()

 with tempfile.TemporaryDirectory() as tmpdir:
     tmp = Path(tmpdir) / "loop.pkl"
     loop = StatefulLoop(tmp)
     agg_state = {"sum": 0} # function initial state
     # Bind function state to stateful loop.
     sum_fn = loop.bind_function_state(process_fn, state={"agg_state": agg_state})

     batches = [pd.DataFrame({"value": [1]}), pd.DataFrame({"value": [2]})]
     for item_ctx in loop.iterate(batches, check_loop_usage=False):
         with item_ctx as df:
             sum_fn(df)
             out = loop.buffer({"main": df}, memory_limit_mb=0.000001)  # raises Skip until flush
             # downstream only runs on flush (or last iteration)

     assert agg_state["sum"] == 3
from pathlib import Path
import tempfile
from oups.stateful_loop import StatefulLoop

with tempfile.TemporaryDirectory() as tmpdir:
    tmp = Path(tmpdir) / "loop.pkl"
    loop = StatefulLoop(tmp)
    out = []
    for item_ctx in loop.iterate([1, 2], check_loop_usage=False):
        with item_ctx as item:
            out.append(item)
    assert out == [1, 2]

Key ideas

  • State binding: bind mutable parameters to functions, or bind object attributes.

  • Resuming: if the loop persistence file exists at construction, initial state values are ignored and stored state is used.

  • Buffering: buffer accumulates DataFrames per-call-site and per-key; returns concatenated DataFrames when memory limit is exceeded or at the last iteration; otherwise raises Skip which the iteration context swallows.

  • Validation: set iterate(..., check_loop_usage=True) to strictly enforce safe placement of with item_ctx as ... and buffer calls at top level in loop body.

Behavioral notes

  • IterationContext swallows Skip to continue to the next iteration without running downstream code.

  • When check_loop_usage is enabled (or on first run when a persistence file does not yet exist), illegal placements of buffer() (e.g., nested blocks, nested loops) are rejected early by an AST validator.

API

  • StatefulLoop.iterate(iterable, check_loop_usage=None)

  • StatefulLoop.buffer(data, memory_limit_mb=None, concat_func=pandas.concat)

  • StatefulLoop.bind_function_state(func, state, name=None)

  • StatefulLoop.bind_object_state(obj, state, name=None)

See also