Quickstart

This short guide shows how to process ordered data out-of-core for both training dataset production and live usage. It uses StatefulLoop to iterate, process with state, buffer under a memory cap, and then persist chunks with Store.

 from dataclasses import dataclass
 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups import StatefulLoop, Store, toplevel

 # 1) Produce ordered chunks (simulate out-of-core/stream).
 def iter_batches():
     for i in range(3):
         yield pd.DataFrame({
             "timestamp": pd.RangeIndex(i*5, (i+1)*5),  # ordered
             "value": list(range(i*5, (i+1)*5)),
         })

 # 2) Define indexer to group results.
 @toplevel
 class Index:
     name: str

 # 3) A stateful function, with a mutable state.
 def add_to_sum(df, agg_state):
     agg_state["sum"] += df["value"].sum()

 with tempfile.TemporaryDirectory() as tmpdir:
     # 4.1) Define where results will be stored.
     store = Store(Path(tmpdir) / "oups-data", Index)
     key = Index("demo")
     # 4.2) Define initial function state and bind function state to
     # stateful loop.
     loop = StatefulLoop(Path(tmpdir) / "loop_persistence.pkl")
     agg_state = {"sum": 0} # function initial state
     stateful_sum = loop.bind_function_state(add_to_sum, state={"agg_state": agg_state})
     # 4.2) Iterate over ordered batches.
     for item_ctx in loop.iterate(iter_batches(), check_loop_usage=False):
         with item_ctx as df:
             stateful_sum(df)  # uses and updates state
             out = loop.buffer({"main": df}, memory_limit_mb=1.0)  # raises Skip until flush
             store[key].write(df=out["main"], ordered_on="timestamp")  # persists chunk

     # Verify the simple aggregation ran
     assert agg_state["sum"] == sum(range(15))

Notes

  • StatefulLoop persists function/object state at the end of the loop to resume later runs.

  • buffer raises Skip until the memory limit is hit or the last iteration, then returns concatenated chunks for downstream processing.

  • Store[key].write merges new data, maintains ordering, and coalesces row groups as configured.