Overview

A Chalk Function is a Python callable that you deploy onto a Scaling Group and register in the Chalk Catalog. Decorate a function with @chalkcompute.function and it becomes reachable from anywhere Chalk runs — local scripts and notebooks, other functions, and (because of the Catalog registration) the feature engine: Chalk SQL, resolvers, the query server, and DataFrames via F.catalog_call(). Arguments and results are serialized automatically; you write ordinary Python and Chalk handles scheduling, scaling, and delivery.

On top of what a bare Scaling Group provides, functions add async / streaming invocation, nested calls, automatic retries, concurrency caps, rate limits, and request-queue-driven autoscaling. Reach for a function when you want any of those features, or when you want the callable to be visible to the rest of Chalk.

Functions support three return styles:

  • Single value — return a result and the caller receives it.
  • Async generatoryield results incrementally and the caller receives them as they’re produced. See Streaming results below for an end-to-end example.
  • Nested calls — a function can call other functions, composing pipelines without manual orchestration. See Nested function calls below for a pipeline example.

Defining a function

Use the @chalkcompute.function decorator:

import chalkcompute

@chalkcompute.function
def add(x: int, y: int) -> int:
    return x + y

Deploy by running the script — the decorator registers the function with Chalk. Call wait_ready() to block until the function is available, then use .remote(...) to invoke the deployed function:

add.wait_ready()
result = add.remote(3, 5)
print(result)  # 8

add(3, 5) runs the Python body locally; use .remote(...) for a deployed call.


Resource configuration

The decorator accepts a wide set of arguments that control the compute environment a function runs in — image, CPU/memory/GPU, secrets, volumes, batching, retries, rate limits, concurrency, autoscaling, and tracing. A realistic decorator usually combines several of them:

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install sentence-transformers",
    ),
    cpu="2",
    memory="4Gi",
    gpu="nvidia-l4",
    secrets=[chalkcompute.Secret.from_env("HF_TOKEN")],
    max_batching_size=25,
    max_buffer_duration=2000,
    retries=3,
)
def embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()

This page walks through the higher-level concepts behind each argument in its own section below — batching, retries, rate limits, concurrency, autoscaling, volumes, and tracing. For the full per-argument reference, types, and defaults, see @chalkcompute.function in the API reference.


Secrets

Inject secrets into a function using the secrets parameter. Secrets are resolved at deploy time and made available as environment variables inside the container:

import os
from urllib.parse import urlparse

import chalkcompute
from chalkcompute import Secret

@chalkcompute.function(
   secrets=[
       Secret.from_env("OPENAI_API_KEY"),
       Secret.from_integration("prod_postgres"),
   ],
)
def call_api(prompt: str) -> str:
   openai_api_key = os.environ["OPENAI_API_KEY"]
   database_url = os.environ["DATABASE_URL"]

   headers = {"Authorization": f"Bearer {openai_api_key}"}
   db_host = urlparse(database_url).hostname
   # Pass headers to your OpenAI-compatible client and database_url to
   # your Postgres client.
   return f"ready to call OpenAI for {prompt!r} and log to {db_host}"

See the Secrets section of the overview for the full Secret API.


Batching

When your handler is more efficient processing multiple items at once (e.g. GPU batch inference), enable batching with max_buffer_duration and/or max_batching_size. Chalk will accumulate incoming items and invoke the handler in chunks:

@chalkcompute.function(
    max_buffer_duration=1000,  # buffer up to 1000 ms
    max_batching_size=32,      # or up to 32 items
    gpu="nvidia-l4",
)
def batch_embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()

When batching is enabled, handler arguments are delivered as lists or a single pyarrow table. Defaults apply when either parameter is set:

ParameterDefault (when batching enabled)
max_buffer_duration1000 ms
max_batching_size10 items

Retry policies

Functions can automatically retry on failure. Pass a simple integer for max-attempts with default exponential backoff, or use RetryPolicy for full control:

import chalkcompute
from chalkcompute import RetryPolicy

# Simple: retry up to 3 times with exponential backoff
@chalkcompute.function(retries=3)
def flaky_api(query: str) -> str:
    ...

