Chalk DataFrame Reference

This reference documents the Chalk DataFrame API for offline feature computation and data processing.

Lightweight DataFrame wrapper around Chalk's execution engine.

The DataFrame class constructs query plans backed by libchalk and can materialize them into Arrow tables. Operations build a lazy query plan that executes only when you call run or to_arrow.

Column expressions can be written with _ (underscore) attribute syntax or using F functions — see the F function reference for the full list.

Logical representation of tabular data for query operations.

DataFrame provides a lazy evaluation model where operations build up a query plan that executes only when materialized via run or to_arrow. Each operation returns a new DataFrame, leaving the original unchanged.

Most users should use class methods like from_dict, from_arrow, or scan to create DataFrames rather than calling the constructor directly.

Column expressions use _ (underscore) or F function syntax.

Examples

from chalkdf import DataFrame
from chalk.features import _, F
df = DataFrame.from_dict({"x": [1, 2, 3], "price": [10.0, 20.0, 30.0]})
# Underscore syntax
doubled = df.with_columns({"x2": _.x * 2})
# F function syntax
capped = df.with_columns({"price": F.least(_.price, 25.0)})
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
filtered = df.filter(_.x > 1)
result = filtered.run()
Attributes

Return a list of the column names on this dataframe

column_dtypes
list[pyarrow.DataType]

Return a list of column data types on this dataframe

Return schema of this dataframe

Return the number of columns on this dataframe

Functions

Create a DataFrame from a dictionary, Arrow table, or query plan.

For most use cases, prefer using class methods like from_dict, from_arrow, or scan instead of calling this constructor directly.

Parameters
root:
ChalkTable | MaterializedTable | dict

Data source for the DataFrame. Can be:

  • dict: Dictionary mapping column names to lists of values
  • PyArrow Table or RecordBatch: In-memory Arrow data
  • ChalkTable: Query plan (advanced usage)
tables:
dict[str, MaterializedTable] | None
= None

Optional mapping of additional table names to Arrow data. Used internally for query execution with multiple tables.

from chalkdf import DataFrame
# Simple dictionary input
df = DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
# Or use the explicit class method (recommended)
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})

Return the number of rows if this DataFrame has already been materialized.

Raising TypeError for non-materialized frames matches Python's default behavior while avoiding implicitly executing the plan.

Serialize this DataFrame into a LogicalExprNode proto.

Return a LazyFramePlaceholder when lazy recording is enabled.

Compose a LazyFramePlaceholder on top of this DataFrame.

Create a schema-only placeholder DataFrame for a named table.

The returned DataFrame contains no data; it is a logical reference that must be supplied with actual Arrow data at execution time via the tables argument of run or to_arrow. This is useful when you want to build a reusable query plan against a well-known schema and inject different data at runtime.

Parameters

Table identifier used as the key in the tables mapping at execution time.

Arrow schema describing the table's columns and types.

Optional list of column names indicating that the data is pre-sorted by those columns (used for optimisation hints).

Returns
type:
import pyarrow as pa
from chalkdf import DataFrame
schema = pa.schema([("user_id", pa.int64()), ("score", pa.float64())])
df = DataFrame.named_table("users", schema)
# Build a query plan
from chalk.features import _
result_plan = df.filter(_.score > 0.5)
# Inject real data at execution time
import pyarrow as pa
data = pa.table({"user_id": [1, 2, 3], "score": [0.3, 0.8, 0.6]})
result = result_plan.run(tables={"users": data})

Construct a DataFrame from an in-memory Arrow object.

Parameters
data:
MaterializedTable

PyArrow Table or RecordBatch to convert into a DataFrame.

Returns
import pyarrow as pa
from chalkdf import DataFrame
table = pa.table({"x": [1, 2, 3], "y": ["a", "b", "c"]})
df = DataFrame.from_arrow(table)

Construct a DataFrame from a Python dictionary.

Parameters

Dictionary mapping column names to lists of values.

Returns
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": ["a", "b", "c"]})

Create a DataFrame from a Python async generator function.

This method allows you to create a DataFrame by streaming data from a custom Python async generator. The generator can yield data as PyArrow RecordBatches, pydicts, or pylists, and the method will handle conversion and schema alignment automatically. If the UDF yields an invalid batch, no further batches will be processed.

Parameters
udf:
Callable[[], typing.AsyncGenerator[pyarrow.RecordBatch | dict | list, None]]

An async generator function that yields data batches. Each yielded value can be a pyarrow.RecordBatch, a dictionary (will be converted using pyarrow.RecordBatch.from_pydict), or a list (will be converted using pyarrow.RecordBatch.from_pylist). The generator should yield None or complete iteration to signal completion.

The expected PyArrow schema for the output data. If yielded batches have columns in a different order, they will be automatically reordered to match this schema.

Maximum time in seconds to wait for the output handler to accept each batch. Prevents deadlocks when the consumer is blocked. Default is 300 seconds (5 minutes). Set to None to disable timeout (not recommended).

Returns
type:
Raises
error:
asyncio.TimeoutError

If sending a batch to the output handler exceeds the timeout.

import pyarrow as pa
from chalkdf import DataFrame
async def generate_data():
    for i in range(3):
        yield {"x": [i * 10, i * 10 + 1], "y": [i, i]}
schema = pa.schema([("x", pa.int64()), ("y", pa.int64())])
df = DataFrame.from_python_udf(generate_data, schema)
result = df.run()
# Example with PyArrow RecordBatches
async def generate_batches():
    batch1 = pa.RecordBatch.from_pydict({"a": [1, 2], "b": [3, 4]})
    batch2 = pa.RecordBatch.from_pydict({"a": [5, 6], "b": [7, 8]})
    yield batch1
    yield batch2
