EventStream.data.config module

Various configuration classes for EventStream data objects.

class EventStream.data.config.DatasetConfig(measurement_configs: dict[str, ~EventStream.data.config.MeasurementConfig] = <factory>, min_events_per_subject: int | None = None, agg_by_time_scale: str | None = '1h', min_valid_column_observations: int | float | None = None, min_valid_vocab_element_observations: int | float | None = None, min_true_float_frequency: float | None = None, min_unique_numerical_observations: int | float | None = None, outlier_detector_config: dict[str, ~typing.Any] | None = None, center_and_scale: bool = True, save_dir: ~pathlib.Path | None = None)[source]

Bases: JSONableMixin

Configuration options for a Dataset class.

This is the core configuration object for Dataset objects. Contains configuration options for pre-processing a dataset already in the “Subject-Events-Measurements” data model or interpreting an existing dataset. This configures details such as

  1. Which measurements should be extracted and included in the raw dataset, via the measurement_configs arg.

  2. What filtering parameters should be applied to eliminate infrequently observed variables or columns.

  3. How/whether or not numerical values should be re-cast as categorical or integral types.

  4. Configuration options for outlier detector or normalization models.

  5. Time aggregation controls.

  6. The output save directory.

These configuration options do not include options to extract the raw dataset from source. For options for raw dataset extraction, see DatasetSchema and InputDFSchema, and for options for the raw script builder, see configs/dataset_base.yml.

measurement_configs

The dataset configuration for this Dataset. Keys are measurement names, and values are MeasurementConfig objects detailing configuration parameters for that measure. Measurement names / dictionary keys are also used as source columns for the data of that measure, though in the case of DataModality.MULTIVARIATE_REGRESSION measures, this name will reference the categorical regression target index column and the config will also contain a reference to a values column name which points to the column containing the associated numerical values. Columns not referenced in any configs are not pre-processed. Measurement configs are checked for validity upon creation. Dictionary keys must match measurement config object names if such are specified; if measurement config object names are not specified, they will be set to their associated dictionary keys.

Type:

dict[str, EventStream.data.config.MeasurementConfig]

min_valid_column_observations

The minimum number of column observations or proportion of possible events that contain a column that must be observed for the column to be included in the training set. If fewer than this many observations are observed, the entire column will be dropped. Can be either an integer count or a proportion (of total vocabulary size) in (0, 1). If None, no constraint is applied.

Type:

int | float | None

min_valid_vocab_element_observations

The minimum number or proportion of observations of a particular metadata vocabulary element that must be observed for the element to be included in the training set vocabulary. If fewer than this many observations are observed, observed elements will be dropped. Can be either an integer count or a proportion (of total vocabulary size) in (0, 1). If None, no constraint is applied.

Type:

int | float | None

min_true_float_frequency

The minimum proportion of true float values that must be observed in order for observations to be treated as true floating point numbers, not integers.

Type:

float | None

min_unique_numerical_observations

The minimum number of unique values a numerical column must have in the training set to be treated as a numerical type (rather than an implied categorical or ordinal type). Numerical entries with fewer than this many observations will be converted to categorical or ordinal types. Can be either an integer count or a proportion (of total numerical observations) in (0, 1). If None, no constraint is applied.

Type:

int | float | None

outlier_detector_config

Configuration options for outlier detection. If not None, must contain the key 'cls', which points to the class used outlier detection. All other keys and values are keyword arguments to be passed to the specified class. The API of these objects is expected to mirror scikit-learn outlier detection model APIs. If None, numerical outlier values are not removed.

Type:

dict[str, Any] | None

center_and_scale

Whether or not to center and scale numerical values.

Type:

bool

save_dir

The output save directory for this dataset. Will be converted to a pathlib.Path upon creation if it is not already one.

Type:

pathlib.Path | None

agg_by_time_scale

Aggregate events into temporal buckets at this frequency. Uses the string language described here: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.group_by_dynamic.html

Type:

str | None

Raises:
  • ValueError – If configuration parameters are invalid (e.g., proportion parameters being > 1, etc.).

  • TypeError – If configuration parameters are of invalid types.

Examples

>>> cfg = DatasetConfig(
...     measurement_configs={
...         "meas1": MeasurementConfig(
...             temporality=TemporalityType.DYNAMIC,
...             modality=DataModality.MULTI_LABEL_CLASSIFICATION,
...         ),
...     },
...     min_valid_column_observations=0.5,
...     save_dir="/path/to/save/dir",
... )
>>> cfg.save_dir
PosixPath('/path/to/save/dir')
>>> cfg.to_dict() 
{'measurement_configs':
    {'meas1':
        {'name': 'meas1',
         'temporality': <TemporalityType.DYNAMIC: 'dynamic'>,
         'modality': <DataModality.MULTI_LABEL_CLASSIFICATION: 'multi_label_classification'>,
         'observation_rate_over_cases': None,
         'observation_rate_per_case': None,
         'functor': None,
         'vocabulary': None,
         'values_column': None,
         '_measurement_metadata': None,
         'modifiers': None}},
    'min_events_per_subject': None,
    'agg_by_time_scale': '1h',
    'min_valid_column_observations': 0.5,
    'min_valid_vocab_element_observations': None,
    'min_true_float_frequency': None,
    'min_unique_numerical_observations': None,
    'outlier_detector_config': None,
    'center_and_scale': True,
    'save_dir': '/path/to/save/dir'}
>>> cfg2 = DatasetConfig.from_dict(cfg.to_dict())
>>> assert cfg == cfg2
>>> DatasetConfig(
...     measurement_configs={
...         "meas1": MeasurementConfig(
...             name="invalid_name",
...             temporality=TemporalityType.DYNAMIC,
...             modality=DataModality.MULTI_LABEL_CLASSIFICATION,
...         ),
...     },
... )
Traceback (most recent call last):
    ...
ValueError: Measurement config meas1 has name invalid_name which differs from dict key!
>>> DatasetConfig(
...     min_valid_column_observations="invalid type"
... )
Traceback (most recent call last):
    ...