# Exponential backoff with custom parameters
@chalkcompute.function(
    retries=RetryPolicy.exponential(
        attempts=5,
        initial=0.5,
        max_wait=30.0,
    ),
)
def resilient_call(payload: str) -> str:
    ...

# Fixed delay, only retry on specific exceptions
@chalkcompute.function(
    retries=RetryPolicy.linear(
        attempts=3,
        delay=2.0,
        retry_on=(TimeoutError, ConnectionError),
    ),
)
def network_call(url: str) -> str:
    ...

RetryPolicy.exponential()

Exponential backoff — wait times grow geometrically: initial, initial * multiplier, initial * multiplier², …, capped at max_wait:

ParameterTypeDefaultDescription
attemptsint3Max retry attempts
initialfloat1.0Base wait time in seconds
multiplierfloat2.0Backoff multiplier
max_waitfloat60.0Upper bound on wait time
jitterboolTrueAdd random jitter
retry_ontuple[type, ...](Exception,)Exception types that trigger retry

RetryPolicy.linear()

Fixed delay between retries:

ParameterTypeDefaultDescription
attemptsint3Max retry attempts
delayfloat1.0Fixed wait in seconds
jitterboolFalseAdd random jitter
retry_ontuple[type, ...](Exception,)Exception types that trigger retry

Exception filtering

By default, all Exception subclasses trigger a retry. Pass a tuple of exception classes to retry_on to restrict which failures are retried:

RetryPolicy.exponential(
    attempts=4,
    retry_on=(ConnectionError, TimeoutError, OSError),
)

Non-matching exceptions propagate immediately without consuming retry budget.


Rate limiting

Cap the outbound request rate of a function using rate_limit. Pass an integer for a simple “N per second” cap, or a RateLimitPolicy for full control:

import chalkcompute
from chalkcompute import RateLimitPolicy

# Simple: up to 100 requests per second
@chalkcompute.function(rate_limit=100)
def call_provider(query: str) -> str:
    ...

# Full control: rate + time unit + shared-bucket key
@chalkcompute.function(
    rate_limit=RateLimitPolicy(rate=1000, per="minute", key="provider-api"),
)
def call_other_endpoint(query: str) -> str:
    ...

RateLimitPolicy

ParameterTypeDefaultDescription
rateintrequiredMaximum requests per per unit
perstr"second"Time unit: "second", "minute", or "hour"
keystrfunction nameShared-bucket identifier (see below)

Sharing a bucket across functions

Two functions declaring the same key share a single rate-limit bucket. This is useful when multiple functions call the same downstream service and the service imposes a limit across all callers:

shared = RateLimitPolicy(rate=1000, per="second", key="provider-api")

@chalkcompute.function(rate_limit=shared)
def lookup(query: str) -> str: ...

@chalkcompute.function(rate_limit=shared)
def enrich(record: dict) -> dict: ...
# Calls to both functions count against the same 1000/s budget.

If key is omitted, each function gets its own bucket keyed by the function name, so rate limits are per-function by default.


Concurrency limits

Cap the in-flight count of a function using concurrency. This is distinct from rate_limit — rate limits gate requests-over-time, concurrency gates how many calls are executing simultaneously:

import chalkcompute
from chalkcompute import ConcurrencyPolicy

# Simple: at most 10 calls in flight at once
@chalkcompute.function(concurrency=10)
def gpu_infer(text: str) -> list[float]:
    ...

# Shared gate across multiple functions hitting the same resource
gpu_pool = ConcurrencyPolicy(max_concurrent=4, key="gpu-pool")

@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def embed(text: str) -> list[float]: ...

@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def rerank(query: str, docs: list[str]) -> list[int]: ...
# Only 4 calls across both functions run simultaneously.

ConcurrencyPolicy

ParameterTypeDefaultDescription
max_concurrentintrequiredMaximum in-flight handler invocations
keystrfunction nameShared-bucket identifier

When `concurrency` is not specified, the function queue consumer applies a default of `max_concurrent=1` per function — calls to a single function are serialized. Set `concurrency=N` explicitly to allow parallel execution.

Rate limit vs concurrency