schema = pa.schema([("a", pa.int64()), ("b", pa.int64())])
df = DataFrame.from_python_udf(generate_batches, schema)
# Example with custom timeout
df = DataFrame.from_python_udf(generate_data, schema, output_timeout=60.0)

Scan files and return a DataFrame.

Currently supports CSV (with headers), Parquet, and Delta.

Parameters
input_uris:
typing.Sequence[str | Path] | str | Path

File path/URI or list of paths/URIs to scan. Supports local paths and file:// URIs.

name:
typing.Optional[str]
= None

Optional name to assign to the table being scanned.

Schema of the data. Required for CSV files, optional for Parquet.

mode:
'auto' | 'hive' | 'delta'
= 'auto'

Scan inference mode:

  • "auto": infer file type from URI/path suffix (CSV/Parquet).
  • "hive": expand Hive/glob paths without Delta inference fallback.
  • "delta": treat the input as a Delta table root (requires exactly one URI).
Returns
type:
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)

Load data from an AWS Glue Iceberg table.

Parameters

Fully qualified database.table name.

schema:
typing.Mapping[str, pyarrow.DataType]

Mapping of column names to Arrow types.

Number of rows per batch.

aws_catalog_account_id:
typing.Optional[str]
= None

AWS account hosting the Glue catalog.

aws_catalog_region:
typing.Optional[str]
= None

Region of the Glue catalog.

aws_role_arn:
typing.Optional[str]
= None

IAM role to assume for access.

filter_predicate:
typing.Optional[Expr]
= None

Optional filter applied during scan.

parquet_scan_range_column:
typing.Optional[str]
= None

Column used for range-based reads.

custom_partitions:
typing.Optional[dict[str, tuple[typing.Literal['date_trunc(day)'], str]]]
= None

Additional partition definitions.

partition_column:
typing.Optional[str]
= None

Column name representing partitions.

Returns
type:

Create a DataFrame from a Chalk SQL catalog table.

Parameters

Name of the table in the catalog.

catalog:
ChalkSqlCatalog

ChalkSqlCatalog instance containing the table.

Returns
type:
from chalkdf import DataFrame
from libchalk.chalksql import ChalkSqlCatalog
catalog = ChalkSqlCatalog()
df = DataFrame.from_catalog_table("users", catalog=catalog)

Create a DataFrame from the result of executing a SQL query (DuckDB dialect).

Pass DataFrames or Arrow tables as keyword arguments to make them available as named tables inside the query. If no keyword arguments are provided, from_sql will attempt to auto-register any DataFrames found in the calling scope.

Parameters

SQL query string (DuckDB dialect).

tables:
CompatibleFrameType
= {}
Returns
type:
from chalkdf import DataFrame
orders = DataFrame.from_dict({"order_id": [1, 2, 3], "amount": [10.0, 20.0, 5.0]})
result = DataFrame.from_sql(
    "SELECT order_id, amount FROM orders WHERE amount > 8",
    orders=orders,
)

Join two DataFrames with SQL:

users = DataFrame.from_dict({"id": [1, 2], "name": ["Alice", "Bob"]})
purchases = DataFrame.from_dict({"user_id": [1, 1, 2], "item": ["a", "b", "c"]})
result = DataFrame.from_sql(
    "SELECT u.name, p.item FROM users u JOIN purchases p ON u.id = p.user_id",
    users=users,
    purchases=purchases,
)

Create a DataFrame from the result of querying a SQL data source.

Parameters
source:
BaseSQLSource

SQL source to query (e.g., PostgreSQL, Snowflake, BigQuery).

SQL query to execute against the data source.

Output schema of the query result. The datasource's driver converts the native query result to this schema.

Returns
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)

Create a DataFrame by executing a SQL UNLOAD query and scanning the resulting parquet files.

The SQL query should be a data warehouse UNLOAD/EXPORT command (e.g., BigQuery EXPORT DATA, Snowflake COPY INTO) that writes parquet files to output_uri_prefix. The files are then read by a deferred TableScan node.

Parameters

The SQL UNLOAD query to execute against the data warehouse.

pool:
typing.Any

A libchalk.sql.ConnectionPool for the data warehouse.

URI prefix where the UNLOAD query writes its parquet output (e.g., gs://bucket/path/ or s3://bucket/prefix/).

Arrow schema of the parquet files produced by the UNLOAD query.

Returns
type:
import pyarrow as pa
from chalkdf import DataFrame
from libchalk.sql import ConnectionPool
from libchalk.sql.snowflake import make_snowflake_connection_factory
factory = make_snowflake_connection_factory(uri="snowflake://user:pass@acct/db")
pool = ConnectionPool(factory, max_pool_size=1)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan_from_sql(
    "COPY INTO @stage/out/ FROM (SELECT user_id, name FROM users)",
    pool=pool,
    output_uri_prefix="s3://my-bucket/unload-output/",
    schema=schema,
)

Create a DataFrame by pulling messages from a streaming source.

This method connects to a Kafka, Kinesis, or PubSub source and pulls up to n messages, returning them as a DataFrame.

Parameters
source:
typing.Any

A streaming source configuration. Can be one of:

  • KafkaSource: Kafka topic configuration
  • KinesisSource: Kinesis stream configuration
  • PubSubSource: Google PubSub subscription configuration

Maximum number of messages to pull from the stream.

Timeout in milliseconds for pulling messages. Default is 15000ms.

If False (default), returns a DataFrame with a single "value" column containing the raw message bytes. If True, returns a DataFrame with columns for topic, partition, offset, timestamp, key, and value.

Returns
type:
from chalkdf import DataFrame
from chalk.streams import KafkaSource
source = KafkaSource(
    name="my_kafka",
    bootstrap_server="localhost:9092",
    topic="my_topic",
)
# Pull 100 messages, just the raw bytes
df = DataFrame.from_stream_source(source, n=100)
# Pull with full metadata
df = DataFrame.from_stream_source(source, n=100, include_metadata=True)

Compile the current plan if necessary.

Configuration is resolved from multiple sources in priority order:

  1. Explicit config parameter (highest priority)
  2. Active compilation_config context manager
  3. Global defaults from set_compilation_defaults
  4. Environment variables (e.g., CHALK_USE_VELOX_PARQUET_READER)
  5. Built-in fallback defaults

If a different configuration is provided than the previous compilation, the plan will be automatically recompiled.

Parameters

Explicit compilation configuration (highest priority).

Force recompilation even if a plan exists.

Returns
type:
CompiledPlan
from chalkdf import DataFrame
from chalkdf.config import CompilationConfig
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
compiled = df.compile(config=CompilationConfig(use_online_hash_join=True))
print(compiled.explain_logical())

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Computes plan JSON for debugging the structure of the computation.

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.

Add or replace columns while keeping all existing columns.

Unlike project, which returns only the columns you specify, with_columns keeps every existing column and either adds new ones or replaces columns whose names match.

Accepts multiple forms:

  • A dict mapping column names to expressions
  • Positional tuples of (name, expression)
  • Bare positional expressions that carry a name via .alias(<name>)
Returns
type:
from chalkdf import DataFrame
from chalk.features import _, F
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using underscore syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a column using an F function
df3 = df.with_columns({"z_capped": F.least(_.x + _.y, 8)})
# Add a column using .alias()
df4 = df.with_columns((_.x * 2).alias("x_doubled"))
# Both df2, df3, df4 still contain x and y in addition to the new column

Add a monotonically increasing unique identifier column.

Parameters

Name of the new ID column.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [10, 20, 30]})
df_with_id = df.with_unique_id("row_id")

