# Functions
source: https://docs.chalk.ai/docs/compute/functions

## Define and call remote functions that run on Chalk Compute.

### Overview

A Chalk Function is a Python callable that you deploy onto a
Scaling Group and register in the
Chalk Catalog. Decorate a function
with @chalkcompute.function and it becomes reachable from anywhere Chalk
runs — local scripts and notebooks, other functions, and (because of the
Catalog registration) the feature engine: Chalk SQL, resolvers, the query
server, and DataFrames via F.catalog_call(). Arguments and results are
serialized automatically; you write ordinary Python and Chalk handles
scheduling, scaling, and delivery.

On top of what a bare Scaling Group provides, functions add async / streaming
invocation, nested calls, automatic retries, concurrency caps, rate limits,
and request-queue-driven autoscaling. Reach for a function when you want any
of those features, or when you want the callable to be visible to the rest
of Chalk.

Functions support three return styles:

- Single value — return a result and the caller receives it.
- Async generator — yield results incrementally and the caller receives them as
they're produced. See Streaming results
below for an end-to-end example.
- Nested calls — a function can call other functions, composing pipelines without
manual orchestration. See Nested function calls below
for a pipeline example.

### Defining a function

Use the @chalkcompute.function decorator:

```
import chalkcompute

@chalkcompute.function
def add(x: int, y: int) -> int:
    return x + y
```

Deploy by running the script — the decorator registers the function with Chalk. Call
wait_ready() to block until the function is available, then use .remote(...)
to invoke the deployed function:

```
add.wait_ready()
result = add.remote(3, 5)
print(result)  # 8
```

add(3, 5) runs the Python body locally; use .remote(...) for a deployed call.

### Resource configuration

The decorator accepts a wide set of arguments that control the compute
environment a function runs in — image, CPU/memory/GPU, secrets, volumes,
batching, retries, rate limits, concurrency, autoscaling, and tracing.
A realistic decorator usually combines several of them:

```
@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install sentence-transformers",
    ),
    cpu="2",
    memory="4Gi",
    gpu="nvidia-l4",
    secrets=[chalkcompute.Secret.from_env("HF_TOKEN")],
    max_batching_size=25,
    max_buffer_duration=2000,
    retries=3,
)
def embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()
```

This page walks through the higher-level concepts behind each argument in its
own section below — batching, retries, rate limits, concurrency, autoscaling,
volumes, and tracing. For the full per-argument reference, types, and
defaults, see @chalkcompute.function
in the API reference.

### Secrets

Inject secrets into a function using the secrets parameter. Secrets are
resolved at deploy time and made available as environment variables inside
the container:

```
import os
from urllib.parse import urlparse

import chalkcompute
from chalkcompute import Secret

@chalkcompute.function(
     secrets=[
+        Secret.from_env("OPENAI_API_KEY"),
+        Secret.from_integration("prod_postgres"),
     ],
)
def call_api(prompt: str) -> str:
+    openai_api_key = os.environ["OPENAI_API_KEY"]
+    database_url = os.environ["DATABASE_URL"]

     headers = {"Authorization": f"Bearer {openai_api_key}"}
     db_host = urlparse(database_url).hostname
     # Pass headers to your OpenAI-compatible client and database_url to
     # your Postgres client.
     return f"ready to call OpenAI for {prompt!r} and log to {db_host}"
```

See the Secrets section of the overview for
the full Secret API.

### Batching

When your handler is more efficient processing multiple items at once (e.g.
GPU batch inference), enable batching with max_buffer_duration and/or
max_batching_size. Chalk will accumulate incoming items and invoke the
handler in chunks:

```
@chalkcompute.function(
    max_buffer_duration=1000,  # buffer up to 1000 ms
    max_batching_size=32,      # or up to 32 items
    gpu="nvidia-l4",
)
def batch_embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()
```

When batching is enabled, handler arguments are delivered as lists or a single
pyarrow table. Defaults apply when either parameter is set:

| Parameter             | Default (when batching enabled) |
| --------------------- | ------------------------------- |
| `max_buffer_duration` | `1000` ms                       |
| `max_batching_size`   | `10` items                      |

### Retry policies

Functions can automatically retry on failure. Pass a simple integer for
max-attempts with default exponential backoff, or use RetryPolicy for full
control:

