Quickstart ========== This short guide shows how to process ordered data out-of-core for both training dataset production and live usage. It uses ``StatefulLoop`` to iterate, process with state, buffer under a memory cap, and then persist chunks with ``Store``. .. code-block:: python :name: test_quickstart from dataclasses import dataclass from pathlib import Path import tempfile import pandas as pd from oups import StatefulLoop, Store, toplevel # 1) Produce ordered chunks (simulate out-of-core/stream). def iter_batches(): for i in range(3): yield pd.DataFrame({ "timestamp": pd.RangeIndex(i*5, (i+1)*5), # ordered "value": list(range(i*5, (i+1)*5)), }) # 2) Define indexer to group results. @toplevel class Index: name: str # 3) A stateful function, with a mutable state. def add_to_sum(df, agg_state): agg_state["sum"] += df["value"].sum() with tempfile.TemporaryDirectory() as tmpdir: # 4.1) Define where results will be stored. store = Store(Path(tmpdir) / "oups-data", Index) key = Index("demo") # 4.2) Define initial function state and bind function state to # stateful loop. loop = StatefulLoop(Path(tmpdir) / "loop_persistence.pkl") agg_state = {"sum": 0} # function initial state stateful_sum = loop.bind_function_state(add_to_sum, state={"agg_state": agg_state}) # 4.2) Iterate over ordered batches. for item_ctx in loop.iterate(iter_batches(), check_loop_usage=False): with item_ctx as df: stateful_sum(df) # uses and updates state out = loop.buffer({"main": df}, memory_limit_mb=1.0) # raises Skip until flush store[key].write(df=out["main"], ordered_on="timestamp") # persists chunk # Verify the simple aggregation ran assert agg_state["sum"] == sum(range(15)) Notes ----- - ``StatefulLoop`` persists function/object state at the end of the loop to resume later runs. - ``buffer`` raises ``Skip`` until the memory limit is hit or the last iteration, then returns concatenated chunks for downstream processing. - ``Store[key].write`` merges new data, maintains ordering, and coalesces row groups as configured.