Store Architecture

The oups.store module provides the core functionality for managing collections of ordered parquet datasets. It consists of three main components working together to provide efficient storage, indexing, and querying of time-series data.

Overview

The store architecture is designed around three key components:

  1. Indexer: Provides a schema-based indexing system for organizing datasets

  2. OrderedParquetDataset: Manages individual parquet datasets with ordering validation

  3. Store: Provides a collection interface for multiple indexed datasets

Types and Parameters

  • Ordered column (``ordered_on``): accepts a single column name (str) or a multi-index column name (tuple[str]).

  • Row group target size: accepts an int (target number of rows) or a Pandas frequency string (e.g., "1D") for time-based grouping.

  • Key-value metadata: arbitrary dict[str, str] stored alongside parquet files; used for provenance and processing parameters.

Main Components

Indexer

The indexer system allows you to define hierarchical schemas for organizing your datasets using dataclasses decorated with @toplevel and optionally @sublevel. This provides a structured way to organize related datasets in common directories.

Motivation

Datasets are gathered in a parent directory as a collection. Each materializes as parquet files located in a child directory whose naming is derived from a user-defined index. By formalizing this index through dataclasses, index management (user scope) is dissociated from path management (oups scope).

Decorators

oups provides 2 class decorators for defining an indexing logic:

  • @toplevel is compulsory, and defines naming logic of the first directory level

  • @sublevel is optional, and can be used as many times as needed for sub-directories

@toplevel Decorator

The @toplevel decorator:

  • Generates paths from attribute values (__str__ and to_path methods)

  • Generates class instances (from_path classmethod)

  • Validates attribute values at instantiation

  • Calls @dataclass with order and frozen parameters set as True

  • Accepts an optional fields_sep parameter (default -) to define field separators

  • Only accepts int or str attribute types

  • If an attribute is a @sublevel-decorated class, it must be positioned last

@sublevel Decorator

The @sublevel decorator:

  • Is an alias for @dataclass with order and frozen set as True

  • Only accepts int or str attribute types

  • If another deeper sub-level is defined, it must be positioned as last attribute

Hierarchical Example

 from oups.store import toplevel, sublevel

 @sublevel
 class Sampling:
     frequency: str

 @toplevel
 class Measure:
     quantity: str
     city: str
     sampling: Sampling

 # Define different indexes for temperature in Berlin
 berlin_1D = Measure('temperature', 'berlin', Sampling('1D'))
 berlin_1W = Measure('temperature', 'berlin', Sampling('1W'))

 # When this indexer is connected to a Store, the directory structure will look like:
 # temperature-berlin/
 # ├── 1D/
 # │   ├── file_0000.parquet
 # │   └── file_0001.parquet
 # └── 1W/
 #     ├── file_0000.parquet
 #     └── file_0001.parquet

Simple Example

 from oups.store import toplevel

 @toplevel
 class CityDateIndex:
     city: str
     date: str

 # This creates a schema where datasets are organized as:
 # city-date/ (e.g., "berlin-2023.01.01/")

OrderedParquetDataset

OrderedParquetDataset is the core class for managing individual parquet datasets with strict ordering validation. It provides:

Key Features:

  • Ordered Storage: Data is stored in row groups ordered by a specified column

  • Incremental Updates: Efficiently merge new data with existing data

  • Row Group Management: Automatic splitting and merging of row groups

  • Metadata Tracking: Comprehensive metadata for each row group

  • Metadata Updates: Add, update, or remove custom key-value metadata

  • Duplicate Handling: Configurable duplicate detection and removal

  • Write Optimization: Configurable row group sizes and merge strategies

File Structure:

parent_directory/
├── my_dataset/                # Dataset directory
│   ├── file_0000.parquet      # Row group files
│   └── file_0001.parquet
├── my_dataset_opdmd           # Metadata file
└── my_dataset.lock            # Lock file

Example:

 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups.store import OrderedParquetDataset

 with tempfile.TemporaryDirectory() as tmpdir:
     # Create or load a dataset
     dataset = OrderedParquetDataset(str(Path(tmpdir) / "dataset"), ordered_on="timestamp")

     # Write data
     df = pd.DataFrame({
         "timestamp": pd.date_range("2023-01-01", periods=3),
         "value": range(3),
     })
     dataset.write(df=df)

     # Read data back
     result = dataset.to_pandas()
     assert len(result) == 3

