# SQL Integration
source: https://docs.chalk.ai/docs/sql

## Integrate with SQL-like sources.

Chalk can ingest data using a SQL interface from any of our supported SQL data source integrations. The full list of
supported SQL data sources can be found in our API reference.

### Basic queries

Chalk supports running SQL from files or from strings. When you run queries that invoke your SQL resolver, Chalk
automatically pushes your input parameters into the WHERE clause of your query. For more details, see our section on
push-down filters.

The examples on this page use our PostgreSQL data source, but can be generalized to any of our other
SQL data sources.

### SQL file resolvers

SQL file resolvers are Chalk's recommended method for writing SQL resolvers.

```
-- type: online
-- resolves: user
-- source: PG
select id, email, full_name from users
```

Here, we define an online resolver that returns some features for the User feature class
from the users table in the PostgreSQL source PG. The comments are yaml-parsed
to provide other metadata for Chalk to decide how to design the resolver. SQL file resolvers
can return multiple rows for aggregation operations, offline queries, and more.

It's also possible to use SELECT * in a SQL file resolver, but be careful!

```
-- type: online
-- resolves: user
-- source: PG
select * from users
```

Implicitly, this query tries to align every scalar feature from the User feature class to a column name in
users. If a feature name is misnamed or absent from the table, you'll get a "missing columns" error.

To programmatically generate SQL file resolvers, check out Generated SQL file
resolvers.

### Parameterized feature inputs

Like other resolvers, SQL file resolvers can take features as input. In this example, we want our resolver to require
EmailUser.id as input:

```
-- type: online
-- resolves: user
-- source: PG
select id, email, full_name
from users
where id = ${user.id}
```

Use ${} with snake case to reference the desired feature.

Use ${now} in your query as a special argument representing the time of the query. For more details, see our
time documentation.

### SQL strings

You can run SQL queries either as Python strings or from .sql files.

When the name of the column matches the name of the feature
with non-alphanumeric characters removed, the mapping from
column to feature is automatic.

```
pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_string("select full_name, email from users where id=1")
        .first()
    )
```

get_user's return type indicates that it expects features named full_name and email, which are returned as
columns from the SQL query.

If the column names don't align exactly, you can
include the parameter fields to specify the mapping from
the query to the fields.

```
pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_string(
            "select name, email from users where id=1",
            fields=dict(name=User.full_name),
        )
        .first()
    )
```

Here, the email column of the query automatically aligns with the
expected User.email feature, but the name column of the query
is explicitly mapped to the User.full_name feature.

### Parameterizing string queries

You can parameterize queries to pass variables.
Parameterize names with a colon,
and pass a dictionary from parameter name to parameter value:

```
pg.query_string(
    query="select * from users where user_id=**:user_id**",
    args=dict(user_id="**uid123**"),
)
```

Use a backslash to escape any literal : characters you need to use in your query:

```
pg.query_string(
    query="select * from users where user_id=**:user_id** and name=**'\:colon'**",
    args=dict(user_id="uid123"),
)
```

### SQL string files

Instead of passing a string directly into your Python code, you can load the SQL content from a file.
you can use the query_sql_file function.

For example, here is a query.sql file containing the same query from above:

```
select * from users where user_id=:user_id
```

You can reference this file in a Python resolver, either
using the absolute path from the root of your project or
relative to the resolver's file.

For example, if the snippet below lived in the same directory
as query.sql, we could refer to it as follows:

```
pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_sql_file(
            path="query.sql",
            args=dict(user_id=uid)
        )
        .first()
    )
```

Auto-mapping of column name to feature name also applies for
the query_sql_file method. Parameterization also works the same way.

### Push-down filtering

Chalk automatically adds query inputs to the WHERE clause of your SQL
queries. We recommend relying on our push-down filtering logic rather than
parameterizing your queries by hand.

The following example will show a transaction feature and how we use push-down
filtering to retrieve transactions belonging to a specific category.

Here's the feature class:

```
pg = PostgreSQLSource(name='PG')

@features
class Transaction:
    id: int
    category: string
    amount: float
```

And the SQL file resolver:

```
-- type: online
-- resolves: transaction
-- source: PG
SELECT
    id, merchant_category AS category, amount
FROM
    txn_table;
```

Finally, here is our query for grocery transactions:

```
client = ChalkClient()
client.query(
    input={
        Transaction.id: [1, 2, 3, 4],
        Transaction.category: "Groceries",
    },
    output=[
        Transaction.id,
        Transaction.amount,
    ],
)
```

When this query is run, Chalk adds each input parameter to the WHERE clause,
effectively running this query against your database:

