Overview

The Function Queue is Chalk Compute’s async execution layer. It decouples callers from workers so function invocations — including ones that stream results incrementally — don’t require the caller to hold a live connection to any specific worker process.

You interact with the queue implicitly every time you call a @chalkcompute.function. This page explains what that buys you — streaming generator output, nested function calls, and policy-enforced dispatch — and shows the patterns that make the most of it.


Architecture

At a high level, calling a Chalk function goes through three moving parts:

┌──────────┐         ┌──────────────────┐         ┌───────────┐
│  Caller  │──call──▸│  Function Queue  │──run───▸│  Worker   │
│          │◂results─│                  │◂results─│ (Python)  │
└──────────┘         └──────────────────┘         └───────────┘
  • The caller (local script, notebook, another function) submits an invocation and receives results over a single bidirectional stream.
  • The Function Queue durably accepts the work, assigns each call an ID, enforces configured policies (concurrency, rate limit, retry), and relays results back to the caller.
  • The worker is the Python process that actually runs your function code.

Callers and workers are loosely coupled — the caller never opens a connection directly to a worker. Workers can scale up, scale down, or get rescheduled mid-flight and in-flight calls aren’t lost.


Streaming results with async generators

The queue’s most distinctive capability is streaming: if your function is an async generator (uses yield), the caller receives each value as soon as the worker produces it, without waiting for the generator to finish.

This makes the queue a good fit for:

  • Streaming LLM token output.
  • Incremental ETL where downstream consumers start processing early.
  • Long-running jobs that emit progress events.

Example: audio transcription

Transcribe an audio file and yield each segment as it’s decoded:

import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install faster-whisper httpx",
    ),
    cpu=4,
    memory="8Gi",
)
async def transcribe(audio_uri: str):
    import httpx
    from faster_whisper import WhisperModel

    async with httpx.AsyncClient() as client:
        resp = await client.get(audio_uri)
        resp.raise_for_status()

    audio_path = "/tmp/audio"
    with open(audio_path, "wb") as f:
        f.write(resp.content)

    model = WhisperModel("base.en", compute_type="int8")
    segments, _info = model.transcribe(audio_path)

    for segment in segments:
        yield {
            "start": segment.start,
            "end": segment.end,
            "text": segment.text,
        }

The caller iterates over segments as they arrive:

transcribe.wait_ready()

for segment in transcribe("https://example.com/meeting-recording.wav"):
    print(f"[{segment['start']:.1f}s] {segment['text']}")

Output streams in real time:

[0.0s]  Welcome to the quarterly review.
[3.2s]  Let's start with the revenue numbers.
[6.1s]  Q3 came in at 4.2 million, up 18 percent.
...

The caller receives the first segment as soon as it's decoded — it does not wait for the full file to finish processing. For a 30-minute recording, the first segment may arrive in seconds.


Nested function calls

A function running on the queue can call other functions the same way a local caller would. The inner call rides the queue too — there’s no direct connection between worker processes. This lets you compose pipelines where each step runs on its own infrastructure.

import chalkcompute

@chalkcompute.function
def translate(text: str, target_lang: str) -> str:
    from transformers import pipeline
    translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-en-{target_lang}")
    return translator(text)[0]["translation_text"]


@chalkcompute.function
async def transcribe_and_translate(audio_uri: str, target_lang: str):
    """Transcribe audio, then translate each segment."""
    translate.wait_ready()

    async for segment in transcribe(audio_uri):
        translated = translate(segment["text"], target_lang)
        yield {
            **segment,
            "translated": translated,
        }

The caller sees a single stream of translated segments:

transcribe_and_translate.wait_ready()

for seg in transcribe_and_translate(
    "https://example.com/meeting.wav",
    target_lang="es",
):
    print(f"[{seg['start']:.1f}s] {seg['translated']}")
[0.0s]  Bienvenidos a la revisión trimestral.
[3.2s]  Comencemos con las cifras de ingresos.
[6.1s]  El tercer trimestre llegó a 4,2 millones, un 18 por ciento más.

The outer function composes two inner calls — transcribe (async generator) and translate (single-value). Each runs on its own compute configuration; the queue routes between them.


Policy enforcement

The queue enforces the policies declared on each function — rate limit, concurrency, and retries — before dispatching work to the worker. All three are configured on @chalkcompute.function; see Functions → Rate limiting, Concurrency limits, and Retry policies for the full API.

PolicyWhat it caps
rate_limitOutbound requests per second / minute / hour, optionally shared across functions via a key.
concurrencyIn-flight invocations at once, optionally shared across functions via a key. Defaults to 1 per function.
retry_policyAutomatic retry of transient failures (UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED) with exponential backoff and jitter. Other errors are terminal.

Concurrency is acquired before dispatching, so a burst of queued calls backpressures on the semaphore rather than spawning unbounded tasks. Rate-limit tokens are consumed per attempt — each retry takes a fresh token.

Retry safety

Retries are safe for failures that happen before the function has started streaming results — connection errors, DEADLINE_EXCEEDED on the initial call, etc. If a function has already yielded chunks when it fails, those chunks are visible to the caller and a retry would produce an ambiguous stream. Retries therefore apply cleanly only to pre-stream failures today; making mid-stream retries safe is a known follow-up.


Error handling

Exceptions raised by a function propagate to the caller as a RuntimeError with the original type and message. For async generators, any chunks already yielded remain visible to the caller; the exception is raised on the next iteration step. See Functions → Error handling for examples.