Store Architecture
The oups.store module provides the core functionality for managing collections of ordered parquet datasets. It consists of three main components working together to provide efficient storage, indexing, and querying of time-series data.
Overview
The store architecture is designed around three key components:
Indexer: Provides a schema-based indexing system for organizing datasets
OrderedParquetDataset: Manages individual parquet datasets with ordering validation
Store: Provides a collection interface for multiple indexed datasets
Types and Parameters
Ordered column (``ordered_on``): accepts a single column name (
str) or a multi-index column name (tuple[str]).Row group target size: accepts an
int(target number of rows) or a Pandas frequency string (e.g.,"1D") for time-based grouping.Key-value metadata: arbitrary
dict[str, str]stored alongside parquet files; used for provenance and processing parameters.
Main Components
Indexer
The indexer system allows you to define hierarchical schemas for organizing your datasets using dataclasses decorated with @toplevel and optionally @sublevel. This provides a structured way to organize related datasets in common directories.
Motivation
Datasets are gathered in a parent directory as a collection. Each materializes as parquet files located in a child directory whose naming is derived from a user-defined index. By formalizing this index through dataclasses, index management (user scope) is dissociated from path management (oups scope).
Decorators
oups provides 2 class decorators for defining an indexing logic:
@toplevelis compulsory, and defines naming logic of the first directory level@sublevelis optional, and can be used as many times as needed for sub-directories
@toplevel Decorator
The @toplevel decorator:
Generates paths from attribute values (
__str__andto_pathmethods)Generates class instances (
from_pathclassmethod)Validates attribute values at instantiation
Calls
@dataclasswithorderandfrozenparameters set asTrueAccepts an optional
fields_sepparameter (default-) to define field separatorsOnly accepts
intorstrattribute typesIf an attribute is a
@sublevel-decorated class, it must be positioned last
@sublevel Decorator
The @sublevel decorator:
Is an alias for
@dataclasswithorderandfrozenset asTrueOnly accepts
intorstrattribute typesIf another deeper sub-level is defined, it must be positioned as last attribute
Hierarchical Example
from oups.store import toplevel, sublevel
@sublevel
class Sampling:
frequency: str
@toplevel
class Measure:
quantity: str
city: str
sampling: Sampling
# Define different indexes for temperature in Berlin
berlin_1D = Measure('temperature', 'berlin', Sampling('1D'))
berlin_1W = Measure('temperature', 'berlin', Sampling('1W'))
# When this indexer is connected to a Store, the directory structure will look like:
# temperature-berlin/
# ├── 1D/
# │ ├── file_0000.parquet
# │ └── file_0001.parquet
# └── 1W/
# ├── file_0000.parquet
# └── file_0001.parquet
Simple Example
from oups.store import toplevel
@toplevel
class CityDateIndex:
city: str
date: str
# This creates a schema where datasets are organized as:
# city-date/ (e.g., "berlin-2023.01.01/")
OrderedParquetDataset
OrderedParquetDataset is the core class for managing individual parquet datasets with strict ordering validation. It provides:
Key Features:
Ordered Storage: Data is stored in row groups ordered by a specified column
Incremental Updates: Efficiently merge new data with existing data
Row Group Management: Automatic splitting and merging of row groups
Metadata Tracking: Comprehensive metadata for each row group
Metadata Updates: Add, update, or remove custom key-value metadata
Duplicate Handling: Configurable duplicate detection and removal
Write Optimization: Configurable row group sizes and merge strategies
File Structure:
parent_directory/
├── my_dataset/ # Dataset directory
│ ├── file_0000.parquet # Row group files
│ └── file_0001.parquet
├── my_dataset_opdmd # Metadata file
└── my_dataset.lock # Lock file
Example:
from pathlib import Path
import tempfile
import pandas as pd
from oups.store import OrderedParquetDataset
with tempfile.TemporaryDirectory() as tmpdir:
# Create or load a dataset
dataset = OrderedParquetDataset(str(Path(tmpdir) / "dataset"), ordered_on="timestamp")
# Write data
df = pd.DataFrame({
"timestamp": pd.date_range("2023-01-01", periods=3),
"value": range(3),
})
dataset.write(df=df)
# Read data back
result = dataset.to_pandas()
assert len(result) == 3
Store
The Store class provides a collection interface for managing multiple OrderedParquetDataset instances organized according to an indexer schema.
Key Features:
Schema-based Organization: Uses indexer schemas for dataset discovery
Lazy Loading: Datasets are loaded on-demand
Collection Interface: Dictionary-like access to datasets
Cross-dataset Operations: Advanced querying across multiple datasets
Automatic Discovery: Finds existing datasets matching the schema
Example:
from pathlib import Path
import tempfile
import pandas as pd
from oups.store import Store
from oups.store import toplevel
@toplevel
class CityYearIndex:
city: str
year: str
with tempfile.TemporaryDirectory() as tmpdir:
# Create store in a temporary directory
store = Store(Path(tmpdir) / "data", CityYearIndex)
# Access datasets via indexing (example):
# berlin_2023 = store[CityYearIndex("berlin", "2023")]
# Iterate over all datasets (example):
# for key in store:
# dataset = store[key]
# print(f"Dataset {key} has {len(dataset)} row groups")
# Basic write via Store[key]
berlin_key = CityYearIndex("berlin", "2023")
df = pd.DataFrame({
"timestamp": pd.date_range("2023-01-01", periods=3, freq="D"),
"temperature": [20, 21, 22],
"humidity": [30, 31, 32],
})
store[berlin_key].write(df=df, ordered_on="timestamp")
# Read back
result_df = store[berlin_key].to_pandas()
assert len(result_df) == 3
Advanced Features
Write Method (preferred)
The preferred way to write data is to call write() on an OrderedParquetDataset obtained via Store[key]. This provides advanced data writing capabilities:
Parameters:
row_group_target_size: Control row group sizes (int or pandas frequency string)duplicates_on: Specify columns for duplicate detectionmax_n_off_target_rgs: Control row group coalescing behaviorkey_value_metadata: Store custom metadata (supports add/update/remove operations)
Example:
from pathlib import Path
import tempfile
import pandas as pd
from oups.store import Store, toplevel
@toplevel
class CityYearIndex:
city: str
year: str
with tempfile.TemporaryDirectory() as tmpdir:
store = Store(Path(tmpdir) / "data", CityYearIndex)
berlin_key = CityYearIndex("berlin", "2023")
df = pd.DataFrame({
"timestamp": pd.date_range("2023-01-01", periods=2, freq="D"),
"temperature": [20, 21],
})
# Write with time-based row groups and metadata
store[berlin_key].write(
df=df,
ordered_on="timestamp",
row_group_target_size="1D",
duplicates_on=["timestamp"],
key_value_metadata={
"source": "weather_station",
"version": "2.1",
"processed_by": "stateful_loop",
},
)
# Update existing metadata (add new, update existing, remove with None)
new_df = pd.DataFrame({
"timestamp": pd.date_range("2023-01-03", periods=1, freq="D"),
"temperature": [22],
})
store[berlin_key].write(
df=new_df,
ordered_on="timestamp",
key_value_metadata={
"version": "2.2",
"last_updated": "2023-12-01",
"processed_by": None,
},
)
Advanced: top-level write helper
For advanced or scripting use-cases, the top-level helper oups.store.write provides an equivalent API that writes directly to a path or an existing OrderedParquetDataset instance. Prefer store[key].write(...) for typical workflows; use the helper when you need to write without constructing a Store.
from pathlib import Path
import tempfile
import pandas as pd
from oups.store import write, OrderedParquetDataset
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "dataset"
df = pd.DataFrame({
"timestamp": pd.date_range("2023-01-01", periods=2, freq="D"),
"value": [1, 2],
})
write(
path,
ordered_on="timestamp",
df=df,
row_group_target_size="1D",
duplicates_on=["timestamp"],
)
# Verify
ds = OrderedParquetDataset(str(path), ordered_on="timestamp")
assert len(ds.to_pandas()) == 2
Incremental Updates
from pathlib import Path
import tempfile
import pandas as pd
from oups.store import Store, toplevel
@toplevel
class CityYearIndex:
city: str
year: str
with tempfile.TemporaryDirectory() as tmpdir:
store = Store(Path(tmpdir) / "data", CityYearIndex)
berlin_key = CityYearIndex("berlin", "2023")
df = pd.DataFrame({
"timestamp": pd.date_range("2023-02-01", periods=2, freq="D"),
"temperature": [15, 16],
"humidity": [40, 41],
})
store[berlin_key].write(df=df, ordered_on="timestamp")
# Append newer rows with correct ordering
new_df = pd.DataFrame({
"timestamp": pd.date_range("2023-02-03", periods=2, freq="D"),
"temperature": [17, 18],
"humidity": [42, 43],
})
store[berlin_key].write(df=new_df, ordered_on="timestamp")
result = store[berlin_key].to_pandas()
assert len(result) == 4
iter_intersections
The iter_intersections() method enables efficient querying across multiple datasets with overlapping ranges:
Key Features:
Range Queries: Query specific ranges (time, numeric, etc.) across multiple datasets
Intersection Detection: Automatically finds overlapping row groups
Memory Efficient: Streams data without loading entire datasets
Synchronized Iteration: Iterates through multiple datasets in sync
Warm-up with Previous Rows (n_prev): Prepend up to
n_prevprevious rows to the first yielded chunk for each dataset, if available. Accepts either a singleint(applied to all datasets) or a list ofintvalues matching the keys order.
Example:
from pathlib import Path
import tempfile
import pandas as pd
from oups.store import Store, toplevel
@toplevel
class CityYearIndex:
city: str
year: str
with tempfile.TemporaryDirectory() as tmpdir:
store = Store(Path(tmpdir) / "data", CityYearIndex)
keys = [CityYearIndex("berlin", "2023"), CityYearIndex("paris", "2023")]
# Seed small datasets with overlapping dates
df_berlin = pd.DataFrame({
"timestamp": pd.date_range("2023-01-01", periods=2, freq="D"),
"v": [1, 2],
})
df_paris = pd.DataFrame({
"timestamp": pd.date_range("2023-01-02", periods=2, freq="D"),
"v": [3, 4],
})
store[keys[0]].write(df=df_berlin, ordered_on="timestamp")
store[keys[1]].write(df=df_paris, ordered_on="timestamp")
found = list(store.iter_intersections(
keys,
start=pd.Timestamp("2023-01-01"),
n_prev=[1, 0],
end_excl=pd.Timestamp("2023-01-04"),
))
assert isinstance(found, list)
Best Practices
Indexer Design: Design your indexer schema to match your data access patterns
Ordered Column: Choose an appropriate column for ordering (typically timestamp)
Row Group Size: Balance between query performance and storage efficiency
Duplicate Handling: Use
duplicates_onwhen data quality is a concernMetadata: Use key-value metadata to store important dataset information
Comparison with manual approach
Traditional manual parquet management:
# Manual path management
path = f"/data/{city}/{year}/{month}/data.parquet"
# Manual duplicate handling
existing_df = pd.read_parquet(path)
new_df = pd.concat([existing_df, new_data])
new_df = new_df.drop_duplicates().sort_values('timestamp')
new_df.to_parquet(path)
With oups Store:
@toplevel
class DataIndex:
city: str
year: str
month: str
store = Store("/data", DataIndex)
key = DataIndex("berlin", "2023", "01")
# Automatic path management, duplicate handling, and ordering
store[key].write(
df=new_data,
ordered_on='timestamp',
duplicates_on=['timestamp', 'street']
)
Alternatives
Several alternatives exist for managing dataset collections:
- Arctic (MongoDB-based)
Provides powerful time-series storage
Requires MongoDB infrastructure
More complex deployment and maintenance
- PyStore (Dask-based)
Supports parallelized operations
Less flexible organizational schemas
Performance concerns in some scenarios
- DuckDB or DataFusion
Excellent query performance
SQL-based querying
- Direct Parquet + File Management
Maximum control over file structure
Requires implementing indexing, updates, and concurrency manually
This is how oups started
See Also
API Reference - Complete API reference
Quickstart - End-to-end example across stateful_loop, stateful_ops, store