EventStream.data.time_dependent_functor module¶
Defines the interface for specifying functional time dependent measurements.
EventStream.data.types.DataModality.FUNCTIONAL_TIME_DEPENDENT measurements are specified by an analytical
function that depends only on the time of the event and per-subject static data. This module defines the
interface for specifying such functions, through the abstract base class TimeDependentFunctor. The
AgeFunctor and TimeOfDayFunctor classes are examples of such functions.
- class EventStream.data.time_dependent_functor.AgeFunctor(dob_col: str)[source]¶
Bases:
TimeDependentFunctorFunctor that returns the age of the subject when the event occurred.
Note that as years are not a fixed unit of time, this measurement is returned in the average number of fixed-length years (where a fixed-length year is of length 365.25 days).
- OUTPUT_MODALITY¶
DataModality.UNIVARIATE_REGRESSION
- dob_col¶
Column name containing the subject’s date of birth.
Example
>>> import polars as pl >>> from datetime import datetime >>> functor = AgeFunctor(dob_col="birth_date") >>> df = pl.DataFrame({ ... "birth_date": [datetime(1990, 1, 1), datetime(1995, 1, 1), datetime(2000, 1, 1)], ... "timestamp": [datetime(2020, 1, 1), datetime(2021, 1, 1), datetime(2022, 1, 1)], ... }) >>> age_expr = functor.pl_expr().alias("age") >>> print(df.select(age_expr).get_column("age").to_list()) [29.998631074606433, 26.001368925393567, 22.001368925393567]-
OUTPUT_MODALITY : DataModality =
'univariate_regression'¶ This functor outputs a univariate regression measurement.
- pl_expr() pl.Expression[source]¶
Defines the Polars Expression of the functor.
This function must return a
polarsexpression that computes the value of the functor when evaluated on apolars.DataFramecontaining the static data and atimestampcolumn. This function must be overridden in subclasses.- Raises:¶
NotImplementedError – If this method is not overridden.
- update_from_prior_timepoint(prior_indices: LongTensor, prior_values: FloatTensor, new_delta: FloatTensor, new_time: FloatTensor, vocab: Vocabulary | None, measurement_metadata: Series | None) tuple[LongTensor, FloatTensor][source]¶
Returns the pre-processed age for the subject at a new timepoint.
This method is used during generation to compute the subject’s age (appropriately pre-processed) at new timepoints (that have been stochastically generated by the model). The function infers the outlier detection and normalization parameters through the
measurement_metadataargument.- Parameters:¶
- prior_indices: LongTensor¶
Prior timepoint associated indices.
- prior_values: FloatTensor¶
The subject’s age (fully pre-processed) as of the last observed prior timepoint.
- new_delta: FloatTensor¶
Delta time in minutes.
- new_time: FloatTensor¶
Raw time in minutes since 01/01/1970. This is not used in this functor.
- vocab: Vocabulary | None¶
Vocabulary config of a dataset. This is not used in this functor.
- measurement_metadata: Series | None¶
Metadata for the age measurement as determined in pre-processing.
- Returns:¶
The static index of the univariate age measurement and the new age of the subject at the new timepoint.
Examples
>>> import torch >>> import pandas as pd >>> prior_indices = torch.LongTensor([1, 1, 1]) >>> prior_ages = torch.LongTensor([20, 30, 40]) >>> age_mean = 30 >>> age_std = 10 >>> thresh_large = 100 >>> thresh_small = 0 >>> prior_values = (prior_ages - age_mean) / age_std >>> new_delta = torch.FloatTensor([1, 10, 2]) * (60*24*365.25) >>> measurement_metadata = pd.Series({ ... "mean": age_mean, ... "std": age_std, ... "thresh_large": thresh_large, ... "thresh_small": thresh_small, ... }) >>> functor = AgeFunctor(dob_col="birth_date") >>> new_indices, new_ages = functor.update_from_prior_timepoint( ... prior_indices=prior_indices, ... prior_values=prior_values, ... new_delta=new_delta, ... new_time=None, ... vocab=None, ... measurement_metadata=measurement_metadata, ... ) >>> print(new_indices) tensor([1, 1, 1]) >>> print(new_ages * age_std + age_mean) tensor([21., 40., 42.])
- class EventStream.data.time_dependent_functor.TimeDependentFunctor(**fn_params)[source]¶
Bases:
ABCAbstract base class for specifying functional time dependent measurements.
A functional time dependent measurement is specified by an analytical function that depends only on the time of the event and a subject’s static data. It must be specified in functional form so that we can appropriately produce these measurements dynamically during generation. These functions must be computable in two ways:
Via a
polarsexpression that can be evaluated on apolars.DataFramecontaining the static data and atimestampcolumn.Via a
torchfunction that takes as input the prior timepoint’s indices, values, and time, the time delta and time of the new event, and the vocabulary config and measurement metadata of a dataset, and returns the new indices and values of the output measurement.
In addition, such functions must also be convertible to and from plain dictionaries, which must store the name of their class, for serializability. This is an abstract base class, and subclasses must overwrite the
pl_exprandupdate_from_prior_timepointfunctions to be valid.- OUTPUT_MODALITY¶
The
DataModalityof the output of the function.
-
OUTPUT_MODALITY : DataModality =
'dropped'¶
- abstract pl_expr() pl.Expression[source]¶
Defines the Polars Expression of the functor.
This function must return a
polarsexpression that computes the value of the functor when evaluated on apolars.DataFramecontaining the static data and atimestampcolumn. This function must be overridden in subclasses.- Raises:¶
NotImplementedError – If this method is not overridden.
- abstract update_from_prior_timepoint(prior_indices: LongTensor, prior_values: FloatTensor, new_delta: FloatTensor, new_time: FloatTensor, vocab: Vocabulary | None, measurement_metadata: Series | None) tuple[LongTensor, FloatTensor][source]¶
Returns the pre-processed output for this measurement at a new timepoint.
This method is used during generation to compute the output of the functor at a new timepoint (that has been stochastically generated by the model) given historical data and the timepoint of the new event. This method must be overridden in subclasses.
- Parameters:¶
- prior_indices: LongTensor¶
Prior timepoint indices.
- prior_values: FloatTensor¶
Prior timepoint values.
- new_delta: FloatTensor¶
Delta time until new event in minutes.
- new_time: FloatTensor¶
Raw time in minutes of new event since 01/01/1970.
- vocab: Vocabulary | None¶
Vocabulary config of a dataset.
- measurement_metadata: Series | None¶
Metadata for the functional time dependent measurement as determined in pre-processing.
- Returns:¶
Tuple of the new indices and values of the output measurement.
- Raises:¶
NotImplementedError – If this method is not overridden.
- class EventStream.data.time_dependent_functor.TimeOfDayFunctor(**fn_params)[source]¶
Bases:
TimeDependentFunctorFunctor that returns the time-of-day in 4 categories of when the event occurred.
- OUTPUT_MODALITY¶
DataModality.SINGLE_LABEL_CLASSIFICATION
Example
>>> import polars as pl >>> from datetime import datetime >>> functor = TimeOfDayFunctor() >>> df = pl.DataFrame({ ... "timestamp": [datetime(2020, 1, 1, 0, 0, 0), datetime(2020, 1, 1, 6, 0, 0), ... datetime(2020, 1, 1, 12, 0, 0), datetime(2020, 1, 1, 18, 0, 0), ... datetime(2020, 1, 1, 23, 59, 59)], ... }) >>> time_of_day_expr = functor.pl_expr().alias("time_of_day") >>> print(df.select(time_of_day_expr).get_column("time_of_day").to_list()) ['EARLY_AM', 'AM', 'PM', 'PM', 'LATE_PM']-
OUTPUT_MODALITY : DataModality =
'single_label_classification'¶ This functor outputs a single-label classification task.
- pl_expr() pl.Expression[source]¶
Defines the Polars Expression of the functor.
This function must return a
polarsexpression that computes the value of the functor when evaluated on apolars.DataFramecontaining the static data and atimestampcolumn. This function must be overridden in subclasses.- Raises:¶
NotImplementedError – If this method is not overridden.
- update_from_prior_timepoint(prior_indices: LongTensor, prior_values: FloatTensor, new_delta: FloatTensor, new_time: FloatTensor, vocab: Vocabulary | None, measurement_metadata: Series | None) tuple[LongTensor, FloatTensor][source]¶
Returns the pre-processed time of day for the subject at a new timepoint.
This method is used during generation to compute the event’s time of day, realized as an integer index, at new timepoints (that have been stochastically generated by the model). The function infers the vocabulary information from the
vocabargument.- Parameters:¶
- prior_indices: LongTensor¶
Prior timepoint associated indices in the global vocabulary. Not used in this functor.
- prior_values: FloatTensor¶
An empty tensor (as this is a categorical measurement). This is not used in this functor.
- new_delta: FloatTensor¶
Delta time in minutes. This is not used in this functor.
- new_time: FloatTensor¶
Raw time in minutes since 01/01/1970.
- vocab: Vocabulary | None¶
Vocabulary config of a dataset.
- measurement_metadata: Series | None¶
None, as this is a categorical measurement. Not used in this functor.
- Returns:¶
Tuple of the new indices of the subsequent time of day, and a tensor of
nanvalues.
Examples
>>> from datetime import datetime >>> from .vocabulary import Vocabulary >>> import torch >>> functor = TimeOfDayFunctor() >>> vocab = Vocabulary(["UNK", "EARLY_AM", "AM", "PM", "LATE_PM"], [0, 4, 3, 2, 1]) >>> new_time = torch.tensor([ ... datetime(2020, 1, 1, 0, 0, 0).timestamp() / 60, ... datetime(2020, 1, 1, 6, 0, 0).timestamp() / 60, ... datetime(2020, 1, 1, 12, 0, 0).timestamp() / 60, ... datetime(2020, 1, 1, 21, 0, 0).timestamp() / 60, ... ]) >>> new_indices, new_values = functor.update_from_prior_timepoint( ... prior_indices=None, ... prior_values=torch.Tensor([1, 1, 1, 1]), ... new_delta=None, ... new_time=new_time, ... vocab=vocab, ... measurement_metadata=None, ... ) >>> print(new_indices) tensor([1, 2, 3, 4]) >>> print(new_values) tensor([nan, nan, nan, nan])