Skip to content

Online Store API

The online module provides abstract base classes and implementations for real-time feature serving.

Classes

OnlineStore (Abstract Base Class)

mlforge.online.OnlineStore

Bases: ABC

Abstract base class for online feature storage backends.

Online stores provide low-latency read/write access to feature values for real-time ML inference. Unlike offline stores (which store full feature history), online stores typically only hold the latest values.

Key design
  • Simple key-value model: entity keys -> feature values
  • JSON serialization for human-readable debugging
  • Batch operations for efficient bulk access
Source code in src/mlforge/online.py
class OnlineStore(ABC):
    """
    Abstract base class for online feature storage backends.

    Online stores provide low-latency read/write access to feature values
    for real-time ML inference. Unlike offline stores (which store full
    feature history), online stores typically only hold the latest values.

    Key design:
        - Simple key-value model: entity keys -> feature values
        - JSON serialization for human-readable debugging
        - Batch operations for efficient bulk access
    """

    @abstractmethod
    def write(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
        values: dict[str, Any],
    ) -> None:
        """
        Write feature values for a single entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values (e.g., {"user_id": "123"})
            values: Feature column values (e.g., {"amount__sum__7d": 1500.0})
        """
        ...

    @abstractmethod
    def write_batch(
        self,
        feature_name: str,
        records: list[dict[str, Any]],
        entity_key_columns: list[str],
    ) -> int:
        """
        Write feature values for multiple entities.

        Args:
            feature_name: Name of the feature
            records: List of records, each containing entity keys and feature values
            entity_key_columns: Column names that form the entity key

        Returns:
            Number of records written
        """
        ...

    @abstractmethod
    def read(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
    ) -> dict[str, Any] | None:
        """
        Read feature values for a single entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values

        Returns:
            Feature values dict, or None if not found
        """
        ...

    @abstractmethod
    def read_batch(
        self,
        feature_name: str,
        entity_keys: list[dict[str, str]],
    ) -> list[dict[str, Any] | None]:
        """
        Read feature values for multiple entities.

        Args:
            feature_name: Name of the feature
            entity_keys: List of entity key dicts

        Returns:
            List of feature value dicts (None for missing entities)
        """
        ...

    @abstractmethod
    def delete(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
    ) -> bool:
        """
        Delete feature values for a single entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values

        Returns:
            True if deleted, False if not found
        """
        ...

    @abstractmethod
    def exists(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
    ) -> bool:
        """
        Check if feature values exist for an entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values

        Returns:
            True if exists, False otherwise
        """
        ...

delete abstractmethod

delete(
    feature_name: str, entity_keys: dict[str, str]
) -> bool

Delete feature values for a single entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/mlforge/online.py
@abstractmethod
def delete(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
) -> bool:
    """
    Delete feature values for a single entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values

    Returns:
        True if deleted, False if not found
    """
    ...

exists abstractmethod

exists(
    feature_name: str, entity_keys: dict[str, str]
) -> bool

Check if feature values exist for an entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values

required

Returns:

Type Description
bool

True if exists, False otherwise

Source code in src/mlforge/online.py
@abstractmethod
def exists(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
) -> bool:
    """
    Check if feature values exist for an entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values

    Returns:
        True if exists, False otherwise
    """
    ...

read abstractmethod

read(
    feature_name: str, entity_keys: dict[str, str]
) -> dict[str, Any] | None

Read feature values for a single entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values

required

Returns:

Type Description
dict[str, Any] | None

Feature values dict, or None if not found

Source code in src/mlforge/online.py
@abstractmethod
def read(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
) -> dict[str, Any] | None:
    """
    Read feature values for a single entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values

    Returns:
        Feature values dict, or None if not found
    """
    ...

read_batch abstractmethod

read_batch(
    feature_name: str, entity_keys: list[dict[str, str]]
) -> list[dict[str, Any] | None]

Read feature values for multiple entities.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys list[dict[str, str]]

List of entity key dicts

required

Returns:

Type Description
list[dict[str, Any] | None]

List of feature value dicts (None for missing entities)

Source code in src/mlforge/online.py
@abstractmethod
def read_batch(
    self,
    feature_name: str,
    entity_keys: list[dict[str, str]],
) -> list[dict[str, Any] | None]:
    """
    Read feature values for multiple entities.

    Args:
        feature_name: Name of the feature
        entity_keys: List of entity key dicts

    Returns:
        List of feature value dicts (None for missing entities)
    """
    ...

write abstractmethod

write(
    feature_name: str,
    entity_keys: dict[str, str],
    values: dict[str, Any],
) -> None

Write feature values for a single entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values (e.g., {"user_id": "123"})

required
values dict[str, Any]