```
import chalkcompute
from chalkcompute import RetryPolicy

# Simple: retry up to 3 times with exponential backoff
@chalkcompute.function(retries=3)
def flaky_api(query: str) -> str:
    ...

# Exponential backoff with custom parameters
@chalkcompute.function(
    retries=RetryPolicy.exponential(
        attempts=5,
        initial=0.5,
        max_wait=30.0,
    ),
)
def resilient_call(payload: str) -> str:
    ...

# Fixed delay, only retry on specific exceptions
@chalkcompute.function(
    retries=RetryPolicy.linear(
        attempts=3,
        delay=2.0,
        retry_on=(TimeoutError, ConnectionError),
    ),
)
def network_call(url: str) -> str:
    ...
```

### RetryPolicy.exponential()

Exponential backoff — wait times grow geometrically: initial, initial * multiplier,
initial * multiplier², ..., capped at max_wait:

| Parameter    | Type               | Default        | Description                        |
| ------------ | ------------------ | -------------- | ---------------------------------- |
| `attempts`   | `int`              | `3`            | Max retry attempts                 |
| `initial`    | `float`            | `1.0`          | Base wait time in seconds          |
| `multiplier` | `float`            | `2.0`          | Backoff multiplier                 |
| `max_wait`   | `float`            | `60.0`         | Upper bound on wait time           |
| `jitter`     | `bool`             | `True`         | Add random jitter                  |
| `retry_on`   | `tuple[type, ...]` | `(Exception,)` | Exception types that trigger retry |

### RetryPolicy.linear()

Fixed delay between retries:

| Parameter  | Type               | Default        | Description                        |
| ---------- | ------------------ | -------------- | ---------------------------------- |
| `attempts` | `int`              | `3`            | Max retry attempts                 |
| `delay`    | `float`            | `1.0`          | Fixed wait in seconds              |
| `jitter`   | `bool`             | `False`        | Add random jitter                  |
| `retry_on` | `tuple[type, ...]` | `(Exception,)` | Exception types that trigger retry |

### Exception filtering

By default, all Exception subclasses trigger a retry. Pass a tuple of
exception classes to retry_on to restrict which failures are retried:

```
RetryPolicy.exponential(
    attempts=4,
    retry_on=(ConnectionError, TimeoutError, OSError),
)
```

Non-matching exceptions propagate immediately without consuming retry budget.

### Rate limiting

Cap the outbound request rate of a function using rate_limit. Pass an
integer for a simple "N per second" cap, or a RateLimitPolicy for full
control:

```
import chalkcompute
from chalkcompute import RateLimitPolicy

# Simple: up to 100 requests per second
@chalkcompute.function(rate_limit=100)
def call_provider(query: str) -> str:
    ...

# Full control: rate + time unit + shared-bucket key
@chalkcompute.function(
    rate_limit=RateLimitPolicy(rate=1000, per="minute", key="provider-api"),
)
def call_other_endpoint(query: str) -> str:
    ...
```

### RateLimitPolicy

| Parameter | Type  | Default       | Description                                    |
| --------- | ----- | ------------- | ---------------------------------------------- |
| `rate`    | `int` | *required*    | Maximum requests per `per` unit                |
| `per`     | `str` | `"second"`    | Time unit: `"second"`, `"minute"`, or `"hour"` |
| `key`     | `str` | function name | Shared-bucket identifier (see below)           |

### Sharing a bucket across functions

Two functions declaring the same key share a single rate-limit bucket. This
is useful when multiple functions call the same downstream service and the
service imposes a limit across all callers:

```
shared = RateLimitPolicy(rate=1000, per="second", key="provider-api")

@chalkcompute.function(rate_limit=shared)
def lookup(query: str) -> str: ...

@chalkcompute.function(rate_limit=shared)
def enrich(record: dict) -> dict: ...
# Calls to both functions count against the same 1000/s budget.
```

If key is omitted, each function gets its own bucket keyed by the function
name, so rate limits are per-function by default.

### Concurrency limits

Cap the in-flight count of a function using concurrency. This is distinct
from rate_limit — rate limits gate requests-over-time, concurrency gates how
many calls are executing simultaneously:

```
import chalkcompute
from chalkcompute import ConcurrencyPolicy

# Simple: at most 10 calls in flight at once
@chalkcompute.function(concurrency=10)
def gpu_infer(text: str) -> list[float]:
    ...

# Shared gate across multiple functions hitting the same resource
gpu_pool = ConcurrencyPolicy(max_concurrent=4, key="gpu-pool")

@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def embed(text: str) -> list[float]: ...

@chalkcompute.function(concurrency=gpu_pool, gpu="nvidia-l4")
def rerank(query: str, docs: list[str]) -> list[int]: ...
# Only 4 calls across both functions run simultaneously.
```

### ConcurrencyPolicy

