Build a complete ML pipeline from features to inference
This tutorial walks through a complete machine learning workflow in Chalk: defining features, connecting to Snowflake data sources, creating point-in-time correct training datasets, training a model, registering it, and deploying it for inference.
We’ll build an event engagement prediction model that predicts whether a user is likely to engage with a notification.
First, define feature classes that represent the entities in your domain. Feature classes use the @features decorator and define typed attributes for each feature.
from datetime import datetime
from chalk import windowed, Windowed
from chalk.features import features, DataFrame, _
import chalk.functions as F
@features
class User:
id: int
# Account features
email: str
signup_date: datetime
account_type: str # one of: "free", "basic", "premium"
# Windowed activity features - automatically creates versions for each window
total_logins: Windowed[int] = windowed(
"1d",
"7d",
"30d",
expression=_.events[
_.type == "login",
_.ts > _.chalk_window,
_.ts <= _.chalk_now
].count(),
default=0
)
first_login_unix_seconds: Windowed[int | None] = windowed(
"1d",
"7d",
"30d",
"all",
expression=_.events[
_.ts_unix_seconds,
_.type == "login",
_.ts > _.chalk_window,
_.ts <= _.chalk_now
].min(),
default=None
)
# Non-windowed activity feature
days_since_last_login: int | None = (
(F.unix_seconds(_.chalk_now) - _.first_login_unix_seconds["all"]) / 86400
)
# Windowed billing features
monthly_spend: float
# Computed features
account_age_days: int
engagement_score: float
# Has-many relationship to events
events: "DataFrame[Event]"
# Timestamp for feature time
ts: datetime
@features
class Event:
id: int
user_id: User.id
user: User
type: str
ts: datetime
ts_unix_seconds: int = F.unix_seconds(_.ts)
event_clicked: "EventClicked | None"
event_was_clicked: bool = F.is_not_null(_.event_clicked.ts)
# Model prediction
interaction_probability: float
@features
class EventClicked:
id: Event.id
ts: datetimeThe windowed() function creates multiple versions of a feature, one for each time window. You can reference specific windows using bracket notation:
User.total_logins["1d"] - logins in the last dayUser.total_logins["7d"] - logins in the last 7 daysUser.total_logins["30d"] - logins in the last 30 daysConnect Chalk to your Snowflake data warehouse. First, configure the integration in the Chalk dashboard, then reference it in your code.
In the Chalk dashboard, navigate to Settings > Data Sources and add a Snowflake integration with your credentials (account, warehouse, database, schema, role).
from chalk.sql import SnowflakeSource
# Reference the integration configured in the dashboard
snowflake = SnowflakeSource(name="sf")Create SQL files that map your Snowflake tables to Chalk features.
-- type: offline
-- resolves: User
-- source: sf
SELECT
id,
email,
signup_date,
account_type,
monthly_spend
FROM users-- type: offline
-- resolves: Event
-- source: sf
SELECT
id,
user_id,
ts,
type
FROM events-- type: offline
-- resolves: EventClicked
-- source: sf
SELECT
id,
ts
FROM events_clickedCreate python resolvers to computed some simple derived features:
from datetime import datetime
from chalk import online, Now
from src.features import User
@online
def compute_account_age(
signup_date: User.signup_date,
now: Now
) -> User.account_age_days:
return (now - signup_date).days
@online
def compute_engagement_score(
logins: User.total_logins["30d"], # Reference 30-day window
days_inactive: User.days_since_last_login,
) -> User.engagement_score:
# Simple engagement formula using 30-day windowed features
activity_score = min(logins / 30, 1.0) * 0.6
recency_score = max(0, 1 - (days_inactive / 30)) * 0.4
score = (activity_score + recency_score) * 100
return scoreUse offline_query to create a training dataset with point-in-time correctness. This ensures that feature values reflect what was known at each historical point, preventing data leakage.
from datetime import datetime, timedelta
from chalk.client import ChalkClient
from src.features import Event
client = ChalkClient()
# Create the dataset with point-in-time correctness
# Use bracket notation to select specific windows for windowed features
dataset = client.offline_query(
input_sql="""
SELECT
id as "event.id",
ts as "__ts__"
FROM "sf.prod.events"
WHERE type='recommendation';
""",
output=[
Event.id,
Event.user.id,
Event.user.account_type,
Event.user.total_logins["1d"], # 1-day window
Event.user.total_logins["7d"], # 7-day window
Event.user.total_logins["30d"], # 30-day window
Event.user.days_since_last_login,
Event.user.monthly_spend,
Event.user.account_age_days,
Event.user.engagement_score,
Event.event_was_clicked
],
dataset_name="event_recommendation_dataset",
run_asynchronously=True, # For large datasets
)
# Retrieve the data as a pandas DataFrame
df = dataset.get_data_as_pandas()
print(f"Dataset shape: {df.shape}")
print(df.head())With your point-in-time correct dataset, train a model. Here’s an example using scikit-learn:
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score
# Prepare features and labels
# Windowed feature columns include the window in the column name
feature_columns = [
'event.user.total_logins__30d__', # 30-day login count
'event.user.days_since_last_login',
'event.user.monthly_spend',
'event.user.account_age_days',
'event.user.engagement_score',
]
X = df[feature_columns]
y = df['event.event_was_clicked']
# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train the model
model = GradientBoostingClassifier(
n_estimators=100,
max_depth=5,
learning_rate=0.1,
random_state=42
)
model.fit(X_train, y_train)
# Evaluate
y_pred_proba = model.predict_proba(X_test)[:, 1]
y_pred = model.predict(X_test)
metrics = {
"auc_roc": roc_auc_score(y_test, y_pred_proba),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
}
print(f"Model metrics: {metrics}")Register your trained model in the Chalk Model Registry. This versions your model and makes it available for deployment.
from datetime import datetime
from chalk.client import ChalkClient
client = ChalkClient()
# Register the trained model version
client.register_model_version(
name="InteractionModel",
aliases=["production", "v1.0"],
model=model, # Pass the sklearn model directly
metadata={
"framework": "sklearn",
"algorithm": "GradientBoostingClassifier",
"training_date": datetime.now().isoformat(),
"training_dataset": "event_recommendation_dataset",
"feature_columns": feature_columns,
"metrics": metrics,
},
input_features=[
Event.user.total_logins["30d"],
Event.user.days_since_last_login,
Event.user.monthly_spend,
Event.user.account_age_days,
Event.user.engagement_score,
],
output_features=[
Event.interaction_probability
]
print("Model registered successfully!")Create a model resolver that connects your registered model to your feature class for real-time inference.
from chalk.ml import ModelReference, make_model_resolver
from src.features import Event
# Reference the registered model
interaction_model = ModelReference.from_alias(
name="InteractionModel",
alias="production",
)
# Or reference by specific version:
# interaction_model = ModelReference.from_version(
# name="InteractionModel",
# version=1,
# )
# Create a resolver that runs inference
# Use bracket notation to specify which window to use for each windowed feature
interaction_resolver = make_model_resolver(
name="get_interaction_prediction",
model=interaction_model,
inputs=[
Event.user.total_logins["30d"],
Event.user.days_since_last_login,
Event.user.monthly_spend,
Event.user.account_age_days,
Event.user.engagement_score,
],
output=[Event.interaction_probability],
)Deploy your Chalk project to make the model available:
chalk applyNow you can query interaction predictions in real-time through the Chalk API:
To get good performance, you'll want to add an online source for your features (like a postgres or a stream).
from chalk.client import ChalkClient
from src.features import Event
client = ChalkClient()
# Get interaction probability for a single event
# You can query multiple windows of the same feature
result = client.query(
input={Event.id: 12345},
output=[
Event.interaction_probability,
],
)
print(f"Interaction probability: {result.get_feature_value(Event.interaction_probability)}")
print(f"Logins (1d): {result.get_feature_value(Event.user.total_logins['1d'])}")
print(f"Logins (7d): {result.get_feature_value(Event.user.total_logins['7d'])}")
print(f"Logins (30d): {result.get_feature_value(Event.user.total_logins['30d'])}")For batch inference, use query_bulk:
event_ids = [1, 2, 3, 4, 5]
results = client.query_bulk(
input={Event.id: event_ids},
output=[Event.id, Event.interaction_probability, Event.user.total_logins["30d"]],
)
predictions_df = results.get_data_as_pandas()
print(predictions_df)For more details on specific topics: