EventStream.data.data_embedding_layer module

class EventStream.data.data_embedding_layer.DataEmbeddingLayer(n_total_embeddings: int, out_dim: int, static_embedding_mode: StaticEmbeddingMode, categorical_embedding_dim: int | None = None, numerical_embedding_dim: int | None = None, split_by_measurement_indices: list[list[int | tuple[int, MeasIndexGroupOptions]]] | None = None, do_normalize_by_measurement_index: bool = False, static_weight: float = 0.5, dynamic_weight: float = 0.5, categorical_weight: float = 0.5, numerical_weight: float = 0.5)[source]

Bases: Module

This class efficiently embeds an PytorchBatch into a fixed-size embedding.

This embeds the PytorchBatch’s dynamic and static indices into a fixed-size embedding via a PyTorch EmbeddingBag layer, weighted by the batch’s dynamic_values (respecting dynamic_values_mask). This layer assumes a padding index of 0, as that is how the PytorchDataset object is structured. layer, taking into account dynamic_indices (including an implicit padding index of 0), It does not take into account the time component of the events; that should be embedded separately.

It has two possible embedding modes; a joint embedding mode, in which categorical data and numerical values are embedded jointly through a unified feature map, which effectively equates to a constant value imputation strategy with value 1 for missing numerical values, and a split embedding mode, in which categorical data and numerical values that are present are embedded through separate feature maps, which equates to an imputation strategy of zero imputation (equivalent to mean imputation given normalization) and indicator variables indicating present variables. This further follows (roughly) the embedding strategy of Gorishniy et al.[1] (link) for joint embedding of categorical and multi-variate numerical features. In particular, given categorical indices and associated continuous values, it produces a categorical embedding of the indices first, then (with a separate embedding layer) re-embeds those categorical indices that have associated values observed, this time weighted by the associated numerical values, then outputs a weighted sum of the two embeddings. In the case that numerical and categorical output embeddings are distinct, both are projected into the output dimensionality through additional linear layers prior to the final summation.

The model uses the joint embedding mode if categorical and numerical embedding dimensions are not specified; otherwise, it uses the split embedding mode.

Parameters:
n_total_embeddings: int

The total vocabulary size that needs to be embedded.

out_dim: int

The output dimension of the embedding layer.

static_embedding_mode: StaticEmbeddingMode

The way that static embeddings are combined with the dynamic embeddings.

categorical_embedding_dim: int | None = None

The dimension of the categorical embeddings. If None, no separate categorical embeddings are used.

numerical_embedding_dim: int | None = None

The dimension of the numerical embeddings. If None, no separate numerical embeddings are used.

split_by_measurement_indices: list[list[int | tuple[int, MeasIndexGroupOptions]]] | None = None

If not None, then the dynamic_indices are split into multiple groups, and each group is embedded separately. The split_by_measurement_indices argument is a list of lists of indices. Each inner list is a group of indices that will be embedded separately. Each index can be an integer, in which case it is the index of the measurement to be embedded, or it can be a tuple of the form (index, meas_index_group_mode), in which case index is the index of the measurement to be embedded, and meas_index_group_mode indicates whether the group includes only the categorical index of the measurement, only the numerical value of the measurement, or both its categorical index and it’s numerical values, as specified through the MeasIndexGroupOptions enum. Note that measurement index groups are assumed to only apply to the dynamic indices, not the static indices, as static indices are never generated and should be assumed to be causally linked to all elements of a given event. Furthermore, note that if specified, no measurement group except for the first can be empty. The first is allowed to be empty to account for settings where a model is built with a dependency graph with no FUNCTIONAL_TIME_DEPENDENT measures, as time is always assumed to be the first element of the dependency graph.

do_normalize_by_measurement_index: bool = False

If True, then the embeddings of each measurement are normalized by the number of measurements of that measurement_index in the batch.

static_weight: float = 0.5

The weight of the static embeddings. Only used if static_embedding_mode is not StaticEmbeddingMode.DROP.

dynamic_weight: float = 0.5

The weight of the dynamic embeddings. Only used if static_embedding_mode is not StaticEmbeddingMode.DROP.

categorical_weight: float = 0.5