Store

The Store class provides a collection interface for managing multiple OrderedParquetDataset instances organized according to an indexer schema.

Key Features:

  • Schema-based Organization: Uses indexer schemas for dataset discovery

  • Lazy Loading: Datasets are loaded on-demand

  • Collection Interface: Dictionary-like access to datasets

  • Cross-dataset Operations: Advanced querying across multiple datasets

  • Automatic Discovery: Finds existing datasets matching the schema

Example:

 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups.store import Store
 from oups.store import toplevel

 @toplevel
 class CityYearIndex:
     city: str
     year: str

 with tempfile.TemporaryDirectory() as tmpdir:
     # Create store in a temporary directory
     store = Store(Path(tmpdir) / "data", CityYearIndex)
     # Access datasets via indexing (example):
     # berlin_2023 = store[CityYearIndex("berlin", "2023")]
     # Iterate over all datasets (example):
     # for key in store:
     #     dataset = store[key]
     #     print(f"Dataset {key} has {len(dataset)} row groups")

     # Basic write via Store[key]
     berlin_key = CityYearIndex("berlin", "2023")
     df = pd.DataFrame({
         "timestamp": pd.date_range("2023-01-01", periods=3, freq="D"),
         "temperature": [20, 21, 22],
         "humidity": [30, 31, 32],
     })
     store[berlin_key].write(df=df, ordered_on="timestamp")
     # Read back
     result_df = store[berlin_key].to_pandas()
     assert len(result_df) == 3

Advanced Features

Write Method (preferred)

The preferred way to write data is to call write() on an OrderedParquetDataset obtained via Store[key]. This provides advanced data writing capabilities:

Parameters:

  • row_group_target_size: Control row group sizes (int or pandas frequency string)

  • duplicates_on: Specify columns for duplicate detection

  • max_n_off_target_rgs: Control row group coalescing behavior

  • key_value_metadata: Store custom metadata (supports add/update/remove operations)

Example:

 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups.store import Store, toplevel

 @toplevel
 class CityYearIndex:
     city: str
     year: str

 with tempfile.TemporaryDirectory() as tmpdir:
     store = Store(Path(tmpdir) / "data", CityYearIndex)
     berlin_key = CityYearIndex("berlin", "2023")
     df = pd.DataFrame({
         "timestamp": pd.date_range("2023-01-01", periods=2, freq="D"),
         "temperature": [20, 21],
     })
     # Write with time-based row groups and metadata
     store[berlin_key].write(
         df=df,
         ordered_on="timestamp",
         row_group_target_size="1D",
         duplicates_on=["timestamp"],
         key_value_metadata={
             "source": "weather_station",
             "version": "2.1",
             "processed_by": "stateful_loop",
         },
     )

     # Update existing metadata (add new, update existing, remove with None)
     new_df = pd.DataFrame({
         "timestamp": pd.date_range("2023-01-03", periods=1, freq="D"),
         "temperature": [22],
     })
     store[berlin_key].write(
         df=new_df,
         ordered_on="timestamp",
         key_value_metadata={
             "version": "2.2",
             "last_updated": "2023-12-01",
             "processed_by": None,
         },
     )

Advanced: top-level write helper

For advanced or scripting use-cases, the top-level helper oups.store.write provides an equivalent API that writes directly to a path or an existing OrderedParquetDataset instance. Prefer store[key].write(...) for typical workflows; use the helper when you need to write without constructing a Store.

 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups.store import write, OrderedParquetDataset

 with tempfile.TemporaryDirectory() as tmpdir:
     path = Path(tmpdir) / "dataset"
     df = pd.DataFrame({
         "timestamp": pd.date_range("2023-01-01", periods=2, freq="D"),
         "value": [1, 2],
     })
     write(
         path,
         ordered_on="timestamp",
         df=df,
         row_group_target_size="1D",
         duplicates_on=["timestamp"],
     )
     # Verify
     ds = OrderedParquetDataset(str(path), ordered_on="timestamp")
     assert len(ds.to_pandas()) == 2

