Feature Engine
Define features as aggregations of data over sliding time ranges.
Given a has-many relationship between two feature classes, Chalk can compute aggregations over the joined data for all rows, filtered by additional field predicates, or filtered by specific windows of time.
Define general aggregation features with aggregation functions in a Chalk expression, and use the windowed function for aggregations over fixed lookback windows.
For example, use windowed features to count the number of login attempts made by a user over the past 10 minutes, or to track the largest purchase amount a cardholder has made in the past 30 days.
Consider Materialized Windowed Aggregations to seamlessly pre-aggregate data for high-throughput or low-latency use cases.
Chalk supports a number of out-of-the-box aggregations. Chalk builtins are very performant as they’re optimized and run natively in C++ and Rust.
Aggregations automatically skip null or None values.
The following table lists the supported aggregation functions:
| Function | Notes |
|---|---|
sum | Support for vector and scalar feature aggregations. |
min | |
max | |
min_by_n | Returns a list of the n values for the minimum by values. |
max_by_n | Returns a list of the n values for the maximum by values. |
mean | Feature type must be Vector, float or float | None. None values are skipped, meaning they are not included in the mean calculation. |
count | |
std | Standard deviation. Requires at least 2 values. |
var | Variance. Same requirements as std. |
approx_count_distinct | An approximation of the cardinality of non-null data. |
approx_top_k | An approximation of the most common values in a field. Takes in a required k parameter, such as approx_top_k(k=5) for the five approximately-most common values. Up to k values will be returned, sometimes fewer. |
approx_percentile | An approximation of the value a given percentage of your data falls below. Accepts quantile parameter, between 0 and 1, such as approx_percentile(quantile=.5). |
For example, say you want to compute some aggregations over a document’s revisions. You could define the features, filters, and aggregations as below.
from chalk.features import features, DataFrame, _
from datetime import datetime
@features
class Document:
id: int
revisions: DataFrame[Revision]
# these are aggregations over the revisions DataFrame
num_revisions: int = _.revisions[_.id].count()
max_revision_size: float = _.revisions[_.size_bytes].max()
# you can also filter the rows being aggregated
earliest_large_revision_ts: datetime = _.revisions[_.size_bytes > 100,000,000, _.timestamp].min()
# some aggregations return a list
most_common_sizes: list[float] = _.revisions[_.size_bytes].approx_top_k(k=5)
@features
class Revision:
id: int
# this is a foreign key join between Document and Revision
document_id: Document.id
size_bytes: float
timestamp: datetime
In an aggregation, you can filter and sort by datetime features. In a single feature definition, Chalk enables you to configure a Windowed Aggregation, which computes the same aggregation over multiple time windows.
A windowed feature takes in a series of window durations, and an aggregation expression
for how to compute the feature outputs. Within the expression, you can reference the timestamp feature on the
feature class over which you are aggregating and compare to _.chalk_window and _.chalk_now to filter against
the current window’s time range.
The _.chalk_now keyword references the query time which will default to timestamp.now() but can be overridden when a different input time is specified at query time. This
allows you to query for an aggregation for any window duration at any point in time.
The _.chalk_window keyword refers to the start timestamp of each configured lookback window relative to _.chalk_now.
Here is an example of a windowed feature representing the number of failed logins in the last 10 minutes, 30 minutes, and 1 day:
from chalk import Windowed, windowed
from chalk.features import DataFrame, _
from datetime import datetime
@features
class LoginAttempt:
id: int
user: "User.id"
status: str
at: datetime
@features
class User:
id: int
login_attempts: DataFrame[LoginAttempt]
num_failed_logins: Windowed[int] = windowed(
"10m", "30m", "1d",
max_staleness="10m",
expression=_.login_attempts[
_.status=="failed",
_.at > _.chalk_window,
_.at < _.chalk_now,
].count(),
default=0,
)
most_common_statuses: Windowed[list[str]] = windowed(
"1h",
max_staleness="15m",
expression=_.login_attempts[
_.status,
_.at > _.chalk_window,
_.at < _.chalk_now,
].approx_top_k(k=5)
)Windowed features support much of the same functionality as normal features. They are most often used alongside
max_staleness and
etl_offline_to_online.
Windowed features are typically resolved, either:
For aggregating across all features for all time, Chalk offers "all" and "infinity" as keywords for
window durations. As a default, Chalk will look back 100 years for the "all" window.
from chalk.features import features, DataFrame, _
from chalk.streams import windowed, Windowed
@features
class TennisPlayer:
id: int
games: DataFrame[Game]
lifetime_wins: Windowed[int] = windowed(
"all",
expression=_.games[_.winner_id == _.id].count(),
default=0,
)A windowed feature can be referenced in a query or a resolver in Python using the window as defined in the feature definition, but Chalk will convert every windowed feature to a FQN (fully qualified name) that defines the window duration in seconds. So, when referencing the feature for a resolver, you can use the Python syntax. However, when referencing a feature in a query, you can use the Python syntax when querying with the Chalk Python SDK, otherwise the CLI will require the FQN syntax. If you do not specify a window when referencing a windowed feature, Chalk will return all windows.
Given the feature definition below, the following showcase the ways to reference the respective windows in either Python syntax or using the FQN.
from chalk.features import features
from chalk.streams import windowed, Windowed
@features
class User:
id: int
...
num_failed_logins: Windowed[int] = windowed(
"10m",
"1h30m",
"1d",
expression=...,
)| Feature Definition Window | Python Syntax | Fully Qualified Name |
|---|---|---|
| “10m” | User.num_failed_logins[“10m”] | User.num_failed_logins__600__ |
| “1h30m” | User.num_failed_logins[“1h30m”] | User.num_failed_logins__5400__ |
| “1d” | User.num_failed_logins[“1d”] | User.num_failed_logins__86400__ |
Windowed features can be inputs to resolvers:
@online
def account_under_attack(
failed_logins_30m: User.num_failed_logins["30m"],
failed_logins_1d: User.num_failed_logins["1d"]
) -> ...:
return failed_logins_30m > 10 or failed_logins_1d > 100Windowed features can also be referenced by other windowed features in the same feature class using
expressions and the _.chalk_window operator. For example, we can compute the average
transaction amount over different time windows:
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
at: datetime
@features
class User:
id: int
transactions: DataFrame[Transaction]
sum_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[
_.amount,
_.at > _.chalk_window,
_.at < _.chalk_now,
].sum(),
)
count_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[
_.at > _.chalk_window,
_.at < _.chalk_now,
].count(),
)
avg_transaction_amount: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=(
_.sum_transactions[_.chalk_window] /
_.count_transactions[_.chalk_window]
)
)Windowed features are typically computed using either raw data or pre-aggregated data. For larger datasets, some systems may pre-aggregate batch data to optimize performance at the cost of real-time accuracy.
Chalk supports both modes of operation:
materialization parameter of windowed features. See Materialized Windowed Aggregations for guidance on when to use them.