EventStream.data.dataset_base module

The base class for core dataset processing logic.

EventStream.data.dataset_base.INPUT_DF_T

This defines the type of the allowable input dataframes – e.g., databases, filepaths, dataframes, etc.

EventStream.data.dataset_base.DF_T

This defines the type of internal dataframes – e.g. polars DataFrames.

class EventStream.data.dataset_base.DatasetBase(config: DatasetConfig, subjects_df: DF_T | None = None, events_df: DF_T | None = None, dynamic_measurements_df: DF_T | None = None, input_schema: DatasetSchema | None = None, **kwargs)[source]

Bases: ABC, Generic[DF_T, INPUT_DF_T], SeedableMixin, SaveableMixin, TimeableMixin, TQDMableMixin

A unified base class for dataset objects using different processing libraries.

Parameters:
config: DatasetConfig

Configuration object for this dataset.

subjects_df: DF_T | None = None

The dataframe containing all static, subject-level data. If this is specified, events_df and dynamic_measurements_df should also be specified. Otherwise, this will be built from source via the extraction pipeline defined in input_schema.

events_df: DF_T | None = None

The dataframe containing all event timestamps, types, and subject IDs. If this is specified, subjects_df and dynamic_measurements_df should also be specified. Otherwise, this will be built from source via the extraction pipeline defined in input_schema.

dynamic_measurements_df: DF_T | None = None

The dataframe containing all time-varying measurement observations. If this is specified, subjects_df and events_df should also be specified. Otherwise, this will be built from source via the extraction pipeline defined in input_schema.

input_schema: DatasetSchema | None = None

The schema configuration object to define the extraction pipeline for pulling raw data from source and produce the subjects_df, events_df, dynamic_measurements_df input view.

DF_SAVE_FORMAT : str = 'parquet'

The save format for internal dataframes in this dataset.

DYNAMIC_MEASUREMENTS_FN : str = 'dynamic_measurements_df'

The name for the dynamic_measurements_df save file.

EVENTS_FN : str = 'events_df'

The name for the events_df save file.

SUBJECTS_FN : str = 'subjects_df'

The name for the subjects_df save file.

abstract build_DL_cached_representation(subject_ids: list[int] | None = None, do_sort_outputs: bool = False) DF_T[source]

Produces the deep learning format dataframe described previously for the passed subjects:

classmethod build_event_and_measurement_dfs(subject_id_col: str, schemas_by_df: dict[INPUT_DF_T, list[InputDFSchema]]) tuple[DF_T, DF_T][source]

Builds and returns events and measurements dataframes from the input schema map.

Parameters:
subject_id_col: str

The name of the column containing (input) subject IDs.

schemas_by_df: dict[INPUT_DF_T, list[InputDFSchema]]

A mapping from input dataframe to associated event/measurement schemas.

Returns:

Both the built events_df and dynamic_measurements_df.

classmethod build_subjects_dfs(schema: InputDFSchema) tuple[DF_T, dict[Hashable, int]][source]

Builds and returns the subjects dataframe from schema.

Parameters:
schema: InputDFSchema

The input schema defining the subjects dataframe. This will include a definition of the input dataframe, the subject ID column, the static measurements columns to load, etc.

Returns:

Both the built subjects_df as well as a dictionary from the raw subject ID column values to the inferred numeric subject IDs.

cache_deep_learning_representation(subjects_per_output_file: int | None = None, do_overwrite: bool = False)[source]

Writes a deep-learning friendly representation of the dataset to disk.

The deep learning format produced will have one row per subject, with the following columns:

  • subject_id: This column will be an unsigned integer type, and will have the ID of the subject for each row.

  • start_time: This column will be a datetime type, and will contain the start time of the subject’s record.

  • static_indices: This column is a ragged, sparse representation of the categorical static measurements observed for this subject. Each element of this column will itself be a list of unsigned integers corresponding to indices into the unified vocabulary for the static measurements observed for that subject.

  • static_measurement_indices: This column corresponds in shape to static_indices, but contains unsigned integer indices into the unified measurement vocabulary, defining to which measurement each observation corresponds. It is of the same shape and of a consistent order as static_indices.

  • time: This column is a ragged array of the time in minutes from the start time at which each event takes place. For a given row, the length of the array within this column corresponds to the number of events that subject has.

  • dynamic_indices: This column is a doubly ragged array containing the indices of the observed values within the unified vocabulary per event per subject. Each subject’s data for this column consists of an array of arrays, each containing only the indices observed at each event.

  • dynamic_measurement_indices This column is a doubly ragged array containing the indices of the observed measurements per event per subject. Each subject’s data for this column consists of an array of arrays, each containing only the indices of measurements observed at each event. It is of the same shape and of a consistent order as dynamic_indices.

  • dynamic_values This column is a doubly ragged array containing the indices of the observed measurements per event per subject. Each subject’s data for this column consists of an array of arrays, each containing only the indices of measurements observed at each event. It is of the same shape and of a consistent order as dynamic_indices.

Parameters:
subjects_per_output_file: int | None = None