Incremental Updates

 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups.store import Store, toplevel

 @toplevel
 class CityYearIndex:
     city: str
     year: str

 with tempfile.TemporaryDirectory() as tmpdir:
     store = Store(Path(tmpdir) / "data", CityYearIndex)
     berlin_key = CityYearIndex("berlin", "2023")
     df = pd.DataFrame({
         "timestamp": pd.date_range("2023-02-01", periods=2, freq="D"),
         "temperature": [15, 16],
         "humidity": [40, 41],
     })
     store[berlin_key].write(df=df, ordered_on="timestamp")
     # Append newer rows with correct ordering
     new_df = pd.DataFrame({
         "timestamp": pd.date_range("2023-02-03", periods=2, freq="D"),
         "temperature": [17, 18],
         "humidity": [42, 43],
     })
     store[berlin_key].write(df=new_df, ordered_on="timestamp")
     result = store[berlin_key].to_pandas()
     assert len(result) == 4

iter_intersections

The iter_intersections() method enables efficient querying across multiple datasets with overlapping ranges:

Key Features:

  • Range Queries: Query specific ranges (time, numeric, etc.) across multiple datasets

  • Intersection Detection: Automatically finds overlapping row groups

  • Memory Efficient: Streams data without loading entire datasets

  • Synchronized Iteration: Iterates through multiple datasets in sync

  • Warm-up with Previous Rows (n_prev): Prepend up to n_prev previous rows to the first yielded chunk for each dataset, if available. Accepts either a single int (applied to all datasets) or a list of int values matching the keys order.

Example:

 from pathlib import Path
 import tempfile
 import pandas as pd
 from oups.store import Store, toplevel

 @toplevel
 class CityYearIndex:
     city: str
     year: str

 with tempfile.TemporaryDirectory() as tmpdir:
     store = Store(Path(tmpdir) / "data", CityYearIndex)
     keys = [CityYearIndex("berlin", "2023"), CityYearIndex("paris", "2023")]
     # Seed small datasets with overlapping dates
     df_berlin = pd.DataFrame({
         "timestamp": pd.date_range("2023-01-01", periods=2, freq="D"),
         "v": [1, 2],
     })
     df_paris = pd.DataFrame({
         "timestamp": pd.date_range("2023-01-02", periods=2, freq="D"),
         "v": [3, 4],
     })
     store[keys[0]].write(df=df_berlin, ordered_on="timestamp")
     store[keys[1]].write(df=df_paris, ordered_on="timestamp")

     found = list(store.iter_intersections(
         keys,
         start=pd.Timestamp("2023-01-01"),
         n_prev=[1, 0],
         end_excl=pd.Timestamp("2023-01-04"),
     ))
     assert isinstance(found, list)

Best Practices

  1. Indexer Design: Design your indexer schema to match your data access patterns

  2. Ordered Column: Choose an appropriate column for ordering (typically timestamp)

  3. Row Group Size: Balance between query performance and storage efficiency

  4. Duplicate Handling: Use duplicates_on when data quality is a concern

  5. Metadata: Use key-value metadata to store important dataset information

Comparison with manual approach

Traditional manual parquet management:

# Manual path management
path = f"/data/{city}/{year}/{month}/data.parquet"

# Manual duplicate handling
existing_df = pd.read_parquet(path)
new_df = pd.concat([existing_df, new_data])
new_df = new_df.drop_duplicates().sort_values('timestamp')
new_df.to_parquet(path)

With oups Store:

@toplevel
class DataIndex:
    city: str
    year: str
    month: str

store = Store("/data", DataIndex)
key = DataIndex("berlin", "2023", "01")

# Automatic path management, duplicate handling, and ordering
store[key].write(
    df=new_data,
    ordered_on='timestamp',
    duplicates_on=['timestamp', 'street']
)

Alternatives

Several alternatives exist for managing dataset collections:

Arctic (MongoDB-based)
  • Provides powerful time-series storage

  • Requires MongoDB infrastructure

  • More complex deployment and maintenance

PyStore (Dask-based)
  • Supports parallelized operations

  • Less flexible organizational schemas

  • Performance concerns in some scenarios

DuckDB or DataFusion
  • Excellent query performance

  • SQL-based querying

Direct Parquet + File Management
  • Maximum control over file structure

  • Requires implementing indexing, updates, and concurrency manually

  • This is how oups started

See Also