EventStream.data.types module¶
A collection of objects and enumerations for better type support in data applications.
- class EventStream.data.types.DataModality(value)[source]¶
Bases:
StrEnumThe modality of a data element.
Measurement modality dictates pre-processing, embedding, and possible generation of said element.
-
DROPPED =
'dropped'¶ This column was dropped due to occurring too infrequently for use.
-
MULTIVARIATE_REGRESSION =
'multivariate_regression'¶ A column which can occur 0+ times per event with different labels and values.
All multivariate regression measures are assumed to be partially observed at present. Element keys will be generated via multi-label, binary classification. Values will be generated via probabilistic regression.
-
MULTI_LABEL_CLASSIFICATION =
'multi_label_classification'¶ This data modality can occur zero or more times with different labels.
This will never have an associated data value measured (see MULTIVARIATE_REGRESSION). Element will be generated via multi-label, binary classification.
-
SINGLE_LABEL_CLASSIFICATION =
'single_label_classification'¶ This data modality must take on a single label in all possible instances where it is observed.
This will never have an associated data value measured. Element will be generated via consecutive prediction of whether or not the event will be observed at all, followed by single- label, multi-class classification of what label will be observed.
-
UNIVARIATE_REGRESSION =
'univariate_regression'¶ This column is a continuous-valued, one-dimensional numerical measure which is partially observed.
The model first predicts whether or not this measurement will be observed, then what value it would take on.
-
DROPPED =
- class EventStream.data.types.InputDFType(value)[source]¶
Bases:
StrEnumThe kinds of input dataframes that can be used to construct a dataset.
-
EVENT =
'event'¶ A dataframe containing event-level data about a subject.
Each row will contain a timestamp, associated measurements, and subject ID. Timestamps may be duplicated in these input dataframes, but will be deduplicated in the resulting dataset.
-
RANGE =
'range'¶ A dataframe containing range-level data about a subject.
Each row contains a start and end timestamp, associated measurements, and subject ID. RANGE dataframes are converted into start, end, and equal (start time = end time) event-level dataframes. Timestamps may be duplicated in these input dataframes, but will be deduplicated in the resulting dataset.
-
STATIC =
'static'¶ A dataframe such that each row contains static (non-time-varying) data for each subject.
-
EVENT =
- class EventStream.data.types.InputDataType(value)[source]¶
Bases:
StrEnumThe kinds of data that can be contained in an input dataframe column.
-
BOOLEAN =
'boolean'¶ A boolean variable.
-
CATEGORICAL =
'categorical'¶ A categorical variable.
-
FLOAT =
'float'¶ A floating-point variable.
-
TIMESTAMP =
'timestamp'¶ A timestamp variable.
This may also be associated with a separate string for timestamp format, if the timestamp is originally presented as a string.
-
BOOLEAN =
- class EventStream.data.types.NumericDataModalitySubtype(value)[source]¶
Bases:
StrEnumNumeric value types.
These are used to characterize both entire measures (e.g., ‘age’ takes on integer values) or sub-measures (e.g., within the measure of “vitals signs”, observations for the key “heart rate” take on float values).
-
CATEGORICAL_FLOAT =
'categorical_float'¶ This formerly floating point measure/sub-measure has been converted to take on categorical values.
Options can be found in the global vocabulary, with the syntax
f"{key_col}__EQ_{orig_val}".
-
CATEGORICAL_INTEGER =
'categorical_integer'¶ This formerly integer measure/sub-measure has been converted to take on categorical values.
Options can be found in the global vocabulary, with the syntax
f"{key_col}__EQ_{orig_val}".
-
DROPPED =
'dropped'¶ The values of this measure (or sub-measure) were dropped.
-
FLOAT =
'float'¶ This measure (or sub-measure) takes on floating point values.
-
INTEGER =
'integer'¶ This measure (or sub-measure) takes on integer values.
-
CATEGORICAL_FLOAT =
-
class EventStream.data.types.PytorchBatch(event_mask: BoolTensor | None =
None, time_delta: FloatTensor | None =None, time: FloatTensor | None =None, static_indices: LongTensor | None =None, static_measurement_indices: LongTensor | None =None, dynamic_indices: LongTensor | None =None, dynamic_measurement_indices: LongTensor | None =None, dynamic_values: FloatTensor | None =None, dynamic_values_mask: BoolTensor | None =None, start_time: FloatTensor | None =None, start_idx: LongTensor | None =None, end_idx: LongTensor | None =None, subject_id: LongTensor | None =None, stream_labels: dict[str, FloatTensor | LongTensor] | None =None)[source]¶ Bases:
objectA dataclass representing a batch of event flow data for a Pytorch model.
This class defines the data-output interface for deep learning models built off Event Flow GPT datasets. It stores the underlying data in the batch in a set of tensors, and also exposes some helpful methods and properties to simplify interacting with data.
- event_mask¶
A boolean tensor of shape (batch_size, sequence_length) indicating which events in the batch are valid (i.e., which are not padding).
- Type:¶
torch.BoolTensor | None
- time_delta¶
A float tensor of shape (batch_size, sequence_length) indicating the time delta in minutes between each event and the subsequent event in that subject’s sequence in the batch.
- Type:¶
torch.FloatTensor | None
- time¶
A float tensor of shape (batch_size, sequence_length) indicating the time in minutes since the start of the subject’s sequence of each event in the batch. This is often left unset, as it is generally redundant with
time_delta. However, it is used in generation, when the batch is truncated to use efficient caching so the raw time point can’t be recovered from the time delta.- Type:¶
torch.FloatTensor | None
- static_indices¶
A long tensor of shape (batch_size, n_static_data_elements) indicating the indices of the static data elements observed for each subject in the batch. These are unordered; meaning that the second dimension position of a given element in this tensor is not necessarily meaningful. This is because the static data elements are sparsely encoded, so the indices are sufficient to recover the original data even in an unordered form. Here, by “indices” we mean that these are integer values indicating the index of the associated categorical vocabulary element corresponding to this observation; e.g., if the static measurement records that the subject’s eye color is brown, then if the categorical measurement of
eye_color/BROWN`in the unified vocabulary is at position 32, then the index for that observation would be 32.- Type:¶
torch.LongTensor | None
- static_measurement_indices¶
A long tensor of shape (batch_size, n_static_data_elements) indicating which measurements the indices in
static_indicescorrespond to. E.g., if there is a static data element corresponding to race, then the value instatic_measurement_indicesat the associated position would be an integer index corresponding to the race measurement overall, whereas the index at the identical position instatic_indiceswould be an integer index corresponding to the specific race observed for the subject (e.g., “White”, “Black”, etc.).- Type:¶
torch.LongTensor | None
- dynamic_indices¶
A long tensor of shape (batch_size, sequence_length, n_data_elements) indicating the indices of the dynamic data elements observed for each subject in the batch. These are unordered in the last dimension, meaning that the third dimension position of a given element in this tensor is not necessarily meaningful. This is because the dynamic data elements are sparsely encoded, so the indices and values are sufficient to recover the original data even in an unordered form.
- Type:¶
torch.LongTensor | None
- dynamic_measurement_indices¶
A long tensor of shape (batch_size, sequence_length, n_data_elements) indicating which measurements the indices in
dynamic_indicescorrespond to, similar to thestatic_measurement_indicesattribute.- Type:¶
torch.LongTensor | None
- dynamic_values¶
A float tensor of shape (batch_size, sequence_length, n_data_elements) indicating the numeric values associated with each dynamic data element in the
dynamic_indicestensor. If no value was recorded for a given dynamic data element, the value in this tensor will be zero.- Type:¶
torch.FloatTensor | None
- dynamic_values_mask¶
A boolean tensor of shape (batch_size, sequence_length, n_data_elements) indicating which values in the
dynamic_valuestensor were actually observed.- Type:¶
torch.BoolTensor | None
- start_time¶
A float tensor of shape (batch_size,) indicating the start time in minutes since the epoch of each subject’s sequence in the batch. This is often unset, as it is only used in generation when we may need to know the actual time of day of any generated event.
- Type:¶
torch.FloatTensor | None
- start_idx¶
A long tensor of shape (batch_size,) indicating the start index of the sampled sub-sequence for each subject in the batch relative to their raw data.
- Type:¶
torch.LongTensor | None
- end_idx¶
A long tensor of shape (batch_size,) indicating the end index of the sampled sub-sequence for each subject in the batch relative to their raw data.
- Type:¶
torch.LongTensor | None
- subject_id¶
A long tensor of shape (batch_size,) indicating the subject ID of each member of the batch.
- Type:¶
torch.LongTensor | None
- stream_labels¶
A dictionary mapping task names to label LongTensors of shape (batch_size,) providing labels for the associated tasks for the sequences in the batch. Is only used during fine-tuning or zero-shot evaluation runs.
- property batch_size : int¶
Returns the batch size of this batch.
Assumes the batch has not been sliced from its initial configuration.
- convert_to_DL_DF() DataFrame[source]¶
Converts the batch data into a sparse DataFrame representation.
Examples
>>> import torch >>> batch = PytorchBatch( ... event_mask=torch.tensor([ ... [True, True, True], ... [True, True, False], ... [True, False, False], ... [False, False, False] ... ]), ... time_delta=torch.tensor([ ... [1.0, 2.0, 3.0], ... [1.0, 5.0, 0.0], ... [2.3, 0.0, 0.0], ... [0.0, 0.0, 0.0], ... ]), ... static_indices=torch.tensor([[0, 1], [1, 2], [1, 3], [0, 5]]), ... static_measurement_indices=torch.tensor([[0, 1], [1, 1], [1, 1], [0, 2]]), ... dynamic_indices=torch.tensor([ ... [[0, 1], [1, 2], [2, 3]], ... [[0, 1], [1, 5], [0, 0]], ... [[0, 2], [0, 0], [0, 0]], ... [[0, 0], [0, 0], [0, 0]], ... ]), ... dynamic_measurement_indices=torch.tensor([ ... [[0, 1], [1, 2], [2, 3]], ... [[0, 1], [1, 2], [0, 0]], ... [[0, 2], [0, 0], [0, 0]], ... [[0, 0], [0, 0], [0, 0]], ... ]), ... dynamic_values=torch.tensor([ ... [[0.0, 1.0], [1.0, 2.0], [0.0, 0.0]], ... [[0.0, 1.0], [1.0, 0.0], [0.0, 0.0]], ... [[0.0, 1.0], [0.0, 0.0], [0.0, 0.0]], ... [[0.0, 0.0], [0.0, 0.0], [0.0, 0.0]], ... ]), ... dynamic_values_mask=torch.tensor([ ... [[False, True], [True, True], [False, False]], ... [[False, True], [True, False], [False, False]], ... [[False, True], [False, False], [False, False]], ... [[False, False], [False, False], [False, False]], ... ]), ... start_time=torch.tensor([0.0, 10.0, 3.0, 2.2]), ... stream_labels={"a": torch.tensor([0, 1, 0, 1]), "b": torch.tensor([1, 2, 4, 3])}, ... time=None, ... ) >>> pl.Config.set_tbl_width_chars(80) <class 'polars.config.Config'> >>> batch.convert_to_DL_DF() shape: (4, 7) ┌───────────┬───────────┬──────────┬──────────┬──────────┬──────────┬──────────┐ │ time_delt ┆ static_in ┆ static_m ┆ dynamic_ ┆ dynamic_ ┆ dynamic_ ┆ start_ti │ │ a ┆ dices ┆ easureme ┆ indices ┆ measurem ┆ values ┆ me │ │ --- ┆ --- ┆ nt_indic ┆ --- ┆ ent_indi ┆ --- ┆ --- │ │ list[f64] ┆ list[f64] ┆ es ┆ list[lis ┆ ces ┆ list[lis ┆ f64 │ │ ┆ ┆ --- ┆ t[f64]] ┆ --- ┆ t[f64]] ┆ │ │ ┆ ┆ list[f64 ┆ ┆ list[lis ┆ ┆ │ │ ┆ ┆ ] ┆ ┆ t[f64]] ┆ ┆ │ ╞═══════════╪═══════════╪══════════╪══════════╪══════════╪══════════╪══════════╡ │ [1.0, ┆ [1.0] ┆ [1.0] ┆ [[1.0], ┆ [[1.0], ┆ [[1.0], ┆ 0.0 │ │ 2.0, 3.0] ┆ ┆ ┆ [1.0, ┆ [1.0, ┆ [1.0, ┆ │ │ ┆ ┆ ┆ 2.0], ┆ 2.0], ┆ 2.0], ┆ │ │ ┆ ┆ ┆ [2.0, ┆ [2.0, ┆ [null, ┆ │ │ ┆ ┆ ┆ 3.0]] ┆ 3.0]] ┆ null]… ┆ │ │ [1.0, ┆ [1.0, ┆ [1.0, ┆ [[1.0], ┆ [[1.0], ┆ [[1.0], ┆ 10.0 │ │ 5.0] ┆ 2.0] ┆ 1.0] ┆ [1.0, ┆ [1.0, ┆ [1.0, ┆ │ │ ┆ ┆ ┆ 5.0]] ┆ 2.0]] ┆ null]] ┆ │ │ [2.3] ┆ [1.0, ┆ [1.0, ┆ [[2.0]] ┆ [[2.0]] ┆ [[1.0]] ┆ 3.0 │ │ ┆ 3.0] ┆ 1.0] ┆ ┆ ┆ ┆ │ │ [] ┆ [5.0] ┆ [2.0] ┆ [] ┆ [] ┆ [] ┆ 2.2 │ └───────────┴───────────┴──────────┴──────────┴──────────┴──────────┴──────────┘
- property device : device¶
Returns the device storing the tensors in this batch.
Assumes all elements of the batch are on the same device.
- get(item: str, default: Any) Any[source]¶
A dictionary like get method for this batch, by attribute name.
- last_sequence_element_unsqueezed() PytorchBatch[source]¶
Filters the batch down to just the last event, while retaining the same # of dims.
- property n_data_elements : int¶
Returns the maximum number of dynamic data elements of the events in this batch.
Assumes the batch has not been sliced from its initial configuration.
- property n_static_data_elements : int¶
Returns the maximum number of static data elements of the subjects in this batch.
Assumes the batch has not been sliced from its initial configuration.
- repeat_batch_elements(expand_size: int) PytorchBatch[source]¶
Repeats each batch element expand_size times in order. Used for generation.
Returns: A new PytorchBatch object with each batch element’s data repeated expand_size times.
Examples
>>> import torch >>> batch = PytorchBatch( ... event_mask=torch.tensor([[True, True, True], [True, True, False]]), ... time_delta=torch.tensor([[1.0, 2.0, 3.0], [1.0, 5.0, 0.0]]), ... static_indices=torch.tensor([[0, 1], [1, 2]]), ... static_measurement_indices=torch.tensor([[0, 1], [1, 1]]), ... dynamic_indices=torch.tensor([[[0, 1], [1, 2], [2, 3]], [[0, 1], [1, 5], [0, 0]]]), ... dynamic_measurement_indices=torch.tensor( ... [[[0, 1], [1, 2], [2, 3]], [[0, 1], [1, 2], [0, 0]]] ... ), ... dynamic_values=torch.tensor( ... [[[0.0, 1.0], [1.0, 2.0], [0, 0]], [[0.0, 1.0], [1.0, 0.0], [0, 0]]] ... ), ... dynamic_values_mask=torch.tensor([ ... [[False, True], [True, True], [False, False]], ... [[False, True], [True, False], [False, False]] ... ]), ... start_time=torch.tensor([0.0, 10.0]), ... stream_labels={"a": torch.tensor([0, 1]), "b": torch.tensor([1, 2])}, ... time=None, ... ) >>> repeated_batch = batch.repeat_batch_elements(2) >>> for k, v in repeated_batch.items(): ... print(k) ... print(v) event_mask tensor([[ True, True, True], [ True, True, True], [ True, True, False], [ True, True, False]]) time_delta tensor([[1., 2., 3.], [1., 2., 3.], [1., 5., 0.], [1., 5., 0.]]) time None static_indices tensor([[0, 1], [0, 1], [1, 2], [1, 2]]) static_measurement_indices tensor([[0, 1], [0, 1], [1, 1], [1, 1]]) dynamic_indices tensor([[[0, 1], [1, 2], [2, 3]], [[0, 1], [1, 2], [2, 3]], [[0, 1], [1, 5], [0, 0]], [[0, 1], [1, 5], [0, 0]]]) dynamic_measurement_indices tensor([[[0, 1], [1, 2], [2, 3]], [[0, 1], [1, 2], [2, 3]], [[0, 1], [1, 2], [0, 0]], [[0, 1], [1, 2], [0, 0]]]) dynamic_values tensor([[[0., 1.], [1., 2.], [0., 0.]], [[0., 1.], [1., 2.], [0., 0.]], [[0., 1.], [1., 0.], [0., 0.]], [[0., 1.], [1., 0.], [0., 0.]]]) dynamic_values_mask tensor([[[False, True], [ True, True], [False, False]], [[False, True], [ True, True], [False, False]], [[False, True], [ True, False], [False, False]], [[False, True], [ True, False], [False, False]]]) start_time tensor([ 0., 0., 10., 10.]) start_idx None end_idx None subject_id None stream_labels {'a': tensor([0, 0, 1, 1]), 'b': tensor([1, 1, 2, 2])}
- property sequence_length : int¶
Returns the maximum sequence length of the sequences in this batch.
Assumes the batch has not been sliced from its initial configuration.
- split_repeated_batch(n_splits: int) list[PytorchBatch][source]¶
Split a batch into a list of batches by chunking batch elements into groups.
This is the inverse of
PytorchBatch.repeat_batch_elements. It is used for taking a generated batch that has been expanded and splitting it into separate list elements with independent generations for each batch element in the original batch.- Returns: A list of length
n_splitsof PytorchBatch objects, such that the list element i contains batch elements [i, i+self.batch_size/n_splits).
- Raises:¶
ValueError – if
n_splitsis not a positive integer divisor ofself.batch_size.
Examples
>>> import torch >>> batch = PytorchBatch( ... event_mask=torch.tensor([ ... [True, True, True], ... [True, True, False], ... [True, False, False], ... [False, False, False] ... ]), ... time_delta=torch.tensor([ ... [1.0, 2.0, 3.0], ... [1.0, 5.0, 0.0], ... [2.3, 0.0, 0.0], ... [0.0, 0.0, 0.0], ... ]), ... static_indices=torch.tensor([[0, 1], [1, 2], [1, 3], [0, 5]]), ... static_measurement_indices=torch.tensor([[0, 1], [1, 1], [1, 1], [0, 2]]), ... dynamic_indices=torch.tensor([ ... [[0, 1], [1, 2], [2, 3]], ... [[0, 1], [1, 5], [0, 0]], ... [[0, 2], [0, 0], [0, 0]], ... [[0, 0], [0, 0], [0, 0]], ... ]), ... dynamic_measurement_indices=torch.tensor([ ... [[0, 1], [1, 2], [2, 3]], ... [[0, 1], [1, 2], [0, 0]], ... [[0, 2], [0, 0], [0, 0]], ... [[0, 0], [0, 0], [0, 0]], ... ]), ... dynamic_values=torch.tensor([ ... [[0.0, 1.0], [1.0, 2.0], [0.0, 0.0]], ... [[0.0, 1.0], [1.0, 0.0], [0.0, 0.0]], ... [[0.0, 1.0], [0.0, 0.0], [0.0, 0.0]], ... [[0.0, 0.0], [0.0, 0.0], [0.0, 0.0]], ... ]), ... dynamic_values_mask=torch.tensor([ ... [[False, True], [True, True], [False, False]], ... [[False, True], [True, False], [False, False]], ... [[False, True], [False, False], [False, False]], ... [[False, False], [False, False], [False, False]], ... ]), ... start_time=torch.tensor([0.0, 10.0, 3.0, 2.2]), ... stream_labels={"a": torch.tensor([0, 1, 0, 1]), "b": torch.tensor([1, 2, 4, 3])}, ... time=None, ... ) >>> batch.split_repeated_batch(3) Traceback (most recent call last): ... ValueError: n_splits (3) must be a positive integer divisor of batch_size (4) >>> for i, T in enumerate(batch.split_repeated_batch(2)): ... print(f"Returned batch {i}:") ... for k, v in T.items(): ... print(k) ... print(v) Returned batch 0: event_mask tensor([[ True, True, True], [ True, False, False]]) time_delta tensor([[1.0000, 2.0000, 3.0000], [2.3000, 0.0000, 0.0000]]) time None static_indices tensor([[0, 1], [1, 3]]) static_measurement_indices tensor([[0, 1], [1, 1]]) dynamic_indices tensor([[[0, 1], [1, 2], [2, 3]], [[0, 2], [0, 0], [0, 0]]]) dynamic_measurement_indices tensor([[[0, 1], [1, 2], [2, 3]], [[0, 2], [0, 0], [0, 0]]]) dynamic_values tensor([[[0., 1.], [1., 2.], [0., 0.]], [[0., 1.], [0., 0.], [0., 0.]]]) dynamic_values_mask tensor([[[False, True], [ True, True], [False, False]], [[False, True], [False, False], [False, False]]]) start_time tensor([0., 3.]) start_idx None end_idx None subject_id None stream_labels {'a': tensor([0, 0]), 'b': tensor([1, 4])} Returned batch 1: event_mask tensor([[ True, True, False], [False, False, False]]) time_delta tensor([[1., 5., 0.], [0., 0., 0.]]) time None static_indices tensor([[1, 2], [0, 5]]) static_measurement_indices tensor([[1, 1], [0, 2]]) dynamic_indices tensor([[[0, 1], [1, 5], [0, 0]], [[0, 0], [0, 0], [0, 0]]]) dynamic_measurement_indices tensor([[[0, 1], [1, 2], [0, 0]], [[0, 0], [0, 0], [0, 0]]]) dynamic_values tensor([[[0., 1.], [1., 0.], [0., 0.]], [[0., 0.], [0., 0.], [0., 0.]]]) dynamic_values_mask tensor([[[False, True], [ True, False], [False, False]], [[False, False], [False, False], [False, False]]]) start_time tensor([10.0000, 2.2000]) start_idx None end_idx None subject_id None stream_labels {'a': tensor([1, 1]), 'b': tensor([2, 3])} >>> repeat_batch = batch.repeat_batch_elements(5) >>> split_batches = repeat_batch.split_repeated_batch(5) >>> for i, v in enumerate(split_batches): ... assert v == batch, f"Batch {i} ({v}) not equal to original batch {batch}!"- Returns: A list of length
- class EventStream.data.types.TemporalityType(value)[source]¶
Bases:
StrEnumThe ways a measurement can vary in time.
-
DYNAMIC =
'dynamic'¶ This measure is dynamic with respect to time in a general manner.
It will be recorded potentially many times per-event, and can take on either categorical or partially observed regression data modalities.
-
FUNCTIONAL_TIME_DEPENDENT =
'functional_time_dependent'¶ This measure varies predictably with respect to time and the static measures of a subject.
The “observations” of this measure will be computed on the basis of that functional form and added to the observed events. Currently only supported with categorical or fully observed regression variables.
-
STATIC =
'static'¶ This measure is static per-subject.
Currently only supported with classificaton data modalities.
-
DYNAMIC =
- EventStream.data.types.de_pad(L: list[int], *other_L) list[int] | tuple[list[int]][source]¶
Filters down all passed lists to only the indices where the first arg is non-zero.
- Parameters:¶
Examples
>>> de_pad([1, 3, 0, 4, 0, 0], [10, 0, 5, 8, 1, 0]) ([1, 3, 4], [10, 0, 8]) >>> de_pad([1, 3, 0, 4, 0, 0]) [1, 3, 4]