This tutorial walks through a complete machine learning workflow in Chalk: defining features, connecting to Snowflake data sources, creating point-in-time correct training datasets, training a model, registering it, and deploying it for inference.

We’ll build an event engagement prediction model that predicts whether a user is likely to engage with a notification.


Step 1: Define Your Features

First, define feature classes that represent the entities in your domain. Feature classes use the @features decorator and define typed attributes for each feature.

src/features.py
from datetime import datetime

from chalk import windowed, Windowed
from chalk.features import features, DataFrame, _
import chalk.functions as F


@features
class User:
    id: int

    # Account features
    email: str
    signup_date: datetime
    account_type: str  # one of: "free", "basic", "premium"

    # Windowed activity features - automatically creates versions for each window
    total_logins: Windowed[int] = windowed(
        "1d",
        "7d",
        "30d",
        expression=_.events[
            _.type == "login",
            _.ts > _.chalk_window,
            _.ts <= _.chalk_now
        ].count(),
        default=0
    )
    first_login_unix_seconds: Windowed[int | None] = windowed(
        "1d",
        "7d",
        "30d",
        "all",
        expression=_.events[
            _.ts_unix_seconds,
            _.type == "login",
            _.ts > _.chalk_window,
            _.ts <= _.chalk_now
        ].min(),
        default=None
    )

    # Non-windowed activity feature
    days_since_last_login: int | None = (
        (F.unix_seconds(_.chalk_now) - _.first_login_unix_seconds["all"]) / 86400
    )

    # Windowed billing features
    monthly_spend: float

    # Computed features
    account_age_days: int
    engagement_score: float

    # Has-many relationship to events
    events: "DataFrame[Event]"

    # Timestamp for feature time
    ts: datetime


@features
class Event:
    id: int
    user_id: User.id
    user: User
    type: str
    ts: datetime
    ts_unix_seconds: int = F.unix_seconds(_.ts)
    event_clicked: "EventClicked | None"
    event_was_clicked: bool = F.is_not_null(_.event_clicked.ts)

    # Model prediction
    interaction_probability: float


@features
class EventClicked:
    id: Event.id
    ts: datetime

The windowed() function creates multiple versions of a feature, one for each time window. You can reference specific windows using bracket notation:

  • User.total_logins["1d"] - logins in the last day
  • User.total_logins["7d"] - logins in the last 7 days
  • User.total_logins["30d"] - logins in the last 30 days

Step 2: Configure Snowflake Data Sources

Connect Chalk to your Snowflake data warehouse. First, configure the integration in the Chalk dashboard, then reference it in your code.

Dashboard Configuration

In the Chalk dashboard, navigate to Settings > Data Sources and add a Snowflake integration with your credentials (account, warehouse, database, schema, role).

Define Sources in Code

src/datasources.py
from chalk.sql import SnowflakeSource

# Reference the integration configured in the dashboard
snowflake = SnowflakeSource(name="sf")

Create SQL File Resolvers

Create SQL files that map your Snowflake tables to Chalk features.

-- type: offline
-- resolves: User
-- source: sf

SELECT
    id,
    email,
    signup_date,
    account_type,
    monthly_spend
FROM users
-- type: offline
-- resolves: Event
-- source: sf

SELECT
    id,
    user_id,
    ts,
    type
FROM events
-- type: offline
-- resolves: EventClicked
-- source: sf

SELECT
    id,
    ts
FROM events_clicked

Create Python Resolvers

Create python resolvers to computed some simple derived features:

src/resolvers/computed.py
from datetime import datetime

from chalk import online, Now
from src.features import User


@online
def compute_account_age(
    signup_date: User.signup_date,
    now: Now
) -> User.account_age_days:
    return (now - signup_date).days


@online
def compute_engagement_score(
    logins: User.total_logins["30d"],      # Reference 30-day window
    days_inactive: User.days_since_last_login,
) -> User.engagement_score:
    # Simple engagement formula using 30-day windowed features
    activity_score = min(logins / 30, 1.0) * 0.6
    recency_score = max(0, 1 - (days_inactive / 30)) * 0.4

    score = (activity_score + recency_score) * 100
    return score

Step 3: Create a Point-in-Time Correct Training Dataset

Use offline_query to create a training dataset with point-in-time correctness. This ensures that feature values reflect what was known at each historical point, preventing data leakage.

from datetime import datetime, timedelta
from chalk.client import ChalkClient
from src.features import Event

client = ChalkClient()