| Parameter        | Type  | Default       | Description                           |
| ---------------- | ----- | ------------- | ------------------------------------- |
| `max_concurrent` | `int` | *required*    | Maximum in-flight handler invocations |
| `key`            | `str` | function name | Shared-bucket identifier              |

When concurrency is not specified, the function queue consumer applies a
default of max_concurrent=1 per function — calls to a single function are
serialized. Set concurrency=N explicitly to allow parallel execution.

### Rate limit vs concurrency

Both can be set independently and they interact:

| Policy        | Controls                 | Example use case                     |
| ------------- | ------------------------ | ------------------------------------ |
| `rate_limit`  | Requests per unit time   | "1000 calls/sec across all replicas" |
| `concurrency` | Parallel in-flight count | "4 GPU slots, don't exceed memory"   |

### Autoscaling

Functions accept four autoscaling hints that shape how replicas scale up and
down in response to load:

```
@chalkcompute.function(
    min_replicas=1,
    max_replicas=8,
    scaling_interval_seconds=30,
    target_cpu_utilization_percentage=70,
    shutdown_delay_seconds=120,
    gpu="nvidia-l4",
)
def infer(text: str) -> str:
    ...
```

| Parameter                           | Description                                                                                                                                           |
| ----------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| `min_replicas`                      | Lower bound — replicas never scale below this. Use `0` for scale-to-zero.                                                                             |
| `max_replicas`                      | Upper bound — replicas never scale above this.                                                                                                        |
| `scaling_interval_seconds`          | How often the autoscaler scrapes metrics and makes scaling decisions. Defaults to 60s on the server.                                                  |
| `target_cpu_utilization_percentage` | Target average CPU utilization (0–100). Autoscaler adds replicas when the average exceeds this, removes them when below.                              |
| `target_queue_depth`                | Target pending-request queue depth per replica. When set, the autoscaler scales on the function's own queue depth instead of (or in addition to) CPU. |
| `shutdown_delay_seconds`            | Grace period on scale-down before replicas are forcibly killed. Lets in-flight requests finish cleanly. Defaults to 30s on the server.                |

### CPU-driven vs queue-driven scaling

- CPU (target_cpu_utilization_percentage): good for compute-bound
functions — model inference, data transformation, anything where CPU is the
bottleneck.
- Queue depth (target_queue_depth): good for I/O-bound or variable-latency
functions, or when you want to scale on demand rather than resource
saturation. Scales up when calls pile up regardless of CPU.

Both can be set together; the autoscaler takes the higher signal.

### Volumes

Attach persistent storage to a function with the volumes parameter. Each
entry is a (name, mount_path) string tuple. Inside the function, the volume
is accessible as an ordinary filesystem path — read and write files normally:

```
import chalkcompute

@chalkcompute.function(
    volumes=[("model-cache", "/models")],
    gpu="nvidia-l4",
)
def predict(text: str) -> list[float]:
    import os
    import json
    from sentence_transformers import SentenceTransformer

    cache_path = "/models/all-MiniLM-L6-v2"
    if os.path.exists(cache_path):
        model = SentenceTransformer(cache_path)
    else:
        model = SentenceTransformer("all-MiniLM-L6-v2")
        model.save(cache_path)

    return model.encode(text).tolist()
```

Volumes persist across invocations — data written by one call is available to
subsequent calls. This is useful for caching large model weights, storing
intermediate results, or sharing data between functions that mount the same
volume.

### Calling from a DataFrame

You can invoke a function on every row of a DataFrame using
F.catalog_call() inside a .project() expression. This is useful for
batch-processing columnar data — Chalk handles parallelism, batching, and
delivery automatically.

For example, define a function that extracts named entities from text:

```
import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install spacy && python -m spacy download en_core_web_sm",
    ),
    cpu="2",
    memory="4Gi",
)
def extract_entities(memo: str) -> str:
    import json
    import spacy
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(memo)
    return json.dumps([
        {"text": ent.text, "label": ent.label_}
        for ent in doc.ents
    ])
```

Then apply it across a parquet file of transaction memos:

```
from chalkcompute import DataFrame, F, _

results = (
    DataFrame.scan("transactions.parquet")
    .project({
        "transaction_id": _.transaction_id,
        "memo": _.memo,
        "entities": F.catalog_call("extract_entities", _.memo),
    })
)
```

Each row's memo column is passed to extract_entities and the result is
returned as a new entities column. The original DataFrame is not mutated —
.project() returns only the columns you specify.

### Tracing

Tracing helps you understand where time is spent when a function runs. Use it
to debug slow calls, follow nested function calls, and see which parts of your
application are doing work.

After a traced call runs, view it from the function's Scaling Group page under
Remote Call Traces.

### Enabling tracing

