Overview

Chalk Functions let you define Python functions that run remotely on Chalk Compute. Decorate a function with @chalkcompute.function, and it becomes callable from any Python process — locally, from a notebook, or from another Chalk function. Arguments and results are serialized automatically; you write ordinary Python and Chalk handles scheduling, scaling, and delivery.

Functions support three return styles:

  • Single value — return a result and the caller receives it.
  • Async generatoryield results incrementally and the caller receives them as they’re produced. See Function Queue → Streaming results for an end-to-end example.
  • Nested calls — a function can call other functions, composing pipelines without manual orchestration. See Function Queue → Nested function calls for a pipeline example.

Defining a function

Use the @chalkcompute.function decorator:

import chalkcompute

@chalkcompute.function
def add(x: int, y: int) -> int:
    return x + y

Deploy by running the script — the decorator registers the function with Chalk. Call wait_ready() to block until the function is available:

add.wait_ready()
result = add(3, 5)
print(result)  # 8

Resource configuration

Functions accept resource hints that control the compute environment:

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install sentence-transformers",
    ),
    cpu=2,
    memory="4Gi",
    gpu="nvidia-l4",
    secrets=[chalkcompute.Secret("HF_TOKEN")],
    max_batching_size=25,
    max_buffer_duration=2000,
    retries=3,
)
def embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()
ParameterDescription
nameCustom function name. Defaults to the function’s __name__.
imageContainer image to deploy in. Defaults to Image.debian_slim().
cpuCPU resource request (e.g. 2, "500m").
memoryMemory resource request (e.g. "4Gi", "512Mi").
gpuGPU resource request (e.g. "nvidia-l4", "nvidia-a100"). Format is "<count>:<type>" or just "<type>" for a single GPU.
envEnvironment variables to pass to the function.
secretsList of Secret references to inject into the function’s environment.
volumesList of (Volume(name), mount_path) tuples for persistent storage.
min_replicasMinimum number of replicas.
max_replicasMaximum number of replicas.
shutdown_delay_secondsGraceful termination period in seconds before replicas are forcibly killed during scale-down. Defaults to 30s on the server.
scaling_interval_secondsHow often (in seconds) the autoscaler scrapes metrics to make scaling decisions. Defaults to 60s on the server.
target_cpu_utilization_percentageTarget CPU utilization percentage (0–100) that drives autoscaling.
target_queue_depthTarget pending-request queue depth per replica. When set, the autoscaler scales on the function’s own queue depth.
serialization_formatWire format for arguments and results. Currently only "pyarrow" is supported.
optionsArbitrary key-value options forwarded to the Chalk platform.
max_buffer_durationMaximum time in milliseconds to buffer incoming items before invoking the handler. Defaults to 1000 ms when batching is enabled.
max_batching_sizeMaximum number of items to accumulate before invoking the handler. Defaults to 10 when batching is enabled.
retriesRetry policy for handler invocations. Pass an int for simple max-attempts with default exponential backoff, or a RetryPolicy for full control.
rate_limitOutbound request rate cap. Pass an int for a simple “N per second” cap, or a RateLimitPolicy for full control over rate, per, and a shared-bucket key.
concurrencyIn-flight call cap. Pass an int for a simple max_concurrent cap, or a ConcurrencyPolicy for full control including a shared-bucket key.

Secrets

Inject secrets into a function using the secrets parameter. Secrets are resolved at deploy time and made available as environment variables inside the container:

import chalkcompute

@chalkcompute.function(
    secrets=[
        chalkcompute.Secret.from_env("OPENAI_API_KEY"),
        chalkcompute.Secret.from_integration("prod_postgres"),
        chalkcompute.Secret.from_local_env("MY_LOCAL_TOKEN"),
    ],
)
def call_api(prompt: str) -> str:
    import os
    import httpx
    key = os.environ["OPENAI_API_KEY"]
    # ...

See the Secrets section of the overview for the full Secret API.


Batching

When your handler is more efficient processing multiple items at once (e.g. GPU batch inference), enable batching with max_buffer_duration and/or max_batching_size. Chalk will accumulate incoming items and invoke the handler in chunks:

@chalkcompute.function(
    max_buffer_duration=1000,  # buffer up to 1000 ms
    max_batching_size=32,      # or up to 32 items
    gpu="nvidia-l4",
)
def batch_embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()

When batching is enabled, handler arguments are delivered as lists or a single pyarrow table. Defaults apply when either parameter is set:

ParameterDefault (when batching enabled)
max_buffer_duration1000 ms
max_batching_size10 items

Retry policies