TypeError: min_valid_column_observations must either be a fraction (float between 0 and 1) or count (int > 1). Got <class 'str'> of invalid type
>>> measurement_configs = {
...     "meas1": MeasurementConfig(
...         temporality=TemporalityType.DYNAMIC,
...         modality=DataModality.MULTI_LABEL_CLASSIFICATION,
...     ),
... }
>>> # Make one of the measurements invalid to show that validitiy is re-checked...
>>> measurement_configs["meas1"].temporality = None
>>> DatasetConfig(
...     measurement_configs=measurement_configs,
...     min_valid_column_observations=0.5,
...     save_dir="/path/to/save/dir",
... )
Traceback (most recent call last):
    ...
ValueError: Measurement config meas1 invalid!
agg_by_time_scale : str | None = '1h'
center_and_scale : bool = True
classmethod from_dict(as_dict: dict) DatasetConfig[source]

Build a configuration object from a plain dictionary representation.

Parameters:
as_dict: dict

The plain dictionary representation to be converted.

Returns: A DatasetConfig instance containing the same data as as_dict.

measurement_configs : dict[str, MeasurementConfig]
min_events_per_subject : int | None = None
min_true_float_frequency : float | None = None
min_unique_numerical_observations : int | float | None = None
min_valid_column_observations : int | float | None = None
min_valid_vocab_element_observations : int | float | None = None
outlier_detector_config : dict[str, Any] | None = None
save_dir : Path | None = None
to_dict() dict[source]

Represents this configuration object as a plain dictionary.

Returns:

A plain dictionary representation of self (nested through measurement configs as well).

class EventStream.data.config.DatasetSchema(static: dict[str, ~typing.Any] | ~EventStream.data.config.InputDFSchema | None = None, dynamic: list[~EventStream.data.config.InputDFSchema | dict[str, ~typing.Any]] = <factory>)[source]

Bases: JSONableMixin

Represents the schema of an input dataset, including static and dynamic data sources.

Contains the information necessary for extracting and pulling input dataset elements during a pre-processing pipeline. Inputs can be represented in either structured (typed) or plain (dictionary) form. There can only be one static schema currently, but arbitrarily many dynamic measurement schemas. During pre-processing the model will read all these dynamic input datasets and combine their outputs into the appropriate format. This can be written to or read from JSON files via the JSONableMixin base class methods.

static

The schema for the input dataset containing static (per-subject) information, in either object or dict form.

Type:

dict[str, Any] | EventStream.data.config.InputDFSchema | None

dynamic

A list of schemas for all dynamic dataset schemas, each in either object or dict form.

Type:

list[EventStream.data.config.InputDFSchema | dict[str, Any]]

Raises:

ValueError – If the static schema is None, if there is not a subject ID column specified in the static schema, if the passed “static” schema is not typed as a static schema, or if any dynamic schema is typed as a static schema.

Examples

>>> DatasetSchema(dynamic=[])
Traceback (most recent call last):
    ...
ValueError: Must specify a static schema!
>>> DatasetSchema(
...     static=dict(type="event", event_type="foo", input_df="/path/to/df.csv", ts_col="col"),
...     dynamic=[]
... )
Traceback (most recent call last):
    ...
ValueError: Must pass a static schema config for static.
>>> DatasetSchema(
...     static=dict(type="static", input_df="/path/to/df.csv", subject_id_col="col"),
...     dynamic=[dict(type="static", input_df="/path/to/df.csv", subject_id_col="col")]
... )
Traceback (most recent call last):
    ...
