Store Architecture ================== The ``oups.store`` module provides the core functionality for managing collections of ordered parquet datasets. It consists of three main components working together to provide efficient storage, indexing, and querying of time-series data. Overview -------- The store architecture is designed around three key components: 1. **Indexer**: Provides a schema-based indexing system for organizing datasets 2. **OrderedParquetDataset**: Manages individual parquet datasets with ordering validation 3. **Store**: Provides a collection interface for multiple indexed datasets Types and Parameters -------------------- - **Ordered column (``ordered_on``)**: accepts a single column name (``str``) or a multi-index column name (``tuple[str]``). - **Row group target size**: accepts an ``int`` (target number of rows) or a Pandas frequency string (e.g., ``"1D"``) for time-based grouping. - **Key-value metadata**: arbitrary ``dict[str, str]`` stored alongside parquet files; used for provenance and processing parameters. Main Components --------------- Indexer ~~~~~~~ The indexer system allows you to define hierarchical schemas for organizing your datasets using dataclasses decorated with ``@toplevel`` and optionally ``@sublevel``. This provides a structured way to organize related datasets in common directories. **Motivation** Datasets are gathered in a parent directory as a collection. Each materializes as parquet files located in a child directory whose naming is derived from a user-defined index. By formalizing this index through dataclasses, index management (user scope) is dissociated from path management (*oups* scope). **Decorators** *oups* provides 2 class decorators for defining an indexing logic: - ``@toplevel`` is compulsory, and defines naming logic of the first directory level - ``@sublevel`` is optional, and can be used as many times as needed for sub-directories **@toplevel Decorator** The ``@toplevel`` decorator: - Generates *paths* from attribute values (``__str__`` and ``to_path`` methods) - Generates class instances (``from_path`` classmethod) - Validates attribute values at instantiation - Calls ``@dataclass`` with ``order`` and ``frozen`` parameters set as ``True`` - Accepts an optional ``fields_sep`` parameter (default ``-``) to define field separators - Only accepts ``int`` or ``str`` attribute types - If an attribute is a ``@sublevel``-decorated class, it must be positioned last **@sublevel Decorator** The ``@sublevel`` decorator: - Is an alias for ``@dataclass`` with ``order`` and ``frozen`` set as ``True`` - Only accepts ``int`` or ``str`` attribute types - If another deeper sub-level is defined, it must be positioned as last attribute **Hierarchical Example** .. code-block:: python :name: test_store_indexer_hierarchy from oups.store import toplevel, sublevel @sublevel class Sampling: frequency: str @toplevel class Measure: quantity: str city: str sampling: Sampling # Define different indexes for temperature in Berlin berlin_1D = Measure('temperature', 'berlin', Sampling('1D')) berlin_1W = Measure('temperature', 'berlin', Sampling('1W')) # When this indexer is connected to a Store, the directory structure will look like: # temperature-berlin/ # ├── 1D/ # │ ├── file_0000.parquet # │ └── file_0001.parquet # └── 1W/ # ├── file_0000.parquet # └── file_0001.parquet **Simple Example** .. code-block:: python :name: test_store_indexer_simple from oups.store import toplevel @toplevel class CityDateIndex: city: str date: str # This creates a schema where datasets are organized as: # city-date/ (e.g., "berlin-2023.01.01/") OrderedParquetDataset ~~~~~~~~~~~~~~~~~~~~~ ``OrderedParquetDataset`` is the core class for managing individual parquet datasets with strict ordering validation. It provides: **Key Features:** - **Ordered Storage**: Data is stored in row groups ordered by a specified column - **Incremental Updates**: Efficiently merge new data with existing data - **Row Group Management**: Automatic splitting and merging of row groups - **Metadata Tracking**: Comprehensive metadata for each row group - **Metadata Updates**: Add, update, or remove custom key-value metadata - **Duplicate Handling**: Configurable duplicate detection and removal - **Write Optimization**: Configurable row group sizes and merge strategies **File Structure:** .. code-block:: parent_directory/ ├── my_dataset/ # Dataset directory │ ├── file_0000.parquet # Row group files │ └── file_0001.parquet ├── my_dataset_opdmd # Metadata file └── my_dataset.lock # Lock file **Example:** .. code-block:: python :name: test_store_ordered_parquet_dataset_example from pathlib import Path import tempfile import pandas as pd from oups.store import OrderedParquetDataset with tempfile.TemporaryDirectory() as tmpdir: # Create or load a dataset dataset = OrderedParquetDataset(str(Path(tmpdir) / "dataset"), ordered_on="timestamp") # Write data df = pd.DataFrame({ "timestamp": pd.date_range("2023-01-01", periods=3), "value": range(3), }) dataset.write(df=df) # Read data back result = dataset.to_pandas() assert len(result) == 3 Store ~~~~~ The ``Store`` class provides a collection interface for managing multiple ``OrderedParquetDataset`` instances organized according to an indexer schema. **Key Features:** - **Schema-based Organization**: Uses indexer schemas for dataset discovery - **Lazy Loading**: Datasets are loaded on-demand - **Collection Interface**: Dictionary-like access to datasets - **Cross-dataset Operations**: Advanced querying across multiple datasets - **Automatic Discovery**: Finds existing datasets matching the schema **Example:** .. code-block:: python :name: test_store_basic_usage from pathlib import Path import tempfile import pandas as pd from oups.store import Store from oups.store import toplevel @toplevel class CityYearIndex: city: str year: str with tempfile.TemporaryDirectory() as tmpdir: # Create store in a temporary directory store = Store(Path(tmpdir) / "data", CityYearIndex) # Access datasets via indexing (example): # berlin_2023 = store[CityYearIndex("berlin", "2023")] # Iterate over all datasets (example): # for key in store: # dataset = store[key] # print(f"Dataset {key} has {len(dataset)} row groups") # Basic write via Store[key] berlin_key = CityYearIndex("berlin", "2023") df = pd.DataFrame({ "timestamp": pd.date_range("2023-01-01", periods=3, freq="D"), "temperature": [20, 21, 22], "humidity": [30, 31, 32], }) store[berlin_key].write(df=df, ordered_on="timestamp") # Read back result_df = store[berlin_key].to_pandas() assert len(result_df) == 3 Advanced Features ----------------- Write Method (preferred) ~~~~~~~~~~~~~~~~~~~~~~~~ The preferred way to write data is to call ``write()`` on an ``OrderedParquetDataset`` obtained via ``Store[key]``. This provides advanced data writing capabilities: **Parameters:** - ``row_group_target_size``: Control row group sizes (int or pandas frequency string) - ``duplicates_on``: Specify columns for duplicate detection - ``max_n_off_target_rgs``: Control row group coalescing behavior - ``key_value_metadata``: Store custom metadata (supports add/update/remove operations) **Example:** .. code-block:: python :name: test_store_write_params from pathlib import Path import tempfile import pandas as pd from oups.store import Store, toplevel @toplevel class CityYearIndex: city: str year: str with tempfile.TemporaryDirectory() as tmpdir: store = Store(Path(tmpdir) / "data", CityYearIndex) berlin_key = CityYearIndex("berlin", "2023") df = pd.DataFrame({ "timestamp": pd.date_range("2023-01-01", periods=2, freq="D"), "temperature": [20, 21], }) # Write with time-based row groups and metadata store[berlin_key].write( df=df, ordered_on="timestamp", row_group_target_size="1D", duplicates_on=["timestamp"], key_value_metadata={ "source": "weather_station", "version": "2.1", "processed_by": "stateful_loop", }, ) # Update existing metadata (add new, update existing, remove with None) new_df = pd.DataFrame({ "timestamp": pd.date_range("2023-01-03", periods=1, freq="D"), "temperature": [22], }) store[berlin_key].write( df=new_df, ordered_on="timestamp", key_value_metadata={ "version": "2.2", "last_updated": "2023-12-01", "processed_by": None, }, ) Advanced: top-level write helper ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For advanced or scripting use-cases, the top-level helper ``oups.store.write`` provides an equivalent API that writes directly to a path or an existing ``OrderedParquetDataset`` instance. Prefer ``store[key].write(...)`` for typical workflows; use the helper when you need to write without constructing a ``Store``. .. code-block:: python :name: test_store_write_helper from pathlib import Path import tempfile import pandas as pd from oups.store import write, OrderedParquetDataset with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "dataset" df = pd.DataFrame({ "timestamp": pd.date_range("2023-01-01", periods=2, freq="D"), "value": [1, 2], }) write( path, ordered_on="timestamp", df=df, row_group_target_size="1D", duplicates_on=["timestamp"], ) # Verify ds = OrderedParquetDataset(str(path), ordered_on="timestamp") assert len(ds.to_pandas()) == 2 Incremental Updates ~~~~~~~~~~~~~~~~~~~ .. code-block:: python :name: test_store_incremental from pathlib import Path import tempfile import pandas as pd from oups.store import Store, toplevel @toplevel class CityYearIndex: city: str year: str with tempfile.TemporaryDirectory() as tmpdir: store = Store(Path(tmpdir) / "data", CityYearIndex) berlin_key = CityYearIndex("berlin", "2023") df = pd.DataFrame({ "timestamp": pd.date_range("2023-02-01", periods=2, freq="D"), "temperature": [15, 16], "humidity": [40, 41], }) store[berlin_key].write(df=df, ordered_on="timestamp") # Append newer rows with correct ordering new_df = pd.DataFrame({ "timestamp": pd.date_range("2023-02-03", periods=2, freq="D"), "temperature": [17, 18], "humidity": [42, 43], }) store[berlin_key].write(df=new_df, ordered_on="timestamp") result = store[berlin_key].to_pandas() assert len(result) == 4 iter_intersections ~~~~~~~~~~~~~~~~~~ The ``iter_intersections()`` method enables efficient querying across multiple datasets with overlapping ranges: **Key Features:** - **Range Queries**: Query specific ranges (time, numeric, etc.) across multiple datasets - **Intersection Detection**: Automatically finds overlapping row groups - **Memory Efficient**: Streams data without loading entire datasets - **Synchronized Iteration**: Iterates through multiple datasets in sync - **Warm-up with Previous Rows (n_prev)**: Prepend up to ``n_prev`` previous rows to the first yielded chunk for each dataset, if available. Accepts either a single ``int`` (applied to all datasets) or a list of ``int`` values matching the keys order. **Example:** .. code-block:: python :name: test_store_manual_vs_store from pathlib import Path import tempfile import pandas as pd from oups.store import Store, toplevel @toplevel class CityYearIndex: city: str year: str with tempfile.TemporaryDirectory() as tmpdir: store = Store(Path(tmpdir) / "data", CityYearIndex) keys = [CityYearIndex("berlin", "2023"), CityYearIndex("paris", "2023")] # Seed small datasets with overlapping dates df_berlin = pd.DataFrame({ "timestamp": pd.date_range("2023-01-01", periods=2, freq="D"), "v": [1, 2], }) df_paris = pd.DataFrame({ "timestamp": pd.date_range("2023-01-02", periods=2, freq="D"), "v": [3, 4], }) store[keys[0]].write(df=df_berlin, ordered_on="timestamp") store[keys[1]].write(df=df_paris, ordered_on="timestamp") found = list(store.iter_intersections( keys, start=pd.Timestamp("2023-01-01"), n_prev=[1, 0], end_excl=pd.Timestamp("2023-01-04"), )) assert isinstance(found, list) Best Practices -------------- 1. **Indexer Design**: Design your indexer schema to match your data access patterns 2. **Ordered Column**: Choose an appropriate column for ordering (typically timestamp) 3. **Row Group Size**: Balance between query performance and storage efficiency 4. **Duplicate Handling**: Use ``duplicates_on`` when data quality is a concern 5. **Metadata**: Use key-value metadata to store important dataset information Comparison with manual approach ------------------------------- **Traditional manual parquet management:** .. code-block:: python # Manual path management path = f"/data/{city}/{year}/{month}/data.parquet" # Manual duplicate handling existing_df = pd.read_parquet(path) new_df = pd.concat([existing_df, new_data]) new_df = new_df.drop_duplicates().sort_values('timestamp') new_df.to_parquet(path) **With oups Store:** .. code-block:: python @toplevel class DataIndex: city: str year: str month: str store = Store("/data", DataIndex) key = DataIndex("berlin", "2023", "01") # Automatic path management, duplicate handling, and ordering store[key].write( df=new_data, ordered_on='timestamp', duplicates_on=['timestamp', 'street'] ) Alternatives ------------ Several alternatives exist for managing dataset collections: **Arctic (MongoDB-based)** - Provides powerful time-series storage - Requires MongoDB infrastructure - More complex deployment and maintenance **PyStore (Dask-based)** - Supports parallelized operations - Less flexible organizational schemas - `Performance concerns `_ in some scenarios **DuckDB or DataFusion** - Excellent query performance - SQL-based querying **Direct Parquet + File Management** - Maximum control over file structure - Requires implementing indexing, updates, and concurrency manually - This is how *oups* started See Also -------- - :doc:`api` - Complete API reference - :doc:`quickstart` - End-to-end example across stateful_loop, stateful_ops, store