Filter rows based on a boolean expression.

Parameters
expr:
Expr | Underscore

Boolean expression to filter rows. Only rows where the expression evaluates to True are kept.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
filtered = df.filter(_.x > 2)

Return a subset of rows starting at a specific position.

Parameters

Zero-based index where the slice begins.

length: = None

Number of rows to include. If None, includes all remaining rows.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3, 4, 5]})
# Get rows 1-3 (indices 1, 2, 3)
sliced = df.slice(1, 3)

Return a column expression for the named column.

df.col("name") is equivalent to _.name but validates that "name" exists in the DataFrame's schema at call time and is therefore useful when the column name is a runtime string variable rather than a literal attribute access.

Parameters

Name of an existing column in this DataFrame.

Returns
type:
Underscore
Raises
error:

If column is not present in the DataFrame's schema.

from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Reference a column by name to build expressions
col_x = df.col("x")
df_filtered = df.filter(col_x > 1)
# Useful when the column name comes from a variable
target = "y"
df2 = df.with_columns({"doubled": df.col(target) * 2})

Return a column expression for the named column.

Alias for col.

Parameters

Name of an existing column in this DataFrame.

Returns
type:
Underscore
Raises
error:

If column is not present in the DataFrame's schema.

from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Compute a sum from two columns referenced by name
df2 = df.with_columns({"sum": df.col("x") + df.col("y")})

Combine this DataFrame with one or more others by stacking rows.

All DataFrames must have the same schema (different column order is allowed - the output will have the same column order as self). Duplicates are retained. Row order is not preserved.

Returns
type:
Raises
error:

If no other DataFrames are provided, or if schemas don't match.

df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order

Combine this DataFrame with another by stacking rows.

Convenience method for unioning with a single DataFrame. Equivalent to union_all(other).

Both DataFrames must have the same schema (different column order is allowed - the output will have the same column order as self). Duplicates are retained. Row order is not preserved.

See Also

union_all : Union with multiple DataFrames at once.

Parameters

DataFrame to union with this DataFrame.

Returns
type:
Raises
error:

If schemas don't match.

df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order

Project to an exact set of output columns using expressions.

Unlike with_columns, which keeps all existing columns and only adds or replaces the ones you name, project returns only the columns you specify. Columns not listed in columns are dropped.

Use project when you want to reshape or rename the schema entirely; use with_columns when you only want to augment it.

Parameters
columns:
typing.Mapping[str, Expr | Underscore]

Mapping of output column names to expressions that define them. Every key becomes a column in the output; every value is an expression (_ expression or Expr) evaluated against the current schema.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
# Keep only "sum" and "x"; z is dropped
projected = df.project({"sum": _.x + _.y, "x": _.x})

Select existing columns by name.

Parameters
columns:
str | Underscore
= ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
selected = df.select("x", "y")

Drop specified columns from the DataFrame.

Parameters
columns:
str | Underscore
= ()
strict: = True

If True (default), raise a ValueError if any named column does not exist. If False, silently skip missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
df_dropped = df.drop("z")           # raises if "z" missing
df_dropped = df.drop("z", "missing_col", strict=False)  # silently skips

Explode a list or array column into multiple rows.

Each element in the list becomes a separate row, with other column values duplicated.

Parameters
column:
str | Underscore

Name of the list/array column to explode.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"id": [1, 2], "items": [[10, 20], [30]]})
exploded = df.explode("items")

Join this DataFrame with another.

Parameters
on:
dict[str | Underscore, str | Underscore] | typing.Sequence[str | Underscore] | None
= None

Join keys. Can be specified in multiple ways:

  • A sequence of column names (same names on both sides): on=["col1", "col2"]
  • A mapping of left->right column names: on={"left_col": "right_col"}
  • If None, must specify left_on and right_on separately.
left_on:
typing.Sequence[str | Underscore] | None
= None

Column names for left DataFrame join keys. Only used when on is None. Must be paired with right_on.

right_on:
typing.Sequence[str | Underscore] | None
= None

Column names for right DataFrame join keys. Only used when on is None. Must be paired with left_on.

how:
JoinType
= 'inner'