```
SELECT
    id, merchant_category AS category, amount
FROM
    txn_table
+ WHERE
+     id IN (1, 2, 3, 4)
+     AND merchant_category = "Groceries";
```

If your database column names differ from your feature names, the column name must be aliased in the SELECT clause of
your query, as shown above with the merchant_category column.

### Configuration

### Supported SQL file resolver comment keys

The following are supported keys that may be included in .chalk.sql file comments.

Describes the namespace to which the outputs belong.
In the above example, user.email
and user.full_name are the outputs.

Describes the database by name, as in the above example, or
the type if there is only one database of that type. Thus, if you have
one PostgreSQL source, you can also write source: postgresql.

The type of resolver. If not specified, online is the default.

Parameters for incremental queries. For more information, see the 
below section on incremental queries.

The incrementalization mode decides how to ingest new data. Defaults to 
"row".

The length of the window from the last observed row that Chalk will re-ingest, e.g. 
1h. Defaults to 0.

The timestamp column in the underlying table to use as the basis for incrementalization.
Must be supplied in row and 
group modes.

The timestamp used for timedelta calculations. Defaults to 
feature_time.

Returns one. Equivalent to the common query finalizer .one().

The maximum time to wait before timing out the query.
See Duration for more details.

The user tags associated with this resolver. For online and offline resolvers.

The environments associated with this resolver.

The schedule for a cron run, e.g. 1h.

The max staleness for the resolver, e.g. 24h.

The owner of the resolver.

An optional mapping from SQL column to Chalk feature.
For example, with SELECT name AS arbitrary_column_name,
we can map the arbitrary_column_name to a Chalk feature belonging to
the
namespace described by the resolves field with the mapping
arbitrary_column_name: chalk_feature_name.

A list of features that must be unique for each row of the output.
This enables unique optimizations in the resolver execution.
Only applicable to resolvers that return a DataFrame.

A list of features that correspond to partition keys in the data source.
This field indicates that this resolver executes its query against a data storage system that is
partitioned by a particular set of columns.
This is most common with data-warehouse sources like Snowflake, BigQuery, Databricks, or Iceberg.

Whether this resolver returns all ids of a given namespace.
To have this annotation, the resolver must take no arguments and return a DataFrame.

### Proper comment formatting

All comments must be inserted before the body of the sql query. Each comment line is parsed as
either a yaml-formatted line describing the resolver or a docstring. Below, the last comment
will appear as a docstring since it is not in key:value format.

```
-- type: online
-- resolves: user
-- source: PG
-- This comment is not in yaml format, so it will be parsed as a docstring
select * from users
```

For field values that can be lists or dictionaries,
such as tags or incremental settings,
we can either enumerate the values inline or with an extra indentation.
Remember, if your values include a colon, you must use quotes around your value in order for
the line to have valid YAML format.

Both of the following formats are valid and equivalent.

```
-- type: online
-- resolves: user
-- source: PG
-- tags: ["single:1", "double:2"]
select * from users
```

```
-- type: online
-- resolves: user
-- source: PG
-- tags:
--    - single:1
--    - double:2
select * from users
```

### Streaming SQL file resolvers

Chalk supports streaming with SQL file resolvers:

```
-- source: kafka
-- resolves: store_features
-- type: streaming
select store_id as id, sum(amount) as purchases
from topic
group by 1
```

### SQL linting configuration

If you are using SQLFluff or another SQL Linter,
you may need to set configurations to accept the variable pattern.
For SQLFluff, set the templater to placeholder and add the following to your config file.

```
# Allows sqlfluff to correctly parse
# ${arbitrary_placeholder_name}

[sqlfluff:templater:placeholder]
param_regex =\$\{[^}]*\}
1 = 'arbitrary_placeholder_name'
```

### Generated SQL file resolvers

You can programmatically generate SQL resolvers with make_sql_file_resolver. This
function is useful if you have many SQL tables and want to automate management of their resolvers.

```
from chalk import make_sql_file_resolver
from chalk.sql import PostgreSQLSource

pg = PostgreSQLSource(name='PG')

definitions = [
    {
        resolver_name: "get_user_features",
        entity_name: "User",
        table: "users",
        pkey_column: "id",
        features: ["feature1", "feature2"],
    },
    ...
]

for definition in definitions:
    targets = ", ".join(definition.features)

    make_sql_file_resolver(
        name=definition.resolver_name,
        sql=f"select {definition.pkey_column}, {targets} from {definition.table}",
        source=pg, # "PG" is also acceptable
        resolves=definition.entity_name,
    )
```