Both can be set independently and they interact:

PolicyControlsExample use case
rate_limitRequests per unit time“1000 calls/sec across all replicas”
concurrencyParallel in-flight count“4 GPU slots, don’t exceed memory”

Autoscaling

Functions accept four autoscaling hints that shape how replicas scale up and down in response to load:

@chalkcompute.function(
    min_replicas=1,
    max_replicas=8,
    scaling_interval_seconds=30,
    target_cpu_utilization_percentage=70,
    shutdown_delay_seconds=120,
    gpu="nvidia-l4",
)
def infer(text: str) -> str:
    ...
ParameterDescription
min_replicasLower bound — replicas never scale below this. Use 0 for scale-to-zero.
max_replicasUpper bound — replicas never scale above this.
scaling_interval_secondsHow often the autoscaler scrapes metrics and makes scaling decisions. Defaults to 60s on the server.
target_cpu_utilization_percentageTarget average CPU utilization (0–100). Autoscaler adds replicas when the average exceeds this, removes them when below.
target_queue_depthTarget pending-request queue depth per replica. When set, the autoscaler scales on the function’s own queue depth instead of (or in addition to) CPU.
shutdown_delay_secondsGrace period on scale-down before replicas are forcibly killed. Lets in-flight requests finish cleanly. Defaults to 30s on the server.

CPU-driven vs queue-driven scaling

  • CPU (target_cpu_utilization_percentage): good for compute-bound functions — model inference, data transformation, anything where CPU is the bottleneck.
  • Queue depth (target_queue_depth): good for I/O-bound or variable-latency functions, or when you want to scale on demand rather than resource saturation. Scales up when calls pile up regardless of CPU.

Both can be set together; the autoscaler takes the higher signal.


Volumes

Attach persistent storage to a function with the volumes parameter. Each entry is a (name, mount_path) string tuple. Inside the function, the volume is accessible as an ordinary filesystem path — read and write files normally:

import chalkcompute

@chalkcompute.function(
    volumes=[("model-cache", "/models")],
    gpu="nvidia-l4",
)
def predict(text: str) -> list[float]:
    import os
    import json
    from sentence_transformers import SentenceTransformer

    cache_path = "/models/all-MiniLM-L6-v2"
    if os.path.exists(cache_path):
        model = SentenceTransformer(cache_path)
    else:
        model = SentenceTransformer("all-MiniLM-L6-v2")
        model.save(cache_path)

    return model.encode(text).tolist()

Volumes persist across invocations — data written by one call is available to subsequent calls. This is useful for caching large model weights, storing intermediate results, or sharing data between functions that mount the same volume.


Calling from a DataFrame

You can invoke a function on every row of a DataFrame using F.catalog_call() inside a .project() expression. This is useful for batch-processing columnar data — Chalk handles parallelism, batching, and delivery automatically.

For example, define a function that extracts named entities from text:

import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install spacy && python -m spacy download en_core_web_sm",
    ),
    cpu="2",
    memory="4Gi",
)
def extract_entities(memo: str) -> str:
    import json
    import spacy
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(memo)
    return json.dumps([
        {"text": ent.text, "label": ent.label_}
        for ent in doc.ents
    ])

Then apply it across a parquet file of transaction memos:

from chalkcompute import DataFrame, F, _

results = (
    DataFrame.scan("transactions.parquet")
    .project({
        "transaction_id": _.transaction_id,
        "memo": _.memo,
        "entities": F.catalog_call("extract_entities", _.memo),
    })
)

Each row’s memo column is passed to extract_entities and the result is returned as a new entities column. The original DataFrame is not mutated — .project() returns only the columns you specify.


Tracing

Tracing helps you understand where time is spent when a function runs. Use it to debug slow calls, follow nested function calls, and see which parts of your application are doing work.

After a traced call runs, view it from the function’s Scaling Group page under Remote Call Traces.

Enabling tracing

Set tracing=True on a function:

import chalkcompute

@chalkcompute.function(
    cpu="1",
    memory="1Gi",
    tracing=True,
)
def score_transaction(transaction_id: str) -> float:
    return 0.42

Nested calls appear in the same trace when tracing is enabled on each function:

