EventStream.data.dataset_polars module

The polars implementation of the Dataset class.

EventStream.data.dataset_polars.INPUT_DF_T

The types of supported input dataframes, which includes paths, pandas dataframes, polars dataframes, or queries.

EventStream.data.dataset_polars.DF_T

The types of supported dataframes, which include polars lazyframes, dataframes, expressions, or series.

class EventStream.data.dataset_polars.Dataset(config: DatasetConfig, subjects_df: DF_T | None = None, events_df: DF_T | None = None, dynamic_measurements_df: DF_T | None = None, input_schema: DatasetSchema | None = None, **kwargs)[source]

Bases: DatasetBase[LazyFrame | DataFrame | Expr | Series, Path | DataFrame | DataFrame | Query]

The polars specific implementation of the dataset.

Parameters:
config: DatasetConfig

Configuration object for this dataset.

subjects_df: DF_T | None = None

The dataframe containing all static, subject-level data. If this is specified, events_df and dynamic_measurements_df should also be specified. Otherwise, this will be built from source via the extraction pipeline defined in input_schema.

events_df: DF_T | None = None

The dataframe containing all event timestamps, types, and subject IDs. If this is specified, subjects_df and dynamic_measurements_df should also be specified. Otherwise, this will be built from source via the extraction pipeline defined in input_schema.

dynamic_measurements_df: DF_T | None = None

The dataframe containing all time-varying measurement observations. If this is specified, subjects_df and events_df should also be specified. Otherwise, this will be built from source via the extraction pipeline defined in input_schema.

input_schema: DatasetSchema | None = None

The schema configuration object to define the extraction pipeline for pulling raw data from source and produce the subjects_df, events_df, dynamic_measurements_df input view.

property ESDS_schema : schema
METADATA_SCHEMA = {'censor_lower_bound': Float64, 'censor_upper_bound': Float64, 'drop_lower_bound': Float64, 'drop_lower_bound_inclusive': Boolean, 'drop_upper_bound': Float64, 'drop_upper_bound_inclusive': Boolean, 'mean': Float64, 'std': Float64, 'thresh_high': Float64, 'thresh_low': Float64, 'value_type': Categorical}

The Polars schema of the numerical measurement metadata dataframes which track fit parameters.

STREAMING = True

Execute any lazy query in streaming mode.

WRITE_USE_PYARROW = False

Use C++ parquet implementation vs Rust parquet implementation for writing parquets.

build_DL_cached_representation(subject_ids: list[int] | None = None, do_sort_outputs: bool = False) LazyFrame | DataFrame | Expr | Series[source]

Produces the deep learning format dataframe described previously for the passed subjects:

build_ESDS_representation(subject_ids: list[int] | None = None, do_sort_outputs: bool = False) DataFrame[source]
static drop_or_censor(col: Expr, drop_lower_bound: Expr | None = None, drop_lower_bound_inclusive: Expr | None = None, drop_upper_bound: Expr | None = None, drop_upper_bound_inclusive: Expr | None = None, censor_lower_bound: Expr | None = None, censor_upper_bound: Expr | None = None, **ignored_kwargs) Expr[source]

Appropriately either drops (returns float(‘nan’)) or censors (returns the censor value) the value val based on the bounds in row.

TODO(mmd): could move this code to an outlier model in Preprocessing and have it be one that is pre-set in metadata.

Parameters:
val

The value to drop, censor, or return unchanged.

drop_lower_bound: Expr | None = None

A lower bound such that if val is either below or at or below this level, float('nan') will be returned. If None or float('nan'), no bound will be applied.

drop_lower_bound_inclusive: Expr | None = None

If True, returns float('nan') if val <= row['drop_lower_bound']. Else, returns float('nan') if val < row['drop_lower_bound'].

drop_upper_bound: Expr | None = None

An upper bound such that if val is either above or at or above this level, float('nan') will be returned. If None or float('nan'), no bound will be applied.

drop_upper_bound_inclusive: Expr | None = None

If True, returns float('nan') if val >= row['drop_upper_bound']. Else, returns float('nan') if val > row['drop_upper_bound'].

censor_lower_bound: Expr | None = None

A lower bound such that if val is below this level but above drop_lower_bound, censor_lower_bound will be returned. If None or float('nan'), no bound will be applied.

censor_upper_bound: Expr | None = None

An upper bound such that if val is above this level but below drop_upper_bound, censor_upper_bound will be returned. If None or float('nan'), no bound will be applied.

get_metadata_schema(config: MeasurementConfig) dict[str, DataType][source]
static get_smallest_valid_uint_type(num: int | float | Expr) DataType[source]

Returns the smallest valid unsigned integral type for an ID variable with num unique options.

Parameters:
num: int | float | Expr

The number of IDs that must be uniquely expressed.

Raises:

ValueError – If there is no unsigned int type big enough to express the passed number of ID variables.

Examples

>>> import polars as pl
>>> Dataset.get_smallest_valid_uint_type(num=1)
UInt8
>>> Dataset.get_smallest_valid_uint_type(num=2**8-1)
UInt16
>>> Dataset.get_smallest_valid_uint_type(num=2**16-1)
UInt32
>>> Dataset.get_smallest_valid_uint_type(num=2**32-1)
UInt64
>>> Dataset.get_smallest_valid_uint_type(num=2**64-1)
Traceback (most recent call last):
    ...
ValueError: Value is too large to be expressed as an int!
class EventStream.data.dataset_polars.Query(connection_uri: str, query: str | Path | list[str | Path], partition_on: str | None = None, partition_num: int | None = None, protocol: str = 'binary')[source]

Bases: object

A structure for database query based input dataframes.

Parameters:
connection_uri: str

The connection URI for the database. This is in the `connectorx`_ format.

query: str | Path | list[str | Path]

The query to be run over the database. It can be specified either as a direct string, a path to a file on disk containing the query in txt format, or a list of said options.

partition_on: str | None = None

If the query should be partitioned, on what column should it be partitioned? See the `polars documentation`_ for more details.

partition_num: int | None = None

If the query should be partitioned, into how many partitions should it be divided? See the `polars documentation`_ for more details.

protocol: str = 'binary'

The `connectorx`_ backend protocol.

connection_uri : str
partition_num : int | None = None
partition_on : str | None = None
protocol : str = 'binary'
query : str | Path | list[str | Path]