Set tracing=True on a function:

```
import chalkcompute

@chalkcompute.function(
    cpu="1",
    memory="1Gi",
    tracing=True,
)
def score_transaction(transaction_id: str) -> float:
    return 0.42
```

Nested calls appear in the same trace when tracing is enabled on each function:

```
import chalkcompute

@chalkcompute.function(tracing=True)
def fetch_profile(user_id: str) -> dict:
    return {"user_id": user_id, "segment": "active"}


@chalkcompute.function(tracing=True)
def score_user(user_id: str) -> float:
    with chalkcompute.span("score_user.fetch_profile"):
        profile = fetch_profile.remote(user_id)

    return 0.9 if profile["segment"] == "active" else 0.1


with chalkcompute.span("demo.score_user"):
    score = score_user.remote("user_123")
```

In the trace you can see the outer function, the call to fetch_profile, and
the inner function's execution together. Add manual spans like
score_user.fetch_profile to label important parts of your own code.

### Manual spans

Use chalkcompute.span() to label meaningful application work:

```
with chalkcompute.span("checkout.score_transaction"):
    score = score_transaction.remote("txn_123")
```

For a whole caller function, use @chalkcompute.traced:

```
@chalkcompute.traced("batch.score_transactions")
def run_batch(transaction_ids: list[str]) -> list[float]:
    return [score_transaction.remote(txn_id) for txn_id in transaction_ids]
```

Manual spans are useful around model calls, retrieval steps, preprocessing, and
other code you want to separate in the trace. To see a remote function's own
execution, enable tracing=True or a TracingPolicy on that function.

### Tracing policies

Use TracingPolicy when you only want to collect a percentage of calls:

```
@chalkcompute.function(
    tracing=chalkcompute.TracingPolicy(sample_rate=0.25),
)
def recommend(user_id: str) -> list[str]:
    ...
```

sample_rate must be between 0.0 and 1.0.

| Configuration                             | Behavior                                                                                                       |
| ----------------------------------------- | -------------------------------------------------------------------------------------------------------------- |
| `tracing=True`                            | Collect traces for every call.                                                                                 |
| `tracing=TracingPolicy(sample_rate=0.25)` | Collect traces for about 25% of calls.                                                                         |
| `tracing=TracingPolicy(sample_rate=0.0)`  | Trace this function only when its caller is already traced.                                                    |
| `tracing=False`                           | Turn tracing off for the function.                                                                             |
| Omitted                                   | Use the default behavior: only continue a caller's trace if tracing support is already installed in the image. |

Once a call is traced, nested function calls stay in the same trace.

tracing=True and TracingPolicy(...) automatically install tracing support
for functions deployed with @chalkcompute.function or RemoteFunction. If
you omit tracing, Chalk does not add those packages for you. The default can
still continue an existing trace if your image already includes tracing support,
for example because it installs chalkcompute or
chalk-remote-call-python[tracing].

### Troubleshooting

If you do not see remote function spans:

- Redeploy the function after changing tracing.
- Enable tracing on both outer and inner functions to see both functions'
execution in a nested trace.
- Wrap local tests in chalkcompute.span() if you want the call to be part of
an explicit trace.
- Make sure you are looking at the called function's Scaling Group page and a
time range that includes the call.

### Streaming results with async generators

If your function is an async generator (uses yield), the caller receives each
value as soon as the worker produces it, without waiting for the generator to
finish.

This makes functions a good fit for:

- Streaming LLM token output.
- Incremental ETL where downstream consumers start processing early.
- Long-running jobs that emit progress events.

### Example: audio transcription

Transcribe an audio file and yield each segment as it's decoded:

```
import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install faster-whisper httpx",
    ),
    cpu=4,
    memory="8Gi",
)
async def transcribe(audio_uri: str):
    import httpx
    from faster_whisper import WhisperModel

    async with httpx.AsyncClient() as client:
        resp = await client.get(audio_uri)
        resp.raise_for_status()

    audio_path = "/tmp/audio"
    with open(audio_path, "wb") as f:
        f.write(resp.content)

    model = WhisperModel("base.en", compute_type="int8")
    segments, _info = model.transcribe(audio_path)

    for segment in segments:
        yield {
            "start": segment.start,
            "end": segment.end,
            "text": segment.text,
        }
```

The caller iterates over segments as they arrive:

```
transcribe.wait_ready()

async for segment in transcribe.remote("https://example.com/meeting-recording.wav"):
    print(f"[{segment['start']:.1f}s] {segment['text']}")
```

Output streams in real time:

