AsofMerger
AsofMerger performs vectorized as-of joins across multiple groups and multiple dataframes per group, optionally with previous values.
Highlights
Multiple groups, multiple join positions per output row
Previous values per dataframe via
n_prevwith guard rowsIterative usage by reusing internal buffers (set
copy=Falsewith care)
Example
from oups import AsofMerger
prefixes = [["L0_", "L1_"], ["R0_", "R1_"]]
merger = AsofMerger(on="timestamp", prefixes=prefixes, n_prev=[1, 0])
result = merger.merge(main=main_values, df_groups=df_groups, combinations=combinations)
Notes
mainmust be monotonically increasing.DataFrames’
oncolumns must be sorted ascending.When
copy=Falseacross iterations, copy results before the next call.When yielding data chunks via
Store.iter_intersections(..., n_prev=...)so that each chunk includes enough preceding rows, you typically do not need to bind_fill_valuesfor resumability; binding_confis sufficient.
Integration with Store and StatefulLoop
AsofMerger is designed to compose with Store.iter_intersections and
StatefulLoop for resumable pipelines.
Minimal example:
from pathlib import Path
import tempfile
import pandas as pd
from pandas import Timedelta, Timestamp, date_range
from oups import Store, toplevel
from oups import StatefulLoop
from oups import AsofMerger
# Define a simple indexer for Store keys
@toplevel
class Indexer:
name: str
with tempfile.TemporaryDirectory() as tmpdir:
basepath = Path(tmpdir) / "store"
store = Store(basepath, Indexer)
key_a = Indexer("A")
key_b = Indexer("B")
out_key = Indexer("out")
# Two small inputs (15min vs 30min cadence)
t0 = Timestamp("2025-01-01 10:00")
ts15 = date_range(t0, t0 + Timedelta(hours=1), freq="15min", inclusive="left")
ts30 = date_range(t0, t0 + Timedelta(hours=1), freq="30min", inclusive="left")
df_a = pd.DataFrame({"timestamp": ts15, "a": range(len(ts15))})
df_b = pd.DataFrame({"timestamp": ts30, "b": [10 * v for v in range(len(ts30))]})
store[key_a].write(ordered_on="timestamp", df=df_a, row_group_target_size=2)
store[key_b].write(ordered_on="timestamp", df=df_b, row_group_target_size=2)
# Set up loop and merger; bind only _conf (no need to bind _fill_values)
loop = StatefulLoop(Path(tmpdir) / "loop_state.pkl")
merger = AsofMerger(on="timestamp", n_dfs_per_group=2, allow_exact_matches=[True, True])
loop.bind_object_state(merger, state=["_conf"]) # _fill_values usually not needed
for item_ctx in loop.iterate(
store.iter_intersections(keys=[key_a, key_b], start=t0, n_prev=[1, 0]),
check_loop_usage=False,
):
with item_ctx as chunk:
dfa, dfb = chunk[key_a], chunk[key_b]
if dfa.empty and dfb.empty:
continue
# Main grid across current chunk span (15min grid, left-inclusive)
span_end = max(dfa.timestamp.max(), dfb.timestamp.max()) + Timedelta("15min")
main = date_range(dfa.timestamp.min(), span_end, freq="15min", inclusive="left").to_numpy()
merged = merger.merge(main=main, df_groups=[dfa, dfb], check_sorted=True)
out_map = loop.buffer({"merged": merged})
store[out_key].write(ordered_on="timestamp", df=out_map["merged"], row_group_target_size=2)
# Validate against one-shot expected
out_df = store[out_key].to_pandas()
expected = AsofMerger(on="timestamp", n_dfs_per_group=2, allow_exact_matches=[True, True]).merge(
main=date_range(ts15.min(), ts15.max() + Timedelta("15min"), freq="15min", inclusive="left").to_numpy(),
df_groups=[df_a, df_b],
check_sorted=True,
)
assert out_df.equals(expected)
Full reference implementation
For a complete, tested integration showcasing resumability and row-group assertions, see the integration test:
tests/test_integration/test_asof_merger.py
See also
Concepts: Stateful Processing Concepts
Quickstart: Quickstart
StatefulLoop: StatefulLoop
Store: Store Architecture
SegmentedAggregator: SegmentedAggregator (planned)