# Create the dataset with point-in-time correctness
# Use bracket notation to select specific windows for windowed features
dataset = client.offline_query(
    input_sql="""
    SELECT 
        id as "event.id", 
        ts  as "__ts__"
    FROM "sf.prod.events"
    WHERE type='recommendation';
    """,
    output=[
        Event.id,
        Event.user.id,
        Event.user.account_type,
        Event.user.total_logins["1d"],      # 1-day window
        Event.user.total_logins["7d"],      # 7-day window
        Event.user.total_logins["30d"],     # 30-day window
        Event.user.days_since_last_login,
        Event.user.monthly_spend,
        Event.user.account_age_days,
        Event.user.engagement_score,
        Event.event_was_clicked
    ],
    dataset_name="event_recommendation_dataset",
    run_asynchronously=True,  # For large datasets
)

# Retrieve the data as a pandas DataFrame
df = dataset.get_data_as_pandas()
print(f"Dataset shape: {df.shape}")
print(df.head())

Step 4: Train Your Model

With your point-in-time correct dataset, train a model. Here’s an example using scikit-learn:

import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score

# Prepare features and labels
# Windowed feature columns include the window in the column name
feature_columns = [
    'event.user.total_logins__30d__',        # 30-day login count
    'event.user.days_since_last_login',
    'event.user.monthly_spend',
    'event.user.account_age_days',
    'event.user.engagement_score',
]

X = df[feature_columns]
y = df['event.event_was_clicked']

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Train the model
model = GradientBoostingClassifier(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42
)
model.fit(X_train, y_train)

# Evaluate
y_pred_proba = model.predict_proba(X_test)[:, 1]
y_pred = model.predict(X_test)

metrics = {
    "auc_roc": roc_auc_score(y_test, y_pred_proba),
    "precision": precision_score(y_test, y_pred),
    "recall": recall_score(y_test, y_pred),
}
print(f"Model metrics: {metrics}")

Step 5: Register the Model

Register your trained model in the Chalk Model Registry. This versions your model and makes it available for deployment.

from datetime import datetime
from chalk.client import ChalkClient

client = ChalkClient()

# Register the trained model version
client.register_model_version(
    name="InteractionModel",
    aliases=["production", "v1.0"],
    model=model,  # Pass the sklearn model directly
    metadata={
        "framework": "sklearn",
        "algorithm": "GradientBoostingClassifier",
        "training_date": datetime.now().isoformat(),
        "training_dataset": "event_recommendation_dataset",
        "feature_columns": feature_columns,
        "metrics": metrics,
    },
    input_features=[
        Event.user.total_logins["30d"],
        Event.user.days_since_last_login,
        Event.user.monthly_spend,
        Event.user.account_age_days,
        Event.user.engagement_score,
    ],
    output_features=[
        Event.interaction_probability
    ]


print("Model registered successfully!")

Step 6: Deploy the Model for Inference

Create a model resolver that connects your registered model to your feature class for real-time inference.

src/resolvers/model_inference.py
from chalk.ml import ModelReference, make_model_resolver
from src.features import Event

# Reference the registered model
interaction_model = ModelReference.from_alias(
    name="InteractionModel",
    alias="production",
)

# Or reference by specific version:
# interaction_model = ModelReference.from_version(
#     name="InteractionModel",
#     version=1,
# )

# Create a resolver that runs inference
# Use bracket notation to specify which window to use for each windowed feature
interaction_resolver = make_model_resolver(
    name="get_interaction_prediction",
    model=interaction_model,
    inputs=[
        Event.user.total_logins["30d"],
        Event.user.days_since_last_login,
        Event.user.monthly_spend,
        Event.user.account_age_days,
        Event.user.engagement_score,
    ],
    output=[Event.interaction_probability],
)

Deploy your Chalk project to make the model available:

chalk apply

Step 7: Query Predictions

Now you can query interaction predictions in real-time through the Chalk API:

To get good performance, you'll want to add an online source for your features (like a postgres or a stream).

from chalk.client import ChalkClient
from src.features import Event

client = ChalkClient()

# Get interaction probability for a single event
# You can query multiple windows of the same feature
result = client.query(
    input={Event.id: 12345},
    output=[
        Event.interaction_probability,
    ],
)

print(f"Interaction probability: {result.get_feature_value(Event.interaction_probability)}")
print(f"Logins (1d): {result.get_feature_value(Event.user.total_logins['1d'])}")
print(f"Logins (7d): {result.get_feature_value(Event.user.total_logins['7d'])}")
print(f"Logins (30d): {result.get_feature_value(Event.user.total_logins['30d'])}")

Batch Predictions

For batch inference, use query_bulk:

event_ids = [1, 2, 3, 4, 5]

results = client.query_bulk(
    input={Event.id: event_ids},
    output=[Event.id, Event.interaction_probability, Event.user.total_logins["30d"]],
)

predictions_df = results.get_data_as_pandas()
print(predictions_df)

Next Steps

For more details on specific topics: