This reference documents the Chalk DataFrame API for offline feature computation and data processing.
Lightweight DataFrame wrapper around Chalk's execution engine.
The DataFrame class constructs query plans backed by libchalk and
can materialize them into Arrow tables. Operations build a lazy query plan
that executes only when you call run or to_arrow.
Column expressions can be written with _ (underscore) attribute syntax
or using F functions — see the
F function reference for the
full list.
Logical representation of tabular data for query operations.
DataFrame provides a lazy evaluation model where operations build up a query
plan that executes only when materialized via run or
to_arrow. Each operation returns a new DataFrame, leaving the
original unchanged.
Most users should use class methods like from_dict,
from_arrow, or scan to create DataFrames rather than
calling the constructor directly.
Column expressions use _ (underscore) or F function syntax.
from chalkdf import DataFrame
from chalk.features import _, F
df = DataFrame.from_dict({"x": [1, 2, 3], "price": [10.0, 20.0, 30.0]})
# Underscore syntax
doubled = df.with_columns({"x2": _.x * 2})
# F function syntax
capped = df.with_columns({"price": F.least(_.price, 25.0)})
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
filtered = df.filter(_.x > 1)
result = filtered.run()
Create a DataFrame from a dictionary, Arrow table, or query plan.
For most use cases, prefer using class methods like from_dict,
from_arrow, or scan instead of calling this constructor directly.
from chalkdf import DataFrame
# Simple dictionary input
df = DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
# Or use the explicit class method (recommended)
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
Return the number of rows if this DataFrame has already been materialized.
Raising TypeError for non-materialized frames matches Python's default
behavior while avoiding implicitly executing the plan.
Create a schema-only placeholder DataFrame for a named table.
The returned DataFrame contains no data; it is a logical reference
that must be supplied with actual Arrow data at execution time via the
tables argument of run or to_arrow. This is
useful when you want to build a reusable query plan against a
well-known schema and inject different data at runtime.
import pyarrow as pa
from chalkdf import DataFrame
schema = pa.schema([("user_id", pa.int64()), ("score", pa.float64())])
df = DataFrame.named_table("users", schema)
# Build a query plan
from chalk.features import _
result_plan = df.filter(_.score > 0.5)
# Inject real data at execution time
import pyarrow as pa
data = pa.table({"user_id": [1, 2, 3], "score": [0.3, 0.8, 0.6]})
result = result_plan.run(tables={"users": data})
Create a DataFrame from a Python async generator function.
This method allows you to create a DataFrame by streaming data from a custom Python async generator. The generator can yield data as PyArrow RecordBatches, pydicts, or pylists, and the method will handle conversion and schema alignment automatically. If the UDF yields an invalid batch, no further batches will be processed.
An async generator function that yields data batches. Each yielded value
can be a pyarrow.RecordBatch, a dictionary (will be converted using
pyarrow.RecordBatch.from_pydict), or a list (will be converted using
pyarrow.RecordBatch.from_pylist). The generator should yield None
or complete iteration to signal completion.
The expected PyArrow schema for the output data. If yielded batches have columns in a different order, they will be automatically reordered to match this schema.
Maximum time in seconds to wait for the output handler to accept each
batch. Prevents deadlocks when the consumer is blocked. Default is 300
seconds (5 minutes). Set to None to disable timeout (not recommended).
If sending a batch to the output handler exceeds the timeout.
import pyarrow as pa
from chalkdf import DataFrame
async def generate_data():
for i in range(3):
yield {"x": [i * 10, i * 10 + 1], "y": [i, i]}
schema = pa.schema([("x", pa.int64()), ("y", pa.int64())])
df = DataFrame.from_python_udf(generate_data, schema)
result = df.run()
# Example with PyArrow RecordBatches
async def generate_batches():
batch1 = pa.RecordBatch.from_pydict({"a": [1, 2], "b": [3, 4]})
batch2 = pa.RecordBatch.from_pydict({"a": [5, 6], "b": [7, 8]})
yield batch1
yield batch2
schema = pa.schema([("a", pa.int64()), ("b", pa.int64())])
df = DataFrame.from_python_udf(generate_batches, schema)
# Example with custom timeout
df = DataFrame.from_python_udf(generate_data, schema, output_timeout=60.0)
Scan files and return a DataFrame.
Currently supports CSV (with headers), Parquet, and Delta.
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)
Create a DataFrame from the result of executing a SQL query (DuckDB dialect).
Pass DataFrames or Arrow tables as keyword arguments to make them
available as named tables inside the query. If no keyword arguments
are provided, from_sql will attempt to auto-register any
DataFrames found in the calling scope.
from chalkdf import DataFrame
orders = DataFrame.from_dict({"order_id": [1, 2, 3], "amount": [10.0, 20.0, 5.0]})
result = DataFrame.from_sql(
"SELECT order_id, amount FROM orders WHERE amount > 8",
orders=orders,
)
Join two DataFrames with SQL:
users = DataFrame.from_dict({"id": [1, 2], "name": ["Alice", "Bob"]})
purchases = DataFrame.from_dict({"user_id": [1, 1, 2], "item": ["a", "b", "c"]})
result = DataFrame.from_sql(
"SELECT u.name, p.item FROM users u JOIN purchases p ON u.id = p.user_id",
users=users,
purchases=purchases,
)
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)
Create a DataFrame by executing a SQL UNLOAD query and scanning the resulting parquet files.
The SQL query should be a data warehouse UNLOAD/EXPORT command (e.g., BigQuery
EXPORT DATA, Snowflake COPY INTO) that writes parquet files to
output_uri_prefix. The files are then read by a deferred TableScan node.
import pyarrow as pa
from chalkdf import DataFrame
from libchalk.sql import ConnectionPool
from libchalk.sql.snowflake import make_snowflake_connection_factory
factory = make_snowflake_connection_factory(uri="snowflake://user:pass@acct/db")
pool = ConnectionPool(factory, max_pool_size=1)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan_from_sql(
"COPY INTO @stage/out/ FROM (SELECT user_id, name FROM users)",
pool=pool,
output_uri_prefix="s3://my-bucket/unload-output/",
schema=schema,
)
Create a DataFrame by pulling messages from a streaming source.
This method connects to a Kafka, Kinesis, or PubSub source and pulls up to n
messages, returning them as a DataFrame.
A streaming source configuration. Can be one of:
KafkaSource: Kafka topic configurationKinesisSource: Kinesis stream configurationPubSubSource: Google PubSub subscription configurationfrom chalkdf import DataFrame
from chalk.streams import KafkaSource
source = KafkaSource(
name="my_kafka",
bootstrap_server="localhost:9092",
topic="my_topic",
)
# Pull 100 messages, just the raw bytes
df = DataFrame.from_stream_source(source, n=100)
# Pull with full metadata
df = DataFrame.from_stream_source(source, n=100, include_metadata=True)
Compile the current plan if necessary.
Configuration is resolved from multiple sources in priority order:
config parameter (highest priority)compilation_config context managerset_compilation_defaultsCHALK_USE_VELOX_PARQUET_READER)If a different configuration is provided than the previous compilation, the plan will be automatically recompiled.
from chalkdf import DataFrame
from chalkdf.config import CompilationConfig
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
compiled = df.compile(config=CompilationConfig(use_online_hash_join=True))
print(compiled.explain_logical())
Add or replace columns while keeping all existing columns.
Unlike project, which returns only the columns you specify,
with_columns keeps every existing column and either adds new ones
or replaces columns whose names match.
Accepts multiple forms:
dict mapping column names to expressions(name, expression).alias(<name>)from chalkdf import DataFrame
from chalk.features import _, F
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using underscore syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a column using an F function
df3 = df.with_columns({"z_capped": F.least(_.x + _.y, 8)})
# Add a column using .alias()
df4 = df.with_columns((_.x * 2).alias("x_doubled"))
# Both df2, df3, df4 still contain x and y in addition to the new column
Return a column expression for the named column.
df.col("name") is equivalent to _.name but validates that
"name" exists in the DataFrame's schema at call time and is
therefore useful when the column name is a runtime string variable
rather than a literal attribute access.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Reference a column by name to build expressions
col_x = df.col("x")
df_filtered = df.filter(col_x > 1)
# Useful when the column name comes from a variable
target = "y"
df2 = df.with_columns({"doubled": df.col(target) * 2})
Return a column expression for the named column.
Alias for col.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Compute a sum from two columns referenced by name
df2 = df.with_columns({"sum": df.col("x") + df.col("y")})
Combine this DataFrame with one or more others by stacking rows.
All DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
If no other DataFrames are provided, or if schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order
Combine this DataFrame with another by stacking rows.
Convenience method for unioning with a single DataFrame.
Equivalent to union_all(other).
Both DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
union_all : Union with multiple DataFrames at once.
If schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order
Project to an exact set of output columns using expressions.
Unlike with_columns, which keeps all existing columns and only
adds or replaces the ones you name, project returns only the
columns you specify. Columns not listed in columns are dropped.
Use project when you want to reshape or rename the schema
entirely; use with_columns when you only want to augment it.
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
# Keep only "sum" and "x"; z is dropped
projected = df.project({"sum": _.x + _.y, "x": _.x})
Join this DataFrame with another.
Join keys. Can be specified in multiple ways:
on=["col1", "col2"]on={"left_col": "right_col"}left_on and right_on separately.Join type. Supported values:
"inner": Keep only rows that match in both DataFrames (default)"left": Keep all rows from left DataFrame"right": Keep all rows from right DataFrame"outer" or "full": Keep all rows from both DataFrames"semi": Return rows from left that have matches in right (no right columns)"anti": Return rows from left that have no matches in right"cross": Cartesian product (do not pass in on)Perform an as-of join with another DataFrame.
An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.
Important: Both DataFrames must be sorted by the on (or left_on/right_on)
column before calling this method. Use .order_by(on) to sort if needed.
Column name to use as the as-of join key (must be sorted).
This column is used for both left and right DataFrames.
The join finds the nearest match according to the strategy.
Either on or both left_on and right_on must be specified.
Column name in left DataFrame for the as-of join key. Only used when on
is None. Must be paired with right_on.
Column name in right DataFrame for the as-of join key. Can be used with on
(to specify a different right column name) or with left_on (when on is None).
Additional exact-match columns (optional). These columns must match exactly
before performing the as-of match on the on column. Can be specified as:
by=["col1", "col2"]by={"left_col": "right_col"}left_by and right_by separately.Column names in left DataFrame for exact-match conditions. Only used when
by is None. Must be paired with right_by.
Column names in right DataFrame for exact-match conditions. Only used when
by is None. Must be paired with left_by.
Compute window (analytic) expressions partitioned by by and ordered by order_by.
Window operations evaluate each WindowExpr over a partition of
rows (defined by by) sorted within that partition (by order_by).
The result columns are appended to the existing schema; original columns
are preserved.
Overlap between by and order_by columns is not allowed.
Column names that define the partition boundaries. Rows with the same combination of values in these columns form one partition.
Column names (or (name, direction) tuples) that define the sort
order within each partition. Direction can be "asc" (default)
or "desc".
from chalkdf import DataFrame
from libchalk.chalktable import WindowExpr
df = DataFrame.from_dict({
"idx": [1, 1, 2, 2],
"v": [10, 20, 30, 40],
})
# Partition by "idx", sort by "v" ascending, shift "v" by -1 into "v_shifted"
result = df.window(["idx"], ["v"], WindowExpr.shift("v", "v_shifted", -1))
# result schema: idx, v, v_shifted
# v_shifted contains the *next* value of v within each idx partition
Create a GroupBy object for chained aggregation operations.
This method returns a GroupBy object that can be used to apply
aggregation expressions via the .agg() method. This provides
an alternative syntax to df.agg(by, *aggregations).
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
grouped = df.group_by("group").agg(_.value.sum().alias("total"))
Multiple grouping columns:
df2 = DataFrame.from_dict({"g1": ["A", "A", "B"], "g2": ["X", "Y", "X"], "val": [1, 2, 3]})
result = df2.group_by("g1", "g2").agg(_.val.sum().alias("sum"))
Using underscore expressions:
result = df.group_by(_.group).agg(_.value.mean().alias("avg"))
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))
# Or with a single column:
agg_df = df.agg("group", _.value.sum().alias("total"))
Remove duplicate rows based on the specified partition columns.
For each unique combination of values in columns, exactly one
row is emitted. Which row is kept within a partition is not
guaranteed — the engine may choose any row. If you need a
deterministic choice, sort the DataFrame first with order_by
before calling distinct_on.
If no columns are provided.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x") # one row per unique x value
Build a lazy write plan without executing it.
Returns a new DataFrame whose query plan ends with a
TableWrite operator. No files are written until you call
run or to_arrow on the returned DataFrame.
For immediate execution use write instead.
Execute the DataFrame plan and write the output files immediately.
This is the eager counterpart to write_lazy: it builds the
write plan and runs it in one step.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result DataFrame instead.
Write the DataFrame as Parquet files using an auto-configured connector.
Convenience method that simplifies writing Parquet files compared to
the more general write. The connector is selected
automatically based on the URI scheme.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result DataFrame instead.skip_planning_time_validation
Whether to skip validation at planning time (default: False).
return_table_write_result
If True, return the raw TableWrite result DataFrame.
If False (default), return None.
URI prefix where Parquet files will be written.
Supports local (file://), S3 (s3://), and GCS (gs://) URIs.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/") # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)
Class methods for constructing new DataFrame instances from various sources.
named_table (data injected at run time)Create a schema-only placeholder DataFrame for a named table.
The returned DataFrame contains no data; it is a logical reference
that must be supplied with actual Arrow data at execution time via the
tables argument of run or to_arrow. This is
useful when you want to build a reusable query plan against a
well-known schema and inject different data at runtime.
import pyarrow as pa
from chalkdf import DataFrame
schema = pa.schema([("user_id", pa.int64()), ("score", pa.float64())])
df = DataFrame.named_table("users", schema)
# Build a query plan
from chalk.features import _
result_plan = df.filter(_.score > 0.5)
# Inject real data at execution time
import pyarrow as pa
data = pa.table({"user_id": [1, 2, 3], "score": [0.3, 0.8, 0.6]})
result = result_plan.run(tables={"users": data})
Scan files and return a DataFrame.
Currently supports CSV (with headers), Parquet, and Delta.
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)
Create a DataFrame from the result of executing a SQL query (DuckDB dialect).
Pass DataFrames or Arrow tables as keyword arguments to make them
available as named tables inside the query. If no keyword arguments
are provided, from_sql will attempt to auto-register any
DataFrames found in the calling scope.
from chalkdf import DataFrame
orders = DataFrame.from_dict({"order_id": [1, 2, 3], "amount": [10.0, 20.0, 5.0]})
result = DataFrame.from_sql(
"SELECT order_id, amount FROM orders WHERE amount > 8",
orders=orders,
)
Join two DataFrames with SQL:
users = DataFrame.from_dict({"id": [1, 2], "name": ["Alice", "Bob"]})
purchases = DataFrame.from_dict({"user_id": [1, 1, 2], "item": ["a", "b", "c"]})
result = DataFrame.from_sql(
"SELECT u.name, p.item FROM users u JOIN purchases p ON u.id = p.user_id",
users=users,
purchases=purchases,
)
Create a DataFrame by pulling messages from a streaming source.
This method connects to a Kafka, Kinesis, or PubSub source and pulls up to n
messages, returning them as a DataFrame.
A streaming source configuration. Can be one of:
KafkaSource: Kafka topic configurationKinesisSource: Kinesis stream configurationPubSubSource: Google PubSub subscription configurationfrom chalkdf import DataFrame
from chalk.streams import KafkaSource
source = KafkaSource(
name="my_kafka",
bootstrap_server="localhost:9092",
topic="my_topic",
)
# Pull 100 messages, just the raw bytes
df = DataFrame.from_stream_source(source, n=100)
# Pull with full metadata
df = DataFrame.from_stream_source(source, n=100, include_metadata=True)
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)
Methods for selecting, transforming, and manipulating columns.
Column expressions accept _ (underscore) syntax or F functions — for
example _.price * _.qty or F.coalesce(_.value, 0). See the
F function reference for
available functions.
select / drop — pick or remove columns by namewith_columns — add or replace columns while keeping existing onesproject — replace all columns with a new set of expressionscol / column — reference a column by runtime name stringrename — rename one or more columnsexplode — expand a list column into one row per elementwith_unique_id — append a monotonically increasing ID columnReturn a column expression for the named column.
df.col("name") is equivalent to _.name but validates that
"name" exists in the DataFrame's schema at call time and is
therefore useful when the column name is a runtime string variable
rather than a literal attribute access.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Reference a column by name to build expressions
col_x = df.col("x")
df_filtered = df.filter(col_x > 1)
# Useful when the column name comes from a variable
target = "y"
df2 = df.with_columns({"doubled": df.col(target) * 2})
Return a column expression for the named column.
Alias for col.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Compute a sum from two columns referenced by name
df2 = df.with_columns({"sum": df.col("x") + df.col("y")})
Add or replace columns while keeping all existing columns.
Unlike project, which returns only the columns you specify,
with_columns keeps every existing column and either adds new ones
or replaces columns whose names match.
Accepts multiple forms:
dict mapping column names to expressions(name, expression).alias(<name>)from chalkdf import DataFrame
from chalk.features import _, F
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using underscore syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a column using an F function
df3 = df.with_columns({"z_capped": F.least(_.x + _.y, 8)})
# Add a column using .alias()
df4 = df.with_columns((_.x * 2).alias("x_doubled"))
# Both df2, df3, df4 still contain x and y in addition to the new column
Project to an exact set of output columns using expressions.
Unlike with_columns, which keeps all existing columns and only
adds or replaces the ones you name, project returns only the
columns you specify. Columns not listed in columns are dropped.
Use project when you want to reshape or rename the schema
entirely; use with_columns when you only want to augment it.
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
# Keep only "sum" and "x"; z is dropped
projected = df.project({"sum": _.x + _.y, "x": _.x})
Methods for filtering, ordering, and deduplicating rows.
filter — keep rows matching a boolean expressionorder_by — sort rows by one or more columnsslice — select a positional range of rowsdistinct_on — deduplicate by a set of key columnsRemove duplicate rows based on the specified partition columns.
For each unique combination of values in columns, exactly one
row is emitted. Which row is kept within a partition is not
guaranteed — the engine may choose any row. If you need a
deterministic choice, sort the DataFrame first with order_by
before calling distinct_on.
If no columns are provided.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x") # one row per unique x value
Methods for combining rows from multiple DataFrames.
Both DataFrames must share the same schema (column order may differ). Duplicates are retained; row order is not guaranteed.
Combine this DataFrame with another by stacking rows.
Convenience method for unioning with a single DataFrame.
Equivalent to union_all(other).
Both DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
union_all : Union with multiple DataFrames at once.
If schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order
Combine this DataFrame with one or more others by stacking rows.
All DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
If no other DataFrames are provided, or if schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order
Methods for combining two DataFrames based on matching keys.
join — standard equality joins (inner, left, right, outer, semi, anti, cross)join_asof — temporal / nearest-key join for time-series dataJoin this DataFrame with another.
Join keys. Can be specified in multiple ways:
on=["col1", "col2"]on={"left_col": "right_col"}left_on and right_on separately.Join type. Supported values:
"inner": Keep only rows that match in both DataFrames (default)"left": Keep all rows from left DataFrame"right": Keep all rows from right DataFrame"outer" or "full": Keep all rows from both DataFrames"semi": Return rows from left that have matches in right (no right columns)"anti": Return rows from left that have no matches in right"cross": Cartesian product (do not pass in on)Perform an as-of join with another DataFrame.
An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.
Important: Both DataFrames must be sorted by the on (or left_on/right_on)
column before calling this method. Use .order_by(on) to sort if needed.
Column name to use as the as-of join key (must be sorted).
This column is used for both left and right DataFrames.
The join finds the nearest match according to the strategy.
Either on or both left_on and right_on must be specified.
Column name in left DataFrame for the as-of join key. Only used when on
is None. Must be paired with right_on.
Column name in right DataFrame for the as-of join key. Can be used with on
(to specify a different right column name) or with left_on (when on is None).
Additional exact-match columns (optional). These columns must match exactly
before performing the as-of match on the on column. Can be specified as:
by=["col1", "col2"]by={"left_col": "right_col"}left_by and right_by separately.Column names in left DataFrame for exact-match conditions. Only used when
by is None. Must be paired with right_by.
Column names in right DataFrame for exact-match conditions. Only used when
by is None. Must be paired with left_by.
Methods for computing group summaries and window (analytic) expressions.
group_by — returns a GroupBy object for chained aggregationsagg — group by columns and apply aggregation expressions directlywindow — compute analytic (window) expressions partitioned by key columnsCreate a GroupBy object for chained aggregation operations.
This method returns a GroupBy object that can be used to apply
aggregation expressions via the .agg() method. This provides
an alternative syntax to df.agg(by, *aggregations).
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
grouped = df.group_by("group").agg(_.value.sum().alias("total"))
Multiple grouping columns:
df2 = DataFrame.from_dict({"g1": ["A", "A", "B"], "g2": ["X", "Y", "X"], "val": [1, 2, 3]})
result = df2.group_by("g1", "g2").agg(_.val.sum().alias("sum"))
Using underscore expressions:
result = df.group_by(_.group).agg(_.value.mean().alias("avg"))
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))
# Or with a single column:
agg_df = df.agg("group", _.value.sum().alias("total"))
Compute window (analytic) expressions partitioned by by and ordered by order_by.
Window operations evaluate each WindowExpr over a partition of
rows (defined by by) sorted within that partition (by order_by).
The result columns are appended to the existing schema; original columns
are preserved.
Overlap between by and order_by columns is not allowed.
Column names that define the partition boundaries. Rows with the same combination of values in these columns form one partition.
Column names (or (name, direction) tuples) that define the sort
order within each partition. Direction can be "asc" (default)
or "desc".
from chalkdf import DataFrame
from libchalk.chalktable import WindowExpr
df = DataFrame.from_dict({
"idx": [1, 1, 2, 2],
"v": [10, 20, 30, 40],
})
# Partition by "idx", sort by "v" ascending, shift "v" by -1 into "v_shifted"
result = df.window(["idx"], ["v"], WindowExpr.shift("v", "v_shifted", -1))
# result schema: idx, v, v_shifted
# v_shifted contains the *next* value of v within each idx partition
Methods for executing query plans and inspecting DataFrame structure.
run — execute and return a materialized DataFrameto_arrow — execute and return a pyarrow.Tablewrite / write_parquet — execute and persist output filesexplain_logical / explain_physical — inspect the query planget_plan / get_tables — access internal plan and table stateExecute the DataFrame plan and write the output files immediately.
This is the eager counterpart to write_lazy: it builds the
write plan and runs it in one step.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result DataFrame instead.
Write the DataFrame as Parquet files using an auto-configured connector.
Convenience method that simplifies writing Parquet files compared to
the more general write. The connector is selected
automatically based on the URI scheme.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result DataFrame instead.skip_planning_time_validation
Whether to skip validation at planning time (default: False).
return_table_write_result
If True, return the raw TableWrite result DataFrame.
If False (default), return None.
URI prefix where Parquet files will be written.
Supports local (file://), S3 (s3://), and GCS (gs://) URIs.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/") # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)
Execute the DataFrame plan and write the output files immediately.
This is the eager counterpart to write_lazy: it builds the
write plan and runs it in one step.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result DataFrame instead.
Write the DataFrame as Parquet files using an auto-configured connector.
Convenience method that simplifies writing Parquet files compared to
the more general write. The connector is selected
automatically based on the URI scheme.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result DataFrame instead.skip_planning_time_validation
Whether to skip validation at planning time (default: False).
return_table_write_result
If True, return the raw TableWrite result DataFrame.
If False (default), return None.
URI prefix where Parquet files will be written.
Supports local (file://), S3 (s3://), and GCS (gs://) URIs.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/") # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)