Join type. Supported values:

  • "inner": Keep only rows that match in both DataFrames (default)
  • "left": Keep all rows from left DataFrame
  • "right": Keep all rows from right DataFrame
  • "outer" or "full": Keep all rows from both DataFrames
  • "semi": Return rows from left that have matches in right (no right columns)
  • "anti": Return rows from left that have no matches in right
  • "cross": Cartesian product (do not pass in on)

Optional suffix applied to right-hand columns when names collide. For example, if both DataFrames have a column "value" and right_suffix="_right", the result will have "value" and "value_right".

Returns
type:

Perform an as-of join with another DataFrame.

An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.

Important: Both DataFrames must be sorted by the on (or left_on/right_on) column before calling this method. Use .order_by(on) to sort if needed.

Parameters

Right-hand DataFrame to join with.

on:
str | Underscore | None
= None

Column name to use as the as-of join key (must be sorted). This column is used for both left and right DataFrames. The join finds the nearest match according to the strategy. Either on or both left_on and right_on must be specified.

left_on:
str | Underscore | None
= None

Column name in left DataFrame for the as-of join key. Only used when on is None. Must be paired with right_on.

right_on:
str | Underscore | None
= None

Column name in right DataFrame for the as-of join key. Can be used with on (to specify a different right column name) or with left_on (when on is None).

by:
dict[str | Underscore, str | Underscore] | typing.Sequence[str | Underscore] | None
= None

Additional exact-match columns (optional). These columns must match exactly before performing the as-of match on the on column. Can be specified as:

  • A sequence of column names (same names on both sides): by=["col1", "col2"]
  • A mapping of left->right column names: by={"left_col": "right_col"}
  • If None, can specify left_by and right_by separately.
left_by:
typing.Sequence[str | Underscore] | None
= None

Column names in left DataFrame for exact-match conditions. Only used when by is None. Must be paired with right_by.

right_by:
typing.Sequence[str | Underscore] | None
= None

Column names in right DataFrame for exact-match conditions. Only used when by is None. Must be paired with left_by.

strategy:
AsOfJoinStrategy | typing.Literal['forward', 'backward']
= 'backward'

Join strategy controlling which match to select:

  • "backward" (default): Match with the most recent past value
  • "forward": Match with the nearest future value Can also pass AsOfJoinStrategy.BACKWARD or AsOfJoinStrategy.FORWARD.

Suffix to add to overlapping column names from the right DataFrame.

Whether to coalesce the join keys (default True).

Returns
type:

Compute window (analytic) expressions partitioned by by and ordered by order_by.

Window operations evaluate each WindowExpr over a partition of rows (defined by by) sorted within that partition (by order_by). The result columns are appended to the existing schema; original columns are preserved.

Overlap between by and order_by columns is not allowed.

Parameters
by:
typing.Sequence[str | Underscore]

Column names that define the partition boundaries. Rows with the same combination of values in these columns form one partition.

order_by:
typing.Sequence[str | Underscore | tuple[str | Underscore, str]]

Column names (or (name, direction) tuples) that define the sort order within each partition. Direction can be "asc" (default) or "desc".

expressions:
WindowExpr
= ()
Returns
type:
from chalkdf import DataFrame
from libchalk.chalktable import WindowExpr
df = DataFrame.from_dict({
    "idx": [1, 1, 2, 2],
    "v":   [10, 20, 30, 40],
})
# Partition by "idx", sort by "v" ascending, shift "v" by -1 into "v_shifted"
result = df.window(["idx"], ["v"], WindowExpr.shift("v", "v_shifted", -1))
# result schema: idx, v, v_shifted
# v_shifted contains the *next* value of v within each idx partition

Create a GroupBy object for chained aggregation operations.

This method returns a GroupBy object that can be used to apply aggregation expressions via the .agg() method. This provides an alternative syntax to df.agg(by, *aggregations).

Returns
type:
GroupBy
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
grouped = df.group_by("group").agg(_.value.sum().alias("total"))

Multiple grouping columns:

df2 = DataFrame.from_dict({"g1": ["A", "A", "B"], "g2": ["X", "Y", "X"], "val": [1, 2, 3]})
result = df2.group_by("g1", "g2").agg(_.val.sum().alias("sum"))

Using underscore expressions:

result = df.group_by(_.group).agg(_.value.mean().alias("avg"))

Group by columns and apply aggregation expressions.

Parameters
by:
typing.Sequence[str | Underscore] | str | Underscore

Column name(s) to group by. Can be a single column name/expression or a sequence of column names/expressions.

aggregations:
AggExpr | Underscore
= ()
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))
# Or with a single column:
agg_df = df.agg("group", _.value.sum().alias("total"))

Remove duplicate rows based on the specified partition columns.

For each unique combination of values in columns, exactly one row is emitted. Which row is kept within a partition is not guaranteed — the engine may choose any row. If you need a deterministic choice, sort the DataFrame first with order_by before calling distinct_on.

Returns
type:
Raises
error:

If no columns are provided.

from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x")  # one row per unique x value

Sort the DataFrame by one or more columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")

Build a lazy write plan without executing it.

Returns a new DataFrame whose query plan ends with a TableWrite operator. No files are written until you call run or to_arrow on the returned DataFrame.

For immediate execution use write instead.

Parameters

Directory to write output files.

Optional explicit file name.

file_format: = 'parquet'

Output format (default parquet).

serde_parameters:
typing.Mapping[str, str] | None
= None

Optional SerDe options for text formats.

Optional compression codec.

Ensure writers emit files even if no rows were produced.

Optional connector id override.

Returns
type:

Execute the DataFrame plan and write the output files immediately.

This is the eager counterpart to write_lazy: it builds the write plan and runs it in one step.

By default (return_table_write_result=False) the method returns None after the write completes. Pass return_table_write_result=True to receive the raw TableWrite result DataFrame instead.

