AsofMerger

AsofMerger performs vectorized as-of joins across multiple groups and multiple dataframes per group, optionally with previous values.

Highlights

  • Multiple groups, multiple join positions per output row

  • Previous values per dataframe via n_prev with guard rows

  • Iterative usage by reusing internal buffers (set copy=False with care)

Example

from oups import AsofMerger

prefixes = [["L0_", "L1_"], ["R0_", "R1_"]]
merger = AsofMerger(on="timestamp", prefixes=prefixes, n_prev=[1, 0])
result = merger.merge(main=main_values, df_groups=df_groups, combinations=combinations)

Notes

  • main must be monotonically increasing.

  • DataFrames’ on columns must be sorted ascending.

  • When copy=False across iterations, copy results before the next call.

  • When yielding data chunks via Store.iter_intersections(..., n_prev=...) so that each chunk includes enough preceding rows, you typically do not need to bind _fill_values for resumability; binding _conf is sufficient.

Integration with Store and StatefulLoop

AsofMerger is designed to compose with Store.iter_intersections and StatefulLoop for resumable pipelines.

Minimal example:

from pathlib import Path
import tempfile
import pandas as pd
from pandas import Timedelta, Timestamp, date_range
from oups import Store, toplevel
from oups import StatefulLoop
from oups import AsofMerger

# Define a simple indexer for Store keys
@toplevel
class Indexer:
    name: str

with tempfile.TemporaryDirectory() as tmpdir:
    basepath = Path(tmpdir) / "store"
    store = Store(basepath, Indexer)
    key_a = Indexer("A")
    key_b = Indexer("B")
    out_key = Indexer("out")

    # Two small inputs (15min vs 30min cadence)
    t0 = Timestamp("2025-01-01 10:00")
    ts15 = date_range(t0, t0 + Timedelta(hours=1), freq="15min", inclusive="left")
    ts30 = date_range(t0, t0 + Timedelta(hours=1), freq="30min", inclusive="left")
    df_a = pd.DataFrame({"timestamp": ts15, "a": range(len(ts15))})
    df_b = pd.DataFrame({"timestamp": ts30, "b": [10 * v for v in range(len(ts30))]})
    store[key_a].write(ordered_on="timestamp", df=df_a, row_group_target_size=2)
    store[key_b].write(ordered_on="timestamp", df=df_b, row_group_target_size=2)

    # Set up loop and merger; bind only _conf (no need to bind _fill_values)
    loop = StatefulLoop(Path(tmpdir) / "loop_state.pkl")
    merger = AsofMerger(on="timestamp", n_dfs_per_group=2, allow_exact_matches=[True, True])
    loop.bind_object_state(merger, state=["_conf"])  # _fill_values usually not needed

    for item_ctx in loop.iterate(
        store.iter_intersections(keys=[key_a, key_b], start=t0, n_prev=[1, 0]),
        check_loop_usage=False,
    ):
        with item_ctx as chunk:
            dfa, dfb = chunk[key_a], chunk[key_b]
            if dfa.empty and dfb.empty:
                continue
            # Main grid across current chunk span (15min grid, left-inclusive)
            span_end = max(dfa.timestamp.max(), dfb.timestamp.max()) + Timedelta("15min")
            main = date_range(dfa.timestamp.min(), span_end, freq="15min", inclusive="left").to_numpy()
            merged = merger.merge(main=main, df_groups=[dfa, dfb], check_sorted=True)
            out_map = loop.buffer({"merged": merged})
            store[out_key].write(ordered_on="timestamp", df=out_map["merged"], row_group_target_size=2)

    # Validate against one-shot expected
    out_df = store[out_key].to_pandas()
    expected = AsofMerger(on="timestamp", n_dfs_per_group=2, allow_exact_matches=[True, True]).merge(
        main=date_range(ts15.min(), ts15.max() + Timedelta("15min"), freq="15min", inclusive="left").to_numpy(),
        df_groups=[df_a, df_b],
        check_sorted=True,
    )
    assert out_df.equals(expected)

Full reference implementation

For a complete, tested integration showcasing resumability and row-group assertions, see the integration test:

tests/test_integration/test_asof_merger.py

See also