import chalkcompute

@chalkcompute.function(tracing=True)
def fetch_profile(user_id: str) -> dict:
    return {"user_id": user_id, "segment": "active"}


@chalkcompute.function(tracing=True)
def score_user(user_id: str) -> float:
    with chalkcompute.span("score_user.fetch_profile"):
        profile = fetch_profile.remote(user_id)

    return 0.9 if profile["segment"] == "active" else 0.1


with chalkcompute.span("demo.score_user"):
    score = score_user.remote("user_123")

In the trace you can see the outer function, the call to fetch_profile, and the inner function’s execution together. Add manual spans like score_user.fetch_profile to label important parts of your own code.

Manual spans

Use chalkcompute.span() to label meaningful application work:

with chalkcompute.span("checkout.score_transaction"):
    score = score_transaction.remote("txn_123")

For a whole caller function, use @chalkcompute.traced:

@chalkcompute.traced("batch.score_transactions")
def run_batch(transaction_ids: list[str]) -> list[float]:
    return [score_transaction.remote(txn_id) for txn_id in transaction_ids]

Manual spans are useful around model calls, retrieval steps, preprocessing, and other code you want to separate in the trace. To see a remote function’s own execution, enable tracing=True or a TracingPolicy on that function.

Tracing policies

Use TracingPolicy when you only want to collect a percentage of calls:

@chalkcompute.function(
    tracing=chalkcompute.TracingPolicy(sample_rate=0.25),
)
def recommend(user_id: str) -> list[str]:
    ...

sample_rate must be between 0.0 and 1.0.

ConfigurationBehavior
tracing=TrueCollect traces for every call.
tracing=TracingPolicy(sample_rate=0.25)Collect traces for about 25% of calls.
tracing=TracingPolicy(sample_rate=0.0)Trace this function only when its caller is already traced.
tracing=FalseTurn tracing off for the function.
OmittedUse the default behavior: only continue a caller’s trace if tracing support is already installed in the image.

Once a call is traced, nested function calls stay in the same trace.

tracing=True and TracingPolicy(...) automatically install tracing support for functions deployed with @chalkcompute.function or RemoteFunction. If you omit tracing, Chalk does not add those packages for you. The default can still continue an existing trace if your image already includes tracing support, for example because it installs chalkcompute or chalk-remote-call-python[tracing].

Troubleshooting

If you do not see remote function spans:

  • Redeploy the function after changing tracing.
  • Enable tracing on both outer and inner functions to see both functions’ execution in a nested trace.
  • Wrap local tests in chalkcompute.span() if you want the call to be part of an explicit trace.
  • Make sure you are looking at the called function’s Scaling Group page and a time range that includes the call.

Streaming results with async generators

If your function is an async generator (uses yield), the caller receives each value as soon as the worker produces it, without waiting for the generator to finish.

This makes functions a good fit for:

  • Streaming LLM token output.
  • Incremental ETL where downstream consumers start processing early.
  • Long-running jobs that emit progress events.

Example: audio transcription

Transcribe an audio file and yield each segment as it’s decoded:

import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install faster-whisper httpx",
    ),
    cpu=4,
    memory="8Gi",
)
async def transcribe(audio_uri: str):
    import httpx
    from faster_whisper import WhisperModel

    async with httpx.AsyncClient() as client:
        resp = await client.get(audio_uri)
        resp.raise_for_status()

    audio_path = "/tmp/audio"
    with open(audio_path, "wb") as f:
        f.write(resp.content)

    model = WhisperModel("base.en", compute_type="int8")
    segments, _info = model.transcribe(audio_path)

    for segment in segments:
        yield {
            "start": segment.start,
            "end": segment.end,
            "text": segment.text,
        }

The caller iterates over segments as they arrive:

transcribe.wait_ready()

async for segment in transcribe.remote("https://example.com/meeting-recording.wav"):
    print(f"[{segment['start']:.1f}s] {segment['text']}")

Output streams in real time:

[0.0s]  Welcome to the quarterly review.
[3.2s]  Let's start with the revenue numbers.
[6.1s]  Q3 came in at 4.2 million, up 18 percent.
...