Parameters

Directory to write output files.

Optional explicit file name.

file_format: = 'parquet'

Output format (default parquet).

serde_parameters:
typing.Mapping[str, str] | None
= None

Optional SerDe options for text formats.

Optional compression codec.

Ensure writers emit files even if no rows were produced.

Optional connector id override.

If True, return the raw TableWrite result DataFrame. If False (default), return None.

Returns
type:

Write the DataFrame as Parquet files using an auto-configured connector.

Convenience method that simplifies writing Parquet files compared to the more general write. The connector is selected automatically based on the URI scheme.

By default (return_table_write_result=False) the method returns None after the write completes. Pass return_table_write_result=True to receive the raw TableWrite result DataFrame instead.skip_planning_time_validation Whether to skip validation at planning time (default: False). return_table_write_result If True, return the raw TableWrite result DataFrame. If False (default), return None.

Parameters

URI prefix where Parquet files will be written. Supports local (file://), S3 (s3://), and GCS (gs://) URIs.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/")  # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)

Write the DataFrame as Parquet files and load into a data warehouse.

Combines parquet writing with a subsequent warehouse load step (e.g., COPY INTO Snowflake).

Parameters

URI prefix where Parquet files will be written.

destination:
DataWarehouseDestination

Target data warehouse destination (database, schema, table).

loader:
DataWarehouseLoader

A DataWarehouseLoader implementation that performs the load.

Whether to skip validation at planning time (default: False).

If True, return the raw TableWrite result (default False).

Returns
type:

Rename columns in the DataFrame.

Parameters
new_names:
dict[str | Underscore, str] | dict[str, str]

Dictionary mapping old column names to new column names. Both keys and values can be either strings or underscore column references (e.g., _.col_name).

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
renamed = df.rename({"x": "id", "y": "value"})
# Can also use underscore syntax for keys
renamed = df.rename({_.x: "id", _.y: "value"})

Execute the query plan and return the result as a PyArrow Table.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
pyarrow.Table
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
arrow_table = filtered.to_arrow()

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Class methods for constructing new DataFrame instances from various sources.

  • From Python dicts or Arrow tables in memory
  • From Parquet, CSV, or Delta files on disk or cloud storage
  • From AWS Glue Iceberg tables
  • From SQL queries (DuckDB dialect) or external SQL datasources
  • From streaming sources (Kafka, Kinesis, PubSub)
  • From Chalk SQL catalog tables
  • Schema-only placeholders via named_table (data injected at run time)

Create a schema-only placeholder DataFrame for a named table.

The returned DataFrame contains no data; it is a logical reference that must be supplied with actual Arrow data at execution time via the tables argument of run or to_arrow. This is useful when you want to build a reusable query plan against a well-known schema and inject different data at runtime.

Parameters

Table identifier used as the key in the tables mapping at execution time.

Arrow schema describing the table's columns and types.

Optional list of column names indicating that the data is pre-sorted by those columns (used for optimisation hints).

Returns
type:
import pyarrow as pa
from chalkdf import DataFrame
schema = pa.schema([("user_id", pa.int64()), ("score", pa.float64())])
df = DataFrame.named_table("users", schema)
# Build a query plan
from chalk.features import _
result_plan = df.filter(_.score > 0.5)
# Inject real data at execution time
import pyarrow as pa
data = pa.table({"user_id": [1, 2, 3], "score": [0.3, 0.8, 0.6]})
result = result_plan.run(tables={"users": data})

Construct a DataFrame from a Python dictionary.

Parameters

Dictionary mapping column names to lists of values.

Returns
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": ["a", "b", "c"]})

Construct a DataFrame from an in-memory Arrow object.

Parameters
data:
MaterializedTable

PyArrow Table or RecordBatch to convert into a DataFrame.

Returns
import pyarrow as pa
from chalkdf import DataFrame
table = pa.table({"x": [1, 2, 3], "y": ["a", "b", "c"]})
df = DataFrame.from_arrow(table)

Scan files and return a DataFrame.

Currently supports CSV (with headers), Parquet, and Delta.

Parameters
input_uris:
typing.Sequence[str | Path] | str | Path

File path/URI or list of paths/URIs to scan. Supports local paths and file:// URIs.

name:
typing.Optional[str]
= None

Optional name to assign to the table being scanned.

Schema of the data. Required for CSV files, optional for Parquet.

mode:
'auto' | 'hive' | 'delta'
= 'auto'

Scan inference mode:

  • "auto": infer file type from URI/path suffix (CSV/Parquet).
  • "hive": expand Hive/glob paths without Delta inference fallback.
  • "delta": treat the input as a Delta table root (requires exactly one URI).
Returns
type:
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)

Load data from an AWS Glue Iceberg table.

Parameters

Fully qualified database.table name.

schema:
typing.Mapping[str, pyarrow.DataType]

Mapping of column names to Arrow types.

Number of rows per batch.

aws_catalog_account_id:
typing.Optional[str]
= None

AWS account hosting the Glue catalog.

aws_catalog_region:
typing.Optional[str]
= None

Region of the Glue catalog.

aws_role_arn:
typing.Optional[str]
= None

IAM role to assume for access.

filter_predicate:
typing.Optional[Expr]
= None

Optional filter applied during scan.

parquet_scan_range_column:
typing.Optional[str]
= None

Column used for range-based reads.

custom_partitions:
typing.Optional[dict[str, tuple[typing.Literal['date_trunc(day)'], str]]]
= None

Additional partition definitions.

partition_column:
typing.Optional[str]
= None

Column name representing partitions.

Returns
type:

Create a DataFrame from the result of executing a SQL query (DuckDB dialect).

Pass DataFrames or Arrow tables as keyword arguments to make them available as named tables inside the query. If no keyword arguments are provided, from_sql will attempt to auto-register any DataFrames found in the calling scope.

Parameters

SQL query string (DuckDB dialect).

tables:
CompatibleFrameType
= {}
Returns
type:
from chalkdf import DataFrame
orders = DataFrame.from_dict({"order_id": [1, 2, 3], "amount": [10.0, 20.0, 5.0]})
result = DataFrame.from_sql(
    "SELECT order_id, amount FROM orders WHERE amount > 8",
    orders=orders,
)

Join two DataFrames with SQL:

users = DataFrame.from_dict({"id": [1, 2], "name": ["Alice", "Bob"]})
purchases = DataFrame.from_dict({"user_id": [1, 1, 2], "item": ["a", "b", "c"]})
result = DataFrame.from_sql(
    "SELECT u.name, p.item FROM users u JOIN purchases p ON u.id = p.user_id",
    users=users,
    purchases=purchases,
)

Create a DataFrame by pulling messages from a streaming source.

This method connects to a Kafka, Kinesis, or PubSub source and pulls up to n messages, returning them as a DataFrame.

Parameters
source:
typing.Any

A streaming source configuration. Can be one of:

  • KafkaSource: Kafka topic configuration
  • KinesisSource: Kinesis stream configuration
  • PubSubSource: Google PubSub subscription configuration

Maximum number of messages to pull from the stream.

Timeout in milliseconds for pulling messages. Default is 15000ms.

If False (default), returns a DataFrame with a single "value" column containing the raw message bytes. If True, returns a DataFrame with columns for topic, partition, offset, timestamp, key, and value.

Returns
type:
from chalkdf import DataFrame
from chalk.streams import KafkaSource
source = KafkaSource(
    name="my_kafka",
    bootstrap_server="localhost:9092",
    topic="my_topic",
)
# Pull 100 messages, just the raw bytes
df = DataFrame.from_stream_source(source, n=100)
# Pull with full metadata
df = DataFrame.from_stream_source(source, n=100, include_metadata=True)

Create a DataFrame from a Chalk SQL catalog table.

Parameters

Name of the table in the catalog.

catalog:
ChalkSqlCatalog

ChalkSqlCatalog instance containing the table.

Returns
type:
from chalkdf import DataFrame
from libchalk.chalksql import ChalkSqlCatalog
catalog = ChalkSqlCatalog()
df = DataFrame.from_catalog_table("users", catalog=catalog)

Create a DataFrame from the result of querying a SQL data source.

Parameters
source:
BaseSQLSource

SQL source to query (e.g., PostgreSQL, Snowflake, BigQuery).

SQL query to execute against the data source.

Output schema of the query result. The datasource's driver converts the native query result to this schema.

Returns
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)

Methods for selecting, transforming, and manipulating columns.

Column expressions accept _ (underscore) syntax or F functions — for example _.price * _.qty or F.coalesce(_.value, 0). See the F function reference for available functions.

  • select / drop — pick or remove columns by name
  • with_columns — add or replace columns while keeping existing ones
  • project — replace all columns with a new set of expressions
  • col / column — reference a column by runtime name string
  • rename — rename one or more columns
  • explode — expand a list column into one row per element
  • with_unique_id — append a monotonically increasing ID column

Return a column expression for the named column.

df.col("name") is equivalent to _.name but validates that "name" exists in the DataFrame's schema at call time and is therefore useful when the column name is a runtime string variable rather than a literal attribute access.

Parameters

Name of an existing column in this DataFrame.

Returns
type:
Underscore
Raises
error:

If column is not present in the DataFrame's schema.

from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Reference a column by name to build expressions
col_x = df.col("x")
df_filtered = df.filter(col_x > 1)
# Useful when the column name comes from a variable
target = "y"
df2 = df.with_columns({"doubled": df.col(target) * 2})

Return a column expression for the named column.

Alias for col.

Parameters

Name of an existing column in this DataFrame.

Returns
type:
Underscore
Raises
error:

If column is not present in the DataFrame's schema.

from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Compute a sum from two columns referenced by name
df2 = df.with_columns({"sum": df.col("x") + df.col("y")})

Select existing columns by name.

Parameters
columns:
str | Underscore
= ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
selected = df.select("x", "y")

Drop specified columns from the DataFrame.

Parameters
columns:
str | Underscore
= ()
strict: = True

If True (default), raise a ValueError if any named column does not exist. If False, silently skip missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
df_dropped = df.drop("z")           # raises if "z" missing
df_dropped = df.drop("z", "missing_col", strict=False)  # silently skips

Add or replace columns while keeping all existing columns.

Unlike project, which returns only the columns you specify, with_columns keeps every existing column and either adds new ones or replaces columns whose names match.

Accepts multiple forms:

  • A dict mapping column names to expressions
  • Positional tuples of (name, expression)
  • Bare positional expressions that carry a name via .alias(<name>)
Returns
type:
from chalkdf import DataFrame
from chalk.features import _, F
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using underscore syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a column using an F function
df3 = df.with_columns({"z_capped": F.least(_.x + _.y, 8)})
# Add a column using .alias()
df4 = df.with_columns((_.x * 2).alias("x_doubled"))
# Both df2, df3, df4 still contain x and y in addition to the new column

Project to an exact set of output columns using expressions.

Unlike with_columns, which keeps all existing columns and only adds or replaces the ones you name, project returns only the columns you specify. Columns not listed in columns are dropped.

Use project when you want to reshape or rename the schema entirely; use with_columns when you only want to augment it.

Parameters
columns:
typing.Mapping[str, Expr | Underscore]

Mapping of output column names to expressions that define them. Every key becomes a column in the output; every value is an expression (_ expression or Expr) evaluated against the current schema.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
# Keep only "sum" and "x"; z is dropped
projected = df.project({"sum": _.x + _.y, "x": _.x})

Rename columns in the DataFrame.

