# Datasource Management API
source: https://docs.chalk.ai/docs/datasource-api

## Programmatically manage data sources using the Python SDK

The Chalk Python SDK provides a programmatic API for managing data sources, allowing you to
create, list, update, test, and delete integrations without using the dashboard UI.

All data source operations are available through client.api.datasources on a ChalkClient instance.
The API communicates with the Chalk API server over gRPC and reuses the same authentication as
the query client.

Data sources created through this API are environment-level configuration — they persist across
deployments and appear in the dashboard, just like sources added through the UI. This is an
administrative API intended for infrastructure-as-code scripts, CI/CD pipelines, or notebooks,
not for use inside resolver code.

```
from chalk.client import ChalkClient, IntegrationKind

client = ChalkClient()
```

### Listing data sources

List all integrations configured in the current environment.

```
sources = client.api.datasources.list()
for s in sources:
    print(f"{s.kind}  {s.name}  id={s.id}")
```

Returns a list of Datasource objects with id, name, kind, environment_id,
created_at, and updated_at fields.

### Creating a data source

Use client.api.datasources.create() to add a new integration. The kind parameter
accepts either an IntegrationKind enum or a string. Data source names must only contain
letters, numbers, and underscores.

The created source will immediately appear in the dashboard and be available for use after
clicking Redeploy in the dashboard.

### BigQuery

```
import base64

with open("path/to/service-account-key.json") as f:
    sa_key_b64 = base64.b64encode(f.read().encode()).decode()

created = client.api.datasources.create(
    kind=IntegrationKind.BIGQUERY,
    name="my_bigquery_source",
    config={
        "BQ_PROJECT": "my-gcp-project",
        "BQ_DATASET": "my_dataset",
        "BQ_CREDENTIALS_BASE64": sa_key_b64,
    },
)
```

### Snowflake

```
created = client.api.datasources.create(
    kind=IntegrationKind.SNOWFLAKE,
    name="my_snowflake_source",
    config={
        "SNOWFLAKE_USER": "admin",
        "SNOWFLAKE_PASSWORD": "secret",
        "SNOWFLAKE_ACCOUNT": "xy12345.us-east-1",
        "SNOWFLAKE_DATABASE": "PROD_DB",
        "SNOWFLAKE_WAREHOUSE": "COMPUTE_WH",
        "SNOWFLAKE_SCHEMA": "PUBLIC",
        "SNOWFLAKE_ROLE": "ANALYST_ROLE",
    },
)
```

### PostgreSQL

```
created = client.api.datasources.create(
    kind=IntegrationKind.POSTGRESQL,
    name="my_postgres_source",
    config={
        "PGHOST": "db.example.com",
        "PGPORT": "5432",
        "PGDATABASE": "mydb",
        "PGUSER": "admin",
        "PGPASSWORD": "secret",
    },
)
```

### Linking existing secrets

Config values can be either literal strings or references to secrets synced from an external
cloud secret manager (e.g., AWS Secrets Manager or GCP Secret Manager). Use LinkedSecretRef
to point at a secret by its ID instead of inlining the value.

```
from chalk.client import ChalkClient, IntegrationKind, LinkedSecretRef

client = ChalkClient()

created = client.api.datasources.create(
    kind=IntegrationKind.SNOWFLAKE,
    name="my_snowflake_source",
    config={
        "SNOWFLAKE_USER": "admin",
        "SNOWFLAKE_PRIVATE_KEY_B64": LinkedSecretRef("snowflake-prod-key"),
        "SNOWFLAKE_ACCOUNT": "xy12345.us-east-1",
        "SNOWFLAKE_DATABASE": "PROD_DB",
        "SNOWFLAKE_WAREHOUSE": "COMPUTE_WH",
        "SNOWFLAKE_SCHEMA": "PUBLIC",
        "SNOWFLAKE_ROLE": "ANALYST_ROLE",
    },
)
```

LinkedSecretRef works with update() as well — pass it for any config key you want to
reference from your cloud secret manager rather than providing a literal value.

### Getting, testing, updating, and deleting data sources

Fetch a data source by ID, test its connectivity, update its configuration, or remove it.

```
# Get a data source by ID
source = client.api.datasources.get(id=created.id)

# Test connectivity
result = client.api.datasources.test(id=created.id)
print(f"Success: {result.success}, Message: {result.message}")

# Update config or name (only provided keys are changed)
updated = client.api.datasources.update(
    id=created.id,
    config={"BQ_DATASET": "new_dataset"},
    name="my_updated_source",
)

# Delete
client.api.datasources.delete(id=created.id)
```

### Supported integration kinds

The IntegrationKind enum supports the following data source types:

| Kind                         | Value                |
| ---------------------------- | -------------------- |
| `IntegrationKind.ATHENA`     | AWS Athena           |
| `IntegrationKind.BIGQUERY`   | Google BigQuery      |
| `IntegrationKind.CLICKHOUSE` | ClickHouse           |
| `IntegrationKind.DATABRICKS` | Databricks           |
| `IntegrationKind.DYNAMODB`   | Amazon DynamoDB      |
| `IntegrationKind.KAFKA`      | Apache Kafka         |
| `IntegrationKind.KINESIS`    | Amazon Kinesis       |
| `IntegrationKind.MSSQL`      | Microsoft SQL Server |
| `IntegrationKind.MYSQL`      | MySQL                |
| `IntegrationKind.POSTGRESQL` | PostgreSQL           |
| `IntegrationKind.PUBSUB`     | Google Pub/Sub       |
| `IntegrationKind.REDSHIFT`   | Amazon Redshift      |
| `IntegrationKind.SNOWFLAKE`  | Snowflake            |
| `IntegrationKind.SPANNER`    | Google Spanner       |
| `IntegrationKind.TRINO`      | Trino                |





