Compute
Define and call remote functions that run on Chalk Compute.
Chalk Functions let you define Python functions that run remotely on Chalk Compute.
Decorate a function with @chalkcompute.function, and it becomes callable from any
Python process — locally, from a notebook, or from another Chalk function. Arguments
and results are serialized automatically; you write ordinary Python and Chalk handles
scheduling, scaling, and delivery.
Functions support three return styles:
yield results incrementally and the caller receives them as
they’re produced. See
Function Queue → Streaming results
for an end-to-end example.Use the @chalkcompute.function decorator:
import chalkcompute
@chalkcompute.function
def add(x: int, y: int) -> int:
return x + yDeploy by running the script — the decorator registers the function with Chalk. Call
wait_ready() to block until the function is available:
add.wait_ready()
result = add(3, 5)
print(result) # 8Functions accept resource hints that control the compute environment:
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install sentence-transformers",
),
cpu=2,
memory="4Gi",
gpu="nvidia-l4",
secrets=[chalkcompute.Secret("HF_TOKEN")],
max_batching_size=25,
max_buffer_duration=2000,
retries=3,
)
def embed(texts: list[str]) -> list[list[float]]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(texts).tolist()| Parameter | Description |
|---|---|
name | Custom function name. Defaults to the function’s __name__. |
image | Container image to deploy in. Defaults to Image.debian_slim(). |
cpu | CPU resource request (e.g. 2, "500m"). |
memory | Memory resource request (e.g. "4Gi", "512Mi"). |
gpu | GPU resource request (e.g. "nvidia-l4", "nvidia-a100"). Format is "<count>:<type>" or just "<type>" for a single GPU. |
env | Environment variables to pass to the function. |
secrets | List of Secret references to inject into the function’s environment. |
volumes | List of (Volume(name), mount_path) tuples for persistent storage. |
min_replicas | Minimum number of replicas. |
max_replicas | Maximum number of replicas. |
shutdown_delay_seconds | Graceful termination period in seconds before replicas are forcibly killed during scale-down. Defaults to 30s on the server. |
scaling_interval_seconds | How often (in seconds) the autoscaler scrapes metrics to make scaling decisions. Defaults to 60s on the server. |
target_cpu_utilization_percentage | Target CPU utilization percentage (0–100) that drives autoscaling. |
target_queue_depth | Target pending-request queue depth per replica. When set, the autoscaler scales on the function’s own queue depth. |
serialization_format | Wire format for arguments and results. Currently only "pyarrow" is supported. |
options | Arbitrary key-value options forwarded to the Chalk platform. |
max_buffer_duration | Maximum time in milliseconds to buffer incoming items before invoking the handler. Defaults to 1000 ms when batching is enabled. |
max_batching_size | Maximum number of items to accumulate before invoking the handler. Defaults to 10 when batching is enabled. |
retries | Retry policy for handler invocations. Pass an int for simple max-attempts with default exponential backoff, or a RetryPolicy for full control. |
rate_limit | Outbound request rate cap. Pass an int for a simple “N per second” cap, or a RateLimitPolicy for full control over rate, per, and a shared-bucket key. |
concurrency | In-flight call cap. Pass an int for a simple max_concurrent cap, or a ConcurrencyPolicy for full control including a shared-bucket key. |
Inject secrets into a function using the secrets parameter. Secrets are
resolved at deploy time and made available as environment variables inside
the container:
import chalkcompute
@chalkcompute.function(
secrets=[
chalkcompute.Secret.from_env("OPENAI_API_KEY"),
chalkcompute.Secret.from_integration("prod_postgres"),
chalkcompute.Secret.from_local_env("MY_LOCAL_TOKEN"),
],
)
def call_api(prompt: str) -> str:
import os
import httpx
key = os.environ["OPENAI_API_KEY"]
# ...See the Secrets section of the overview for
the full Secret API.
When your handler is more efficient processing multiple items at once (e.g.
GPU batch inference), enable batching with max_buffer_duration and/or
max_batching_size. Chalk will accumulate incoming items and invoke the
handler in chunks:
@chalkcompute.function(
max_buffer_duration=1000, # buffer up to 1000 ms
max_batching_size=32, # or up to 32 items
gpu="nvidia-l4",
)
def batch_embed(texts: list[str]) -> list[list[float]]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(texts).tolist()When batching is enabled, handler arguments are delivered as lists or a single pyarrow table. Defaults apply when either parameter is set:
| Parameter | Default (when batching enabled) |
|---|---|
max_buffer_duration | 1000 ms |
max_batching_size | 10 items |
Functions can automatically retry on failure. Pass a simple integer for
max-attempts with default exponential backoff, or use RetryPolicy for full
control:
import chalkcompute
from chalkcompute import RetryPolicy
# Simple: retry up to 3 times with exponential backoff
@chalkcompute.function(retries=3)
def flaky_api(query: str) -> str:
...
# Exponential backoff with custom parameters
@chalkcompute.function(
retries=RetryPolicy.exponential(
attempts=5,
initial=0.5,
max_wait=30.0,
),
)
def resilient_call(payload: str) -> str:
...
# Fixed delay, only retry on specific exceptions
@chalkcompute.function(
retries=RetryPolicy.linear(
attempts=3,
delay=2.0,
retry_on=(TimeoutError, ConnectionError),
),
)
def network_call(url: str) -> str:
...Exponential backoff — wait times grow geometrically: initial, initial * multiplier,
initial * multiplier², …, capped at max_wait:
| Parameter | Type | Default | Description |
|---|---|---|---|
attempts | int | 3 | Max retry attempts |
initial | float | 1.0 | Base wait time in seconds |
multiplier | float | 2.0 | Backoff multiplier |
max_wait | float | 60.0 | Upper bound on wait time |
jitter | bool | True | Add random jitter |
retry_on | tuple[type, ...] | (Exception,) | Exception types that trigger retry |
Fixed delay between retries:
| Parameter | Type | Default | Description |
|---|---|---|---|
attempts | int | 3 | Max retry attempts |
delay | float | 1.0 | Fixed wait in seconds |
jitter | bool | False | Add random jitter |
retry_on | tuple[type, ...] | (Exception,) | Exception types that trigger retry |
By default, all Exception subclasses trigger a retry. Pass a tuple of
exception classes to retry_on to restrict which failures are retried:
RetryPolicy.exponential(
attempts=4,
retry_on=(ConnectionError, TimeoutError, OSError),
)Non-matching exceptions propagate immediately without consuming retry budget.
Cap the outbound request rate of a function using rate_limit. Pass an
integer for a simple “N per second” cap, or a RateLimitPolicy for full
control:
import chalkcompute
from chalkcompute import RateLimitPolicy
# Simple: up to 100 requests per second
@chalkcompute.function(rate_limit=100)
def call_provider(query: str) -> str:
...
# Full control: rate + time unit + shared-bucket key
@chalkcompute.function(
rate_limit=RateLimitPolicy(rate=1000, per="minute", key="provider-api"),
)
def call_other_endpoint(query: str) -> str:
...| Parameter | Type | Default | Description |
|---|---|---|---|
rate | int | required | Maximum requests per per unit |
per | str | "second" | Time unit: "second", "minute", or "hour" |
key | str | function name | Shared-bucket identifier (see below) |
Two functions declaring the same key share a single rate-limit bucket. This
is useful when multiple functions call the same downstream service and the
service imposes a limit across all callers:
shared = RateLimitPolicy(rate=1000, per="second", key="provider-api")
@chalkcompute.function(rate_limit=shared)
def lookup(query: str) -> str: ...
@chalkcompute.function(rate_limit=shared)
def enrich(record: dict) -> dict: ...
# Calls to both functions count against the same 1000/s budget.If key is omitted, each function gets its own bucket keyed by the function
name, so rate limits are per-function by default.
Cap the in-flight count of a function using concurrency. This is distinct
from rate_limit — rate limits gate requests-over-time, concurrency gates how
many calls are executing simultaneously:
import chalkcompute
from chalkcompute import ConcurrencyPolicy
# Simple: at most 10 calls in flight at once
@chalkcompute.function(concurrency=10)
def gpu_infer(text: str) -> list[float]:
...
# Shared gate across multiple functions hitting the same resource
gpu_pool = ConcurrencyPolicy(max_concurrent=4, key="gpu-pool")
@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def embed(text: str) -> list[float]: ...
@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def rerank(query: str, docs: list[str]) -> list[int]: ...
# Only 4 calls across both functions run simultaneously.| Parameter | Type | Default | Description |
|---|---|---|---|
max_concurrent | int | required | Maximum in-flight handler invocations |
key | str | function name | Shared-bucket identifier |
When `concurrency` is not specified, the function queue consumer applies a default of `max_concurrent=1` per function — calls to a single function are serialized. Set `concurrency=N` explicitly to allow parallel execution.
Both can be set independently and they interact:
| Policy | Controls | Example use case |
|---|---|---|
rate_limit | Requests per unit time | “1000 calls/sec across all replicas” |
concurrency | Parallel in-flight count | “4 GPU slots, don’t exceed memory” |
Functions accept four autoscaling hints that shape how replicas scale up and down in response to load:
@chalkcompute.function(
min_replicas=1,
max_replicas=8,
scaling_interval_seconds=30,
target_cpu_utilization_percentage=70,
shutdown_delay_seconds=120,
gpu="nvidia-l4",
)
def infer(text: str) -> str:
...| Parameter | Description |
|---|---|
min_replicas | Lower bound — replicas never scale below this. Use 0 for scale-to-zero. |
max_replicas | Upper bound — replicas never scale above this. |
scaling_interval_seconds | How often the autoscaler scrapes metrics and makes scaling decisions. Defaults to 60s on the server. |
target_cpu_utilization_percentage | Target average CPU utilization (0–100). Autoscaler adds replicas when the average exceeds this, removes them when below. |
target_queue_depth | Target pending-request queue depth per replica. When set, the autoscaler scales on the function’s own queue depth instead of (or in addition to) CPU. |
shutdown_delay_seconds | Grace period on scale-down before replicas are forcibly killed. Lets in-flight requests finish cleanly. Defaults to 30s on the server. |
target_cpu_utilization_percentage): good for compute-bound
functions — model inference, data transformation, anything where CPU is the
bottleneck.target_queue_depth): good for I/O-bound or variable-latency
functions, or when you want to scale on demand rather than resource
saturation. Scales up when calls pile up regardless of CPU.Both can be set together; the autoscaler takes the higher signal.
Attach persistent storage to a function with the volumes parameter. Each
entry is a (Volume(...), mount_path) tuple. Inside the function, the volume
is accessible as an ordinary filesystem path — read and write files normally:
import chalkcompute
from chalkcompute import Volume
@chalkcompute.function(
volumes=[(Volume("model-cache"), "/models")],
gpu="nvidia-l4",
)
def predict(text: str) -> list[float]:
import os
import json
from sentence_transformers import SentenceTransformer
cache_path = "/models/all-MiniLM-L6-v2"
if os.path.exists(cache_path):
model = SentenceTransformer(cache_path)
else:
model = SentenceTransformer("all-MiniLM-L6-v2")
model.save(cache_path)
return model.encode(text).tolist()Volumes persist across invocations — data written by one call is available to subsequent calls. This is useful for caching large model weights, storing intermediate results, or sharing data between functions that mount the same volume.
You can invoke a function on every row of a DataFrame using
F.catalog_call() inside a .project() expression. This is useful for
batch-processing columnar data — Chalk handles parallelism, batching, and
delivery automatically.
For example, define a function that extracts named entities from text:
import chalkcompute
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install spacy && python -m spacy download en_core_web_sm",
),
cpu=2,
memory="4Gi",
)
def extract_entities(memo: str) -> str:
import json
import spacy
nlp = spacy.load("en_core_web_sm")
doc = nlp(memo)
return json.dumps([
{"text": ent.text, "label": ent.label_}
for ent in doc.ents
])Then apply it across a parquet file of transaction memos:
from chalkcompute import DataFrame, F, _
results = (
DataFrame.scan("transactions.parquet")
.project({
"transaction_id": _.transaction_id,
"memo": _.memo,
"entities": F.catalog_call("extract_entities", _.memo),
})
)Each row’s memo column is passed to extract_entities and the result is
returned as a new entities column. The original DataFrame is not mutated —
.project() returns only the columns you specify.
If a function raises an exception, the caller receives it as a RuntimeError with the
original exception type and message:
@chalkcompute.function
def divide(a: float, b: float) -> float:
return a / b
divide.wait_ready()
try:
divide(1.0, 0.0)
except RuntimeError as e:
print(e) # ZeroDivisionError: division by zeroFor async generators, an exception mid-stream terminates the iterator. Any segments already yielded are still available to the caller — only subsequent iteration raises:
@chalkcompute.function
async def fragile_gen():
yield "ok-1"
yield "ok-2"
raise ValueError("something went wrong")
for item in fragile_gen():
print(item)
# ok-1
# ok-2
# RuntimeError: ValueError: something went wrong@chalkcompute.function is backed by the
Function Queue, Chalk Compute’s async
execution layer. Calls are accepted, assigned an ID, and dispatched to a
worker without the caller holding a direct connection to any specific worker
process — this is what enables streaming async generators, nested calls, and
policy-enforced dispatch. See the
Function Queue page for architecture and
end-to-end examples.