Skip to content

Store API

The store module defines interfaces and implementations for offline feature storage.

Abstract Base Classes

mlforge.store.Store

Bases: ABC

Abstract base class for offline feature storage backends.

Defines the interface that all storage implementations must provide for persisting and retrieving materialized features with versioning.

v0.5.0: Added version parameter to read/write/exists methods and new list_versions/get_latest_version methods.

Source code in src/mlforge/store.py
class Store(ABC):
    """
    Abstract base class for offline feature storage backends.

    Defines the interface that all storage implementations must provide
    for persisting and retrieving materialized features with versioning.

    v0.5.0: Added version parameter to read/write/exists methods and
    new list_versions/get_latest_version methods.
    """

    @abstractmethod
    def write(
        self,
        feature_name: str,
        result: results.ResultKind,
        feature_version: str,
    ) -> dict[str, Any]:
        """
        Persist a materialized feature to storage.

        Args:
            feature_name: Unique identifier for the feature
            result: Result kind containing data to write
            feature_version: Semantic version string (e.g., "1.0.0")

        Returns:
            Metadata dictionary with path, row_count, schema
        """
        ...

    @abstractmethod
    def read(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> pl.DataFrame:
        """
        Retrieve a materialized feature from storage.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version to read. If None, reads latest.

        Returns:
            Feature data as a DataFrame

        Raises:
            FileNotFoundError: If the feature/version has not been materialized
        """
        ...

    @abstractmethod
    def exists(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> bool:
        """
        Check whether a feature version has been materialized.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version to check. If None, checks any version.

        Returns:
            True if feature exists in storage, False otherwise
        """
        ...

    @abstractmethod
    def list_versions(self, feature_name: str) -> list[str]:
        """
        List all versions of a feature.

        Args:
            feature_name: Unique identifier for the feature

        Returns:
            Sorted list of version strings (oldest to newest)
        """
        ...

    @abstractmethod
    def get_latest_version(self, feature_name: str) -> str | None:
        """
        Get the latest version of a feature.

        Args:
            feature_name: Unique identifier for the feature

        Returns:
            Latest version string, or None if no versions exist
        """
        ...

    @abstractmethod
    def path_for(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> Path | str:
        """
        Get the storage path for a feature version.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, uses latest.

        Returns:
            Path or str where the feature is or would be stored
        """
        ...

    @abstractmethod
    def metadata_path_for(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> Path | str:
        """
        Get the storage path for a feature version's metadata file.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, uses latest.

        Returns:
            Path where the feature's .meta.json is or would be stored
        """
        ...

    @abstractmethod
    def write_metadata(
        self,
        feature_name: str,
        metadata: manifest.FeatureMetadata,
    ) -> None:
        """
        Write feature metadata to storage.

        Uses metadata.version to determine storage path.

        Args:
            feature_name: Unique identifier for the feature
            metadata: FeatureMetadata object to persist
        """
        ...

    @abstractmethod
    def read_metadata(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> manifest.FeatureMetadata | None:
        """
        Read feature metadata from storage.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, reads latest.

        Returns:
            FeatureMetadata if exists, None otherwise
        """
        ...

    @abstractmethod
    def list_metadata(self) -> list[manifest.FeatureMetadata]:
        """
        List metadata for latest version of all features in the store.

        Returns:
            List of FeatureMetadata for all features (latest versions only)
        """
        ...

exists abstractmethod

exists(
    feature_name: str, feature_version: str | None = None
) -> bool

Check whether a feature version has been materialized.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version to check. If None, checks any version.

None

Returns:

Type Description
bool

True if feature exists in storage, False otherwise

Source code in src/mlforge/store.py
@abstractmethod
def exists(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> bool:
    """
    Check whether a feature version has been materialized.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version to check. If None, checks any version.

    Returns:
        True if feature exists in storage, False otherwise
    """
    ...

get_latest_version abstractmethod

get_latest_version(feature_name: str) -> str | None

Get the latest version of a feature.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required

Returns:

Type Description
str | None

Latest version string, or None if no versions exist

Source code in src/mlforge/store.py
@abstractmethod
def get_latest_version(self, feature_name: str) -> str | None:
    """
    Get the latest version of a feature.

    Args:
        feature_name: Unique identifier for the feature

    Returns:
        Latest version string, or None if no versions exist
    """
    ...

list_metadata abstractmethod

list_metadata() -> list[manifest.FeatureMetadata]

List metadata for latest version of all features in the store.

Returns:

Type Description
list[FeatureMetadata]

List of FeatureMetadata for all features (latest versions only)

Source code in src/mlforge/store.py
@abstractmethod
def list_metadata(self) -> list[manifest.FeatureMetadata]:
    """
    List metadata for latest version of all features in the store.

    Returns:
        List of FeatureMetadata for all features (latest versions only)
    """
    ...

list_versions abstractmethod

list_versions(feature_name: str) -> list[str]

List all versions of a feature.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required

Returns:

Type Description
list[str]

Sorted list of version strings (oldest to newest)

Source code in src/mlforge/store.py
@abstractmethod
def list_versions(self, feature_name: str) -> list[str]:
    """
    List all versions of a feature.

    Args:
        feature_name: Unique identifier for the feature

    Returns:
        Sorted list of version strings (oldest to newest)
    """
    ...

metadata_path_for abstractmethod

metadata_path_for(
    feature_name: str, feature_version: str | None = None
) -> Path | str

Get the storage path for a feature version's metadata file.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, uses latest.

None

Returns:

Type Description
Path | str

Path where the feature's .meta.json is or would be stored

Source code in src/mlforge/store.py
@abstractmethod
def metadata_path_for(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> Path | str:
    """
    Get the storage path for a feature version's metadata file.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, uses latest.

    Returns:
        Path where the feature's .meta.json is or would be stored
    """
    ...

path_for abstractmethod

path_for(
    feature_name: str, feature_version: str | None = None
) -> Path | str

Get the storage path for a feature version.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, uses latest.

None

Returns:

Type Description
Path | str

Path or str where the feature is or would be stored

Source code in src/mlforge/store.py
@abstractmethod
def path_for(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> Path | str:
    """
    Get the storage path for a feature version.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, uses latest.

    Returns:
        Path or str where the feature is or would be stored
    """
    ...

read abstractmethod

read(
    feature_name: str, feature_version: str | None = None
) -> pl.DataFrame

Retrieve a materialized feature from storage.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version to read. If None, reads latest.

None

Returns:

Type Description
DataFrame

Feature data as a DataFrame

Raises:

Type Description
FileNotFoundError

If the feature/version has not been materialized

Source code in src/mlforge/store.py
@abstractmethod
def read(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> pl.DataFrame:
    """
    Retrieve a materialized feature from storage.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version to read. If None, reads latest.

    Returns:
        Feature data as a DataFrame

    Raises:
        FileNotFoundError: If the feature/version has not been materialized
    """
    ...

read_metadata abstractmethod

read_metadata(
    feature_name: str, feature_version: str | None = None
) -> manifest.FeatureMetadata | None

Read feature metadata from storage.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, reads latest.

None

Returns:

Type Description
FeatureMetadata | None

FeatureMetadata if exists, None otherwise

Source code in src/mlforge/store.py
@abstractmethod
def read_metadata(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> manifest.FeatureMetadata | None:
    """
    Read feature metadata from storage.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, reads latest.

    Returns:
        FeatureMetadata if exists, None otherwise
    """
    ...

write abstractmethod

write(
    feature_name: str,
    result: ResultKind,
    feature_version: str,
) -> dict[str, Any]

Persist a materialized feature to storage.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
result ResultKind

Result kind containing data to write

required
feature_version str

Semantic version string (e.g., "1.0.0")

required

Returns:

Type Description
dict[str, Any]

Metadata dictionary with path, row_count, schema

Source code in src/mlforge/store.py
@abstractmethod
def write(
    self,
    feature_name: str,
    result: results.ResultKind,
    feature_version: str,
) -> dict[str, Any]:
    """
    Persist a materialized feature to storage.

    Args:
        feature_name: Unique identifier for the feature
        result: Result kind containing data to write
        feature_version: Semantic version string (e.g., "1.0.0")

    Returns:
        Metadata dictionary with path, row_count, schema
    """
    ...

write_metadata abstractmethod

write_metadata(
    feature_name: str, metadata: FeatureMetadata
) -> None

Write feature metadata to storage.

Uses metadata.version to determine storage path.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
metadata FeatureMetadata

FeatureMetadata object to persist

required
Source code in src/mlforge/store.py
@abstractmethod
def write_metadata(
    self,
    feature_name: str,
    metadata: manifest.FeatureMetadata,
) -> None:
    """
    Write feature metadata to storage.

    Uses metadata.version to determine storage path.

    Args:
        feature_name: Unique identifier for the feature
        metadata: FeatureMetadata object to persist
    """
    ...

Implementations

Local Storage

mlforge.store.LocalStore

Bases: Store

Local filesystem storage backend using Parquet format.

Stores features in versioned directories

feature_store/ ├── user_spend/ │ ├── 1.0.0/ │ │ ├── data.parquet │ │ └── .meta.json │ ├── 1.1.0/ │ │ └── ... │ └── _latest.json

Attributes:

Name Type Description
path

Root directory for storing feature files

Example

store = LocalStore("./feature_store") store.write("user_age", result, version="1.0.0") age_df = store.read("user_age") # reads latest age_df = store.read("user_age", version="1.0.0") # reads specific

Source code in src/mlforge/store.py
class LocalStore(Store):
    """
    Local filesystem storage backend using Parquet format.

    Stores features in versioned directories:
        feature_store/
        ├── user_spend/
        │   ├── 1.0.0/
        │   │   ├── data.parquet
        │   │   └── .meta.json
        │   ├── 1.1.0/
        │   │   └── ...
        │   └── _latest.json

    Attributes:
        path: Root directory for storing feature files

    Example:
        store = LocalStore("./feature_store")
        store.write("user_age", result, version="1.0.0")
        age_df = store.read("user_age")  # reads latest
        age_df = store.read("user_age", version="1.0.0")  # reads specific
    """

    def __init__(self, path: str | Path = "./feature_store"):
        """
        Initialize local storage backend.

        Args:
            path: Directory path for feature storage. Defaults to "./feature_store".
        """
        self.path = Path(path)
        self.path.mkdir(parents=True, exist_ok=True)

    @override
    def path_for(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> Path:
        """
        Get file path for a feature version.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, uses latest.

        Returns:
            Path to the feature's parquet file
        """
        resolved = version.resolve_version(
            self.path, feature_name, feature_version
        )

        if resolved is None:
            # No versions exist yet - return path for hypothetical 1.0.0
            return version.versioned_data_path(self.path, feature_name, "1.0.0")

        return version.versioned_data_path(self.path, feature_name, resolved)

    @override
    def write(
        self,
        feature_name: str,
        result: results.ResultKind,
        feature_version: str,
    ) -> dict[str, Any]:
        """
        Write feature data to versioned parquet file.

        Args:
            feature_name: Unique identifier for the feature
            result: Engine result containing feature data and metadata
            feature_version: Semantic version string (e.g., "1.0.0")

        Returns:
            Metadata dictionary with path, row count, and schema
        """
        path = version.versioned_data_path(
            self.path, feature_name, feature_version
        )
        path.parent.mkdir(parents=True, exist_ok=True)

        result.write_parquet(path)

        # Update _latest.json pointer
        version.write_latest_pointer(self.path, feature_name, feature_version)

        # Create .gitignore in feature directory (if not present)
        version.write_feature_gitignore(self.path, feature_name)

        return {
            "path": str(path),
            "row_count": result.row_count(),
            "schema": result.schema(),
        }

    @override
    def read(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> pl.DataFrame:
        """
        Read feature data from versioned parquet file.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version to read. If None, reads latest.

        Returns:
            Feature data as a DataFrame

        Raises:
            FileNotFoundError: If the feature/version doesn't exist
        """
        resolved = version.resolve_version(
            self.path, feature_name, feature_version
        )

        if resolved is None:
            raise FileNotFoundError(
                f"Feature '{feature_name}' not found. Run 'mlforge build' first."
            )

        path = version.versioned_data_path(self.path, feature_name, resolved)

        if not path.exists():
            raise FileNotFoundError(
                f"Feature '{feature_name}' version '{resolved}' not found."
            )

        return pl.read_parquet(path)

    @override
    def exists(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> bool:
        """
        Check if feature version exists.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version to check. If None, checks any version.

        Returns:
            True if the feature/version exists, False otherwise
        """
        if feature_version is None:
            # Check if any version exists
            return len(self.list_versions(feature_name)) > 0

        path = version.versioned_data_path(
            self.path, feature_name, feature_version
        )
        return path.exists()

    @override
    def list_versions(self, feature_name: str) -> list[str]:
        """
        List all versions of a feature.

        Args:
            feature_name: Unique identifier for the feature

        Returns:
            Sorted list of version strings (oldest to newest)
        """
        return version.list_versions(self.path, feature_name)

    @override
    def get_latest_version(self, feature_name: str) -> str | None:
        """
        Get the latest version of a feature.

        Args:
            feature_name: Unique identifier for the feature

        Returns:
            Latest version string, or None if no versions exist
        """
        return version.get_latest_version(self.path, feature_name)

    @override
    def metadata_path_for(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> Path:
        """
        Get file path for a feature version's metadata.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, uses latest.

        Returns:
            Path to the feature's .meta.json file
        """
        resolved = version.resolve_version(
            self.path, feature_name, feature_version
        )

        if resolved is None:
            # No versions exist yet
            return version.versioned_metadata_path(
                self.path, feature_name, "1.0.0"
            )

        return version.versioned_metadata_path(
            self.path, feature_name, resolved
        )

    @override
    def write_metadata(
        self,
        feature_name: str,
        metadata: manifest.FeatureMetadata,
    ) -> None:
        """
        Write feature metadata to versioned JSON file.

        Uses metadata.version to determine storage path.

        Args:
            feature_name: Unique identifier for the feature
            metadata: FeatureMetadata object to persist
        """
        path = version.versioned_metadata_path(
            self.path, feature_name, metadata.version
        )
        path.parent.mkdir(parents=True, exist_ok=True)
        manifest.write_metadata_file(path, metadata)

    @override
    def read_metadata(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> manifest.FeatureMetadata | None:
        """
        Read feature metadata from versioned JSON file.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, reads latest.

        Returns:
            FeatureMetadata if exists and valid, None otherwise
        """
        resolved = version.resolve_version(
            self.path, feature_name, feature_version
        )

        if resolved is None:
            return None

        path = version.versioned_metadata_path(
            self.path, feature_name, resolved
        )
        return manifest.read_metadata_file(path)

    @override
    def list_metadata(self) -> list[manifest.FeatureMetadata]:
        """
        List metadata for latest version of all features.

        Scans for feature directories and reads their latest metadata.

        Returns:
            List of FeatureMetadata for all features (latest versions only)
        """
        metadata_list: list[manifest.FeatureMetadata] = []

        # Scan for feature directories (contain version subdirectories)
        for feature_dir in self.path.iterdir():
            if not feature_dir.is_dir() or feature_dir.name.startswith("_"):
                continue

            latest = self.get_latest_version(feature_dir.name)
            if latest:
                meta = self.read_metadata(feature_dir.name, latest)
                if meta:
                    metadata_list.append(meta)

        return metadata_list

__init__

__init__(path: str | Path = './feature_store')

Initialize local storage backend.

Parameters:

Name Type Description Default
path str | Path

Directory path for feature storage. Defaults to "./feature_store".

'./feature_store'
Source code in src/mlforge/store.py
def __init__(self, path: str | Path = "./feature_store"):
    """
    Initialize local storage backend.

    Args:
        path: Directory path for feature storage. Defaults to "./feature_store".
    """
    self.path = Path(path)
    self.path.mkdir(parents=True, exist_ok=True)

exists

exists(
    feature_name: str, feature_version: str | None = None
) -> bool

Check if feature version exists.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version to check. If None, checks any version.

None

Returns:

Type Description
bool

True if the feature/version exists, False otherwise

Source code in src/mlforge/store.py
@override
def exists(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> bool:
    """
    Check if feature version exists.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version to check. If None, checks any version.

    Returns:
        True if the feature/version exists, False otherwise
    """
    if feature_version is None:
        # Check if any version exists
        return len(self.list_versions(feature_name)) > 0

    path = version.versioned_data_path(
        self.path, feature_name, feature_version
    )
    return path.exists()

get_latest_version

get_latest_version(feature_name: str) -> str | None

Get the latest version of a feature.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required

Returns:

Type Description
str | None

Latest version string, or None if no versions exist

Source code in src/mlforge/store.py
@override
def get_latest_version(self, feature_name: str) -> str | None:
    """
    Get the latest version of a feature.

    Args:
        feature_name: Unique identifier for the feature

    Returns:
        Latest version string, or None if no versions exist
    """
    return version.get_latest_version(self.path, feature_name)

list_metadata

list_metadata() -> list[manifest.FeatureMetadata]

List metadata for latest version of all features.

Scans for feature directories and reads their latest metadata.

Returns:

Type Description
list[FeatureMetadata]

List of FeatureMetadata for all features (latest versions only)

Source code in src/mlforge/store.py
@override
def list_metadata(self) -> list[manifest.FeatureMetadata]:
    """
    List metadata for latest version of all features.

    Scans for feature directories and reads their latest metadata.

    Returns:
        List of FeatureMetadata for all features (latest versions only)
    """
    metadata_list: list[manifest.FeatureMetadata] = []

    # Scan for feature directories (contain version subdirectories)
    for feature_dir in self.path.iterdir():
        if not feature_dir.is_dir() or feature_dir.name.startswith("_"):
            continue

        latest = self.get_latest_version(feature_dir.name)
        if latest:
            meta = self.read_metadata(feature_dir.name, latest)
            if meta:
                metadata_list.append(meta)

    return metadata_list

list_versions

list_versions(feature_name: str) -> list[str]

List all versions of a feature.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required

Returns:

Type Description
list[str]

Sorted list of version strings (oldest to newest)

Source code in src/mlforge/store.py
@override
def list_versions(self, feature_name: str) -> list[str]:
    """
    List all versions of a feature.

    Args:
        feature_name: Unique identifier for the feature

    Returns:
        Sorted list of version strings (oldest to newest)
    """
    return version.list_versions(self.path, feature_name)

metadata_path_for

metadata_path_for(
    feature_name: str, feature_version: str | None = None
) -> Path

Get file path for a feature version's metadata.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, uses latest.

None

Returns:

Type Description
Path

Path to the feature's .meta.json file

Source code in src/mlforge/store.py
@override
def metadata_path_for(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> Path:
    """
    Get file path for a feature version's metadata.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, uses latest.

    Returns:
        Path to the feature's .meta.json file
    """
    resolved = version.resolve_version(
        self.path, feature_name, feature_version
    )

    if resolved is None:
        # No versions exist yet
        return version.versioned_metadata_path(
            self.path, feature_name, "1.0.0"
        )

    return version.versioned_metadata_path(
        self.path, feature_name, resolved
    )

path_for

path_for(
    feature_name: str, feature_version: str | None = None
) -> Path

Get file path for a feature version.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, uses latest.

None

Returns:

Type Description
Path

Path to the feature's parquet file

Source code in src/mlforge/store.py
@override
def path_for(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> Path:
    """
    Get file path for a feature version.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, uses latest.

    Returns:
        Path to the feature's parquet file
    """
    resolved = version.resolve_version(
        self.path, feature_name, feature_version
    )

    if resolved is None:
        # No versions exist yet - return path for hypothetical 1.0.0
        return version.versioned_data_path(self.path, feature_name, "1.0.0")

    return version.versioned_data_path(self.path, feature_name, resolved)

read

read(
    feature_name: str, feature_version: str | None = None
) -> pl.DataFrame

Read feature data from versioned parquet file.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version to read. If None, reads latest.

None

Returns:

Type Description
DataFrame

Feature data as a DataFrame

Raises:

Type Description
FileNotFoundError

If the feature/version doesn't exist

Source code in src/mlforge/store.py
@override
def read(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> pl.DataFrame:
    """
    Read feature data from versioned parquet file.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version to read. If None, reads latest.

    Returns:
        Feature data as a DataFrame

    Raises:
        FileNotFoundError: If the feature/version doesn't exist
    """
    resolved = version.resolve_version(
        self.path, feature_name, feature_version
    )

    if resolved is None:
        raise FileNotFoundError(
            f"Feature '{feature_name}' not found. Run 'mlforge build' first."
        )

    path = version.versioned_data_path(self.path, feature_name, resolved)

    if not path.exists():
        raise FileNotFoundError(
            f"Feature '{feature_name}' version '{resolved}' not found."
        )

    return pl.read_parquet(path)

read_metadata

read_metadata(
    feature_name: str, feature_version: str | None = None
) -> manifest.FeatureMetadata | None

Read feature metadata from versioned JSON file.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, reads latest.

None

Returns:

Type Description
FeatureMetadata | None

FeatureMetadata if exists and valid, None otherwise

Source code in src/mlforge/store.py
@override
def read_metadata(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> manifest.FeatureMetadata | None:
    """
    Read feature metadata from versioned JSON file.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, reads latest.

    Returns:
        FeatureMetadata if exists and valid, None otherwise
    """
    resolved = version.resolve_version(
        self.path, feature_name, feature_version
    )

    if resolved is None:
        return None

    path = version.versioned_metadata_path(
        self.path, feature_name, resolved
    )
    return manifest.read_metadata_file(path)

write

write(
    feature_name: str,
    result: ResultKind,
    feature_version: str,
) -> dict[str, Any]

Write feature data to versioned parquet file.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
result ResultKind

Engine result containing feature data and metadata

required
feature_version str

Semantic version string (e.g., "1.0.0")

required

Returns:

Type Description
dict[str, Any]

Metadata dictionary with path, row count, and schema

Source code in src/mlforge/store.py
@override
def write(
    self,
    feature_name: str,
    result: results.ResultKind,
    feature_version: str,
) -> dict[str, Any]:
    """
    Write feature data to versioned parquet file.

    Args:
        feature_name: Unique identifier for the feature
        result: Engine result containing feature data and metadata
        feature_version: Semantic version string (e.g., "1.0.0")

    Returns:
        Metadata dictionary with path, row count, and schema
    """
    path = version.versioned_data_path(
        self.path, feature_name, feature_version
    )
    path.parent.mkdir(parents=True, exist_ok=True)

    result.write_parquet(path)

    # Update _latest.json pointer
    version.write_latest_pointer(self.path, feature_name, feature_version)

    # Create .gitignore in feature directory (if not present)
    version.write_feature_gitignore(self.path, feature_name)

    return {
        "path": str(path),
        "row_count": result.row_count(),
        "schema": result.schema(),
    }

write_metadata

write_metadata(
    feature_name: str, metadata: FeatureMetadata
) -> None

Write feature metadata to versioned JSON file.

Uses metadata.version to determine storage path.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
metadata FeatureMetadata

FeatureMetadata object to persist

required
Source code in src/mlforge/store.py
@override
def write_metadata(
    self,
    feature_name: str,
    metadata: manifest.FeatureMetadata,
) -> None:
    """
    Write feature metadata to versioned JSON file.

    Uses metadata.version to determine storage path.

    Args:
        feature_name: Unique identifier for the feature
        metadata: FeatureMetadata object to persist
    """
    path = version.versioned_metadata_path(
        self.path, feature_name, metadata.version
    )
    path.parent.mkdir(parents=True, exist_ok=True)
    manifest.write_metadata_file(path, metadata)

Cloud Storage

mlforge.store.S3Store

Bases: Store

Amazon S3 storage backend using Parquet format.

Stores features in versioned directories within an S3 bucket

s3://bucket/prefix/ ├── user_spend/ │ ├── 1.0.0/ │ │ ├── data.parquet │ │ └── .meta.json │ ├── 1.1.0/ │ │ └── ... │ └── _latest.json

Uses AWS credentials from environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION).

Attributes:

Name Type Description
bucket

S3 bucket name for storing features

prefix

Optional path prefix within the bucket

region

AWS region (optional)

Example

store = S3Store(bucket="mlforge-features", prefix="prod/features") store.write("user_age", result, version="1.0.0") age_df = store.read("user_age") # reads latest

Source code in src/mlforge/store.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
class S3Store(Store):
    """
    Amazon S3 storage backend using Parquet format.

    Stores features in versioned directories within an S3 bucket:
        s3://bucket/prefix/
        ├── user_spend/
        │   ├── 1.0.0/
        │   │   ├── data.parquet
        │   │   └── .meta.json
        │   ├── 1.1.0/
        │   │   └── ...
        │   └── _latest.json

    Uses AWS credentials from environment variables
    (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION).

    Attributes:
        bucket: S3 bucket name for storing features
        prefix: Optional path prefix within the bucket
        region: AWS region (optional)

    Example:
        store = S3Store(bucket="mlforge-features", prefix="prod/features")
        store.write("user_age", result, version="1.0.0")
        age_df = store.read("user_age")  # reads latest
    """

    def __init__(
        self, bucket: str, prefix: str = "", region: str | None = None
    ) -> None:
        """
        Initialize S3 storage backend.

        Args:
            bucket: S3 bucket name for feature storage
            prefix: Path prefix within bucket. Defaults to empty string.
            region: AWS region. Defaults to None (uses AWS_DEFAULT_REGION).

        Raises:
            ValueError: If bucket doesn't exist or is not accessible
        """
        self.bucket = bucket
        self.prefix = prefix.strip("/")
        self.region = region
        self._s3 = s3fs.S3FileSystem()  # Uses AWS env vars automatically

        if not self._s3.exists(self.bucket):
            raise ValueError(
                f"Bucket '{self.bucket}' does not exist or is not accessible. "
                f"Ensure the bucket is created and credentials have appropriate permissions."
            )

    def _base_path(self) -> str:
        """Get base S3 path (bucket/prefix)."""
        if self.prefix:
            return f"s3://{self.bucket}/{self.prefix}"
        return f"s3://{self.bucket}"

    def _versioned_data_path(
        self, feature_name: str, feature_version: str
    ) -> str:
        """Get S3 path for versioned feature data."""
        return (
            f"{self._base_path()}/{feature_name}/{feature_version}/data.parquet"
        )

    def _versioned_metadata_path(
        self, feature_name: str, feature_version: str
    ) -> str:
        """Get S3 path for versioned feature metadata."""
        return (
            f"{self._base_path()}/{feature_name}/{feature_version}/.meta.json"
        )

    def _latest_pointer_path(self, feature_name: str) -> str:
        """Get S3 path for _latest.json pointer."""
        return f"{self._base_path()}/{feature_name}/_latest.json"

    def _feature_dir_path(self, feature_name: str) -> str:
        """Get S3 path for feature directory."""
        return f"{self._base_path()}/{feature_name}"

    def _write_latest_pointer(
        self, feature_name: str, feature_version: str
    ) -> None:
        """Write _latest.json pointer to S3."""
        path = self._latest_pointer_path(feature_name)
        with self._s3.open(path, "w") as f:
            json.dump({"version": feature_version}, f, indent=2)

    def _read_latest_pointer(self, feature_name: str) -> str | None:
        """Read _latest.json pointer from S3."""
        path = self._latest_pointer_path(feature_name)
        if not self._s3.exists(path):
            return None

        try:
            with self._s3.open(path, "r") as f:
                data = json.load(f)
            return data.get("version")
        except (json.JSONDecodeError, KeyError):
            return None

    @override
    def list_versions(self, feature_name: str) -> list[str]:
        """
        List all versions of a feature in S3.

        Args:
            feature_name: Unique identifier for the feature

        Returns:
            Sorted list of version strings (oldest to newest)
        """
        feature_dir = self._feature_dir_path(feature_name)

        # Remove s3:// prefix for ls
        feature_dir_key = feature_dir.replace("s3://", "")

        try:
            # List directories in the feature directory
            items = self._s3.ls(feature_dir_key, detail=False)
        except FileNotFoundError:
            return []

        versions = []
        for item in items:
            # Extract the directory name (version)
            name = item.split("/")[-1]
            if version.is_valid_version(name):
                versions.append(name)

        return version.sort_versions(versions)

    @override
    def get_latest_version(self, feature_name: str) -> str | None:
        """
        Get the latest version of a feature from S3.

        Args:
            feature_name: Unique identifier for the feature

        Returns:
            Latest version string, or None if no versions exist
        """
        return self._read_latest_pointer(feature_name)

    def _resolve_version(
        self, feature_name: str, feature_version: str | None
    ) -> str | None:
        """Resolve version to latest if None."""
        if feature_version is not None:
            return feature_version
        return self.get_latest_version(feature_name)

    @override
    def path_for(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> str:
        """
        Get S3 URI for a feature version.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, uses latest.

        Returns:
            S3 URI where the feature is or would be stored
        """
        resolved = self._resolve_version(feature_name, feature_version)

        if resolved is None:
            # No versions exist yet
            return self._versioned_data_path(feature_name, "1.0.0")

        return self._versioned_data_path(feature_name, resolved)

    @override
    def write(
        self,
        feature_name: str,
        result: results.ResultKind,
        feature_version: str,
    ) -> dict[str, Any]:
        """
        Write feature data to versioned S3 parquet file.

        Args:
            feature_name: Unique identifier for the feature
            result: Engine result containing feature data and metadata
            feature_version: Semantic version string (e.g., "1.0.0")

        Returns:
            Metadata dictionary with S3 URI, row count, and schema
        """
        path = self._versioned_data_path(feature_name, feature_version)
        result.write_parquet(path)

        # Update _latest.json pointer
        self._write_latest_pointer(feature_name, feature_version)

        return {
            "path": path,
            "row_count": result.row_count(),
            "schema": result.schema(),
        }

    @override
    def read(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> pl.DataFrame:
        """
        Read feature data from versioned S3 parquet file.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version to read. If None, reads latest.

        Returns:
            Feature data as a DataFrame

        Raises:
            FileNotFoundError: If the feature/version doesn't exist
        """
        resolved = self._resolve_version(feature_name, feature_version)

        if resolved is None:
            raise FileNotFoundError(
                f"Feature '{feature_name}' not found. Run 'mlforge build' first."
            )

        path = self._versioned_data_path(feature_name, resolved)

        if not self._s3.exists(path):
            raise FileNotFoundError(
                f"Feature '{feature_name}' version '{resolved}' not found."
            )

        return pl.read_parquet(path)

    @override
    def exists(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> bool:
        """
        Check if feature version exists in S3.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version to check. If None, checks any version.

        Returns:
            True if the feature/version exists, False otherwise
        """
        if feature_version is None:
            # Check if any version exists
            return len(self.list_versions(feature_name)) > 0

        path = self._versioned_data_path(feature_name, feature_version)
        return self._s3.exists(path)

    @override
    def metadata_path_for(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> str:
        """
        Get S3 URI for a feature version's metadata.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, uses latest.

        Returns:
            S3 URI where the feature's metadata is or would be stored
        """
        resolved = self._resolve_version(feature_name, feature_version)

        if resolved is None:
            return self._versioned_metadata_path(feature_name, "1.0.0")

        return self._versioned_metadata_path(feature_name, resolved)

    @override
    def write_metadata(
        self,
        feature_name: str,
        metadata: manifest.FeatureMetadata,
    ) -> None:
        """
        Write feature metadata to versioned S3 JSON file.

        Uses metadata.version to determine storage path.

        Args:
            feature_name: Unique identifier for the feature
            metadata: FeatureMetadata object to persist
        """
        path = self._versioned_metadata_path(feature_name, metadata.version)
        with self._s3.open(path, "w") as f:
            json.dump(metadata.to_dict(), f, indent=2)

    @override
    def read_metadata(
        self,
        feature_name: str,
        feature_version: str | None = None,
    ) -> manifest.FeatureMetadata | None:
        """
        Read feature metadata from versioned S3 JSON file.

        Args:
            feature_name: Unique identifier for the feature
            feature_version: Specific version. If None, reads latest.

        Returns:
            FeatureMetadata if exists and valid, None otherwise
        """
        resolved = self._resolve_version(feature_name, feature_version)

        if resolved is None:
            return None

        path = self._versioned_metadata_path(feature_name, resolved)

        if not self._s3.exists(path):
            return None

        try:
            with self._s3.open(path, "r") as f:
                data = json.load(f)
        except json.JSONDecodeError as e:
            logger.warning(f"Invalid JSON in {path}: {e}")
            return None

        try:
            return manifest.FeatureMetadata.from_dict(data)
        except KeyError as e:
            logger.warning(f"Schema mismatch in {path}: missing key {e}")
            return None

    @override
    def list_metadata(self) -> list[manifest.FeatureMetadata]:
        """
        List metadata for latest version of all features in S3.

        Scans for feature directories and reads their latest metadata.

        Returns:
            List of FeatureMetadata for all features (latest versions only)
        """
        metadata_list: list[manifest.FeatureMetadata] = []

        # List all directories at the base path
        base_key = self._base_path().replace("s3://", "")

        try:
            items = self._s3.ls(base_key, detail=False)
        except FileNotFoundError:
            return []

        for item in items:
            feature_name = item.split("/")[-1]

            # Skip hidden/metadata directories
            if feature_name.startswith("_"):
                continue

            latest = self.get_latest_version(feature_name)
            if latest:
                meta = self.read_metadata(feature_name, latest)
                if meta:
                    metadata_list.append(meta)

        return metadata_list

__init__

__init__(
    bucket: str, prefix: str = "", region: str | None = None
) -> None

Initialize S3 storage backend.

Parameters:

Name Type Description Default
bucket str

S3 bucket name for feature storage

required
prefix str

Path prefix within bucket. Defaults to empty string.

''
region str | None

AWS region. Defaults to None (uses AWS_DEFAULT_REGION).

None

Raises:

Type Description
ValueError

If bucket doesn't exist or is not accessible

Source code in src/mlforge/store.py
def __init__(
    self, bucket: str, prefix: str = "", region: str | None = None
) -> None:
    """
    Initialize S3 storage backend.

    Args:
        bucket: S3 bucket name for feature storage
        prefix: Path prefix within bucket. Defaults to empty string.
        region: AWS region. Defaults to None (uses AWS_DEFAULT_REGION).

    Raises:
        ValueError: If bucket doesn't exist or is not accessible
    """
    self.bucket = bucket
    self.prefix = prefix.strip("/")
    self.region = region
    self._s3 = s3fs.S3FileSystem()  # Uses AWS env vars automatically

    if not self._s3.exists(self.bucket):
        raise ValueError(
            f"Bucket '{self.bucket}' does not exist or is not accessible. "
            f"Ensure the bucket is created and credentials have appropriate permissions."
        )

exists

exists(
    feature_name: str, feature_version: str | None = None
) -> bool

Check if feature version exists in S3.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version to check. If None, checks any version.

None

Returns:

Type Description
bool

True if the feature/version exists, False otherwise

Source code in src/mlforge/store.py
@override
def exists(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> bool:
    """
    Check if feature version exists in S3.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version to check. If None, checks any version.

    Returns:
        True if the feature/version exists, False otherwise
    """
    if feature_version is None:
        # Check if any version exists
        return len(self.list_versions(feature_name)) > 0

    path = self._versioned_data_path(feature_name, feature_version)
    return self._s3.exists(path)

get_latest_version

get_latest_version(feature_name: str) -> str | None

Get the latest version of a feature from S3.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required

Returns:

Type Description
str | None

Latest version string, or None if no versions exist

Source code in src/mlforge/store.py
@override
def get_latest_version(self, feature_name: str) -> str | None:
    """
    Get the latest version of a feature from S3.

    Args:
        feature_name: Unique identifier for the feature

    Returns:
        Latest version string, or None if no versions exist
    """
    return self._read_latest_pointer(feature_name)

list_metadata

list_metadata() -> list[manifest.FeatureMetadata]

List metadata for latest version of all features in S3.

Scans for feature directories and reads their latest metadata.

Returns:

Type Description
list[FeatureMetadata]

List of FeatureMetadata for all features (latest versions only)

Source code in src/mlforge/store.py
@override
def list_metadata(self) -> list[manifest.FeatureMetadata]:
    """
    List metadata for latest version of all features in S3.

    Scans for feature directories and reads their latest metadata.

    Returns:
        List of FeatureMetadata for all features (latest versions only)
    """
    metadata_list: list[manifest.FeatureMetadata] = []

    # List all directories at the base path
    base_key = self._base_path().replace("s3://", "")

    try:
        items = self._s3.ls(base_key, detail=False)
    except FileNotFoundError:
        return []

    for item in items:
        feature_name = item.split("/")[-1]

        # Skip hidden/metadata directories
        if feature_name.startswith("_"):
            continue

        latest = self.get_latest_version(feature_name)
        if latest:
            meta = self.read_metadata(feature_name, latest)
            if meta:
                metadata_list.append(meta)

    return metadata_list

list_versions

list_versions(feature_name: str) -> list[str]

List all versions of a feature in S3.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required

Returns:

Type Description
list[str]

Sorted list of version strings (oldest to newest)

Source code in src/mlforge/store.py
@override
def list_versions(self, feature_name: str) -> list[str]:
    """
    List all versions of a feature in S3.

    Args:
        feature_name: Unique identifier for the feature

    Returns:
        Sorted list of version strings (oldest to newest)
    """
    feature_dir = self._feature_dir_path(feature_name)

    # Remove s3:// prefix for ls
    feature_dir_key = feature_dir.replace("s3://", "")

    try:
        # List directories in the feature directory
        items = self._s3.ls(feature_dir_key, detail=False)
    except FileNotFoundError:
        return []

    versions = []
    for item in items:
        # Extract the directory name (version)
        name = item.split("/")[-1]
        if version.is_valid_version(name):
            versions.append(name)

    return version.sort_versions(versions)

metadata_path_for

metadata_path_for(
    feature_name: str, feature_version: str | None = None
) -> str

Get S3 URI for a feature version's metadata.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, uses latest.

None

Returns:

Type Description
str

S3 URI where the feature's metadata is or would be stored

Source code in src/mlforge/store.py
@override
def metadata_path_for(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> str:
    """
    Get S3 URI for a feature version's metadata.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, uses latest.

    Returns:
        S3 URI where the feature's metadata is or would be stored
    """
    resolved = self._resolve_version(feature_name, feature_version)

    if resolved is None:
        return self._versioned_metadata_path(feature_name, "1.0.0")

    return self._versioned_metadata_path(feature_name, resolved)

path_for

path_for(
    feature_name: str, feature_version: str | None = None
) -> str

Get S3 URI for a feature version.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, uses latest.

None

Returns:

Type Description
str

S3 URI where the feature is or would be stored

Source code in src/mlforge/store.py
@override
def path_for(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> str:
    """
    Get S3 URI for a feature version.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, uses latest.

    Returns:
        S3 URI where the feature is or would be stored
    """
    resolved = self._resolve_version(feature_name, feature_version)

    if resolved is None:
        # No versions exist yet
        return self._versioned_data_path(feature_name, "1.0.0")

    return self._versioned_data_path(feature_name, resolved)

read

read(
    feature_name: str, feature_version: str | None = None
) -> pl.DataFrame

Read feature data from versioned S3 parquet file.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version to read. If None, reads latest.

None

Returns:

Type Description
DataFrame

Feature data as a DataFrame

Raises:

Type Description
FileNotFoundError

If the feature/version doesn't exist

Source code in src/mlforge/store.py
@override
def read(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> pl.DataFrame:
    """
    Read feature data from versioned S3 parquet file.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version to read. If None, reads latest.

    Returns:
        Feature data as a DataFrame

    Raises:
        FileNotFoundError: If the feature/version doesn't exist
    """
    resolved = self._resolve_version(feature_name, feature_version)

    if resolved is None:
        raise FileNotFoundError(
            f"Feature '{feature_name}' not found. Run 'mlforge build' first."
        )

    path = self._versioned_data_path(feature_name, resolved)

    if not self._s3.exists(path):
        raise FileNotFoundError(
            f"Feature '{feature_name}' version '{resolved}' not found."
        )

    return pl.read_parquet(path)

read_metadata

read_metadata(
    feature_name: str, feature_version: str | None = None
) -> manifest.FeatureMetadata | None

Read feature metadata from versioned S3 JSON file.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
feature_version str | None

Specific version. If None, reads latest.

None

Returns:

Type Description
FeatureMetadata | None

FeatureMetadata if exists and valid, None otherwise

Source code in src/mlforge/store.py
@override
def read_metadata(
    self,
    feature_name: str,
    feature_version: str | None = None,
) -> manifest.FeatureMetadata | None:
    """
    Read feature metadata from versioned S3 JSON file.

    Args:
        feature_name: Unique identifier for the feature
        feature_version: Specific version. If None, reads latest.

    Returns:
        FeatureMetadata if exists and valid, None otherwise
    """
    resolved = self._resolve_version(feature_name, feature_version)

    if resolved is None:
        return None

    path = self._versioned_metadata_path(feature_name, resolved)

    if not self._s3.exists(path):
        return None

    try:
        with self._s3.open(path, "r") as f:
            data = json.load(f)
    except json.JSONDecodeError as e:
        logger.warning(f"Invalid JSON in {path}: {e}")
        return None

    try:
        return manifest.FeatureMetadata.from_dict(data)
    except KeyError as e:
        logger.warning(f"Schema mismatch in {path}: missing key {e}")
        return None

write

write(
    feature_name: str,
    result: ResultKind,
    feature_version: str,
) -> dict[str, Any]

Write feature data to versioned S3 parquet file.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
result ResultKind

Engine result containing feature data and metadata

required
feature_version str

Semantic version string (e.g., "1.0.0")

required

Returns:

Type Description
dict[str, Any]

Metadata dictionary with S3 URI, row count, and schema

Source code in src/mlforge/store.py
@override
def write(
    self,
    feature_name: str,
    result: results.ResultKind,
    feature_version: str,
) -> dict[str, Any]:
    """
    Write feature data to versioned S3 parquet file.

    Args:
        feature_name: Unique identifier for the feature
        result: Engine result containing feature data and metadata
        feature_version: Semantic version string (e.g., "1.0.0")

    Returns:
        Metadata dictionary with S3 URI, row count, and schema
    """
    path = self._versioned_data_path(feature_name, feature_version)
    result.write_parquet(path)

    # Update _latest.json pointer
    self._write_latest_pointer(feature_name, feature_version)

    return {
        "path": path,
        "row_count": result.row_count(),
        "schema": result.schema(),
    }

write_metadata

write_metadata(
    feature_name: str, metadata: FeatureMetadata
) -> None

Write feature metadata to versioned S3 JSON file.

Uses metadata.version to determine storage path.

Parameters:

Name Type Description Default
feature_name str

Unique identifier for the feature

required
metadata FeatureMetadata

FeatureMetadata object to persist

required
Source code in src/mlforge/store.py
@override
def write_metadata(
    self,
    feature_name: str,
    metadata: manifest.FeatureMetadata,
) -> None:
    """
    Write feature metadata to versioned S3 JSON file.

    Uses metadata.version to determine storage path.

    Args:
        feature_name: Unique identifier for the feature
        metadata: FeatureMetadata object to persist
    """
    path = self._versioned_metadata_path(feature_name, metadata.version)
    with self._s3.open(path, "w") as f:
        json.dump(metadata.to_dict(), f, indent=2)

Type Aliases

mlforge.store.OfflineStoreKind

OfflineStoreKind = LocalStore | S3Store