Feature column values (e.g., {"amount__sum__7d": 1500.0})

required
Source code in src/mlforge/online.py
@abstractmethod
def write(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
    values: dict[str, Any],
) -> None:
    """
    Write feature values for a single entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values (e.g., {"user_id": "123"})
        values: Feature column values (e.g., {"amount__sum__7d": 1500.0})
    """
    ...

write_batch abstractmethod

write_batch(
    feature_name: str,
    records: list[dict[str, Any]],
    entity_key_columns: list[str],
) -> int

Write feature values for multiple entities.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
records list[dict[str, Any]]

List of records, each containing entity keys and feature values

required
entity_key_columns list[str]

Column names that form the entity key

required

Returns:

Type Description
int

Number of records written

Source code in src/mlforge/online.py
@abstractmethod
def write_batch(
    self,
    feature_name: str,
    records: list[dict[str, Any]],
    entity_key_columns: list[str],
) -> int:
    """
    Write feature values for multiple entities.

    Args:
        feature_name: Name of the feature
        records: List of records, each containing entity keys and feature values
        entity_key_columns: Column names that form the entity key

    Returns:
        Number of records written
    """
    ...

RedisStore

mlforge.online.RedisStore

Bases: OnlineStore

Redis-backed online feature store.

Stores feature values as JSON in Redis with optional TTL. Uses Redis pipelines for efficient batch operations.

Key format: mlforge:{feature_name}:{entity_key_hash} Value format: JSON serialized feature values

Attributes:

Name Type Description
host

Redis server hostname

port

Redis server port

db

Redis database number

password

Redis password (optional)

ttl

Time-to-live in seconds (optional, None = no expiry)

prefix

Key prefix (default: "mlforge")

Example

store = RedisStore(host="localhost", port=6379, ttl=3600)

Write single entity

store.write( feature_name="user_spend", entity_keys={"user_id": "user_123"}, values={"amount__sum__7d": 1500.0}, )

Read single entity

features = store.read( feature_name="user_spend", entity_keys={"user_id": "user_123"}, )

Returns:

Source code in src/mlforge/online.py
class RedisStore(OnlineStore):
    """
    Redis-backed online feature store.

    Stores feature values as JSON in Redis with optional TTL.
    Uses Redis pipelines for efficient batch operations.

    Key format: mlforge:{feature_name}:{entity_key_hash}
    Value format: JSON serialized feature values

    Attributes:
        host: Redis server hostname
        port: Redis server port
        db: Redis database number
        password: Redis password (optional)
        ttl: Time-to-live in seconds (optional, None = no expiry)
        prefix: Key prefix (default: "mlforge")

    Example:
        store = RedisStore(host="localhost", port=6379, ttl=3600)

        # Write single entity
        store.write(
            feature_name="user_spend",
            entity_keys={"user_id": "user_123"},
            values={"amount__sum__7d": 1500.0},
        )

        # Read single entity
        features = store.read(
            feature_name="user_spend",
            entity_keys={"user_id": "user_123"},
        )
        # Returns: {"amount__sum__7d": 1500.0}
    """

    def __init__(
        self,
        host: str = "localhost",
        port: int = 6379,
        db: int = 0,
        password: str | None = None,
        ttl: int | None = None,
        prefix: str = "mlforge",
    ) -> None:
        """
        Initialize Redis online store.

        Args:
            host: Redis server hostname. Defaults to "localhost".
            port: Redis server port. Defaults to 6379.
            db: Redis database number. Defaults to 0.
            password: Redis password. Defaults to None.
            ttl: Time-to-live in seconds. Defaults to None (no expiry).
            prefix: Key prefix for all keys. Defaults to "mlforge".

        Raises:
            ImportError: If redis package is not installed
        """
        try:
            import redis
        except ImportError as e:
            raise ImportError(
                "Redis package not installed. "
                "Install with: pip install mlforge[redis]"
            ) from e

        self.host = host
        self.port = port
        self.db = db
        self.password = password
        self.ttl = ttl
        self.prefix = prefix

        self._client = redis.Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            decode_responses=True,
        )

    def _build_key(self, feature_name: str, entity_keys: dict[str, str]) -> str:
        """Build Redis key with configured prefix."""
        entity_hash = _compute_entity_hash(entity_keys)
        return f"{self.prefix}:{feature_name}:{entity_hash}"

    @override
    def write(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
        values: dict[str, Any],
    ) -> None:
        """
        Write feature values for a single entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values
            values: Feature column values to store
        """
        key = self._build_key(feature_name, entity_keys)
        value_json = json.dumps(values)

        if self.ttl:
            self._client.setex(key, self.ttl, value_json)
        else:
            self._client.set(key, value_json)

    @override
    def write_batch(
        self,
        feature_name: str,
        records: list[dict[str, Any]],
        entity_key_columns: list[str],
    ) -> int:
        """
        Write feature values for multiple entities using Redis pipeline.

        Args:
            feature_name: Name of the feature
            records: List of records with entity keys and feature values
            entity_key_columns: Column names that form the entity key

        Returns:
            Number of records written
        """
        if not records:
            return 0

        pipe = self._client.pipeline()

        for record in records:
            # Extract entity keys
            entity_keys = {col: str(record[col]) for col in entity_key_columns}

            # Extract feature values (all columns except entity keys)
            values = {
                k: v for k, v in record.items() if k not in entity_key_columns
            }

            key = self._build_key(feature_name, entity_keys)
            value_json = json.dumps(values)

            if self.ttl:
                pipe.setex(key, self.ttl, value_json)
            else:
                pipe.set(key, value_json)

        pipe.execute()
        return len(records)

    @override
    def read(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
    ) -> dict[str, Any] | None:
        """
        Read feature values for a single entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values

        Returns:
            Feature values dict, or None if not found
        """
        key = self._build_key(feature_name, entity_keys)
        value = cast(str | None, self._client.get(key))

        if value is None:
            return None

        return json.loads(value)

    @override
    def read_batch(
        self,
        feature_name: str,
        entity_keys: list[dict[str, str]],
    ) -> list[dict[str, Any] | None]:
        """
        Read feature values for multiple entities using Redis pipeline.

        Args:
            feature_name: Name of the feature
            entity_keys: List of entity key dicts

        Returns:
            List of feature value dicts (None for missing entities)
        """
        if not entity_keys:
            return []

        pipe = self._client.pipeline()

        for keys in entity_keys:
            key = self._build_key(feature_name, keys)
            pipe.get(key)

        results = pipe.execute()

        return [
            json.loads(value) if value is not None else None
            for value in results
        ]

    @override
    def delete(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
    ) -> bool:
        """
        Delete feature values for a single entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values

        Returns:
            True if deleted, False if not found
        """
        key = self._build_key(feature_name, entity_keys)
        deleted = cast(int, self._client.delete(key))
        return deleted > 0

    @override
    def exists(
        self,
        feature_name: str,
        entity_keys: dict[str, str],
    ) -> bool:
        """
        Check if feature values exist for an entity.

        Args:
            feature_name: Name of the feature
            entity_keys: Entity key columns and values

        Returns:
            True if exists, False otherwise
        """
        key = self._build_key(feature_name, entity_keys)
        count = cast(int, self._client.exists(key))
        return count > 0

    def ping(self) -> bool:
        """
        Check Redis connection.

        Returns:
            True if connected, False otherwise
        """
        try:
            return bool(self._client.ping())
        except Exception:
            return False

__init__

__init__(
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    password: str | None = None,
    ttl: int | None = None,
    prefix: str = "mlforge",
) -> None

Initialize Redis online store.

Parameters:

Name Type Description Default
host str

Redis server hostname. Defaults to "localhost".

'localhost'
port int

Redis server port. Defaults to 6379.

6379
db int

Redis database number. Defaults to 0.

0
password str | None

Redis password. Defaults to None.

None
ttl int | None

Time-to-live in seconds. Defaults to None (no expiry).

None
prefix str

Key prefix for all keys. Defaults to "mlforge".

'mlforge'

Raises:

Type Description
ImportError

If redis package is not installed

Source code in src/mlforge/online.py
def __init__(
    self,
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    password: str | None = None,
    ttl: int | None = None,
    prefix: str = "mlforge",
) -> None:
    """
    Initialize Redis online store.

    Args:
        host: Redis server hostname. Defaults to "localhost".
        port: Redis server port. Defaults to 6379.
        db: Redis database number. Defaults to 0.
        password: Redis password. Defaults to None.
        ttl: Time-to-live in seconds. Defaults to None (no expiry).
        prefix: Key prefix for all keys. Defaults to "mlforge".

    Raises:
        ImportError: If redis package is not installed
    """
    try:
        import redis
    except ImportError as e:
        raise ImportError(
            "Redis package not installed. "
            "Install with: pip install mlforge[redis]"
        ) from e

    self.host = host
    self.port = port
    self.db = db
    self.password = password
    self.ttl = ttl
    self.prefix = prefix

    self._client = redis.Redis(
        host=host,
        port=port,
        db=db,
        password=password,
        decode_responses=True,
    )

delete

delete(
    feature_name: str, entity_keys: dict[str, str]
) -> bool

Delete feature values for a single entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/mlforge/online.py
@override
def delete(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
) -> bool:
    """
    Delete feature values for a single entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values

    Returns:
        True if deleted, False if not found
    """
    key = self._build_key(feature_name, entity_keys)
    deleted = cast(int, self._client.delete(key))
    return deleted > 0