The caller receives the first segment as soon as it's decoded — it does not wait for the full file to finish processing. For a 30-minute recording, the first segment may arrive in seconds.


Nested function calls

A function can call other functions with .remote(...). Each inner call is dispatched independently, so you can compose pipelines where each step runs on its own infrastructure.

import chalkcompute

@chalkcompute.function
def translate(text: str, target_lang: str) -> str:
    from transformers import pipeline
    translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-en-{target_lang}")
    return translator(text)[0]["translation_text"]


@chalkcompute.function
async def transcribe_and_translate(audio_uri: str, target_lang: str):
    """Transcribe audio, then translate each segment."""
    translate.wait_ready()

    async for segment in transcribe.remote(audio_uri):
        translated = await translate.remote(segment["text"], target_lang)
        yield {
            **segment,
            "translated": translated,
        }

The caller sees a single stream of translated segments:

transcribe_and_translate.wait_ready()

async for seg in transcribe_and_translate.remote(
    "https://example.com/meeting.wav",
    target_lang="es",
):
    print(f"[{seg['start']:.1f}s] {seg['translated']}")
[0.0s]  Bienvenidos a la revisión trimestral.
[3.2s]  Comencemos con las cifras de ingresos.
[6.1s]  El tercer trimestre llegó a 4,2 millones, un 18 por ciento más.

The outer function composes two inner calls — transcribe (async generator) and translate (single-value). Each runs on its own compute configuration.


Error handling

If a function raises an exception, the caller receives it as a RuntimeError with the original exception type and message:

@chalkcompute.function
def divide(a: float, b: float) -> float:
    return a / b

divide.wait_ready()

try:
    divide.remote(1.0, 0.0)
except RuntimeError as e:
    print(e)  # ZeroDivisionError: division by zero

For async generators, an exception mid-stream terminates the iterator. Any segments already yielded are still available to the caller — only subsequent iteration raises:

@chalkcompute.function
async def fragile_gen():
    yield "ok-1"
    yield "ok-2"
    raise ValueError("something went wrong")

async for item in fragile_gen.remote():
    print(item)
# ok-1
# ok-2
# RuntimeError: ValueError: something went wrong

How it works

@chalkcompute.function is backed by Chalk Compute’s async execution layer. At a high level, calling a function goes through three moving parts:

┌──────────┐         ┌──────────────────┐         ┌───────────┐
│  Caller  │──call──▸│  Function Queue  │──run───▸│  Worker   │
│          │◂results─│                  │◂results─│ (Python)  │
└──────────┘         └──────────────────┘         └───────────┘
  • The caller (local script, notebook, another function) submits an invocation and receives results over a single bidirectional stream.
  • The Function Queue durably accepts the work, assigns each call an ID, enforces configured policies (concurrency, rate limit, retry), and relays results back to the caller.
  • The worker is the Python process that actually runs your function code.

Callers and workers are loosely coupled — the caller never opens a connection directly to a worker. Workers can scale up, scale down, or get rescheduled mid-flight and in-flight calls are not lost. This is what enables streaming async generators, nested calls, and policy-enforced dispatch.

Policy enforcement

The queue enforces the policies declared on each function — rate limit, concurrency, and retries — before dispatching work to the worker.

PolicyWhat it caps
rate_limitOutbound requests per second / minute / hour, optionally shared across functions via a key.
concurrencyIn-flight invocations at once, optionally shared across functions via a key. Defaults to 1 per function.
retriesAutomatic retry of transient failures (UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED) with exponential backoff and jitter. Other errors are terminal.

Concurrency is acquired before dispatching, so a burst of queued calls backpressures on the semaphore rather than spawning unbounded tasks. Rate-limit tokens are consumed per attempt — each retry takes a fresh token.

Retry safety

Retries are safe for failures that happen before the function has started streaming results — connection errors, DEADLINE_EXCEEDED on the initial call, etc. If a function has already yielded chunks when it fails, those chunks are visible to the caller and a retry would produce an ambiguous stream. Retries therefore apply cleanly only to pre-stream failures today; making mid-stream retries safe is a known follow-up.