Build a complete ML pipeline from features to inference
This tutorial walks through a complete machine learning workflow in Chalk: defining features, connecting to Snowflake data sources, creating point-in-time correct training datasets, training a model, registering it, and deploying it for inference.
We’ll build an event engagement prediction model that predicts whether a user is likely to engage with a notification.
First, define feature classes that represent the entities in your domain. Feature classes use the @features decorator and define typed attributes for each feature.
from datetime import datetime
from chalk import windowed, Windowed
from chalk.features import features, DataFrame, _
import chalk.functions as F
@features
class User:
id: int
# Account features
email: str
signup_date: datetime
account_type: str # one of: "free", "basic", "premium"
# Windowed activity features - automatically creates versions for each window
total_logins: Windowed[int] = windowed(
"1d",
"7d",
"30d",
expression=_.events[
_.type == "login",
_.ts > _.chalk_window,
_.ts <= _.chalk_now
].count(),
default=0
)
first_login_unix_seconds: Windowed[int | None] = windowed(
"1d",
"7d",
"30d",
"all",
expression=_.events[
_.ts_unix_seconds,
_.type == "login",
_.ts > _.chalk_window,
_.ts <= _.chalk_now
].min(),
default=None
)
# Non-windowed activity feature
days_since_last_login: int | None = (
(F.unix_seconds(_.chalk_now) - _.first_login_unix_seconds["all"]) / 86400
)
# Windowed billing features
monthly_spend: float
# Computed features
account_age_days: int
engagement_score: float
# Has-many relationship to events
events: "DataFrame[Event]"
# Timestamp for feature time
ts: datetime
@features
class Event:
id: int
user_id: User.id
user: User
type: str
ts: datetime
ts_unix_seconds: int = F.unix_seconds(_.ts)
event_clicked: "EventClicked | None"
event_was_clicked: bool = F.is_not_null(_.event_clicked.ts)
@features
class EventClicked:
id: Event.id
ts: datetimeThe windowed() function creates multiple versions of a feature, one for each time window. You can reference specific windows using bracket notation:
User.total_logins["1d"] - logins in the last dayUser.total_logins["7d"] - logins in the last 7 daysUser.total_logins["30d"] - logins in the last 30 daysConnect Chalk to your Snowflake data warehouse. First, configure the integration in the Chalk dashboard, then reference it in your code.
In the Chalk dashboard, navigate to Integrations > Data Sources and add a Snowflake integration with your credentials (account, warehouse, database, schema, role).
from chalk.sql import SnowflakeSource
# Reference the integration configured in the dashboard
snowflake = SnowflakeSource(name="sf")Create SQL files that map your Snowflake tables to Chalk features.
-- type: offline
-- resolves: User
-- source: sf
SELECT
id,
email,
signup_date,
account_type,
monthly_spend
FROM users-- type: offline
-- resolves: Event
-- source: sf
SELECT
id,
user_id,
ts,
type
FROM events-- type: offline
-- resolves: EventClicked
-- source: sf
SELECT
id,
ts
FROM events_clickedCreate python resolvers to computed some simple derived features:
from datetime import datetime
from chalk import online, Now
from src.features import User
@online
def compute_account_age(
signup_date: User.signup_date,
now: Now
) -> User.account_age_days:
return (now - signup_date).days
@online
def compute_engagement_score(
logins: User.total_logins["30d"], # Reference 30-day window
days_inactive: User.days_since_last_login,
) -> User.engagement_score:
# Simple engagement formula using 30-day windowed features
activity_score = min(logins / 30, 1.0) * 0.6
recency_score = max(0, 1 - (days_inactive / 30)) * 0.4
score = (activity_score + recency_score) * 100
return scoreUse offline_query to create a training dataset with point-in-time correctness. This ensures that feature values reflect what was known at each historical point, preventing data leakage.
from datetime import datetime, timedelta
from chalk.client import ChalkClient
from src.features import Event
client = ChalkClient()
# Create the dataset with point-in-time correctness
# Use bracket notation to select specific windows for windowed features
dataset = client.offline_query(
input_sql="""
SELECT
id as "event.id",
ts as "__ts__"
FROM "sf.prod.events"
WHERE type='recommendation';
""",
output=[
Event.id,
Event.user.id,
Event.user.account_type,
Event.user.total_logins["1d"], # 1-day window
Event.user.total_logins["7d"], # 7-day window
Event.user.total_logins["30d"], # 30-day window
Event.user.days_since_last_login,
Event.user.monthly_spend,
Event.user.account_age_days,
Event.user.engagement_score,
Event.event_was_clicked
],
dataset_name="event_recommendation_dataset",
run_asynchronously=True, # For large datasets
)
# Retrieve the data as a pandas DataFrame
df = dataset.get_data_as_pandas()
print(f"Dataset shape: {df.shape}")
print(df.head())With your point-in-time correct dataset, train a model and save it as a pickle. The pickle will be uploaded to a volume and loaded by the handler at container startup.
import joblib
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import precision_score, recall_score, roc_auc_score
from sklearn.model_selection import train_test_split
# Column names from the offline_query dataset in Step 3
feature_columns = [
'event.user.total_logins__30d__',
'event.user.days_since_last_login',
'event.user.monthly_spend',
'event.user.account_age_days',
'event.user.engagement_score',
]
# Use the dataset from Step 3
X = df[feature_columns]
y = df["event.event_was_clicked"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y,
)
model = GradientBoostingClassifier(
n_estimators=100,
max_depth=5,
learning_rate=0.1,
random_state=42,
)
model.fit(X_train, y_train)
y_pred_proba = model.predict_proba(X_test)[:, 1]
y_pred = model.predict(X_test)
metrics = {
"auc_roc": float(roc_auc_score(y_test, y_pred_proba)),
"precision": float(precision_score(y_test, y_pred)),
"recall": float(recall_score(y_test, y_pred)),
}
print(f"Model metrics: {metrics}")
joblib.dump(model, "model.pkl")Next, create a handler that loads the pickle on startup and runs inference. The handler receives input features as PyArrow arrays and returns a PyArrow array of predictions.
import joblib
import numpy as np
import pyarrow as pa
input_features = [
"login_count_30d",
"days_since_last_login",
"monthly_spend",
"account_age_days",
"engagement_score",
]
model = None
def on_startup():
global model
model = joblib.load("/app/artifacts/model.pkl")
def handler(event: dict[str, pa.Array], context: dict) -> pa.Array:
X = np.column_stack([event[name].to_numpy() for name in input_features])
probabilities = model.predict_proba(X)[:, 1]
return pa.array(probabilities, type=pa.float64())The handler's `on_startup` function runs once when the container starts, loading the pickle so the model is ready before any inference requests arrive.
Build a container image with the handler, register the model version, and upload the trained pickle to a volume.
import os
import pyarrow as pa
from chalkcompute import Image
from chalk.client import ChalkClient
from chalk.client.model_image import chalk_handler_volume_name, upload_model_to_volume
input_features = [
"login_count_30d",
"days_since_last_login",
"monthly_spend",
"account_age_days",
"engagement_score",
]
DIR = os.path.dirname(__file__)
client = ChalkClient()
image = (
Image.debian_slim("3.11")
.pip_install([
"chalk-remote-call-python",
"pyarrow",
"scikit-learn>=1.4",
"numpy>=1.26",
"joblib",
])
.add_local_file(os.path.join(DIR, "src/handler.py"), "/app/handler.py", strategy="copy")
.env({"PYTHONPATH": "/app"})
.workdir("/app")
.entrypoint([
"chalk-remote-call",
"--handler",
"handler.handler",
"--on-startup",
"handler.on_startup",
"--port",
"8080",
])
)
result = client.register_model_version(
name="interaction-model",
input_schema={name: pa.float64() for name in input_features},
output_schema={"interaction_probability": pa.float64()},
model_image=image,
)
print(f"Registered model: {result.model_name} v{result.model_version}")
volume_name = chalk_handler_volume_name("interaction-model", result.model_version)
upload_model_to_volume(
volume_name=volume_name,
model_filename="model.pkl",
model_file_path=os.path.join(DIR, "model.pkl"),
chalk_client=client,
)
print(f"Uploaded model pickle to volume: {volume_name}")Deploy the registered model version to a scaling group so it can serve real-time predictions.
from chalk.client import ChalkClient
from chalk.scalinggroup import AutoScalingSpec, ScalingGroupResourceRequest
client = ChalkClient()
# Use the version returned from registration, or look it up
model_version = 1
client.deploy_model_version_to_scaling_group(
name="interaction-model-sg",
model_name="interaction-model",
model_version=model_version,
handler="handler.handler",
scaling=AutoScalingSpec(
min_replicas=1,
max_replicas=2,
target_cpu_utilization_percentage=70,
),
resources=ScalingGroupResourceRequest(cpu="1000m", memory="1Gi"),
)
print(f"Deployed interaction-model v{model_version} to scaling group interaction-model-sg")Add an interaction_probability feature to your Event class that calls your deployed model using F.catalog_call. The first argument is model.<scaling-group-name>, and the remaining arguments are the input features in the same order as your input_schema.
from chalk.features import features, _
import chalk.functions as F
@features
class Event:
# ... existing features ...
interaction_probability: float = F.catalog_call(
"model.interaction-model-sg",
_.user.total_logins["30d"],
_.user.days_since_last_login,
_.user.monthly_spend,
_.user.account_age_days,
_.user.engagement_score,
)Deploy to make the feature available:
chalk applyNow you can query predictions through the Chalk API:
To get good performance, you'll want to add an online source for your features (like a postgres or a stream).
from chalk.client import ChalkClient
from src.features import Event
client = ChalkClient()
result = client.query(
input={Event.id: 12345},
output=[Event.interaction_probability],
)
print(f"Interaction probability: {result.get_feature_value(Event.interaction_probability)}")For more details on specific topics: