Source code for EventStream.data.preprocessing.preprocessor
"""The base class for Polars friendly data pre-processors.
This file contains the abstract base class for polars pre-processors. It is just used to define the interface
expected by the data preprocessing pipeline. Subclasses (defined in other files in this module) contain actual
implementations of algorithms.
"""
from abc import ABC, abstractmethod
import polars as pl
[docs]
class Preprocessor(ABC):
"""The base class for Polars friendly data pre-processors.
This should be sub-classed by implementation classes for concrete implementations. Must define the schema
of the output column produced by the pre-processor, the fit method which extracts those parameters from
the raw data via a Polars expression, and the predict method which applies the pre-processing to a data
column expression using another column containing the model parameters for that data element.
"""
[docs]
@classmethod
@abstractmethod
def params_schema(cls) -> dict[str, pl.DataType]:
"""The schema of the output column produced by the pre-processor.
Must be implemented by a sub-class.
Returns:
dict[str, pl.DataType]:
The schema of the output column produced by the pre-processor, as a mapping from field names
to polars data types.
"""
raise NotImplementedError("Subclass must implement abstract method")
[docs]
@abstractmethod
def fit_from_polars(self, column: pl.Expr) -> pl.Expr:
"""Fit the pre-processing model over the data contained in `column`.
Performs the logic necessary to fit the pre-processing model over the data in the input column. As the
input column is a polars expression, it does not contain materialized data, but rather just references
a column operation that could be run to produce materialized data. The pre-processing logic must be
consistent with that assumption. Must be implemented by a sub-class. The logic used in this method
must be applicable for use in both a select and a groupby aggregation context.
Arguments:
column: The Polars expression for the column containing the raw data to be pre-processed.
Returns:
pl.Expr:
The Polars expression for a column that would materialize the resulting pre-processing model
parameters.
"""
raise NotImplementedError("Subclass must implement abstract method")
[docs]
@classmethod
@abstractmethod
def predict_from_polars(cls, column: pl.Expr, model_column: pl.Expr) -> pl.Expr:
"""Predicts for the data in `column` given the fit parameters in `model_column`.
Performs the logic necessary to "predict" as defined by the implementing subclass over the data in the
input column according to the parameters in the fit model column. As both input columns are polars
expressions, they do not contain materialized data, but rather just references column operations that
could be run to produce materialized data. The pre-processing logic must be consistent with that
assumption. Must be implemented by a sub-class. The logic used in this method must be applicable for
use in both a select and a groupby aggregation context.
Arguments:
column: The Polars expression for the column containing the raw data to be pre-processed.
model_column: The Polars expression for the column containing the pre-processing model parameters.
Returns:
pl.Expr:
The Polars expression for a column that would materialize the pre-processed outputs for the
input data given the pre-processing model parameters.
"""
raise NotImplementedError("Subclass must implement abstract method")