exists

exists(
    feature_name: str, entity_keys: dict[str, str]
) -> bool

Check if feature values exist for an entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values

required

Returns:

Type Description
bool

True if exists, False otherwise

Source code in src/mlforge/online.py
@override
def exists(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
) -> bool:
    """
    Check if feature values exist for an entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values

    Returns:
        True if exists, False otherwise
    """
    key = self._build_key(feature_name, entity_keys)
    count = cast(int, self._client.exists(key))
    return count > 0

ping

ping() -> bool

Check Redis connection.

Returns:

Type Description
bool

True if connected, False otherwise

Source code in src/mlforge/online.py
def ping(self) -> bool:
    """
    Check Redis connection.

    Returns:
        True if connected, False otherwise
    """
    try:
        return bool(self._client.ping())
    except Exception:
        return False

read

read(
    feature_name: str, entity_keys: dict[str, str]
) -> dict[str, Any] | None

Read feature values for a single entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values

required

Returns:

Type Description
dict[str, Any] | None

Feature values dict, or None if not found

Source code in src/mlforge/online.py
@override
def read(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
) -> dict[str, Any] | None:
    """
    Read feature values for a single entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values

    Returns:
        Feature values dict, or None if not found
    """
    key = self._build_key(feature_name, entity_keys)
    value = cast(str | None, self._client.get(key))

    if value is None:
        return None

    return json.loads(value)

read_batch

read_batch(
    feature_name: str, entity_keys: list[dict[str, str]]
) -> list[dict[str, Any] | None]

Read feature values for multiple entities using Redis pipeline.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys list[dict[str, str]]

List of entity key dicts

required

Returns:

Type Description
list[dict[str, Any] | None]

List of feature value dicts (None for missing entities)

Source code in src/mlforge/online.py
@override
def read_batch(
    self,
    feature_name: str,
    entity_keys: list[dict[str, str]],
) -> list[dict[str, Any] | None]:
    """
    Read feature values for multiple entities using Redis pipeline.

    Args:
        feature_name: Name of the feature
        entity_keys: List of entity key dicts

    Returns:
        List of feature value dicts (None for missing entities)
    """
    if not entity_keys:
        return []

    pipe = self._client.pipeline()

    for keys in entity_keys:
        key = self._build_key(feature_name, keys)
        pipe.get(key)

    results = pipe.execute()

    return [
        json.loads(value) if value is not None else None
        for value in results
    ]

write

write(
    feature_name: str,
    entity_keys: dict[str, str],
    values: dict[str, Any],
) -> None

Write feature values for a single entity.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
entity_keys dict[str, str]

Entity key columns and values

required
values dict[str, Any]

Feature column values to store

required
Source code in src/mlforge/online.py
@override
def write(
    self,
    feature_name: str,
    entity_keys: dict[str, str],
    values: dict[str, Any],
) -> None:
    """
    Write feature values for a single entity.

    Args:
        feature_name: Name of the feature
        entity_keys: Entity key columns and values
        values: Feature column values to store
    """
    key = self._build_key(feature_name, entity_keys)
    value_json = json.dumps(values)

    if self.ttl:
        self._client.setex(key, self.ttl, value_json)
    else:
        self._client.set(key, value_json)

write_batch

write_batch(
    feature_name: str,
    records: list[dict[str, Any]],
    entity_key_columns: list[str],
) -> int

Write feature values for multiple entities using Redis pipeline.

Parameters:

Name Type Description Default
feature_name str

Name of the feature

required
records list[dict[str, Any]]

List of records with entity keys and feature values

required
entity_key_columns list[str]

Column names that form the entity key

required

Returns:

Type Description
int

Number of records written

Source code in src/mlforge/online.py
@override
def write_batch(
    self,
    feature_name: str,
    records: list[dict[str, Any]],
    entity_key_columns: list[str],
) -> int:
    """
    Write feature values for multiple entities using Redis pipeline.

    Args:
        feature_name: Name of the feature
        records: List of records with entity keys and feature values
        entity_key_columns: Column names that form the entity key

    Returns:
        Number of records written
    """
    if not records:
        return 0

    pipe = self._client.pipeline()

    for record in records:
        # Extract entity keys
        entity_keys = {col: str(record[col]) for col in entity_key_columns}

        # Extract feature values (all columns except entity keys)
        values = {
            k: v for k, v in record.items() if k not in entity_key_columns
        }

        key = self._build_key(feature_name, entity_keys)
        value_json = json.dumps(values)

        if self.ttl:
            pipe.setex(key, self.ttl, value_json)
        else:
            pipe.set(key, value_json)

    pipe.execute()
    return len(records)