AsofMerger ========== ``AsofMerger`` performs vectorized as-of joins across multiple groups and multiple dataframes per group, optionally with previous values. Highlights ---------- - Multiple groups, multiple join positions per output row - Previous values per dataframe via ``n_prev`` with guard rows - Iterative usage by reusing internal buffers (set ``copy=False`` with care) Example ------- .. code-block:: python from oups import AsofMerger prefixes = [["L0_", "L1_"], ["R0_", "R1_"]] merger = AsofMerger(on="timestamp", prefixes=prefixes, n_prev=[1, 0]) result = merger.merge(main=main_values, df_groups=df_groups, combinations=combinations) Notes ----- - ``main`` must be monotonically increasing. - DataFrames' ``on`` columns must be sorted ascending. - When ``copy=False`` across iterations, copy results before the next call. - When yielding data chunks via ``Store.iter_intersections(..., n_prev=...)`` so that each chunk includes enough preceding rows, you typically do not need to bind ``_fill_values`` for resumability; binding ``_conf`` is sufficient. Integration with Store and StatefulLoop --------------------------------------- ``AsofMerger`` is designed to compose with ``Store.iter_intersections`` and ``StatefulLoop`` for resumable pipelines. Minimal example: .. code-block:: python :name: test_asof_merger_integration from pathlib import Path import tempfile import pandas as pd from pandas import Timedelta, Timestamp, date_range from oups import Store, toplevel from oups import StatefulLoop from oups import AsofMerger # Define a simple indexer for Store keys @toplevel class Indexer: name: str with tempfile.TemporaryDirectory() as tmpdir: basepath = Path(tmpdir) / "store" store = Store(basepath, Indexer) key_a = Indexer("A") key_b = Indexer("B") out_key = Indexer("out") # Two small inputs (15min vs 30min cadence) t0 = Timestamp("2025-01-01 10:00") ts15 = date_range(t0, t0 + Timedelta(hours=1), freq="15min", inclusive="left") ts30 = date_range(t0, t0 + Timedelta(hours=1), freq="30min", inclusive="left") df_a = pd.DataFrame({"timestamp": ts15, "a": range(len(ts15))}) df_b = pd.DataFrame({"timestamp": ts30, "b": [10 * v for v in range(len(ts30))]}) store[key_a].write(ordered_on="timestamp", df=df_a, row_group_target_size=2) store[key_b].write(ordered_on="timestamp", df=df_b, row_group_target_size=2) # Set up loop and merger; bind only _conf (no need to bind _fill_values) loop = StatefulLoop(Path(tmpdir) / "loop_state.pkl") merger = AsofMerger(on="timestamp", n_dfs_per_group=2, allow_exact_matches=[True, True]) loop.bind_object_state(merger, state=["_conf"]) # _fill_values usually not needed for item_ctx in loop.iterate( store.iter_intersections(keys=[key_a, key_b], start=t0, n_prev=[1, 0]), check_loop_usage=False, ): with item_ctx as chunk: dfa, dfb = chunk[key_a], chunk[key_b] if dfa.empty and dfb.empty: continue # Main grid across current chunk span (15min grid, left-inclusive) span_end = max(dfa.timestamp.max(), dfb.timestamp.max()) + Timedelta("15min") main = date_range(dfa.timestamp.min(), span_end, freq="15min", inclusive="left").to_numpy() merged = merger.merge(main=main, df_groups=[dfa, dfb], check_sorted=True) out_map = loop.buffer({"merged": merged}) store[out_key].write(ordered_on="timestamp", df=out_map["merged"], row_group_target_size=2) # Validate against one-shot expected out_df = store[out_key].to_pandas() expected = AsofMerger(on="timestamp", n_dfs_per_group=2, allow_exact_matches=[True, True]).merge( main=date_range(ts15.min(), ts15.max() + Timedelta("15min"), freq="15min", inclusive="left").to_numpy(), df_groups=[df_a, df_b], check_sorted=True, ) assert out_df.equals(expected) Full reference implementation ----------------------------- For a complete, tested integration showcasing resumability and row-group assertions, see the integration test: ``tests/test_integration/test_asof_merger.py`` See also -------- - Concepts: :doc:`stateful_processing` - Quickstart: :doc:`quickstart` - StatefulLoop: :doc:`stateful_loop` - Store: :doc:`store` - SegmentedAggregator: :doc:`segmented_aggregator`