ValueError: Must pass dynamic schemas in self.dynamic!
>>> DS = DatasetSchema(
...     static=dict(type="static", input_df="/path/to/df.csv", subject_id_col="col"),
...     dynamic=[
...         dict(type="event", event_type="foo", input_df="/path/to/foo.csv", ts_col="col"),
...         dict(type="event", event_type="bar", input_df="/path/to/bar.csv", ts_col="col"),
...         dict(type="event", event_type="bar2", input_df="/path/to/bar.csv", ts_col="col2"),
...     ],
... )
>>> DS.dynamic_by_df 
{'/path/to/foo.csv': [InputDFSchema(input_df='/path/to/foo.csv', type='event', event_type='foo',
subject_id_col='col', ts_col='col')], '/path/to/bar.csv': [InputDFSchema(input_df='/path/to/bar.csv',
type='event', event_type='bar', subject_id_col='col', ts_col='col'),
InputDFSchema(input_df='/path/to/bar.csv', type='event', event_type='bar2', subject_id_col='col',
ts_col='col2')]}
dynamic : list[InputDFSchema | dict[str, Any]]
static : dict[str, Any] | InputDFSchema | None = None
class EventStream.data.config.InputDFSchema(input_df: ~typing.Any | None = None, type: ~EventStream.data.types.InputDFType | None = None, event_type: str | tuple[str, str, str] | None = None, subject_id_col: str | None = None, ts_col: str | ~collections.abc.Sequence[str] | None = None, start_ts_col: str | ~collections.abc.Sequence[str] | None = None, end_ts_col: str | ~collections.abc.Sequence[str] | None = None, ts_format: str | None = None, start_ts_format: str | None = None, end_ts_format: str | None = None, data_schema: tuple[list[str | ~collections.abc.Sequence[str]], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | tuple[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], tuple[str, ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | tuple[dict[str | ~collections.abc.Sequence[str], str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | list[tuple[list[str | ~collections.abc.Sequence[str]], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | tuple[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], tuple[str, ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | tuple[dict[str | ~collections.abc.Sequence[str], str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | None = None, start_data_schema: tuple[list[str | ~collections.abc.Sequence[str]], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | tuple[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], tuple[str, ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | tuple[dict[str | ~collections.abc.Sequence[str], str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | list[tuple[list[str | ~collections.abc.Sequence[str]], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | tuple[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], tuple[str, ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | tuple[dict[str | ~collections.abc.Sequence[str], str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | None = None, end_data_schema: tuple[list[str | ~collections.abc.Sequence[str]], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | tuple[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], tuple[str, ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | tuple[dict[str | ~collections.abc.Sequence[str], str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | list[tuple[list[str | ~collections.abc.Sequence[str]], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | tuple[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]] | dict[str | ~collections.abc.Sequence[str], tuple[str, ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | tuple[dict[str | ~collections.abc.Sequence[str], str], ~EventStream.data.types.InputDataType | tuple[~EventStream.data.types.InputDataType, str]]] | None = None, must_have: list[str | tuple[str, list[~typing.Any]]] = <factory>)[source]

Bases: JSONableMixin

The schema for one input DataFrame.

Dataclass that defines the schema for an input DataFrame. It verifies the provided attributes during the post-initialization stage, and raises exceptions if mandatory attributes are missing or if any inconsistencies are found. It stores sufficient data to extract subject IDs; produce event or range timestamps; extract, rename, and convert columns; and filter data.

input_df

DataFrame input. This can take on many types, including an actual dataframe, a query to a database, or a path to a dataframe stored on disk. Mandatory attribute.

Type:

Any | None

type

Type of the input data. Possible values are InputDFType.STATIC, InputDFType.EVENT, or InputDFType.RANGE. Mandatory attribute.

Type:

EventStream.data.types.InputDFType | None

event_type

What categorical event_type should be assigned to events sourced from this input dataframe? For events, must be only a single string, or for ranges can either be a single string or a tuple of strings indicating event type names for start, start == stop, and stop events. If the string starts with “COL:” then the remaining portion of the string will be interpreted as a column name in the input from which the event type should be read. Otherwise it will be intrepreted as a literal event_type category name.

Type:

str | tuple[str, str, str] | None

subject_id_col

The name of the column containing the subject ID.

Type:

str | None

ts_col

Column name containing timestamp for events.

Type:

str | collections.abc.Sequence[str] | None

start_ts_col

Column name containing start timestamp for ranges.

Type:

str | collections.abc.Sequence[str] | None

end_ts_col

Column name containing end timestamp for ranges.

Type:

str | collections.abc.Sequence[str] | None

ts_format

String format of the timestamp in ts_col.

Type:

str | None

start_ts_format

String format of the timestamp in start_ts_col.

Type:

str | None

end_ts_format

String format of the timestamp in end_ts_col.

Type:

str | None

data_schema

Schema of the input data.

Type:

tuple[list[str | collections.abc.Sequence[str]], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | tuple[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], tuple[str, EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | tuple[dict[str | collections.abc.Sequence[str], str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | list[tuple[list[str | collections.abc.Sequence[str]], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | tuple[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], tuple[str, EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | tuple[dict[str | collections.abc.Sequence[str], str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | None

start_data_schema

Schema of the start data in a range. If unspecified for a range, will fall back on data_schema.

Type:

tuple[list[str | collections.abc.Sequence[str]], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | tuple[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], tuple[str, EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | tuple[dict[str | collections.abc.Sequence[str], str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | list[tuple[list[str | collections.abc.Sequence[str]], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | tuple[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], tuple[str, EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | tuple[dict[str | collections.abc.Sequence[str], str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | None

end_data_schema

Schema of the end data in a range. If unspecified for a range, will fall back on data_schema.

Type:

tuple[list[str | collections.abc.Sequence[str]], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | tuple[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], tuple[str, EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | tuple[dict[str | collections.abc.Sequence[str], str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | list[tuple[list[str | collections.abc.Sequence[str]], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | tuple[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]] | dict[str | collections.abc.Sequence[str], tuple[str, EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | tuple[dict[str | collections.abc.Sequence[str], str], EventStream.data.types.InputDataType | tuple[EventStream.data.types.InputDataType, str]]] | None

must_have

List of mandatory columns or filters to apply, as a mapping from column name to filter to apply. The filter can either be True, in which case the column simply must have a non-null value, or a list of options, in which case the column must take on one of those values for the row to be included.

Type:

list[str | tuple[str, list[Any]]]

Raises:
  • ValueError – If mandatory attributes (input_df, type) are not provided, or if inconsistencies are found in the attributes based on the input data type.

  • TypeError – If attributes are of the wrong type.

Examples

>>> S = InputDFSchema(
...     input_df="/path/to/df.csv",
...     type='static',
...     subject_id_col='subj_id',
...     must_have=['subj_id', ['foo', ['opt1', 'opt2']]],
... )
>>> S.filter_on
{'subj_id': True, 'foo': ['opt1', 'opt2']}
>>> S.is_static
True
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='event',
...     ts_col='col',
...     event_type='bar',
... )
>>> S.is_static
False
>>> S
InputDFSchema(input_df='/path/to_df.parquet', type='event', event_type='bar', ts_col='col')
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='range',
...     start_ts_col='start',
...     end_ts_col='end',
...     event_type=('bar_st_eq_end', 'bar_st', 'bar_end'),
... )
>>> S.is_static
False
>>> InputDFSchema()
Traceback (most recent call last):
    ...
ValueError: Missing mandatory parameter input_df!
>>> S = InputDFSchema(input_df="/path/to/df.csv")
Traceback (most recent call last):
    ...
ValueError: Missing mandatory parameter type!
>>> S = InputDFSchema(
...     input_df="/path/to/df.csv",
...     type='static',
... )
Traceback (most recent call last):
    ...
ValueError: Must set subject_id_col for static source!
>>> S = InputDFSchema(
...     input_df="/path/to/df.csv",
...     type='static',
...     subject_id_col='subj_id',
...     must_have=[34]
... )
Traceback (most recent call last):
    ...
ValueError: Malformed filter: 34
>>> S = InputDFSchema(
...     input_df="/path/to/df.parquet",
...     type=InputDFType.RANGE,
... )
Traceback (most recent call last):
    ...
ValueError: Missing mandatory range parameter event_type!
>>> S = InputDFSchema(
...     input_df="/path/to/df.csv",
...     type='static',
...     subject_id_col='subj_id',
...     event_type='foo'
... )
Traceback (most recent call last):
    ...
ValueError: Set invalid param event_type for static source!
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='event',
...     event_type='bar',
... )
Traceback (most recent call last):
    ...
ValueError: Missing mandatory event parameter ts_col!
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='event',
...     ts_col='bar',
... )
Traceback (most recent call last):
    ...
ValueError: Missing mandatory event parameter event_type!
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='event',
...     ts_col='bar',
...     event_type='foo',
...     subject_id_col='subj',
... )
Traceback (most recent call last):
    ...
ValueError: subject_id_col should be None for non-static types!
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='event',
...     ts_col='bar',
...     event_type=('foo', 'categorical'),
... )
Traceback (most recent call last):
    ...
TypeError: event_type must be a string for events. Got ('foo', 'categorical')
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='event',
...     ts_col='bar',
...     event_type='foo',
...     start_ts_col='start',
... )
Traceback (most recent call last):
    ...
ValueError: start_ts_col should be None for event schema: Got start
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='event',
...     ts_col='col',
...     event_type='bar',
...     data_schema=('foobar', 'categorical'),
... )
>>> S.is_static
False
>>> S 
InputDFSchema(input_df='/path/to_df.parquet',
              type='event',
              event_type='bar',
              ts_col='col',
              data_schema=[('foobar', 'categorical')])
>>> S.unified_schema
{'foobar': ('foobar', 'categorical')}
>>> S.columns_to_load
[('foobar', 'categorical'), ('col', <InputDataType.TIMESTAMP: 'timestamp'>)]
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='range',
...     start_ts_col='start',
...     end_ts_col='end',
...     event_type='bar',
...     start_data_schema=[
...         {'buz': 'float'},
...         {'baz': ['timestamp', '%Y-%m']}
...     ],
...     end_data_schema={'foobar': InputDataType.FLOAT},
... )
>>> for n, schema in zip(('EQ', 'ST', 'END'), S.unified_schema):
...     print(f"{n}:")
...     for k, v in sorted(schema.items()):
...         print(f"  {k}: {v}")
EQ:
  baz: ('baz', ['timestamp', '%Y-%m'])
  buz: ('buz', 'float')
  foobar: ('foobar', <InputDataType.FLOAT: 'float'>)
ST:
  baz: ('baz', ['timestamp', '%Y-%m'])
  buz: ('buz', 'float')
END:
  foobar: ('foobar', <InputDataType.FLOAT: 'float'>)
>>> S = InputDFSchema(
...     input_df="/path/to_df.parquet",
...     type='range',
...     start_ts_col='start',
...     end_ts_col='end',
...     ts_format='%Y-%m-%d',
...     event_type='bar',
...     start_data_schema={'foobar': ('foobar_renamed', ['timestamp', '%Y'])},
...     end_data_schema=[
...         ('buz', 'float'),
...         (['biz', 'whizz'], 'categorical'),
...     ],
... )
>>> for n, schema in zip(('EQ', 'ST', 'END'), S.unified_schema):
...     print(f"{n}:")
...     for k, v in sorted(schema.items()):
...         print(f"  {k}: {v}")
EQ:
  biz: ('biz', 'categorical')
  buz: ('buz', 'float')
  foobar: ('foobar_renamed', ['timestamp', '%Y'])
  whizz: ('whizz', 'categorical')
ST:
  foobar: ('foobar_renamed', ['timestamp', '%Y'])
END:
  biz: ('biz', 'categorical')
  buz: ('buz', 'float')
  whizz: ('whizz', 'categorical')
>>> list(sorted(S.columns_to_load)) 
[('biz', 'categorical'), ('buz', 'float'),
 ('end', (<InputDataType.TIMESTAMP: 'timestamp'>, '%Y-%m-%d')),
 ('foobar', ['timestamp', '%Y']),
 ('start', (<InputDataType.TIMESTAMP: 'timestamp'>, '%Y-%m-%d')),
 ('whizz', 'categorical')]
property columns_to_load : list[tuple[str, InputDataType]]

Computes the columns to be loaded based on the input data type and schema.

Returns:

A list of tuples of column names and desired types for the columns to be loaded from the input dataframe.

Raises:

ValueError – If any of the column definitions are invalid or repeated.

data_schema : tuple[list[str | Sequence[str]], InputDataType | tuple[InputDataType, str]] | tuple[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], tuple[str, InputDataType | tuple[InputDataType, str]]] | tuple[dict[str | Sequence[str], str], InputDataType | tuple[InputDataType, str]] | list[tuple[list[str | Sequence[str]], InputDataType | tuple[InputDataType, str]] | tuple[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], tuple[str, InputDataType | tuple[InputDataType, str]]] | tuple[dict[str | Sequence[str], str], InputDataType | tuple[InputDataType, str]]] | None = None
end_data_schema : tuple[list[str | Sequence[str]], InputDataType | tuple[InputDataType, str]] | tuple[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], tuple[str, InputDataType | tuple[InputDataType, str]]] | tuple[dict[str | Sequence[str], str], InputDataType | tuple[InputDataType, str]] | list[tuple[list[str | Sequence[str]], InputDataType | tuple[InputDataType, str]] | tuple[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], tuple[str, InputDataType | tuple[InputDataType, str]]] | tuple[dict[str | Sequence[str], str], InputDataType | tuple[InputDataType, str]]] | None = None
end_ts_col : str | Sequence[str] | None = None
end_ts_format : str | None = None
event_type : str | tuple[str, str, str] | None = None
input_df : Any | None = None
property is_static

Returns True if and only if the input data type is static.

must_have : list[str | tuple[str, list[Any]]]
start_data_schema : tuple[list[str | Sequence[str]], InputDataType | tuple[InputDataType, str]] | tuple[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], tuple[str, InputDataType | tuple[InputDataType, str]]] | tuple[dict[str | Sequence[str], str], InputDataType | tuple[InputDataType, str]] | list[tuple[list[str | Sequence[str]], InputDataType | tuple[InputDataType, str]] | tuple[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], InputDataType | tuple[InputDataType, str]] | dict[str | Sequence[str], tuple[str, InputDataType | tuple[InputDataType, str]]] | tuple[dict[str | Sequence[str], str], InputDataType | tuple[InputDataType, str]]] | None = None
start_ts_col : str | Sequence[str] | None = None
start_ts_format : str | None = None
subject_id_col : str | None = None
ts_col : str | Sequence[str] | None = None
ts_format : str | None = None
type : InputDFType | None = None
property unified_end_schema : dict[str, tuple[str, InputDataType]]
property unified_eq_schema : dict[str, tuple[str, InputDataType]]
property unified_event_schema : dict[str, tuple[str, InputDataType]]
property unified_schema : dict[str, tuple[str, InputDataType]]

Computes the unified schema based on the input data type and data schema.

Returns:

A unified schema mapping from output column names to input column names and types.

Raises:

ValueError – If the type attribute of the calling object is invalid.

property unified_start_schema : dict[str, tuple[str, InputDataType]]
class EventStream.data.config.MeasurementConfig(name: str | None = None, temporality: TemporalityType | None = None, modality: DataModality | None = None, observation_rate_over_cases: float | None = None, observation_rate_per_case: float | None = None, functor: TimeDependentFunctor | None = None, vocabulary: Vocabulary | None = None, values_column: str | None = None, _measurement_metadata: DataFrame | Series | str | Path | None = None, modifiers: list[str] | None = None)[source]

Bases: JSONableMixin

The Configuration class for a measurement in the Dataset.

A measurement is any observation in the dataset; be it static or dynamic, categorical or continuous. This class contains configuration options to define a measurement and dictate how it should be pre-processed, embedded, and generated in generative models.

name

Stores the name of this measurement; also the column in the appropriate internal dataframe (subjects_df, events_df, or dynamic_measurements_df) that will contain this measurement. All measurements will have this set.

The ‘column’ linkage has slightly different meanings depending on self.modality:

  • If modality == DataModality.UNIVARIATE_REGRESSION, then this column stores the values associated with this continuous-valued measure.

  • If modality == DataModality.MULTIVARIATE_REGRESSION, then this column stores the keys that dictate the dimensions for which the associated values_column has the values.

  • Otherwise, this column stores the categorical values of this measure.

Similarly, it has slightly different meanings depending on self.temporality:

  • If temporality == TemporalityType.STATIC, this is an existent column in the subjects_df dataframe.

  • If temporality == TemporalityType.DYNAMIC, this is an existent column in the dynamic_measurements_df dataframe.

  • Otherwise, (when temporality == TemporalityType.FUNCTIONAL_TIME_DEPENDENT), then this is the name the output-to-be-created column will take in the events_df dataframe.

Type:

str | None

modality

The modality of this measurement. If DataModality.UNIVARIATE_REGRESSION, then this measurement takes on single-variate continuous values. If DataModality.MULTIVARIATE_REGRESSION, then this measurement consists of key-value pairs of categorical covariate identifiers and continuous values. Keys are stored in the column reflected in self.name and values in self.values_column.

Type:

EventStream.data.types.DataModality | None

temporality

How this measure varies in time. If TemporalityType.STATIC, this is a static measurement. If TemporalityType.FUNCTIONAL_TIME_DEPENDENT, then this measurement is a time-dependent measure that varies with time and static data in an analytically computable manner (e.g., age). If TemporalityType.DYNAMIC, then this is a measurement that varies in time in a non-a-priori computable manner.

Type:

EventStream.data.types.TemporalityType | None

observation_rate_over_cases

The fraction of valid “instances” in which this measure is observed at all. For example, for a static measurement, this is the fraction of subjects for which this measure is observed to take on a non-null value at least once. For a dynamic measurement, this is the fraction of events for which this measure is observed to take on a non-null value at least once. This is set dynamically during pre-procesisng, and not specified at construction.

Type:

float | None

observation_rate_per_case

The number of times this measure is observed to take on a non-null value per possible valid “instance” where at least one measure is observed. For example, for a static measurement, this is the number of times this measure is observed per subject when this measure is observed at all. For a dynamic measurement, this is the number of times this measure is observed per event when this measure is observed at all. This is set dynamically during pre-procesisng, and not specified at construction.

Type:

float | None

functor

If temporality == TemporalityType.FUNCTIONAL_TIME_DEPENDENT, then this will be set to the functor used to compute the value of a known-time-depedency measure. In this case, functor must be a subclass of data.time_dependent_functor.TimeDependentFunctor. If temporality is anything else, then this will be None.

Type:

EventStream.data.time_dependent_functor.TimeDependentFunctor | None

vocabulary

The vocabulary for this column, realized as a Vocabulary object. Begins with 'UNK'. Not set on modality==UNIVARIATE_REGRESSION measurements.

Type:

EventStream.data.vocabulary.Vocabulary | None

values_column

For modality==MULTIVARIATE_REGRESSION measurements, this will store the name of the column which will contain the numerical values corresponding to this measurement. Otherwise will be None.

Type:

str | None

measurement_metadata

Stores metadata about the numerical values corresponding to this measurement. This can take one of two forms, depending on the measurement modality. If modality==UNIVARIATE_REGRESSION, then this will be a pd.Series whose index will contain the set of possible column headers listed below. If modality==MULTIVARIATE_REGRESSION, then this will be a pd.DataFrame, whose index will contain the possible regression covariate identifier keys and whose columns will contain the set of possible columns listed below.

Metadata Columns:

  • drop_lower_bound: A lower bound such that values either below or at or below this level will be dropped (key presence will be retained for multivariate regression measures). Optional.

  • drop_lower_bound_inclusive: This must be set if drop_lower_bound is set. If this is true, then values will be dropped if they are $<=$ drop_lower_bound. If it is false, then values will be dropped if they are $<$ drop_lower_bound.

  • censor_lower_bound: A lower bound such that values either below or at or below this level, but above the level of drop_lower_bound, will be replaced with the value censor_lower_bound. Optional.

  • drop_upper_bound An upper bound such that values either above or at or above this level will be dropped (key presence will be retained for multivariate regression measures). Optional.

  • drop_upper_bound_inclusive: This must be set if drop_upper_bound is set. If this is true, then values will be dropped if they are $>=$ drop_upper_bound. If it is false, then values will be dropped if they are $>$ drop_upper_bound.

  • censor_upper_bound: An upper bound such that values either above or at or above this level, but below the level of drop_upper_bound, will be replaced with the value censor_upper_bound. Optional.

  • value_type: To which kind of value (e.g., integer, categorical, float) this key corresponds. Must be an element of the enum NumericMetadataValueType. Optional. If not pre-specified, will be inferred from the data.

  • thresh_large: The learned upper bound for inlier values.

  • thresh_small: The learned lower bound for inlier values.

  • mean: The mean to which values will be standardized.

  • std: The standard deviation to which values will be standardized.

modifiers

Stores a list of additional column names that modify this measurement that should be tracked with this measurement record through the dataset.

Type:

list[str] | None

Raises:
  • ValueError – If the configuration is not self consistent (e.g., a functor specified on a non-functional_time_dependent measure).

  • NotImplementedError – If the configuration relies on a measurement configuration that is not yet supported, such as numeric, static measurements.

Examples

>>> cfg = MeasurementConfig(
...     name='key',
...     modality='multi_label_classification',
...     temporality='dynamic',
...     vocabulary=Vocabulary(['foo', 'bar', 'baz'], [0.3, 0.4, 0.3]),
... )
>>> cfg.is_numeric
False
>>> cfg.is_dropped
False
>>> cfg = MeasurementConfig(
...     name='key',
...     modality='univariate_regression',
...     temporality='dynamic',
...     _measurement_metadata=pd.Series([1, 0.2], index=['censor_upper_bound', 'censor_lower_bound']),
... )
>>> cfg.is_numeric
True
>>> cfg.is_dropped
False
>>> cfg = MeasurementConfig(
...     name='key',
...     modality='multivariate_regression',
...     temporality='dynamic',
...     values_column='vals',
...     _measurement_metadata=pd.DataFrame(
...         {'censor_lower_bound': [1, 0.2, 0.1]},
...         index=pd.Index(['foo', 'bar', 'baz'], name='key'),
...     ),
...     vocabulary=Vocabulary(['foo', 'bar', 'baz'], [0.3, 0.4, 0.3]),
... )
>>> cfg.is_numeric
True
>>> cfg.is_dropped
False
>>> cfg = MeasurementConfig(
...     name='key',
...     modality='multi_label_classification',
...     temporality='dynamic',
...     modifiers=['foo', 'bar'],
... )
>>> cfg = MeasurementConfig(
...     name='key',
...     modality='multi_label_classification',
...     temporality='dynamic',
...     modifiers=[1, 2],
... )
Traceback (most recent call last):
    ...
ValueError: `self.modifiers` must be a list of strings; got element 1.
>>> MeasurementConfig()
Traceback (most recent call last):
    ...
ValueError: `self.temporality = None` Invalid! Must be in static, dynamic, functional_time_dependent
>>> MeasurementConfig(
...     temporality=TemporalityType.FUNCTIONAL_TIME_DEPENDENT,
...     functor=None,
... )
Traceback (most recent call last):
    ...
ValueError: functor must be set for functional_time_dependent measurements!
>>> MeasurementConfig(
...     temporality=TemporalityType.STATIC,
...     functor=AgeFunctor(dob_col="date_of_birth"),
... )
Traceback (most recent call last):
    ...
ValueError: functor should be None for static measurements! Got ...
>>> MeasurementConfig(
...     temporality=TemporalityType.DYNAMIC,
...     modality=DataModality.MULTIVARIATE_REGRESSION,
...     _measurement_metadata=pd.Series([1, 10], index=['censor_lower_bound', 'censor_upper_bound']),
...     values_column='vals',
... )
Traceback (most recent call last):
    ...
ValueError: If set, measurement_metadata must be a DataFrame on a multivariate_regression MeasurementConfig. Got <class 'pandas.core.series.Series'>
censor_lower_bound     1
censor_upper_bound    10
dtype: int64
FUNCTORS = {'AgeFunctor': <class 'EventStream.data.time_dependent_functor.AgeFunctor'>, 'TimeOfDayFunctor': <class 'EventStream.data.time_dependent_functor.TimeOfDayFunctor'>}
PREPROCESSING_METADATA_COLUMNS = {'mean': <class 'float'>, 'std': <class 'float'>, 'thresh_large': <class 'float'>, 'thresh_small': <class 'float'>, 'value_type': <class 'str'>}
add_empty_metadata()[source]

Adds an empty measurement_metadata dataframe or series.

add_missing_mandatory_metadata_cols()[source]
cache_measurement_metadata(base_dir: Path, fn: str)[source]
describe(line_width: int = 60, wrap_lines: bool = False, stream: TextIOBase | None = None) int | None[source]

Provides a plain-text description of the measurement.

Prints the following information about the MeasurementConfig object:

  1. The measurement’s name, temporality, modality, and observation frequency.

  2. What value types (e.g., integral, float, etc.) it’s values take on, if the measurement is a numerical modality whose values may take on distinct value types.

  3. Details about its internal self.vocabulary object, via Vocabulary.describe.

Parameters:
line_width: int = 60

The maximum width of each line in the description.

wrap_lines: bool = False

Whether to wrap lines that exceed the line_width.

stream: TextIOBase | None = None

The stream to write the description to. If None, the description is printed to stdout.

Returns:

The number of characters written to the stream if a stream was provided, otherwise None.

Raises:

ValueError – if the calling object is misconfigured.

Examples

>>> vocab = Vocabulary(
...     vocabulary=['apple', 'banana', 'pear', 'UNK'],
...     obs_frequencies=[3, 4, 1, 2],
... )
>>> cfg = MeasurementConfig(
...     name="MVR",
...     values_column='bar',
...     temporality='dynamic',
...     modality='multivariate_regression',
...     observation_rate_over_cases=0.6816,
...     observation_rate_per_case=1.32,
...     _measurement_metadata=pd.DataFrame(
...         {'value_type': ['float', 'categorical', 'categorical']},
...         index=pd.Index(['apple', 'pear', 'banana'], name='MVR'),
...     ),
...     vocabulary=vocab,
... )
>>> cfg.describe(line_width=100)
MVR: dynamic, multivariate_regression observed 68.2%, 1.3/case on average
Value Types:
  2 categorical
  1 float
Vocabulary:
  4 elements, 20.0% UNKs
  Frequencies: █▆▁
  Elements:
    (40.0%) banana
    (30.0%) apple
    (10.0%) pear
>>> cfg.modality = 'wrong'
>>> cfg.describe()
Traceback (most recent call last):
    ...
ValueError: Can't describe wrong measure MVR!
drop()[source]

Sets the modality to DROPPED and does associated post-processing to ensure validity.

Examples

>>> cfg = MeasurementConfig(
...     name='key',
...     modality='multivariate_regression',
...     temporality='dynamic',
...     values_column='vals',
...     _measurement_metadata=pd.DataFrame(
...         {'censor_lower_bound': [1, 0.2, 0.1]},
...         index=pd.Index(['foo', 'bar', 'baz'], name='key'),
...     ),
...     vocabulary=Vocabulary(['foo', 'bar', 'baz'], [0.3, 0.4, 0.3]),
... )
>>> cfg.drop()
>>> cfg.modality
<DataModality.DROPPED: 'dropped'>
>>> assert cfg._measurement_metadata is None
>>> assert cfg.vocabulary is None
>>> assert cfg.is_dropped
classmethod from_dict(as_dict: dict, base_dir: Path | None = None) MeasurementConfig[source]

Build a configuration object from a plain dictionary representation.

functor : TimeDependentFunctor | None = None
property is_dropped : bool
property is_numeric : bool
property measurement_metadata : DataFrame | Series | None
modality : DataModality | None = None
modifiers : list[str] | None = None
name : str | None = None
observation_rate_over_cases : float | None = None
observation_rate_per_case : float | None = None
temporality : TemporalityType | None = None
to_dict() dict[source]

Represents this configuration object as a plain dictionary.

uncache_measurement_metadata()[source]
values_column : str | None = None
vocabulary : Vocabulary | None = None
class EventStream.data.config.PytorchDatasetConfig(save_dir: Path = '???', max_seq_len: int = 256, min_seq_len: int = 2, seq_padding_side: SeqPaddingSide = SeqPaddingSide.RIGHT, subsequence_sampling_strategy: SubsequenceSamplingStrategy = SubsequenceSamplingStrategy.RANDOM, train_subset_size: int | float | str = 'FULL', train_subset_seed: int | None = None, tuning_subset_size: int | float | str = 'FULL', tuning_subset_seed: int | None = None, task_df_name: str | None = None, do_include_subsequence_indices: bool = False, do_include_subject_id: bool = False, do_include_start_time_min: bool = False, cache_for_epochs: int = 1)[source]

Bases: JSONableMixin

Configuration options for building a PyTorch dataset from a Dataset.

This is the main configuration object for a PytorchDataset. The PytorchDataset class specializes the representation of the data in a base Dataset class for sequential deep learning. This dataclass is also an acceptable Hydra Structured Config object with the name “pytorch_dataset_config”.

save_dir

Directory where the base dataset, including the deep learning representation outputs, is saved.

Type:

pathlib.Path

max_seq_len

Maximum sequence length the dataset should output in any individual item.

Type:

int

min_seq_len

Minimum sequence length required to include a subject in the dataset.

Type:

int

seq_padding_side

Whether to pad smaller sequences on the right or the left.

Type:

EventStream.data.config.SeqPaddingSide

subsequence_sampling_strategy

Strategy for sampling subsequences when an individual item’s total sequence length in the raw data exceeds the maximum allowed sequence length.

Type:

EventStream.data.config.SubsequenceSamplingStrategy

train_subset_size

If the training data should be subsampled randomly, this specifies the size of the training subset. If None or “FULL”, then the full training data is used.

Type:

int | float | str

train_subset_seed

If the training data should be subsampled randomly, this specifies the seed for that random subsampling.

Type:

int | None

tuning_subset_size

If the tuning data should be subsampled randomly, this specifies the size of the tuning subset. If None or “FULL”, then the full tuning data is used.

Type:

int | float | str

tuning_subset_seed

If the tuning data should be subsampled randomly, this specifies the seed for that random subsampling.

Type:

int | None

task_df_name

If the raw dataset should be limited to a task dataframe view, this specifies the name of the task dataframe, and indirectly the path on disk from where that task dataframe will be read (save_dir / “task_dfs” / f”{task_df_name}.parquet”).

Type:

str | None

do_include_subject_id

Whether or not to include the subject ID of the individual for this batch.

Type:

bool

do_include_subsequence_indices

Whether or not to include the start and end indices of the sampled subsequence for the individual from their full dataset for this batch. This is sometimes used during generative-based evaluation.

Type:

bool

do_include_start_time_min

Whether or not to include the start time of the individual’s sequence in minutes since the epoch (1/1/1970) in the output data. This is necessary during generation, and not used anywhere else currently.

Type:

bool

Raises:
  • ValueError – If ‘seq_padding_side’ is not a valid value; If ‘min_seq_len’ is not a non-negative integer; If ‘max_seq_len’ is not an integer greater or equal to ‘min_seq_len’; If ‘train_subset_seed’ is not None when ‘train_subset_size’ is None or ‘FULL’; If ‘train_subset_size’ is negative when it’s an integer; If ‘train_subset_size’ is not within (0, 1) when it’s a float.

  • TypeError – If ‘train_subset_size’ is of unrecognized type.

Examples

>>> config = PytorchDatasetConfig(
...     save_dir='./dataset',
...     max_seq_len=256,
...     min_seq_len=2,
...     seq_padding_side=SeqPaddingSide.RIGHT,
...     subsequence_sampling_strategy=SubsequenceSamplingStrategy.RANDOM,
...     train_subset_size="FULL",
...     train_subset_seed=None,
...     task_df_name=None,
...     do_include_start_time_min=False
... )
>>> config_dict = config.to_dict()
>>> new_config = PytorchDatasetConfig.from_dict(config_dict)
>>> config == new_config
True
>>> config = PytorchDatasetConfig(train_subset_size=-1)
Traceback (most recent call last):
    ...
ValueError: If integral, train_subset_size must be positive! Got -1
>>> config = PytorchDatasetConfig(train_subset_size=1.2)
Traceback (most recent call last):
    ...
ValueError: If float, train_subset_size must be in (0, 1)! Got 1.2
>>> config = PytorchDatasetConfig(train_subset_size='200')
Traceback (most recent call last):
    ...
TypeError: train_subset_size is of unrecognized type <class 'str'>.
>>> import sys
>>> from loguru import logger
>>> logger.remove()
>>> _ = logger.add(sys.stdout, format="{message}")
>>> config = PytorchDatasetConfig(
...     save_dir='./dataset',
...     max_seq_len=256,
...     min_seq_len=2,
...     seq_padding_side='left',
...     subsequence_sampling_strategy=SubsequenceSamplingStrategy.RANDOM,
...     train_subset_size=100,
...     train_subset_seed=None,
...     task_df_name=None,
...     do_include_start_time_min=False
... )
train_subset_size is set, but train_subset_seed is not. Setting to...
>>> assert config.train_subset_seed is not None
property DL_reps_dir : Path
cache_for_epochs : int = 1
property cached_task_dir : Path | None
do_include_start_time_min : bool = False
do_include_subject_id : bool = False
do_include_subsequence_indices : bool = False
classmethod from_dict(as_dict: dict) PytorchDatasetConfig[source]

Creates a new instance of this class from a plain dictionary.

max_seq_len : int = 256
property measurement_config_fp : Path
property measurement_configs : dict[str, MeasurementConfig]
min_seq_len : int = 2
property raw_task_df_fp : Path | None
save_dir : Path = '???'
seq_padding_side : SeqPaddingSide = 'right'
subsequence_sampling_strategy : SubsequenceSamplingStrategy = 'random'
task_df_name : str | None = None
property task_info_fp : Path | None
property tensorized_cached_dir : Path
tensorized_cached_files(split: str) dict[str, Path][source]
to_dict() dict[source]

Represents this configuration object as a plain dictionary.

train_subset_seed : int | None = None
train_subset_size : int | float | str = 'FULL'
tuning_subset_seed : int | None = None
tuning_subset_size : int | float | str = 'FULL'
property vocabulary_config : VocabularyConfig
property vocabulary_config_fp : Path
class EventStream.data.config.SeqPaddingSide(value)[source]

Bases: StrEnum

Enumeration for the side of sequence padding during PyTorch Batch construction.

LEFT = 'left'

Pad on the left side (at the beginning of the sequence).

This is the default during generation.

RIGHT = 'right'

Pad on the right side (at the end of the sequence).

This is the default during normal training.

class EventStream.data.config.SubsequenceSamplingStrategy(value)[source]

Bases: StrEnum

Enumeration for subsequence sampling strategies.

When the maximum allowed sequence length for a PyTorchDataset is shorter than the sequence length of a subject’s data, this enumeration dictates how we sample a subsequence to include.

FROM_START = 'from_start'

Sample subsequences of the maximum length from the start of the permitted window.

RANDOM = 'random'

Sample subsequences of the maximum length randomly within the permitted window.

This is the default during pre-training.

TO_END = 'to_end'

Sample subsequences of the maximum length up to the end of the permitted window.

This is the default during fine-tuning and with task dataframes.

class EventStream.data.config.VocabularyConfig(vocab_sizes_by_measurement: dict[str, int] | None = None, vocab_offsets_by_measurement: dict[str, int] | None = None, measurements_idxmap: dict[str, dict[Hashable, int]] | None = None, measurements_per_generative_mode: dict[DataModality, list[str]] | None = None, event_types_idxmap: dict[str, int] | None = None)[source]

Bases: JSONableMixin

Dataclass that describes the vocabulary of a dataset, for initializing model parameters.

This does not configure a vocabulary, but rather describes the vocabulary learned during dataset pre-processing for an entire dataset. This description includes the sizes of all per-measurement vocabularies (where measurements without a vocabulary, such as univariate regression measurements) are omitted as their vocabularies have size 1, vocabulary offsets per measurement, which detail how the various vocabularies are stuck together to form a unified vocabulary, the indices of each global measurement type, the generative modes used by each measurement, and the event type indices.

vocab_sizes_by_measurement

A dictionary mapping measurements to their respective vocabulary sizes.

Type:

dict[str, int] | None

vocab_offsets_by_measurement

A dictionary mapping measurements to their respective vocabulary offsets.

Type:

dict[str, int] | None

measurements_idxmap

A dictionary mapping measurements to their integer indices.

Type:

dict[str, dict[collections.abc.Hashable, int]] | None

measurements_per_generative_mode

A dictionary mapping data modality to a list of measurements.

Type:

dict[EventStream.data.types.DataModality, list[str]] | None

event_types_idxmap

A dictionary mapping event types to their respective indices.

Type:

dict[str, int] | None

event_types_idxmap : dict[str, int] | None = None
measurements_idxmap : dict[str, dict[Hashable, int]] | None = None
measurements_per_generative_mode : dict[DataModality, list[str]] | None = None
property total_vocab_size : int

Returns the total vocab size of the vocabulary described here.

The total vocabulary size is the sum of (1) all the individual measurement vocabularies’ sizes, (2) any offset the global vocabulary has from 0, to account for padding indices, and (3) any measurements who have length-1 vocabularies (which are not included in vocab_sizes_by_measurement) as is reflected by elements in the vocab offsets dictionary that aren’t in the vocab sizes dictionary.

Examples

>>> config = VocabularyConfig(
...     vocab_sizes_by_measurement={"measurement1": 10, "measurement2": 3},
...     vocab_offsets_by_measurement={"measurement1": 5, "measurement2": 15, "measurement3": 18}
... )
>>> config.total_vocab_size
19
vocab_offsets_by_measurement : dict[str, int] | None = None
vocab_sizes_by_measurement : dict[str, int] | None = None