Quickstart
This short guide shows how to process ordered data out-of-core for both training dataset production and live usage. It uses StatefulLoop to iterate, process with state, buffer under a memory cap, and then persist chunks with Store.
from dataclasses import dataclass
from pathlib import Path
import tempfile
import pandas as pd
from oups import StatefulLoop, Store, toplevel
# 1) Produce ordered chunks (simulate out-of-core/stream).
def iter_batches():
for i in range(3):
yield pd.DataFrame({
"timestamp": pd.RangeIndex(i*5, (i+1)*5), # ordered
"value": list(range(i*5, (i+1)*5)),
})
# 2) Define indexer to group results.
@toplevel
class Index:
name: str
# 3) A stateful function, with a mutable state.
def add_to_sum(df, agg_state):
agg_state["sum"] += df["value"].sum()
with tempfile.TemporaryDirectory() as tmpdir:
# 4.1) Define where results will be stored.
store = Store(Path(tmpdir) / "oups-data", Index)
key = Index("demo")
# 4.2) Define initial function state and bind function state to
# stateful loop.
loop = StatefulLoop(Path(tmpdir) / "loop_persistence.pkl")
agg_state = {"sum": 0} # function initial state
stateful_sum = loop.bind_function_state(add_to_sum, state={"agg_state": agg_state})
# 4.2) Iterate over ordered batches.
for item_ctx in loop.iterate(iter_batches(), check_loop_usage=False):
with item_ctx as df:
stateful_sum(df) # uses and updates state
out = loop.buffer({"main": df}, memory_limit_mb=1.0) # raises Skip until flush
store[key].write(df=out["main"], ordered_on="timestamp") # persists chunk
# Verify the simple aggregation ran
assert agg_state["sum"] == sum(range(15))
Notes
StatefulLooppersists function/object state at the end of the loop to resume later runs.bufferraisesSkipuntil the memory limit is hit or the last iteration, then returns concatenated chunks for downstream processing.Store[key].writemerges new data, maintains ordering, and coalesces row groups as configured.