Parameters
new_names:
dict[str | Underscore, str] | dict[str, str]

Dictionary mapping old column names to new column names. Both keys and values can be either strings or underscore column references (e.g., _.col_name).

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
renamed = df.rename({"x": "id", "y": "value"})
# Can also use underscore syntax for keys
renamed = df.rename({_.x: "id", _.y: "value"})

Add a monotonically increasing unique identifier column.

Parameters

Name of the new ID column.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [10, 20, 30]})
df_with_id = df.with_unique_id("row_id")

Explode a list or array column into multiple rows.

Each element in the list becomes a separate row, with other column values duplicated.

Parameters
column:
str | Underscore

Name of the list/array column to explode.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"id": [1, 2], "items": [[10, 20], [30]]})
exploded = df.explode("items")

Methods for filtering, ordering, and deduplicating rows.

  • filter — keep rows matching a boolean expression
  • order_by — sort rows by one or more columns
  • slice — select a positional range of rows
  • distinct_on — deduplicate by a set of key columns

Filter rows based on a boolean expression.

Parameters
expr:
Expr | Underscore

Boolean expression to filter rows. Only rows where the expression evaluates to True are kept.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
filtered = df.filter(_.x > 2)

Sort the DataFrame by one or more columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")

Return a subset of rows starting at a specific position.

Parameters

Zero-based index where the slice begins.

length: = None

Number of rows to include. If None, includes all remaining rows.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3, 4, 5]})
# Get rows 1-3 (indices 1, 2, 3)
sliced = df.slice(1, 3)

Remove duplicate rows based on the specified partition columns.

For each unique combination of values in columns, exactly one row is emitted. Which row is kept within a partition is not guaranteed — the engine may choose any row. If you need a deterministic choice, sort the DataFrame first with order_by before calling distinct_on.

Returns
type:
Raises
error:

If no columns are provided.

from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x")  # one row per unique x value

Methods for combining rows from multiple DataFrames.

Both DataFrames must share the same schema (column order may differ). Duplicates are retained; row order is not guaranteed.

Combine this DataFrame with another by stacking rows.

Convenience method for unioning with a single DataFrame. Equivalent to union_all(other).

Both DataFrames must have the same schema (different column order is allowed - the output will have the same column order as self). Duplicates are retained. Row order is not preserved.

See Also

union_all : Union with multiple DataFrames at once.

Parameters

DataFrame to union with this DataFrame.

Returns
type:
Raises
error:

If schemas don't match.

df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order

Combine this DataFrame with one or more others by stacking rows.

All DataFrames must have the same schema (different column order is allowed - the output will have the same column order as self). Duplicates are retained. Row order is not preserved.

Returns
type:
Raises
error:

If no other DataFrames are provided, or if schemas don't match.

df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order

Methods for combining two DataFrames based on matching keys.

  • join — standard equality joins (inner, left, right, outer, semi, anti, cross)
  • join_asof — temporal / nearest-key join for time-series data

Join this DataFrame with another.

Parameters
on:
dict[str | Underscore, str | Underscore] | typing.Sequence[str | Underscore] | None
= None

Join keys. Can be specified in multiple ways:

  • A sequence of column names (same names on both sides): on=["col1", "col2"]
  • A mapping of left->right column names: on={"left_col": "right_col"}
  • If None, must specify left_on and right_on separately.
left_on:
typing.Sequence[str | Underscore] | None
= None

Column names for left DataFrame join keys. Only used when on is None. Must be paired with right_on.

right_on:
typing.Sequence[str | Underscore] | None
= None

Column names for right DataFrame join keys. Only used when on is None. Must be paired with left_on.

how:
JoinType
= 'inner'

Join type. Supported values:

  • "inner": Keep only rows that match in both DataFrames (default)
  • "left": Keep all rows from left DataFrame
  • "right": Keep all rows from right DataFrame
  • "outer" or "full": Keep all rows from both DataFrames
  • "semi": Return rows from left that have matches in right (no right columns)
  • "anti": Return rows from left that have no matches in right
  • "cross": Cartesian product (do not pass in on)

Optional suffix applied to right-hand columns when names collide. For example, if both DataFrames have a column "value" and right_suffix="_right", the result will have "value" and "value_right".

Returns
type:

Perform an as-of join with another DataFrame.

An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.

Important: Both DataFrames must be sorted by the on (or left_on/right_on) column before calling this method. Use .order_by(on) to sort if needed.

Parameters

Right-hand DataFrame to join with.

on:
str | Underscore | None
= None

Column name to use as the as-of join key (must be sorted). This column is used for both left and right DataFrames. The join finds the nearest match according to the strategy. Either on or both left_on and right_on must be specified.

left_on:
str | Underscore | None
= None

Column name in left DataFrame for the as-of join key. Only used when on is None. Must be paired with right_on.

right_on:
str | Underscore | None
= None

Column name in right DataFrame for the as-of join key. Can be used with on (to specify a different right column name) or with left_on (when on is None).

by:
dict[str | Underscore, str | Underscore] | typing.Sequence[str | Underscore] | None
= None

Additional exact-match columns (optional). These columns must match exactly before performing the as-of match on the on column. Can be specified as:

  • A sequence of column names (same names on both sides): by=["col1", "col2"]
  • A mapping of left->right column names: by={"left_col": "right_col"}
  • If None, can specify left_by and right_by separately.
left_by:
typing.Sequence[str | Underscore] | None
= None

Column names in left DataFrame for exact-match conditions. Only used when by is None. Must be paired with right_by.

right_by:
typing.Sequence[str | Underscore] | None
= None

Column names in right DataFrame for exact-match conditions. Only used when by is None. Must be paired with left_by.

strategy:
AsOfJoinStrategy | typing.Literal['forward', 'backward']
= 'backward'