```
[0.0s]  Welcome to the quarterly review.
[3.2s]  Let's start with the revenue numbers.
[6.1s]  Q3 came in at 4.2 million, up 18 percent.
...
```

The caller receives the first segment as soon as it's decoded — it does not
wait for the full file to finish processing. For a 30-minute recording, the
first segment may arrive in seconds.

### Nested function calls

A function can call other functions with .remote(...). Each inner call is
dispatched independently, so you can compose pipelines where each step runs on
its own infrastructure.

```
import chalkcompute

@chalkcompute.function
def translate(text: str, target_lang: str) -> str:
    from transformers import pipeline
    translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-en-{target_lang}")
    return translator(text)[0]["translation_text"]


@chalkcompute.function
async def transcribe_and_translate(audio_uri: str, target_lang: str):
    """Transcribe audio, then translate each segment."""
    translate.wait_ready()

    async for segment in transcribe.remote(audio_uri):
        translated = await translate.remote(segment["text"], target_lang)
        yield {
            **segment,
            "translated": translated,
        }
```

The caller sees a single stream of translated segments:

```
transcribe_and_translate.wait_ready()

async for seg in transcribe_and_translate.remote(
    "https://example.com/meeting.wav",
    target_lang="es",
):
    print(f"[{seg['start']:.1f}s] {seg['translated']}")
```

```
[0.0s]  Bienvenidos a la revisión trimestral.
[3.2s]  Comencemos con las cifras de ingresos.
[6.1s]  El tercer trimestre llegó a 4,2 millones, un 18 por ciento más.
```

The outer function composes two inner calls — transcribe (async generator)
and translate (single-value). Each runs on its own compute configuration.

### Error handling

If a function raises an exception, the caller receives it as a RuntimeError with the
original exception type and message:

```
@chalkcompute.function
def divide(a: float, b: float) -> float:
    return a / b

divide.wait_ready()

try:
    divide.remote(1.0, 0.0)
except RuntimeError as e:
    print(e)  # ZeroDivisionError: division by zero
```

For async generators, an exception mid-stream terminates the iterator. Any segments
already yielded are still available to the caller — only subsequent iteration raises:

```
@chalkcompute.function
async def fragile_gen():
    yield "ok-1"
    yield "ok-2"
    raise ValueError("something went wrong")

async for item in fragile_gen.remote():
    print(item)
# ok-1
# ok-2
# RuntimeError: ValueError: something went wrong
```

### How it works

@chalkcompute.function is backed by Chalk Compute's async execution layer.
At a high level, calling a function goes through three moving parts:

```
┌──────────┐         ┌──────────────────┐         ┌───────────┐
│  Caller  │──call──▸│  Function Queue  │──run───▸│  Worker   │
│          │◂results─│                  │◂results─│ (Python)  │
└──────────┘         └──────────────────┘         └───────────┘
```

- The caller (local script, notebook, another function) submits an
invocation and receives results over a single bidirectional stream.
- The Function Queue durably accepts the work, assigns each call an ID,
enforces configured policies (concurrency, rate limit, retry), and relays
results back to the caller.
- The worker is the Python process that actually runs your function code.

Callers and workers are loosely coupled — the caller never opens a connection
directly to a worker. Workers can scale up, scale down, or get rescheduled
mid-flight and in-flight calls are not lost. This is what enables streaming
async generators, nested calls, and policy-enforced dispatch.

### Policy enforcement

The queue enforces the policies declared on each function — rate limit,
concurrency, and retries — before dispatching work to the worker.

| Policy                               | What it caps                                                                                                                                                                |
| ------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| [`rate_limit`](#rate-limiting)       | Outbound requests per second / minute / hour, optionally shared across functions via a `key`.                                                                               |
| [`concurrency`](#concurrency-limits) | In-flight invocations at once, optionally shared across functions via a `key`. Defaults to `1` per function.                                                                |
| [`retries`](#retry-policies)         | Automatic retry of transient failures (`UNAVAILABLE`, `DEADLINE_EXCEEDED`, `RESOURCE_EXHAUSTED`, `ABORTED`) with exponential backoff and jitter. Other errors are terminal. |

Concurrency is acquired before dispatching, so a burst of queued calls
backpressures on the semaphore rather than spawning unbounded tasks.
Rate-limit tokens are consumed per attempt — each retry takes a fresh token.

### Retry safety

Retries are safe for failures that happen before the function has started
streaming results — connection errors, DEADLINE_EXCEEDED on the initial
call, etc. If a function has already yielded chunks when it fails, those
chunks are visible to the caller and a retry would produce an ambiguous
stream. Retries therefore apply cleanly only to pre-stream failures today;
making mid-stream retries safe is a known follow-up.