Functions can automatically retry on failure. Pass a simple integer for max-attempts with default exponential backoff, or use RetryPolicy for full control:

import chalkcompute
from chalkcompute import RetryPolicy

# Simple: retry up to 3 times with exponential backoff
@chalkcompute.function(retries=3)
def flaky_api(query: str) -> str:
    ...

# Exponential backoff with custom parameters
@chalkcompute.function(
    retries=RetryPolicy.exponential(
        attempts=5,
        initial=0.5,
        max_wait=30.0,
    ),
)
def resilient_call(payload: str) -> str:
    ...

# Fixed delay, only retry on specific exceptions
@chalkcompute.function(
    retries=RetryPolicy.linear(
        attempts=3,
        delay=2.0,
        retry_on=(TimeoutError, ConnectionError),
    ),
)
def network_call(url: str) -> str:
    ...

RetryPolicy.exponential()

Exponential backoff — wait times grow geometrically: initial, initial * multiplier, initial * multiplier², …, capped at max_wait:

ParameterTypeDefaultDescription
attemptsint3Max retry attempts
initialfloat1.0Base wait time in seconds
multiplierfloat2.0Backoff multiplier
max_waitfloat60.0Upper bound on wait time
jitterboolTrueAdd random jitter
retry_ontuple[type, ...](Exception,)Exception types that trigger retry

RetryPolicy.linear()

Fixed delay between retries:

ParameterTypeDefaultDescription
attemptsint3Max retry attempts
delayfloat1.0Fixed wait in seconds
jitterboolFalseAdd random jitter
retry_ontuple[type, ...](Exception,)Exception types that trigger retry

Exception filtering

By default, all Exception subclasses trigger a retry. Pass a tuple of exception classes to retry_on to restrict which failures are retried:

RetryPolicy.exponential(
    attempts=4,
    retry_on=(ConnectionError, TimeoutError, OSError),
)

Non-matching exceptions propagate immediately without consuming retry budget.


Rate limiting

Cap the outbound request rate of a function using rate_limit. Pass an integer for a simple “N per second” cap, or a RateLimitPolicy for full control:

import chalkcompute
from chalkcompute import RateLimitPolicy

# Simple: up to 100 requests per second
@chalkcompute.function(rate_limit=100)
def call_provider(query: str) -> str:
    ...

# Full control: rate + time unit + shared-bucket key
@chalkcompute.function(
    rate_limit=RateLimitPolicy(rate=1000, per="minute", key="provider-api"),
)
def call_other_endpoint(query: str) -> str:
    ...

RateLimitPolicy

ParameterTypeDefaultDescription
rateintrequiredMaximum requests per per unit
perstr"second"Time unit: "second", "minute", or "hour"
keystrfunction nameShared-bucket identifier (see below)

Sharing a bucket across functions

Two functions declaring the same key share a single rate-limit bucket. This is useful when multiple functions call the same downstream service and the service imposes a limit across all callers:

shared = RateLimitPolicy(rate=1000, per="second", key="provider-api")

@chalkcompute.function(rate_limit=shared)
def lookup(query: str) -> str: ...

@chalkcompute.function(rate_limit=shared)
def enrich(record: dict) -> dict: ...
# Calls to both functions count against the same 1000/s budget.

If key is omitted, each function gets its own bucket keyed by the function name, so rate limits are per-function by default.


Concurrency limits

Cap the in-flight count of a function using concurrency. This is distinct from rate_limit — rate limits gate requests-over-time, concurrency gates how many calls are executing simultaneously:

import chalkcompute
from chalkcompute import ConcurrencyPolicy

# Simple: at most 10 calls in flight at once
@chalkcompute.function(concurrency=10)
def gpu_infer(text: str) -> list[float]:
    ...

# Shared gate across multiple functions hitting the same resource
gpu_pool = ConcurrencyPolicy(max_concurrent=4, key="gpu-pool")

@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def embed(text: str) -> list[float]: ...

@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def rerank(query: str, docs: list[str]) -> list[int]: ...
# Only 4 calls across both functions run simultaneously.

ConcurrencyPolicy

ParameterTypeDefaultDescription
max_concurrentintrequiredMaximum in-flight handler invocations
keystrfunction nameShared-bucket identifier

When `concurrency` is not specified, the function queue consumer applies a default of `max_concurrent=1` per function — calls to a single function are serialized. Set `concurrency=N` explicitly to allow parallel execution.

Rate limit vs concurrency

Both can be set independently and they interact:

PolicyControlsExample use case
rate_limitRequests per unit time“1000 calls/sec across all replicas”
concurrencyParallel in-flight count“4 GPU slots, don’t exceed memory”

