Chalk home page
Docs
SDK
CLI
  1. Streaming
  2. Streams

Chalk enables users to convert and aggregate streaming messages into Chalk features in realtime.

The first step towards building a streaming environment with Chalk is creating a streaming source from one of Chalk’s integrations. Chalk supports Kafka, Kinesis, and PubSub for streaming. Like other integration sources, source parameters and credentials may be specified in either the dashboard or the source instantiation in code. See more information on KafkaSource, KinesisSource, and PubSubSource in the API docs.

from chalk.streams import KafkaSource, KinesisSource, PubSubSource

kafka_topic = KafkaSource(name="transactions")
kinesis_topic = KinesisSource(name="transactions")
pubsub_topic = PubSubSource(name="transactions")

Now, let’s set up the Chalk features you’d like to materialize upon stream message ingestion.

from chalk.features import features, Features

@features(max_staleness="1d")
class Transaction:
    id: str
    amount: float

Streaming resolvers produce feature values that are persisted to the offline store. Similar to other resolvers, streaming resolvers can also persist computed feature values to the online store by setting etl_offline_to_online=True or max_staleness in the @features decorator.

Chalk supports two kinds of streaming resolvers: mapping and windowed. This page focuses on mapping resolvers, which create one Chalk feature instance per message. Windowed streaming is covered in the windowed streaming page.

This is an example of a mapping resolver that maps from a bytes message to an instance of Transaction.

from chalk.streams import stream

@stream(source=source)
def stream_resolver(message: bytes) -> Features[Transaction.id, Transaction.value]:
    id, value = some_bytes_processing_step(message)
    return Transaction(id=id, value=value)

In this resolver, we pass in the raw bytes from the stream. If you’re using JSON as the encoding for messages on your topic, you can optionally specify a Pydantic Model as a wrapper for messages on the topic. Chalk will validate the encoding against the model.

from chalk.streams import KafkaSource
from pydantic import BaseModel

@features(max_staleness="1d")
class Transaction:
    id: str
    amount: float

kafka_topic = KafkaSource(name="transactions")

class TransactionTopicMessage(BaseModel):
    id: str
    amount: float

@stream(source=kafka_topic)
def stream_resolver(
    message: TransactionTopicMessage,
) -> Features[Transaction.id, Transaction.value]:
    return Transaction(id=message.id, value=message.value)

Late arrivals

Chalk lets you configure whether resolvers should accept late-arriving stream messages. By default, Chalk attempts to consider any late arriving in stream resolvers. However, you can tune this behavior with the late_arrival_deadline argument to you stream source:

from chalk.streams import KafkaSource

# By default, the late_arrival_deadline is set to "infinity".
source = KafkaSource(late_arrival_deadline="30m")

If a message is older than the late_arrival_deadline when it is consumed, its resolver will not run.


Parsing

Sometimes, messages must be processed before resolver execution. Streaming resolvers can optionally support a parse function that preprocesses messages. Possible use cases include

  • Ingesting bytes and preprocessing before inputting data into a BaseModel.
  • Optionally skipping resolver execution for certain messages.
  • Re-keying for windowed stream aggregations.

The parse function runs before the resolver, and can transform the message into a format that the stream resolver understands. If the parse function returns None, the resolver will be skipped.

A simple parse function can ingest bytes into a BaseModel, which will be used as input for the streaming resolver.

from pydantic import BaseModel

class TransactionMessage(BaseModel):
    id: str
    value: str

def parse_bytes(data: bytes) -> TransactionMessage:
    id, amount = some_bytes_processing_step(data)
    return TransactionMessage(id=id, amount=amount)

@stream(source=source, parse=parse_bytes)
def stream_resolver(message: TransactionMessage) -> Features[Transaction.id, Transaction.amount]:
    return Transaction(id=message.id, amount=message.amount)

In the below example, only UserEventMessages which have a click_event property will be processed by resolve_clicks. Those where it is equal to None will not be processed, and skipped entirely.

from pydantic import BaseModel

# Child messages
class UserLoginEvent(BaseModel):
    ...

class UserClickEvent(BaseModel):
    ...

# Parent message contains one of the child message types
class UserEventMessage(BaseModel):
    login_event: UserLoginEvent | None = None
    click_event: UserClickEvent | None = None

def get_click_event(event: UserEventMessage) -> UserClickEvent | None:
    return event.click_event # can be None

@stream(source=str_source, parse=get_click_event)
def resolve_clicks(message: UserClickEvent) -> Features[...]:
    ...

Full example

In this example, each of the messages from our Kafka source will be converted to a StreamFeature instance. Our streaming message, embodied by KafkaMessage, contains a string representation of a naive datetime we would like to convert to a timezoned datetime.