How big to chunk the dataset down for writing to disk; larger values will make fewer chunks but increase the memory cost.

do_overwrite: bool = False

Whether or not to overwrite any existing file on disk.

cache_flat_representation(subjects_per_output_file: int | None = None, feature_inclusion_frequency: float | dict[str, float] | None = None, window_sizes: list[str] | None = None, include_only_measurements: set[str] | None = None, do_overwrite: bool = False, do_update: bool = True)[source]

Writes a flat (historically summarized) representation of the dataset to disk.

This file caches a set of files useful for building flat representations of the dataset to disk, suitable for, e.g., sklearn style modeling for downstream tasks. It will produce a few sets of files:

  • A new directory self.config.save_dir / "flat_reps" which contains the following:

  • A subdirectory raw which contains: (1) a json file with the configuration arguments and (2) a set of parquet files containing flat (e.g., wide) representations of summarized events per subject, broken out by split and subject chunk.

  • A set of subdirectories past/* which contains summarized views over the past * time period per subject per event, for all time periods in window_sizes, if any.

Parameters:
subjects_per_output_file: int | None = None

The number of subjects that should be included in each output file. Lowering this number increases the number of files written, making the process of creating and leveraging these files slower but more memory efficient.

feature_inclusion_frequency: float | dict[str, float] | None = None

The base feature inclusion frequency that should be used to dictate what features can be included in the flat representation. It can either be a float, in which case it applies across all measurements, or None, in which case no filtering is applied, or a dictionary from measurement type to a float dictating a per-measurement-type inclusion cutoff.

window_sizes: list[str] | None = None

Beyond writing out a raw, per-event flattened representation, the dataset also has the capability to summarize these flattened representations over the historical windows specified in this argument. These are strings specifying time deltas, using this syntax: link. Each window size will be summarized to a separate directory, and will share the same subject file split as is used in the raw representation files.

include_only_measurements: set[str] | None = None

Measurement types can also be filtered out wholesale from both representations. If this list is not None, only these measurements will be included.

do_overwrite: bool = False

If True, this function will overwrite the data already stored in the target save directory.

do_update: bool = True

If True, this function will (a) ensure that the parameters are the same or are mappable to one another (critically, _it may_ default to an existing subject split if one has been used historically, overwriting the specified subjects_per_output_file parameter!), then (b) attempt to write only those files that are not yet written to disk across the historical summarization targets.

describe(do_print_measurement_summaries: bool = True, viz_config: Visualizer | None = None) list[Figure] | None[source]

Describes the dataset, both in language and in figures.

property dynamic_measurements_df : DF_T

Lazily loads and/or returns the measurements dataframe from the implicit filepath.

This will return the _dynamic_measurements_df attribute, if defined and not None; otherwise, it will attempt to load the dynamic measurements dataframe from the implicit filepath defined by config.save_dir and DYNAMIC_MEASUREMENTS_FN.

classmethod dynamic_measurements_fp(save_dir: Path) Path[source]

Returns the filepath for the dynamic_measurements_df given save_dir and class parameters.

property dynamic_numerical_columns

Returns all numerical metadata column key-column, value-column pairs.

property events_df : DF_T

Lazily loads and/or returns the events dataframe from the implicit filepath.

This will return the _events_df attribute, if defined and not None; otherwise, it will attempt to load the events dataframe from the implicit filepath defined by config.save_dir and EVENTS_FN.

classmethod events_fp(save_dir: Path) Path[source]

Returns the filepath for the events_df given save_dir and class parameters.

fit_measurements()[source]

Fits all preprocessing parameters over the training dataset, according to self.config.

Raises:

ValueError – if fitting preprocessing parameters fails for a given measurement.

property has_static_measurements

Returns True if the dataset has any static measurements.

property held_out_dynamic_measurements_df : DF_T

Returns the held-out set split of dynamic_measurements_df.

property held_out_events_df : DF_T

Returns the held-out set split of events_df.

property held_out_subjects_df : DF_T

Returns the held-out set split of subjects_df.

classmethod load(load_dir: Path) DatasetBase[source]

Loads and returns a dataset from disk.

This function re-loads an instance of the calling class from disk. This function assumes that files are stored on disk in the following, distributed format:

  • The base configuration object is stored in the file 'config.json', in JSON format.

  • If the saved dataset has already been fit, then the pre-processed measurement configs with inferred parameters are stroed in 'inferred_measurement_configs.json', in JSON format. Note that these configs may in turn store their own attributes in further files, such as their measurement_metadata dataframes, which are stored on disk in separate files to facilitate lazy loading.

  • The raw or fully pre-processed subjects, events, and measurements dataframes are stored in their respective filenames (SUBJECTS_FN, EVENTS_FN, DYNAMIC_MEASUREMENTS_FN).

  • Remaining attributes are stored in pickle format at 'E.pkl'.

Parameters:
load_dir: Path

The path to the directory on disk from which the dataset should be loaded.

Raises:

FileNotFoundError – If either the attributes file or config file do not exist.

property measurement_configs

Errors if not fit; otherwise returns all fit, non-dropped measurement configs.

Raises:

ValueError – if is not fit.

property measurement_idxmaps

Accesses the fit vocabularies vocabulary idxmap objects, per measurement column.

property measurement_vocabs

Accesses the fit vocabularies vocabulary objects, per measurement column.

preprocess()[source]

Fits all pre-processing parameters over the train set, then transforms all observations.

This entails the following steps:

  1. First, filter out subjects that have too few events.

  2. Next, pre-compute the FUNCTIONAL_TIME_DEPENDENT temporality measurements and store their values in the events dataframe.

  3. Next, fit all pre-processing parameters over the observed measurements.

  4. Finally, transform all data via the fit pre-processing parameters.

save(**kwargs)[source]

Saves the calling object to disk, in the directory self.config.save_dir.

This function stores to disk the internal parameters of the calling object, in the following format:

  • The base configuration object is stored in the file 'config.json', in JSON format.

  • If the saved dataset has already been fit, then the pre-processed measurement configs with inferred parameters are stroed in 'inferred_measurement_configs.json', in JSON format. Note that these configs may in turn store their own attributes in further files, such as their measurement_metadata dataframes, which are stored on disk in separate files to facilitate lazy loading.

  • The raw or fully pre-processed subjects, events, and measurements dataframes are stored in their respective filenames (SUBJECTS_FN, EVENTS_FN, DYNAMIC_MEASUREMENTS_FN).

  • Remaining attributes are stored in pickle format at 'E.pkl'.

Parameters:
do_overwrite

Keyword only; if passed with a value evaluating to True, then the system will overwrite any files that exist, rather than erroring.

Raises:

FileExistsError – If any of the desired filepaths already exist and do_overwrite is False.

split(split_fracs: Sequence[float], split_names: Sequence[str] | None = None, mandatory_set_IDs: dict[str, set[int] | None] | None = None)[source]

Splits the underlying dataset into random sets by subject_id.

Parameters:
split_fracs: Sequence[float]

The fractional sizes of the desired splits. If it sums to < 1, the remainder will be tracked in an extra split at the end of the list. All split fractions must be positive floating point numbers less than 1.

split_names: Sequence[str] | None = None

If specified, assigns the passed names to each split. Must be of the same size as split_fracs (after it is expanded to sum to 1 if necessary). If unset, and there are two splits, it defaults to [train, held_out]. If there are three, it defaults to ['train', 'tuning', 'held_out']. If more than 3, it defaults to `['split_0', 'split_1', ...]. Split names of train, tuning, and held_out have special significance and are used elsewhere in the model, so if split_names does not reflect those other things may not work down the line.

mandatory_set_IDs: dict[str, set[int] | None] | None = None

Maps split name to an optional set of subject IDs that make up that split. If a split name is included in mandatory_set_IDs, it should _not_ be included in split_fracs as the size of the split is determined by the IDs in this object. Any IDs in this object will be excluded from _all_ other splits and split_fractions will be taken over the remaining, unused IDs.

Raises:

ValueError – if split_fracs contains anything outside the range of (0, 1], sums to something > 1, or is not of the same length as split_names.

property subjects_df : DF_T

Lazily loads and/or returns the subjects dataframe from the implicit filepath.

This will return the _subjects_df attribute, if defined and not None; otherwise, it will attempt to load the subjects dataframe from the implicit filepath defined by config.save_dir and SUBJECTS_FN.

classmethod subjects_fp(save_dir: Path) Path[source]

Returns the filepath for the subjects_df given save_dir and class parameters.

property time_dependent_numerical_columns

Returns all numerical metadata column key-column, value-column pairs.

property train_dynamic_measurements_df : DF_T

Returns the train set split of dynamic_measurements_df.

property train_events_df : DF_T

Returns the train set split of events_df.

property train_subjects_df : DF_T

Returns the train set split of subjects_df.

transform_measurements()[source]

Transforms the entire dataset given the fit preprocessing parameters.

Raises:

ValueError – If transforming fails for a given measurement.

property tuning_dynamic_measurements_df : DF_T

Returns the tuning set split of dynamic_measurements_df.

property tuning_events_df : DF_T

Returns the tuning set split of events_df.

property tuning_subjects_df : DF_T

Returns the tuning set split of subjects_df.

property unified_measurements_idxmap : dict[str, int]

Returns a unified idxmap of observed measurements.

property unified_measurements_vocab : list[str]

Returns a unified vocabulary of observed measurements.

property unified_vocabulary_flat : list[str]
property unified_vocabulary_idxmap : dict[str, dict[str, int]]

Provides a unified idxmap spanning all measurements’ vocabularies (concatenated via offsets).

property unified_vocabulary_offsets : dict[str, int]

Returns a set of offsets detailing at what position each measurement’s vocab starts.

visualize(viz_config: Visualizer) list[Figure][source]

Visualizes the dataset, along several axes.

property vocabulary_config : VocabularyConfig

Returns the implied VocabularyConfig object corresponding to this (fit) dataset.

This property collates vocabulary information across all measurements into a format that is concise, but complete for downstream DL applications.