Compute
Asynchronous remote function execution with streaming results.
The Function Queue is Chalk Compute’s async execution layer. It decouples callers from workers so function invocations — including ones that stream results incrementally — don’t require the caller to hold a live connection to any specific worker process.
You interact with the queue implicitly every time you call a
@chalkcompute.function. This page explains what that buys you — streaming
generator output, nested function calls, and policy-enforced dispatch — and
shows the patterns that make the most of it.
At a high level, calling a Chalk function goes through three moving parts:
┌──────────┐ ┌──────────────────┐ ┌───────────┐
│ Caller │──call──▸│ Function Queue │──run───▸│ Worker │
│ │◂results─│ │◂results─│ (Python) │
└──────────┘ └──────────────────┘ └───────────┘
Callers and workers are loosely coupled — the caller never opens a connection directly to a worker. Workers can scale up, scale down, or get rescheduled mid-flight and in-flight calls aren’t lost.
The queue’s most distinctive capability is streaming: if your function is
an async generator (uses yield), the caller receives each value as soon as
the worker produces it, without waiting for the generator to finish.
This makes the queue a good fit for:
Transcribe an audio file and yield each segment as it’s decoded:
import chalkcompute
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install faster-whisper httpx",
),
cpu=4,
memory="8Gi",
)
async def transcribe(audio_uri: str):
import httpx
from faster_whisper import WhisperModel
async with httpx.AsyncClient() as client:
resp = await client.get(audio_uri)
resp.raise_for_status()
audio_path = "/tmp/audio"
with open(audio_path, "wb") as f:
f.write(resp.content)
model = WhisperModel("base.en", compute_type="int8")
segments, _info = model.transcribe(audio_path)
for segment in segments:
yield {
"start": segment.start,
"end": segment.end,
"text": segment.text,
}The caller iterates over segments as they arrive:
transcribe.wait_ready()
for segment in transcribe("https://example.com/meeting-recording.wav"):
print(f"[{segment['start']:.1f}s] {segment['text']}")Output streams in real time:
[0.0s] Welcome to the quarterly review.
[3.2s] Let's start with the revenue numbers.
[6.1s] Q3 came in at 4.2 million, up 18 percent.
...
The caller receives the first segment as soon as it's decoded — it does not wait for the full file to finish processing. For a 30-minute recording, the first segment may arrive in seconds.
A function running on the queue can call other functions the same way a local caller would. The inner call rides the queue too — there’s no direct connection between worker processes. This lets you compose pipelines where each step runs on its own infrastructure.
import chalkcompute
@chalkcompute.function
def translate(text: str, target_lang: str) -> str:
from transformers import pipeline
translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-en-{target_lang}")
return translator(text)[0]["translation_text"]
@chalkcompute.function
async def transcribe_and_translate(audio_uri: str, target_lang: str):
"""Transcribe audio, then translate each segment."""
translate.wait_ready()
async for segment in transcribe(audio_uri):
translated = translate(segment["text"], target_lang)
yield {
**segment,
"translated": translated,
}The caller sees a single stream of translated segments:
transcribe_and_translate.wait_ready()
for seg in transcribe_and_translate(
"https://example.com/meeting.wav",
target_lang="es",
):
print(f"[{seg['start']:.1f}s] {seg['translated']}")[0.0s] Bienvenidos a la revisión trimestral.
[3.2s] Comencemos con las cifras de ingresos.
[6.1s] El tercer trimestre llegó a 4,2 millones, un 18 por ciento más.
The outer function composes two inner calls — transcribe (async generator)
and translate (single-value). Each runs on its own compute configuration;
the queue routes between them.
The queue enforces the policies declared on each function — rate limit,
concurrency, and retries — before dispatching work to the worker. All three
are configured on @chalkcompute.function; see
Functions → Rate limiting,
Concurrency limits, and
Retry policies for the full API.
| Policy | What it caps |
|---|---|
rate_limit | Outbound requests per second / minute / hour, optionally shared across functions via a key. |
concurrency | In-flight invocations at once, optionally shared across functions via a key. Defaults to 1 per function. |
retry_policy | Automatic retry of transient failures (UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED) with exponential backoff and jitter. Other errors are terminal. |
Concurrency is acquired before dispatching, so a burst of queued calls backpressures on the semaphore rather than spawning unbounded tasks. Rate-limit tokens are consumed per attempt — each retry takes a fresh token.
Retries are safe for failures that happen before the function has started
streaming results — connection errors, DEADLINE_EXCEEDED on the initial
call, etc. If a function has already yielded chunks when it fails, those
chunks are visible to the caller and a retry would produce an ambiguous
stream. Retries therefore apply cleanly only to pre-stream failures today;
making mid-stream retries safe is a known follow-up.
Exceptions raised by a function propagate to the caller as a RuntimeError
with the original type and message. For async generators, any chunks already
yielded remain visible to the caller; the exception is raised on the next
iteration step. See
Functions → Error handling for
examples.