make_sql_file_resolver adds this resolver to your registry as if it were a SQL file
resolver, but without creating the .chalk.sql file.

All SQL file resolvers require source and resolves. These values can be provided as SQL comments within your sql
value or as parameters. If comments and parameters are both provided, parameters will take precedence. This function
call is equivalent to the previous example:

```
make_sql_file_resolver(
    name=definition.resolver_name,
    sql=f"""
        -- source: PG
        -- resolves: {definition.entity_name}
        select {definition.pkey_column}, {targets} from {definition.table}
    """,
)
```

### Incremental queries

### Overview

The first time that a resolver with an incremental
query is executed, Chalk ingests all data from the source.
On subsequent runs of the resolver, Chalk only looks for
new rows in the table to ingest.
Using incremental queries limits the amount of data that
Chalk needs to ingest, lowering latency for updates
and reducing costs.

Incremental queries are useful for ingesting immutable tables
or queries, like event tables or logs.
This type of data is frequently found in the offline
context, as it represents logs of real-time events.

Incremental queries use the incremental_column parameter to
page over the underlying table.

### Incremental queries with SQL file resolvers

Imagine you have a login events table, where you keep track of login attempts to your website. You can ingest this table
with a SQL file resolver as follows:

```
-- type: offline
-- resolves: login
-- source: PG
-- incremental:
--   mode: row
--   lookback_period: 60m
--   incremental_column: attempted_at
select attempted_at, status, user_id from logins
```

Configuration for incremental resolvers can be passed in the incremental dictionary of the file's comments. For more
details, see the configuration reference above.

### Incremental queries with SQL strings

To use incremental mode with SQL string resolvers, use .incremental along with the incremental_column parameter.

```
pg = PostgreSQLSource(name='PG')

@offline
def fn() -> DataFrame[Login.attempted_at, Login.user_id, Login.status]:
    return (
        pg.query_string("select attempted_at, status, user_id from logins")
          .incremental(incremental_column="attempted_at")
    )
```

### Handling late-arriving messages

If your underlying data source has "late arriving records", you may need to use the lookback_period argument to
incremental. When lookback_period is specified, Chalk subtracts the lookback_period from the
"maximum observed timestamp" that it uses as a lower-bound for ingestion.

Concretely, if your resolver body looks like this:

```
db.query_string("SELECT * FROM payments")
    .incremental(incremental_column="updated_at", lookback_period="30m")
```

then Chalk will rewrite your SQL query to:

```
SELECT * FROM payments
WHERE updated_at >= (<previous_max_updated_at> - <lookback_period>)
```

This means that rows that arrive up to 30 minutes late will be properly ingested. The trade-off
is that Chalk will re-ingest some redundant data.

### Managing incremental resolvers

Each incremental resolver tracks the timestamp of its latest ingested row in an internal property called
chalk_incremental_timestamp. This property can be referenced within your SQL query when using parameter as your
incrementalization mode.

The Chalk CLI provides several commands for managing your incremental resolvers:

- chalk incremental status shows the status and latest timestamp of the given resolver.
- chalk incremental set allows you to directly modify the internal timestamps the resolver
uses to track its state.
- chalk incremental drop clears the resolver's tracking state so that it will restart data
ingestion on its next run.

### Incrementalization modes

The default incrementalization mode for .incremental is mode='row'. Three modes are supported:

- row: Chalk ingests features from all rows whose incremental_column is newer than the previously observed max timestamp.
- group: Chalk ingests features from all groups who are aggregating a row that has been added or changed since the previously observed max timestamp.
- parameter: Chalk passes the chalk_incremental_timestamp value (including lookback_period) to your query, and leaves your query unchanged.

### "Group" incremental mode

Group mode incremental ingestion is appropriate when you are aggregating rows in a table in order to compute
features.

For example, if you are running the following query:

```
SELECT
    business_id,
    SUM(amount) as sum_payments_amount,
    COUNT(*) as count_payments,
    max(updated_at) as ts
FROM
    payments
GROUP BY
    1
```

to ingest features for this feature class:

```
@features
class Business:
    id: int
    sum_payments_amount: float
    count_payments: int
    ts: FeatureTime
```

then you can specify the following resolver:

```
@offline(...)
def resolve_aggregate_payment_features() -> DataFrame[Business]:
    query = """
        SELECT
            business_id,
            SUM(amount) as sum_payments_amount,
            COUNT(*) as count_payments,
            max(updated_at) as ts
        FROM
            payments
        GROUP BY
            1
    """

    return db.query_string(query, fields={"business_id": Business.id}) \
                .incremental(incremental_column="updated_at", mode="group")
```

and Chalk will automatically rewrite your query into this form:

```
SELECT
    business_id,
    SUM(amount) as sum_payments_amount,
    COUNT(*) as count_payments,
    max(updated_at) as ts
FROM payments
WHERE business_id IN (
    SELECT DISTINCT(business_id) FROM payments
    WHERE updated_at >= :chalk_incremental_timestamp
)
GROUP BY 1
```

This means that if you have a payments table like this:

```
| id | business_id | amount | updated_at               |
| 1  | 1           | 10     | 2022-11-01T00:00:00.000Z |
| 2  | 1           | 5      | 2022-11-15T00:00:00.000Z |
| 3  | 2           | 7      | 2022-11-15T00:00:00.000Z |
| 4  | 3           | 17     | 2022-10-01T00:00:00.000Z |
```

and your query had previously run on 2022-11-07, then Chalk would return the following aggregate values:

```
| business_id | sum_payments_amount | count_payments | ts                       |
| 1           | 15                  | 2              | 2022-11-01T00:00:00.000Z |
| 2           | 7                   | 1              | 2022-11-15T00:00:00.000Z |
```

Both business 1 and 2 are present, because they have at least one payment after 2022-11-07.
Business 3 is excluded, since it has no payments that after 2022-11-07.

### "Parameter" incremental mode

In parameter incremental mode, Chalk leaves your query untouched. Chalk will simply pass the max incremental timestamp
to your query as a bind parameter named chalk_incremental_timestamp.

Concretely, if you write:

```
@offline(...)
def parameter_incremental_mode_resolver() -> DataFrame[...]:
    return (
        db.query_string("SELECT * FROM payments WHERE updated_at >= :chalk_incremental_timestamp")
           .incremental(mode="parameter")
    )
```

Then Chalk will execute your query verbatim, and will keep track of the appropriate value for chalk_incremental_timestamp
between executions of your resolver.

### Incremental interaction with FeatureTime

When Chalk executes an incremental query, it has to update the "max timestamp" value that it will use as the
lower bound for the next query. By default, Chalk sets this value to the time at the start of the query.

If your resolver returns a FeatureTime feature, Chalk will update the "max timestamp" value
to the "max" FeatureTime value that is returned instead. This allows you to control the incremental
behavior more precisely.

### Tagged SQL sources

Chalk supports applying tags to SQL sources. This allows you to define a single resolver that routes traffic
to multiple different backing databases depending on tags supplied at query time. This is useful for limiting
the blast-radius of traffic from different use-cases.

First, define a SQL source group:

```
from chalk.sql import SQLSourceGroup, PostgreSQLSource

sql_group = SQLSourceGroup(
    name='primary_group',
    default=PostgreSQLSource(name="default_replica"),
    tagged_sources={
        'risk': PostgreSQLSource(name='risk_replica'),
        'analytics': PostgreSQLSource(name='analytics_replica'),
    }
)
```

Then, define a resolver that uses the group:

```
@online
def users() -> DataFrame[User]:
    return sql_group.query_string("select id, name from users").all()
```

Then, when you submit queries, the query tags will control which data source is used to execute the
query:

```
client = ChalkClient()

# This query uses the risk datasource
client.query(input={User.id: 1}, output=[User.name], tags=['risk'])

# This query uses the analytics data source

client.query(input={User.id: 1}, output=[User.name], tags=['analytics'])

# This query uses the default datasource
client.query(input={User.id: 1}, output=[User.name])
```

### Additional query options

### SQLAlchemy

Chalk supports SQLAlchemy:

```
pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.email, User.full_name]:
    return (
        pg.query(User(email=UserSQL.email, full_name=UserSQL.name))
        .filter_by(id=uid)
        .first()
    )
```

In the .query(...) call, you map the target columns of the
SQL statement to the feature namespace.
Here, we assign User.email to UserSQL.email and
User.full_name to UserSQL.name.

### Incremental queries with SQLAlchemy

To create an incremental SQLAlchemy query, use .incremental. Chalk will page over the underlying table using the
column mapped to FeatureTime.

```
pg = PostgreSQLSource(name='PG')

@offline
def fn() -> DataFrame[Login.ts, Login.attempted_at, Login.user_id, Login.status]:
    return pg.query(
        Login(
            ts=LoginHistorySQL.created_at,
            attempted_at=LoginHistorySQL.attempted_at,
            user_id=LoginHistorySQL.user_id,
            status=LoginHistorySQL.status,
        )
    ).incremental()
```





