EventStream.data.dataset_polars module¶
The polars implementation of the Dataset class.
- EventStream.data.dataset_polars.INPUT_DF_T¶
The types of supported input dataframes, which includes paths, pandas dataframes, polars dataframes, or queries.
- EventStream.data.dataset_polars.DF_T¶
The types of supported dataframes, which include polars lazyframes, dataframes, expressions, or series.
-
class EventStream.data.dataset_polars.Dataset(config: DatasetConfig, subjects_df: DF_T | None =
None, events_df: DF_T | None =None, dynamic_measurements_df: DF_T | None =None, input_schema: DatasetSchema | None =None, **kwargs)[source]¶ Bases:
DatasetBase[LazyFrame|DataFrame|Expr|Series,Path|DataFrame|DataFrame|Query]The polars specific implementation of the dataset.
- Parameters:¶
- config: DatasetConfig¶
Configuration object for this dataset.
- subjects_df: DF_T | None =
None¶ The dataframe containing all static, subject-level data. If this is specified,
events_dfanddynamic_measurements_dfshould also be specified. Otherwise, this will be built from source via the extraction pipeline defined ininput_schema.- events_df: DF_T | None =
None¶ The dataframe containing all event timestamps, types, and subject IDs. If this is specified,
subjects_dfanddynamic_measurements_dfshould also be specified. Otherwise, this will be built from source via the extraction pipeline defined ininput_schema.- dynamic_measurements_df: DF_T | None =
None¶ The dataframe containing all time-varying measurement observations. If this is specified,
subjects_dfandevents_dfshould also be specified. Otherwise, this will be built from source via the extraction pipeline defined ininput_schema.- input_schema: DatasetSchema | None =
None¶ The schema configuration object to define the extraction pipeline for pulling raw data from source and produce the
subjects_df,events_df,dynamic_measurements_dfinput view.
-
METADATA_SCHEMA =
{'censor_lower_bound': Float64, 'censor_upper_bound': Float64, 'drop_lower_bound': Float64, 'drop_lower_bound_inclusive': Boolean, 'drop_upper_bound': Float64, 'drop_upper_bound_inclusive': Boolean, 'normalizer': <function Dataset.<lambda>>, 'outlier_model': <function Dataset.<lambda>>, 'value_type': Categorical}¶ The Polars schema of the numerical measurement metadata dataframes which track fit parameters.
-
PREPROCESSORS : dict[str, Preprocessor] =
{'standard_scaler': <class 'EventStream.data.preprocessing.standard_scaler.StandardScaler'>, 'stddev_cutoff': <class 'EventStream.data.preprocessing.stddev_cutoff.StddevCutoffOutlierDetector'>}¶ A dictionary containing the valid pre-processors that can be used by this model class.
-
STREAMING =
True¶ Execute any lazy query in streaming mode.
-
WRITE_USE_PYARROW =
False¶ Use C++ parquet implementation vs Rust parquet implementation for writing parquets.
-
build_DL_cached_representation(subject_ids: list[int] | None =
None, do_sort_outputs: bool =False) LazyFrame | DataFrame | Expr | Series[source]¶ Produces the deep learning format dataframe described previously for the passed subjects:
-
static drop_or_censor(col: Expr, drop_lower_bound: Expr | None =
None, drop_lower_bound_inclusive: Expr | None =None, drop_upper_bound: Expr | None =None, drop_upper_bound_inclusive: Expr | None =None, censor_lower_bound: Expr | None =None, censor_upper_bound: Expr | None =None, **ignored_kwargs) Expr[source]¶ Appropriately either drops (returns np.NaN) or censors (returns the censor value) the value
valbased on the bounds inrow.TODO(mmd): could move this code to an outlier model in Preprocessing and have it be one that is pre-set in metadata.
- Parameters:¶
- val
The value to drop, censor, or return unchanged.
- drop_lower_bound: Expr | None =
None¶ A lower bound such that if
valis either below or at or below this level,np.NaNwill be returned. IfNoneornp.NaN, no bound will be applied.- drop_lower_bound_inclusive: Expr | None =
None¶ If
True, returnsnp.NaNifval <= row['drop_lower_bound']. Else, returnsnp.NaNifval < row['drop_lower_bound'].- drop_upper_bound: Expr | None =
None¶ An upper bound such that if
valis either above or at or above this level,np.NaNwill be returned. IfNoneornp.NaN, no bound will be applied.- drop_upper_bound_inclusive: Expr | None =
None¶ If
True, returnsnp.NaNifval >= row['drop_upper_bound']. Else, returnsnp.NaNifval > row['drop_upper_bound'].- censor_lower_bound: Expr | None =
None¶ A lower bound such that if
valis below this level but abovedrop_lower_bound,censor_lower_boundwill be returned. IfNoneornp.NaN, no bound will be applied.- censor_upper_bound: Expr | None =
None¶ An upper bound such that if
valis above this level but belowdrop_upper_bound,censor_upper_boundwill be returned. IfNoneornp.NaN, no bound will be applied.
- get_metadata_schema(config: MeasurementConfig) dict[str, DataType][source]¶
- static get_smallest_valid_uint_type(num: int | float | Expr) DataType[source]¶
Returns the smallest valid unsigned integral type for an ID variable with
numunique options.- Parameters:¶
- Raises:¶
ValueError – If there is no unsigned int type big enough to express the passed number of ID variables.
Examples
>>> import polars as pl >>> Dataset.get_smallest_valid_uint_type(num=1) UInt8 >>> Dataset.get_smallest_valid_uint_type(num=2**8-1) UInt16 >>> Dataset.get_smallest_valid_uint_type(num=2**16-1) UInt32 >>> Dataset.get_smallest_valid_uint_type(num=2**32-1) UInt64 >>> Dataset.get_smallest_valid_uint_type(num=2**64-1) Traceback (most recent call last): ... ValueError: Value is too large to be expressed as an int!
-
class EventStream.data.dataset_polars.Query(connection_uri: str, query: str | Path | list[str | Path], partition_on: str | None =
None, partition_num: int | None =None, protocol: str ='binary')[source]¶ Bases:
objectA structure for database query based input dataframes.
- Parameters:¶
- connection_uri: str¶
The connection URI for the database. This is in the `connectorx`_ format.
- query: str | Path | list[str | Path]¶
The query to be run over the database. It can be specified either as a direct string, a path to a file on disk containing the query in txt format, or a list of said options.
- partition_on: str | None =
None¶ If the query should be partitioned, on what column should it be partitioned? See the `polars documentation`_ for more details.
- partition_num: int | None =
None¶ If the query should be partitioned, into how many partitions should it be divided? See the `polars documentation`_ for more details.
- protocol: str =
'binary'¶ The `connectorx`_ backend protocol.