Feature Engine
Programmatically manage data sources using the Python SDK
The Chalk Python SDK provides a programmatic API for managing data sources, allowing you to create, list, update, test, and delete integrations without using the dashboard UI.
All data source operations are available through client.api.datasources on a ChalkClient instance.
The API communicates with the Chalk API server over gRPC and reuses the same authentication as
the query client.
Data sources created through this API are environment-level configuration — they persist across deployments and appear in the dashboard, just like sources added through the UI. This is an administrative API intended for infrastructure-as-code scripts, CI/CD pipelines, or notebooks, not for use inside resolver code.
from chalk.client import ChalkClient, IntegrationKind
client = ChalkClient()List all integrations configured in the current environment.
sources = client.api.datasources.list()
for s in sources:
print(f"{s.kind} {s.name} id={s.id}")Returns a list of Datasource objects with id, name, kind, environment_id,
created_at, and updated_at fields.
Use client.api.datasources.create() to add a new integration. The kind parameter
accepts either an IntegrationKind enum or a string. Data source names must only contain
letters, numbers, and underscores.
The created source will immediately appear in the dashboard and be available for use after clicking Redeploy in the dashboard.
import base64
with open("path/to/service-account-key.json") as f:
sa_key_b64 = base64.b64encode(f.read().encode()).decode()
created = client.api.datasources.create(
kind=IntegrationKind.BIGQUERY,
name="my_bigquery_source",
config={
"BQ_PROJECT": "my-gcp-project",
"BQ_DATASET": "my_dataset",
"BQ_CREDENTIALS_BASE64": sa_key_b64,
},
)created = client.api.datasources.create(
kind=IntegrationKind.SNOWFLAKE,
name="my_snowflake_source",
config={
"SNOWFLAKE_USER": "admin",
"SNOWFLAKE_PASSWORD": "secret",
"SNOWFLAKE_ACCOUNT": "xy12345.us-east-1",
"SNOWFLAKE_DATABASE": "PROD_DB",
"SNOWFLAKE_WAREHOUSE": "COMPUTE_WH",
"SNOWFLAKE_SCHEMA": "PUBLIC",
"SNOWFLAKE_ROLE": "ANALYST_ROLE",
},
)created = client.api.datasources.create(
kind=IntegrationKind.POSTGRESQL,
name="my_postgres_source",
config={
"PGHOST": "db.example.com",
"PGPORT": "5432",
"PGDATABASE": "mydb",
"PGUSER": "admin",
"PGPASSWORD": "secret",
},
)Fetch a data source by ID, test its connectivity, update its configuration, or remove it.
# Get a data source by ID
source = client.api.datasources.get(id=created.id)
# Test connectivity
result = client.api.datasources.test(id=created.id)
print(f"Success: {result.success}, Message: {result.message}")
# Update config or name (only provided keys are changed)
updated = client.api.datasources.update(
id=created.id,
config={"BQ_DATASET": "new_dataset"},
name="my_updated_source",
)
# Delete
client.api.datasources.delete(id=created.id)The IntegrationKind enum supports the following data source types:
| Kind | Value |
|---|---|
IntegrationKind.ATHENA | AWS Athena |
IntegrationKind.BIGQUERY | Google BigQuery |
IntegrationKind.CLICKHOUSE | ClickHouse |
IntegrationKind.DATABRICKS | Databricks |
IntegrationKind.DYNAMODB | Amazon DynamoDB |
IntegrationKind.KAFKA | Apache Kafka |
IntegrationKind.KINESIS | Amazon Kinesis |
IntegrationKind.MSSQL | Microsoft SQL Server |
IntegrationKind.MYSQL | MySQL |
IntegrationKind.POSTGRESQL | PostgreSQL |
IntegrationKind.PUBSUB | Google Pub/Sub |
IntegrationKind.REDSHIFT | Amazon Redshift |
IntegrationKind.SNOWFLAKE | Snowflake |
IntegrationKind.SPANNER | Google Spanner |
IntegrationKind.TRINO | Trino |