The weight of the categorical embeddings. Only used if categorical_embedding_dim and numerical_embedding_dim are not None.

numerical_weight: float = 0.5

The weight of the numerical embeddings. Only used if categorical_embedding_dim and numerical_embedding_dim are not None.

Raises:
  • TypeError – If any of the arguments are of the wrong type.

  • ValueError – If any of the arguments are not valid.

Examples

>>> valid_layer = DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim=10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
... )
>>> valid_layer.embedding_mode
<EmbeddingMode.JOINT: 'joint'>
>>> valid_layer = DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim=10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
...     categorical_embedding_dim=5,
...     numerical_embedding_dim=5,
...     split_by_measurement_indices=None,
...     do_normalize_by_measurement_index=False,
...     categorical_weight=1 / 2,
...     numerical_weight=1 / 2,
... )
>>> valid_layer.embedding_mode
<EmbeddingMode.SPLIT_CATEGORICAL_NUMERICAL: 'split_categorical_numerical'>
>>> DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim="10",
...     static_embedding_mode=StaticEmbeddingMode.DROP,
... )
Traceback (most recent call last):
    ...
TypeError: `out_dim` must be an `int`.
>>> DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim=-10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
... )
Traceback (most recent call last):
    ...
ValueError: `out_dim` must be positive.
>>> DataEmbeddingLayer(
...     n_total_embeddings="100",
...     out_dim=10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
... )
Traceback (most recent call last):
    ...
TypeError: `n_total_embeddings` must be an `int`.
>>> DataEmbeddingLayer(
...     n_total_embeddings=-100,
...     out_dim=10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
... )
Traceback (most recent call last):
    ...
ValueError: `n_total_embeddings` must be positive.
>>> DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim=10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
...     categorical_embedding_dim=5,
...     numerical_embedding_dim=5,
...     split_by_measurement_indices=[4, (5, MeasIndexGroupOptions.CATEGORICAL_ONLY)],
... )
Traceback (most recent call last):
    ...
TypeError: `split_by_measurement_indices` must be a list of lists.
>>> DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim=10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
...     categorical_embedding_dim=5,
...     numerical_embedding_dim=5,
...     split_by_measurement_indices=[[4, [5, MeasIndexGroupOptions.CATEGORICAL_ONLY]]],
... )
Traceback (most recent call last):
    ...
TypeError: `split_by_measurement_indices` must be a list of lists of ints and/or tuples.
forward(batch: PytorchBatch) Tensor[source]

Returns the final embeddings of the values in the batch.

Parameters:
batch: PytorchBatch

The input batch to be embedded.

Returns:

The final embeddings. These will either be of shape (batch_size, sequence_length, out_dim) or (batch_size, sequence_length, num_measurement_buckets, out_dim) depending on whether the measurements are split or not.

Raises:
  • AssertionError – If indices.max() is greater than or equal to self.n_total_embeddings.

  • ValueError – If self.embedding_mode is not a valid EmbeddingMode, or if split_by_measurement_indices is not None and there either there is an empty measurement group beyond the first or there is an invalid specified group mode.

Examples

>>> import torch
>>> # Here we construct a batch with batch size of 2, sequence length of 3, number of static data
>>> # elements of 3, and number of dynamic data elements of 2.
>>> batch = PytorchBatch(
...     event_mask=torch.BoolTensor([[True, True, True], [True, True, False]]),
...     static_indices=torch.LongTensor([[1, 2, 3], [4, 5, 6]]),
...     static_measurement_indices=torch.LongTensor([[1, 1, 2], [2, 2, 3]]),
...     dynamic_indices=torch.LongTensor([[[7, 8], [11, 10], [8, 7]], [[8, 7], [8, 10], [0, 0]]]),
...     dynamic_measurement_indices=torch.LongTensor(
...         [[[4, 4], [5, 5], [4, 4]], [[4, 4], [4, 5], [0, 0]]]
...     ),
...     dynamic_values=torch.FloatTensor(
...         [[[1, 2], [0, 0], [1.1, 2.1]], [[5, 6], [7, 0], [0, 0]]]
...     ),
...     dynamic_values_mask=torch.BoolTensor(
...         [
...             [[True, True], [False, False], [True, True]],
...             [[True, True], [True, False], [False, False]],
...         ]
...     ),
... )
>>> L = DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim=10,
...     static_embedding_mode=StaticEmbeddingMode.DROP,
...     categorical_embedding_dim=5,
...     numerical_embedding_dim=5,
...     split_by_measurement_indices=None,
...     do_normalize_by_measurement_index=False,
...     categorical_weight=1 / 2,
...     numerical_weight=1 / 2,
... )
>>> out = L(batch)
>>> out.shape # batch, seq_len, out_dim
torch.Size([2, 3, 10])
>>> L = DataEmbeddingLayer(
...     n_total_embeddings=100,
...     out_dim=10,
...     static_embedding_mode='sum_all',
...     categorical_embedding_dim=5,
...     numerical_embedding_dim=5,
...     split_by_measurement_indices=[
...         [(4, MeasIndexGroupOptions.CATEGORICAL_ONLY)],
...         [5, (4, 'categorical_and_numerical')],
...     ],
...     do_normalize_by_measurement_index=True,
...     static_weight=1/3,
...     dynamic_weight=2/3,
...     categorical_weight=1/4,
...     numerical_weight=3/4,
... )
>>> out = L(batch)
>>> out.shape # batch, seq_len, dependency graph length (split_by_measruement_indices), out_dim
torch.Size([2, 3, 2, 10])
static get_measurement_index_normalziation(measurement_indices: Tensor) Tensor[source]

Returns a normalization tensor for the measurements observed in the input, by row.

Parameters:
measurement_indices: Tensor

A tensor of shape (batch_size, num_measurements) that contains the indices of the measurements in each batch element. Zero indicates padded measurements and the returned mask will have a value of zero in those positions.

Returns:

A tensor of the same shape as the input where the value at position i, j is one divided by the number of times the measurement index at the position i, j in the input occurs in the input row i, normalized such that each row sums to one. Said alternatively, this returns a tensor that assigns each unique measurement in the input total equal weight out of 1, then splits that total weight evenly among all occurrences of that measurement in the input.

Examples

>>> import torch
>>> measurement_indices = torch.LongTensor([[1, 2, 5, 2, 2], [1, 3, 5, 3, 0]])
>>> DataEmbeddingLayer.get_measurement_index_normalziation(measurement_indices)
tensor([[0.3333, 0.1111, 0.3333, 0.1111, 0.1111],
        [0.3333, 0.1667, 0.3333, 0.1667, 0.0000]])
class EventStream.data.data_embedding_layer.EmbeddingMode(value)[source]

Bases: StrEnum

The different ways that the data can be embedded.

JOINT = 'joint'

Embed all data jointly via a single embedding layer, weighting observed measurement embdddings by values when present.

SPLIT_CATEGORICAL_NUMERICAL = 'split_categorical_numerical'

Embed the categorical observations of measurements separately from their numerical values, and combine the two via a specifiable strategy.

class EventStream.data.data_embedding_layer.MeasIndexGroupOptions(value)[source]

Bases: StrEnum

The different ways that the split_by_measurement_indices argument can be interpreted.

If measurements are split, then the final embedding can be seen as a combination of emb_cat(measurement_indices) and emb_num(measurement_indices, measurement_values), where emb_* are embedding layers with sum aggregations that take in indices to be embedded and possible values to use in the output sum. This enumeration controls how those two elements are combined for a given measurement feature.

CATEGORICAL_AND_NUMERICAL = 'categorical_and_numerical'

Embed both the categorical features and the numerical features of this measurement.

CATEGORICAL_ONLY = 'categorical_only'

Only embed the categorical component of this measurement (emb_cat(...)).

NUMERICAL_ONLY = 'numerical_only'

Only embed the numerical component of this measurement (emb_num(...)).

class EventStream.data.data_embedding_layer.StaticEmbeddingMode(value)[source]

Bases: StrEnum

The different ways that static embeddings can be combined with the dynamic embeddings.

DROP = 'drop'

Static embeddings are dropped, and only the dynamic embeddings are used.

SUM_ALL = 'sum_all'

Static embeddings are summed with the dynamic embeddings per event.