Compute
Define and call remote functions that run on Chalk Compute.
A Chalk Function is a Python callable that you deploy onto a
Scaling Group and register in the
Chalk Catalog. Decorate a function
with @chalkcompute.function and it becomes reachable from anywhere Chalk
runs — local scripts and notebooks, other functions, and (because of the
Catalog registration) the feature engine: Chalk SQL, resolvers, the query
server, and DataFrames via F.catalog_call(). Arguments and results are
serialized automatically; you write ordinary Python and Chalk handles
scheduling, scaling, and delivery.
On top of what a bare Scaling Group provides, functions add async / streaming invocation, nested calls, automatic retries, concurrency caps, rate limits, and request-queue-driven autoscaling. Reach for a function when you want any of those features, or when you want the callable to be visible to the rest of Chalk.
Functions support three return styles:
yield results incrementally and the caller receives them as
they’re produced. See Streaming results
below for an end-to-end example.Use the @chalkcompute.function decorator:
import chalkcompute
@chalkcompute.function
def add(x: int, y: int) -> int:
return x + yDeploy by running the script — the decorator registers the function with Chalk. Call
wait_ready() to block until the function is available, then use .remote(...)
to invoke the deployed function:
add.wait_ready()
result = add.remote(3, 5)
print(result) # 8add(3, 5) runs the Python body locally; use .remote(...) for a deployed call.
The decorator accepts a wide set of arguments that control the compute environment a function runs in — image, CPU/memory/GPU, secrets, volumes, batching, retries, rate limits, concurrency, autoscaling, and tracing. A realistic decorator usually combines several of them:
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install sentence-transformers",
),
cpu="2",
memory="4Gi",
gpu="nvidia-l4",
secrets=[chalkcompute.Secret.from_env("HF_TOKEN")],
max_batching_size=25,
max_buffer_duration=2000,
retries=3,
)
def embed(texts: list[str]) -> list[list[float]]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(texts).tolist()This page walks through the higher-level concepts behind each argument in its
own section below — batching, retries, rate limits, concurrency, autoscaling,
volumes, and tracing. For the full per-argument reference, types, and
defaults, see @chalkcompute.function
in the API reference.
Inject secrets into a function using the secrets parameter. Secrets are
resolved at deploy time and made available as environment variables inside
the container:
import os
from urllib.parse import urlparse
import chalkcompute
from chalkcompute import Secret
@chalkcompute.function(
secrets=[
Secret.from_env("OPENAI_API_KEY"),
Secret.from_integration("prod_postgres"),
],
)
def call_api(prompt: str) -> str:
openai_api_key = os.environ["OPENAI_API_KEY"]
database_url = os.environ["DATABASE_URL"]
headers = {"Authorization": f"Bearer {openai_api_key}"}
db_host = urlparse(database_url).hostname
# Pass headers to your OpenAI-compatible client and database_url to
# your Postgres client.
return f"ready to call OpenAI for {prompt!r} and log to {db_host}"
See the Secrets section of the overview for
the full Secret API.
When your handler is more efficient processing multiple items at once (e.g.
GPU batch inference), enable batching with max_buffer_duration and/or
max_batching_size. Chalk will accumulate incoming items and invoke the
handler in chunks:
@chalkcompute.function(
max_buffer_duration=1000, # buffer up to 1000 ms
max_batching_size=32, # or up to 32 items
gpu="nvidia-l4",
)
def batch_embed(texts: list[str]) -> list[list[float]]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(texts).tolist()When batching is enabled, handler arguments are delivered as lists or a single pyarrow table. Defaults apply when either parameter is set:
| Parameter | Default (when batching enabled) |
|---|---|
max_buffer_duration | 1000 ms |
max_batching_size | 10 items |
Functions can automatically retry on failure. Pass a simple integer for
max-attempts with default exponential backoff, or use RetryPolicy for full
control:
import chalkcompute
from chalkcompute import RetryPolicy
# Simple: retry up to 3 times with exponential backoff
@chalkcompute.function(retries=3)
def flaky_api(query: str) -> str:
...
# Exponential backoff with custom parameters
@chalkcompute.function(
retries=RetryPolicy.exponential(
attempts=5,
initial=0.5,
max_wait=30.0,
),
)
def resilient_call(payload: str) -> str:
...
# Fixed delay, only retry on specific exceptions
@chalkcompute.function(
retries=RetryPolicy.linear(
attempts=3,
delay=2.0,
retry_on=(TimeoutError, ConnectionError),
),
)
def network_call(url: str) -> str:
...Exponential backoff — wait times grow geometrically: initial, initial * multiplier,
initial * multiplier², …, capped at max_wait:
| Parameter | Type | Default | Description |
|---|---|---|---|
attempts | int | 3 | Max retry attempts |
initial | float | 1.0 | Base wait time in seconds |
multiplier | float | 2.0 | Backoff multiplier |
max_wait | float | 60.0 | Upper bound on wait time |
jitter | bool | True | Add random jitter |
retry_on | tuple[type, ...] | (Exception,) | Exception types that trigger retry |
Fixed delay between retries:
| Parameter | Type | Default | Description |
|---|---|---|---|
attempts | int | 3 | Max retry attempts |
delay | float | 1.0 | Fixed wait in seconds |
jitter | bool | False | Add random jitter |
retry_on | tuple[type, ...] | (Exception,) | Exception types that trigger retry |
By default, all Exception subclasses trigger a retry. Pass a tuple of
exception classes to retry_on to restrict which failures are retried:
RetryPolicy.exponential(
attempts=4,
retry_on=(ConnectionError, TimeoutError, OSError),
)Non-matching exceptions propagate immediately without consuming retry budget.
Cap the outbound request rate of a function using rate_limit. Pass an
integer for a simple “N per second” cap, or a RateLimitPolicy for full
control:
import chalkcompute
from chalkcompute import RateLimitPolicy
# Simple: up to 100 requests per second
@chalkcompute.function(rate_limit=100)
def call_provider(query: str) -> str:
...
# Full control: rate + time unit + shared-bucket key
@chalkcompute.function(
rate_limit=RateLimitPolicy(rate=1000, per="minute", key="provider-api"),
)
def call_other_endpoint(query: str) -> str:
...| Parameter | Type | Default | Description |
|---|---|---|---|
rate | int | required | Maximum requests per per unit |
per | str | "second" | Time unit: "second", "minute", or "hour" |
key | str | function name | Shared-bucket identifier (see below) |
Two functions declaring the same key share a single rate-limit bucket. This
is useful when multiple functions call the same downstream service and the
service imposes a limit across all callers:
shared = RateLimitPolicy(rate=1000, per="second", key="provider-api")
@chalkcompute.function(rate_limit=shared)
def lookup(query: str) -> str: ...
@chalkcompute.function(rate_limit=shared)
def enrich(record: dict) -> dict: ...
# Calls to both functions count against the same 1000/s budget.If key is omitted, each function gets its own bucket keyed by the function
name, so rate limits are per-function by default.
Cap the in-flight count of a function using concurrency. This is distinct
from rate_limit — rate limits gate requests-over-time, concurrency gates how
many calls are executing simultaneously:
import chalkcompute
from chalkcompute import ConcurrencyPolicy
# Simple: at most 10 calls in flight at once
@chalkcompute.function(concurrency=10)
def gpu_infer(text: str) -> list[float]:
...
# Shared gate across multiple functions hitting the same resource
gpu_pool = ConcurrencyPolicy(max_concurrent=4, key="gpu-pool")
@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def embed(text: str) -> list[float]: ...
@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def rerank(query: str, docs: list[str]) -> list[int]: ...
# Only 4 calls across both functions run simultaneously.| Parameter | Type | Default | Description |
|---|---|---|---|
max_concurrent | int | required | Maximum in-flight handler invocations |
key | str | function name | Shared-bucket identifier |
When `concurrency` is not specified, the function queue consumer applies a default of `max_concurrent=1` per function — calls to a single function are serialized. Set `concurrency=N` explicitly to allow parallel execution.
Both can be set independently and they interact:
| Policy | Controls | Example use case |
|---|---|---|
rate_limit | Requests per unit time | “1000 calls/sec across all replicas” |
concurrency | Parallel in-flight count | “4 GPU slots, don’t exceed memory” |
Functions accept four autoscaling hints that shape how replicas scale up and down in response to load:
@chalkcompute.function(
min_replicas=1,
max_replicas=8,
scaling_interval_seconds=30,
target_cpu_utilization_percentage=70,
shutdown_delay_seconds=120,
gpu="nvidia-l4",
)
def infer(text: str) -> str:
...| Parameter | Description |
|---|---|
min_replicas | Lower bound — replicas never scale below this. Use 0 for scale-to-zero. |
max_replicas | Upper bound — replicas never scale above this. |
scaling_interval_seconds | How often the autoscaler scrapes metrics and makes scaling decisions. Defaults to 60s on the server. |
target_cpu_utilization_percentage | Target average CPU utilization (0–100). Autoscaler adds replicas when the average exceeds this, removes them when below. |
target_queue_depth | Target pending-request queue depth per replica. When set, the autoscaler scales on the function’s own queue depth instead of (or in addition to) CPU. |
shutdown_delay_seconds | Grace period on scale-down before replicas are forcibly killed. Lets in-flight requests finish cleanly. Defaults to 30s on the server. |
target_cpu_utilization_percentage): good for compute-bound
functions — model inference, data transformation, anything where CPU is the
bottleneck.target_queue_depth): good for I/O-bound or variable-latency
functions, or when you want to scale on demand rather than resource
saturation. Scales up when calls pile up regardless of CPU.Both can be set together; the autoscaler takes the higher signal.
Attach persistent storage to a function with the volumes parameter. Each
entry is a (name, mount_path) string tuple. Inside the function, the volume
is accessible as an ordinary filesystem path — read and write files normally:
import chalkcompute
@chalkcompute.function(
volumes=[("model-cache", "/models")],
gpu="nvidia-l4",
)
def predict(text: str) -> list[float]:
import os
import json
from sentence_transformers import SentenceTransformer
cache_path = "/models/all-MiniLM-L6-v2"
if os.path.exists(cache_path):
model = SentenceTransformer(cache_path)
else:
model = SentenceTransformer("all-MiniLM-L6-v2")
model.save(cache_path)
return model.encode(text).tolist()Volumes persist across invocations — data written by one call is available to subsequent calls. This is useful for caching large model weights, storing intermediate results, or sharing data between functions that mount the same volume.
You can invoke a function on every row of a DataFrame using
F.catalog_call() inside a .project() expression. This is useful for
batch-processing columnar data — Chalk handles parallelism, batching, and
delivery automatically.
For example, define a function that extracts named entities from text:
import chalkcompute
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install spacy && python -m spacy download en_core_web_sm",
),
cpu="2",
memory="4Gi",
)
def extract_entities(memo: str) -> str:
import json
import spacy
nlp = spacy.load("en_core_web_sm")
doc = nlp(memo)
return json.dumps([
{"text": ent.text, "label": ent.label_}
for ent in doc.ents
])Then apply it across a parquet file of transaction memos:
from chalkcompute import DataFrame, F, _
results = (
DataFrame.scan("transactions.parquet")
.project({
"transaction_id": _.transaction_id,
"memo": _.memo,
"entities": F.catalog_call("extract_entities", _.memo),
})
)Each row’s memo column is passed to extract_entities and the result is
returned as a new entities column. The original DataFrame is not mutated —
.project() returns only the columns you specify.
Tracing helps you understand where time is spent when a function runs. Use it to debug slow calls, follow nested function calls, and see which parts of your application are doing work.
After a traced call runs, view it from the function’s Scaling Group page under
Remote Call Traces.
Set tracing=True on a function:
import chalkcompute
@chalkcompute.function(
cpu="1",
memory="1Gi",
tracing=True,
)
def score_transaction(transaction_id: str) -> float:
return 0.42Nested calls appear in the same trace when tracing is enabled on each function:
import chalkcompute
@chalkcompute.function(tracing=True)
def fetch_profile(user_id: str) -> dict:
return {"user_id": user_id, "segment": "active"}
@chalkcompute.function(tracing=True)
def score_user(user_id: str) -> float:
with chalkcompute.span("score_user.fetch_profile"):
profile = fetch_profile.remote(user_id)
return 0.9 if profile["segment"] == "active" else 0.1
with chalkcompute.span("demo.score_user"):
score = score_user.remote("user_123")In the trace you can see the outer function, the call to fetch_profile, and
the inner function’s execution together. Add manual spans like
score_user.fetch_profile to label important parts of your own code.
Use chalkcompute.span() to label meaningful application work:
with chalkcompute.span("checkout.score_transaction"):
score = score_transaction.remote("txn_123")For a whole caller function, use @chalkcompute.traced:
@chalkcompute.traced("batch.score_transactions")
def run_batch(transaction_ids: list[str]) -> list[float]:
return [score_transaction.remote(txn_id) for txn_id in transaction_ids]Manual spans are useful around model calls, retrieval steps, preprocessing, and
other code you want to separate in the trace. To see a remote function’s own
execution, enable tracing=True or a TracingPolicy on that function.
Use TracingPolicy when you only want to collect a percentage of calls:
@chalkcompute.function(
tracing=chalkcompute.TracingPolicy(sample_rate=0.25),
)
def recommend(user_id: str) -> list[str]:
...sample_rate must be between 0.0 and 1.0.
| Configuration | Behavior |
|---|---|
tracing=True | Collect traces for every call. |
tracing=TracingPolicy(sample_rate=0.25) | Collect traces for about 25% of calls. |
tracing=TracingPolicy(sample_rate=0.0) | Trace this function only when its caller is already traced. |
tracing=False | Turn tracing off for the function. |
| Omitted | Use the default behavior: only continue a caller’s trace if tracing support is already installed in the image. |
Once a call is traced, nested function calls stay in the same trace.
tracing=True and TracingPolicy(...) automatically install tracing support
for functions deployed with @chalkcompute.function or RemoteFunction. If
you omit tracing, Chalk does not add those packages for you. The default can
still continue an existing trace if your image already includes tracing support,
for example because it installs chalkcompute or
chalk-remote-call-python[tracing].
If you do not see remote function spans:
tracing.chalkcompute.span() if you want the call to be part of
an explicit trace.If your function is an async generator (uses yield), the caller receives each
value as soon as the worker produces it, without waiting for the generator to
finish.
This makes functions a good fit for:
Transcribe an audio file and yield each segment as it’s decoded:
import chalkcompute
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install faster-whisper httpx",
),
cpu=4,
memory="8Gi",
)
async def transcribe(audio_uri: str):
import httpx
from faster_whisper import WhisperModel
async with httpx.AsyncClient() as client:
resp = await client.get(audio_uri)
resp.raise_for_status()
audio_path = "/tmp/audio"
with open(audio_path, "wb") as f:
f.write(resp.content)
model = WhisperModel("base.en", compute_type="int8")
segments, _info = model.transcribe(audio_path)
for segment in segments:
yield {
"start": segment.start,
"end": segment.end,
"text": segment.text,
}The caller iterates over segments as they arrive:
transcribe.wait_ready()
async for segment in transcribe.remote("https://example.com/meeting-recording.wav"):
print(f"[{segment['start']:.1f}s] {segment['text']}")Output streams in real time:
[0.0s] Welcome to the quarterly review.
[3.2s] Let's start with the revenue numbers.
[6.1s] Q3 came in at 4.2 million, up 18 percent.
...
The caller receives the first segment as soon as it's decoded — it does not wait for the full file to finish processing. For a 30-minute recording, the first segment may arrive in seconds.
A function can call other functions with .remote(...). Each inner call is
dispatched independently, so you can compose pipelines where each step runs on
its own infrastructure.
import chalkcompute
@chalkcompute.function
def translate(text: str, target_lang: str) -> str:
from transformers import pipeline
translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-en-{target_lang}")
return translator(text)[0]["translation_text"]
@chalkcompute.function
async def transcribe_and_translate(audio_uri: str, target_lang: str):
"""Transcribe audio, then translate each segment."""
translate.wait_ready()
async for segment in transcribe.remote(audio_uri):
translated = await translate.remote(segment["text"], target_lang)
yield {
**segment,
"translated": translated,
}The caller sees a single stream of translated segments:
transcribe_and_translate.wait_ready()
async for seg in transcribe_and_translate.remote(
"https://example.com/meeting.wav",
target_lang="es",
):
print(f"[{seg['start']:.1f}s] {seg['translated']}")[0.0s] Bienvenidos a la revisión trimestral.
[3.2s] Comencemos con las cifras de ingresos.
[6.1s] El tercer trimestre llegó a 4,2 millones, un 18 por ciento más.
The outer function composes two inner calls — transcribe (async generator)
and translate (single-value). Each runs on its own compute configuration.
If a function raises an exception, the caller receives it as a RuntimeError with the
original exception type and message:
@chalkcompute.function
def divide(a: float, b: float) -> float:
return a / b
divide.wait_ready()
try:
divide.remote(1.0, 0.0)
except RuntimeError as e:
print(e) # ZeroDivisionError: division by zeroFor async generators, an exception mid-stream terminates the iterator. Any segments already yielded are still available to the caller — only subsequent iteration raises:
@chalkcompute.function
async def fragile_gen():
yield "ok-1"
yield "ok-2"
raise ValueError("something went wrong")
async for item in fragile_gen.remote():
print(item)
# ok-1
# ok-2
# RuntimeError: ValueError: something went wrong@chalkcompute.function is backed by Chalk Compute’s async execution layer.
At a high level, calling a function goes through three moving parts:
┌──────────┐ ┌──────────────────┐ ┌───────────┐
│ Caller │──call──▸│ Function Queue │──run───▸│ Worker │
│ │◂results─│ │◂results─│ (Python) │
└──────────┘ └──────────────────┘ └───────────┘
Callers and workers are loosely coupled — the caller never opens a connection directly to a worker. Workers can scale up, scale down, or get rescheduled mid-flight and in-flight calls are not lost. This is what enables streaming async generators, nested calls, and policy-enforced dispatch.
The queue enforces the policies declared on each function — rate limit, concurrency, and retries — before dispatching work to the worker.
| Policy | What it caps |
|---|---|
rate_limit | Outbound requests per second / minute / hour, optionally shared across functions via a key. |
concurrency | In-flight invocations at once, optionally shared across functions via a key. Defaults to 1 per function. |
retries | Automatic retry of transient failures (UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED) with exponential backoff and jitter. Other errors are terminal. |
Concurrency is acquired before dispatching, so a burst of queued calls backpressures on the semaphore rather than spawning unbounded tasks. Rate-limit tokens are consumed per attempt — each retry takes a fresh token.
Retries are safe for failures that happen before the function has started
streaming results — connection errors, DEADLINE_EXCEEDED on the initial
call, etc. If a function has already yielded chunks when it fails, those
chunks are visible to the caller and a retry would produce an ambiguous
stream. Retries therefore apply cleanly only to pre-stream failures today;
making mid-stream retries safe is a known follow-up.