Join strategy controlling which match to select:

  • "backward" (default): Match with the most recent past value
  • "forward": Match with the nearest future value Can also pass AsOfJoinStrategy.BACKWARD or AsOfJoinStrategy.FORWARD.

Suffix to add to overlapping column names from the right DataFrame.

Whether to coalesce the join keys (default True).

Returns
type:

Methods for computing group summaries and window (analytic) expressions.

  • group_by — returns a GroupBy object for chained aggregations
  • agg — group by columns and apply aggregation expressions directly
  • window — compute analytic (window) expressions partitioned by key columns

Create a GroupBy object for chained aggregation operations.

This method returns a GroupBy object that can be used to apply aggregation expressions via the .agg() method. This provides an alternative syntax to df.agg(by, *aggregations).

Returns
type:
GroupBy
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
grouped = df.group_by("group").agg(_.value.sum().alias("total"))

Multiple grouping columns:

df2 = DataFrame.from_dict({"g1": ["A", "A", "B"], "g2": ["X", "Y", "X"], "val": [1, 2, 3]})
result = df2.group_by("g1", "g2").agg(_.val.sum().alias("sum"))

Using underscore expressions:

result = df.group_by(_.group).agg(_.value.mean().alias("avg"))

Group by columns and apply aggregation expressions.

Parameters
by:
typing.Sequence[str | Underscore] | str | Underscore

Column name(s) to group by. Can be a single column name/expression or a sequence of column names/expressions.

aggregations:
AggExpr | Underscore
= ()
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))
# Or with a single column:
agg_df = df.agg("group", _.value.sum().alias("total"))

Compute window (analytic) expressions partitioned by by and ordered by order_by.

Window operations evaluate each WindowExpr over a partition of rows (defined by by) sorted within that partition (by order_by). The result columns are appended to the existing schema; original columns are preserved.

Overlap between by and order_by columns is not allowed.

Parameters
by:
typing.Sequence[str | Underscore]

Column names that define the partition boundaries. Rows with the same combination of values in these columns form one partition.

order_by:
typing.Sequence[str | Underscore | tuple[str | Underscore, str]]

Column names (or (name, direction) tuples) that define the sort order within each partition. Direction can be "asc" (default) or "desc".

expressions:
WindowExpr
= ()
Returns
type:
from chalkdf import DataFrame
from libchalk.chalktable import WindowExpr
df = DataFrame.from_dict({
    "idx": [1, 1, 2, 2],
    "v":   [10, 20, 30, 40],
})
# Partition by "idx", sort by "v" ascending, shift "v" by -1 into "v_shifted"
result = df.window(["idx"], ["v"], WindowExpr.shift("v", "v_shifted", -1))
# result schema: idx, v, v_shifted
# v_shifted contains the *next* value of v within each idx partition

Methods for executing query plans and inspecting DataFrame structure.

  • run — execute and return a materialized DataFrame
  • to_arrow — execute and return a pyarrow.Table
  • write / write_parquet — execute and persist output files
  • explain_logical / explain_physical — inspect the query plan
  • get_plan / get_tables — access internal plan and table state

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Execute the query plan and return the result as a PyArrow Table.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
pyarrow.Table
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
arrow_table = filtered.to_arrow()

Execute the DataFrame plan and write the output files immediately.

This is the eager counterpart to write_lazy: it builds the write plan and runs it in one step.

By default (return_table_write_result=False) the method returns None after the write completes. Pass return_table_write_result=True to receive the raw TableWrite result DataFrame instead.

Parameters

Directory to write output files.

Optional explicit file name.

file_format: = 'parquet'

Output format (default parquet).

serde_parameters:
typing.Mapping[str, str] | None
= None

Optional SerDe options for text formats.

Optional compression codec.

Ensure writers emit files even if no rows were produced.

Optional connector id override.

If True, return the raw TableWrite result DataFrame. If False (default), return None.

Returns
type:

Write the DataFrame as Parquet files using an auto-configured connector.

Convenience method that simplifies writing Parquet files compared to the more general write. The connector is selected automatically based on the URI scheme.

By default (return_table_write_result=False) the method returns None after the write completes. Pass return_table_write_result=True to receive the raw TableWrite result DataFrame instead.skip_planning_time_validation Whether to skip validation at planning time (default: False). return_table_write_result If True, return the raw TableWrite result DataFrame. If False (default), return None.

Parameters

URI prefix where Parquet files will be written. Supports local (file://), S3 (s3://), and GCS (gs://) URIs.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/")  # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Execute the query plan and return the result as a PyArrow Table.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
pyarrow.Table
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
arrow_table = filtered.to_arrow()

Execute the DataFrame plan and write the output files immediately.

This is the eager counterpart to write_lazy: it builds the write plan and runs it in one step.

By default (return_table_write_result=False) the method returns None after the write completes. Pass return_table_write_result=True to receive the raw TableWrite result DataFrame instead.

Parameters

Directory to write output files.

Optional explicit file name.

file_format: = 'parquet'

Output format (default parquet).

serde_parameters:
typing.Mapping[str, str] | None
= None

Optional SerDe options for text formats.

Optional compression codec.

Ensure writers emit files even if no rows were produced.

Optional connector id override.

If True, return the raw TableWrite result DataFrame. If False (default), return None.

Returns
type:

Write the DataFrame as Parquet files using an auto-configured connector.

Convenience method that simplifies writing Parquet files compared to the more general write. The connector is selected automatically based on the URI scheme.

By default (return_table_write_result=False) the method returns None after the write completes. Pass return_table_write_result=True to receive the raw TableWrite result DataFrame instead.skip_planning_time_validation Whether to skip validation at planning time (default: False). return_table_write_result If True, return the raw TableWrite result DataFrame. If False (default), return None.

Parameters

URI prefix where Parquet files will be written. Supports local (file://), S3 (s3://), and GCS (gs://) URIs.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/")  # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.