Skip to content

Retrieval API

The retrieval module provides functions for retrieving features for both training (offline) and inference (online) use cases.

Functions

Offline Retrieval (Training)

mlforge.retrieval.get_training_data

get_training_data(
    features: list[FeatureSpec],
    entity_df: DataFrame,
    store: str | Path | Store = "./feature_store",
    entities: list[EntityKeyTransform] | None = None,
    timestamp: str | None = None,
) -> pl.DataFrame

Retrieve features and join to an entity DataFrame.

Parameters:

Name Type Description Default
features list[FeatureSpec]

Feature specifications. Can be: - "feature_name" - uses latest version - ("feature_name", "1.0.0") - uses specific version

required
entity_df DataFrame

DataFrame with entity keys to join on

required
store str | Path | Store

Path to feature store or Store instance

'./feature_store'
entities list[EntityKeyTransform] | None

Entity key transforms to apply to entity_df before joining

None
timestamp str | None

Column in entity_df to use for point-in-time joins. If provided, features with timestamps will be asof-joined.

None

Returns:

Type Description
DataFrame

entity_df with feature columns joined

Example

from mlforge import get_training_data from transactions.entities import with_user_id

transactions = pl.read_parquet("data/transactions.parquet")

Point-in-time correct training data with mixed versions

training_df = get_training_data( features=[ "user_spend_mean_30d", # latest version ("merchant_features", "1.0.0"), # pinned version ], entity_df=transactions, entities=[with_user_id], timestamp="trans_date_trans_time", )

Source code in src/mlforge/retrieval.py
def get_training_data(
    features: list[FeatureSpec],
    entity_df: pl.DataFrame,
    store: str | Path | store_.Store = "./feature_store",
    entities: list[utils.EntityKeyTransform] | None = None,
    timestamp: str | None = None,
) -> pl.DataFrame:
    """
    Retrieve features and join to an entity DataFrame.

    Args:
        features: Feature specifications. Can be:
            - "feature_name" - uses latest version
            - ("feature_name", "1.0.0") - uses specific version
        entity_df: DataFrame with entity keys to join on
        store: Path to feature store or Store instance
        entities: Entity key transforms to apply to entity_df before joining
        timestamp: Column in entity_df to use for point-in-time joins.
                   If provided, features with timestamps will be asof-joined.

    Returns:
        entity_df with feature columns joined

    Example:
        from mlforge import get_training_data
        from transactions.entities import with_user_id

        transactions = pl.read_parquet("data/transactions.parquet")

        # Point-in-time correct training data with mixed versions
        training_df = get_training_data(
            features=[
                "user_spend_mean_30d",              # latest version
                ("merchant_features", "1.0.0"),    # pinned version
            ],
            entity_df=transactions,
            entities=[with_user_id],
            timestamp="trans_date_trans_time",
        )
    """
    if isinstance(store, (str, Path)):
        store = store_.LocalStore(path=store)

    result = _apply_entity_transforms(entity_df, entities)

    for feature_spec in features:
        # Parse feature specification
        if isinstance(feature_spec, tuple):
            feature_name, feature_version = feature_spec
        else:
            feature_name = feature_spec
            feature_version = None  # Use latest

        if not store.exists(feature_name, feature_version):
            version_str = (
                f" version '{feature_version}'" if feature_version else ""
            )
            raise ValueError(
                f"Feature '{feature_name}'{version_str} not found. Run `mlforge build` first."
            )

        feature_df = store.read(feature_name, feature_version)
        join_keys = list(set(result.columns) & set(feature_df.columns))

        # Remove timestamp columns from join keys—they're handled separately
        if timestamp:
            join_keys = [k for k in join_keys if k != timestamp]

        if not join_keys:
            raise ValueError(
                f"No common columns to join '{feature_name}'. "
                f"entity_df has: {result.columns}, feature has: {feature_df.columns}"
            )

        # Determine join strategy
        feature_timestamp = _get_feature_timestamp(feature_df)

        if timestamp and feature_timestamp:
            # Point-in-time join
            result = _asof_join(
                left=result,
                right=feature_df,
                on_keys=join_keys,
                left_timestamp=timestamp,
                right_timestamp=feature_timestamp,
            )
        else:
            # Standard join
            result = result.join(feature_df, on=join_keys, how="left")

    return result

Online Retrieval (Inference)

mlforge.retrieval.get_online_features

get_online_features(
    features: list[str],
    entity_df: DataFrame,
    store: OnlineStore,
    entities: list[EntityKeyTransform] | None = None,
) -> pl.DataFrame

Retrieve features from an online store for inference.

Unlike get_training_data(), this function: - Always returns latest values (no point-in-time joins) - Does not support versioning (online stores hold latest only) - Uses direct key lookups instead of DataFrame joins

Parameters:

Name Type Description Default
features list[str]

List of feature names to retrieve

required
entity_df DataFrame

DataFrame with entity keys (e.g., inference requests)

required
store OnlineStore

Online store instance (e.g., RedisStore)

required
entities list[EntityKeyTransform] | None

Optional entity key transforms to apply before lookup

None

Returns:

Type Description
DataFrame

entity_df with feature columns joined (None for missing entities)

Example

from mlforge import get_online_features, RedisStore from myproject.entities import with_user_id

store = RedisStore(host="localhost") request_df = pl.DataFrame({"user_id": ["user_123", "user_456"]})

features_df = get_online_features( features=["user_spend"], entity_df=request_df, entities=[with_user_id], store=store, )

Source code in src/mlforge/retrieval.py
def get_online_features(
    features: list[str],
    entity_df: pl.DataFrame,
    store: online_.OnlineStore,
    entities: list[utils.EntityKeyTransform] | None = None,
) -> pl.DataFrame:
    """
    Retrieve features from an online store for inference.

    Unlike get_training_data(), this function:
    - Always returns latest values (no point-in-time joins)
    - Does not support versioning (online stores hold latest only)
    - Uses direct key lookups instead of DataFrame joins

    Args:
        features: List of feature names to retrieve
        entity_df: DataFrame with entity keys (e.g., inference requests)
        store: Online store instance (e.g., RedisStore)
        entities: Optional entity key transforms to apply before lookup

    Returns:
        entity_df with feature columns joined (None for missing entities)

    Example:
        from mlforge import get_online_features, RedisStore
        from myproject.entities import with_user_id

        store = RedisStore(host="localhost")
        request_df = pl.DataFrame({"user_id": ["user_123", "user_456"]})

        features_df = get_online_features(
            features=["user_spend"],
            entity_df=request_df,
            entities=[with_user_id],
            store=store,
        )
    """
    result = _apply_entity_transforms(entity_df, entities)

    # Retrieve each feature and join to result
    for feature_name in features:
        result = _join_online_feature(result, feature_name, store, entities)

    return result