Autoscaling

Functions accept four autoscaling hints that shape how replicas scale up and down in response to load:

@chalkcompute.function(
    min_replicas=1,
    max_replicas=8,
    scaling_interval_seconds=30,
    target_cpu_utilization_percentage=70,
    shutdown_delay_seconds=120,
    gpu="nvidia-l4",
)
def infer(text: str) -> str:
    ...
ParameterDescription
min_replicasLower bound — replicas never scale below this. Use 0 for scale-to-zero.
max_replicasUpper bound — replicas never scale above this.
scaling_interval_secondsHow often the autoscaler scrapes metrics and makes scaling decisions. Defaults to 60s on the server.
target_cpu_utilization_percentageTarget average CPU utilization (0–100). Autoscaler adds replicas when the average exceeds this, removes them when below.
target_queue_depthTarget pending-request queue depth per replica. When set, the autoscaler scales on the function’s own queue depth instead of (or in addition to) CPU.
shutdown_delay_secondsGrace period on scale-down before replicas are forcibly killed. Lets in-flight requests finish cleanly. Defaults to 30s on the server.

CPU-driven vs queue-driven scaling

  • CPU (target_cpu_utilization_percentage): good for compute-bound functions — model inference, data transformation, anything where CPU is the bottleneck.
  • Queue depth (target_queue_depth): good for I/O-bound or variable-latency functions, or when you want to scale on demand rather than resource saturation. Scales up when calls pile up regardless of CPU.

Both can be set together; the autoscaler takes the higher signal.


Volumes

Attach persistent storage to a function with the volumes parameter. Each entry is a (Volume(...), mount_path) tuple. Inside the function, the volume is accessible as an ordinary filesystem path — read and write files normally:

import chalkcompute
from chalkcompute import Volume

@chalkcompute.function(
    volumes=[(Volume("model-cache"), "/models")],
    gpu="nvidia-l4",
)
def predict(text: str) -> list[float]:
    import os
    import json
    from sentence_transformers import SentenceTransformer

    cache_path = "/models/all-MiniLM-L6-v2"
    if os.path.exists(cache_path):
        model = SentenceTransformer(cache_path)
    else:
        model = SentenceTransformer("all-MiniLM-L6-v2")
        model.save(cache_path)

    return model.encode(text).tolist()

Volumes persist across invocations — data written by one call is available to subsequent calls. This is useful for caching large model weights, storing intermediate results, or sharing data between functions that mount the same volume.


Calling from a DataFrame

You can invoke a function on every row of a DataFrame using F.catalog_call() inside a .project() expression. This is useful for batch-processing columnar data — Chalk handles parallelism, batching, and delivery automatically.

For example, define a function that extracts named entities from text:

import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install spacy && python -m spacy download en_core_web_sm",
    ),
    cpu=2,
    memory="4Gi",
)
def extract_entities(memo: str) -> str:
    import json
    import spacy
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(memo)
    return json.dumps([
        {"text": ent.text, "label": ent.label_}
        for ent in doc.ents
    ])

Then apply it across a parquet file of transaction memos:

from chalkcompute import DataFrame, F, _

results = (
    DataFrame.scan("transactions.parquet")
    .project({
        "transaction_id": _.transaction_id,
        "memo": _.memo,
        "entities": F.catalog_call("extract_entities", _.memo),
    })
)

Each row’s memo column is passed to extract_entities and the result is returned as a new entities column. The original DataFrame is not mutated — .project() returns only the columns you specify.


Error handling

If a function raises an exception, the caller receives it as a RuntimeError with the original exception type and message:

@chalkcompute.function
def divide(a: float, b: float) -> float:
    return a / b

divide.wait_ready()

try:
    divide(1.0, 0.0)
except RuntimeError as e:
    print(e)  # ZeroDivisionError: division by zero

For async generators, an exception mid-stream terminates the iterator. Any segments already yielded are still available to the caller — only subsequent iteration raises:

@chalkcompute.function
async def fragile_gen():
    yield "ok-1"
    yield "ok-2"
    raise ValueError("something went wrong")

for item in fragile_gen():
    print(item)
# ok-1
# ok-2
# RuntimeError: ValueError: something went wrong

How it works

@chalkcompute.function is backed by the Function Queue, Chalk Compute’s async execution layer. Calls are accepted, assigned an ID, and dispatched to a worker without the caller holding a direct connection to any specific worker process — this is what enables streaming async generators, nested calls, and policy-enforced dispatch. See the Function Queue page for architecture and end-to-end examples.