Upon message arrival, the bytes are first parsed into KafkaMessage, then run through the parse function parse_message. The intermediate output ParsedMessage is fed to the stream_resolver, which produces Chalk features. Because we have specified max staleness and etl_offline_to_online, we can expect StreamFeature to be queryable in both online and offline contexts.

from datetime import datetime, timezone
from dateutil import parser
from pydantic import BaseModel
from chalk.features import features, Features
from chalk.streams import stream, KafkaSource


source = KafkaSource(name="...")

class KafkaMessage(BaseModel):
    id: str
    value: str
    naive_timestamp_str: str

class ParsedMessage(BaseModel):
    id: str
    value: str
    event_timestamp: datetime


@features(max_staleness="1d", etl_offline_to_online=True)
class StreamFeature:
    id: str
    value: str
    event_timestamp: datetime

def parse_message(kafka_message: KafkaMessage) -> ParsedMessage:
    parsed_timestamp: datetime = parser.parse(kafka_message.naive_timestamp_str)
    timezoned_timestamp = parsed_timestamp.replace(tzinfo=timezone.utc)
    return ParsedMessage(
        id=kafka_message.id,
        value=kafka_message.value,
        event_timestamp=timezoned_timestamp
    )

@stream(source=source, parse=parse_message)
def kafka_stream_resolver(message: ParsedMessage) -> Features[StreamFeature.id, StreamFeature.value]:
    return StreamFeature(id=message.id, value=message.value, event_timestamp=message.event_timestamp)

Testing Stream Resolvers

Testing stream resolvers can be difficult: streaming is a complex paradigm that involves multiple services that can be difficult to replicate.

If your streaming resolvers error in production, the errors will be logged and displayed on the dashboard for the resolver. But if you’re looking to unit test your stream resolvers, you have a few options.

One option is to feed dummy data into the resolver locally. Like regular resolvers, stream resolvers should be unit-testable as regular Python functions.

from pydantic import BaseModel
from chalk.features import features, Features
from chalk.streams import stream, KinesisSource

kinesis_source = KinesisSource(name="...")

class KinesisMessage(BaseModel):
    payment_id: str
    amount: int

@features(max_staleness="1d", etl_offline_to_online=True)
class KinesisPaymentFeatures:
    id: str
    amount: int

@stream(source=kinesis_source)
def kinesis_stream_resolver(
    message: KinesisMessage,
) -> Features[KinesisPaymentFeatures.id, KinesisPaymentFeatures.amount]:
    return KinesisPaymentFeatures(id=message.payment_id, amount=message.amount)


model_example = KinesisMessage(id="1", amount=5)
result = kinesis_stream_resolver(model_example)
assert result.id == "1"

However, if you have a parse function, custom timestamps, or other advanced functionality you want to test, you may have to send messages to the streaming server. Read more on how to use the ChalkClient to test streaming resolvers here). This will pass messages through your resolvers without persistence, and will return you a Polars DataFrame with your output data for inspection.

from pydantic import BaseModel
from chalk.features import features, Features
from chalk.streams import stream, KinesisSource
from chalk.client import ChalkClient

kinesis_source = KinesisSource(name="...")

class KinesisMessage(BaseModel):
    payment_id: str
    amount: int

@features(max_staleness="1d", etl_offline_to_online=True)
class KinesisPaymentFeatures:
    id: str
    amount: int

# Note the use of a parse function here.
def parse_bytes(b: bytes) -> KinesisMessage:
    return KinesisMessage.parse_raw(b)

@stream(source=kinesis_source, parse=parse_bytes)
def kinesis_stream_resolver_with_parse(
    message: KinesisMessage,
) -> Features[KinesisPaymentFeatures.id, KinesisPaymentFeatures.amount]:
    return KinesisPaymentFeatures(id=message.payment_id, amount=message.amount)

client = ChalkClient()
messages = [KinesisMessage(payment_id=str(i), amount=i * 10).json().encode('utf-8') for i in range(10)]
resp = client.test_streaming_resolver(
    resolver="kinesis_stream_resolver_with_parse",
    message_bodies=messages,
)
print(resp.features)

Online / Offline Storage

With mapping streaming resolvers, as with Python and SQL resolvers, Chalk will persist values to the offline store if there is an offline store deployed in the environment. You would be able to access these values with an offline query or through the SQL Interface. If you would like to query features as resolved by a stream resolver, then the feature must have etl_offline_to_online=True and max_staleness set in the feature definition. With these two settings, the Chalk streaming server will also persist feature values to the online store, such that you can access recent feature values through the online query pathway.

Note: if you would like to resolve features in a feature class with a materialized aggregation defined as part of the feature class, you should apply the max_staleness specifically to the features for which you would like to online query. Otherwise, the aggregation will return the last observed value based on the max_staleness, rather than the actual computed aggregation value.