The Warehouse Sync API enables you to create and manage data ingestion pipelines with your cloud data warehouses.
https://api.mparticle.com/platform/v2/workspaces/{workspaceId}
To find your workspace id
, follow the instructions in Managing Workspaces.
Use the Warehouse Sync API resources (endpoints) to work with connections, data models, and data pipelines in order to ingest data into mParticle from data warehouses.
mParticle recommends:
paused
state on the pipeline resource to pause syncingincremental
sync mode to keep track of which rows need to be loaded to improve performance and reduce costsSELECT * FROM table
may not work. You should provide a fully qualified name like SELECT * FROM database.schema.name
Complete the Warehouse Sync API tutorial with the provided Postman collection and environment:
A copy of the Warehouse Sync environment is included. You can download it again here.
Use these endpoints for managing connections to data sources.
For the workspace specified in the base URI, get all the data warehouse connections.
Request: GET {baseURI}/connections
Query parameter: {serviceProvider}
Optional
Allowed Values:
Request body: none
Example response:
[
{
"id": "string",
"name": "string",
"state": "active",
"status": "healthy",
"errors": [
{
"message": "string"
}
],
"service_provider": "Redshift",
"config": {
"database": "string",
"host": "string",
"user": "string",
"aws_iam_role_arn": "string"
},
"created_on": "2023-10-24T19:59:30.828Z",
"created_by": "string",
"last_modified_on": "2023-10-24T19:59:30.828Z",
"last_modified_by": "string"
},
{
"id": "string",
"name": "string",
"state": "active",
"status": "healthy",
"errors": [
{
"message": "string"
}
],
"service_provider": "Snowflake",
"config": {
"account_identifier": "string",
"region": "string",
"warehouse": "string",
"database": "string",
"role": "string",
"user": "string",
"storage_integration": "mp_us1_123_123_s3",
"aws_iam_user_arn": "arn:aws:iam::123456:user/externalstages/abcdefg",
"aws_external_id": "GD1234=2_abcdefg=="
},
"created_on": "2023-10-24T19:59:30.828Z",
"created_by": "string",
"last_modified_on": "2023-10-24T19:59:30.828Z",
"last_modified_by": "string"
},
{
"id": "string",
"name": "string",
"state": "active",
"status": "healthy",
"errors": [
{
"message": "string"
}
],
"service_provider": "BigQuery",
"config": {
"region": "string",
"project_id": "string",
"dataset_id": "string",
"service_account_id": "string",
"service_account_key": "string"
},
"created_on": "2023-10-24T19:59:30.828Z",
"created_by": "string",
"last_modified_on": "2023-10-24T19:59:30.828Z",
"last_modified_by": "string"
}
]
Request: GET {baseURI}/connections/{connectionId}
Path parameter: {connectionId}
Required
Request body: none
Example response:
{
"id": "example-connection",
"name": "example-connection-name",
"state": "active",
"status": "healthy",
"errors": [
{
"message": "example error message"
}
],
"service_provider": "Redshift",
"config": {
"database": "string",
"host": "string",
"user": "string",
"aws_iam_role_arn": "string"
},
"created_on": "2023-10-24T20:06:37.429Z",
"created_by": "string",
"last_modified_on": "2023-10-24T20:06:37.429Z",
"last_modified_by": "string"
}
Request: POST {baseURI}/connections
Parameters for Snowflake connections:
Name | Type | Required | Description |
---|---|---|---|
id | string | Required | Unique identifier in slug format. Valid characters include numbers, letters, _, and - |
name | string | Required | Name of the connection |
state | string | Required | Valid values: new , active , or inactive |
service_provider | string | Required | Valid value: Snowflake |
account_identifier | string | Required | Snowflake account locator where data will be retrieved. Refer to your warehouse documentation to determine your account id. Snowflake: here |
region | string | Required | Snowflake <cloud_region_id> or <cloud_region_id>.<cloud> region identifier where data will be retrieved. Refer to your warehouse documentation to determine your region. Snowflake: here |
warehouse | string | Required | Identifier for compute resource to utilize when syncing data |
database | string | Required | Identifier for the name of the database where data is stored in your warehouse |
user | string | Required | Username to log to your warehouse as |
role | string | Required | Snowflake role to assume |
password | string | Required | Password for user |
storage_integration | string | Required | Snowflake storage integration name that was created in the quickstart |
aws_iam_user_arn | string | Optional | Snowflake storage integration AWS resource identifier that was created in the quickstart |
aws_external_id | string | Optional | Snowflake storage integration external identifier that was created in the quickstart |
Request body example:
{
"id": "example-snowflake-connection",
"name": "Example Snowflake Connection",
"state": "active",
"service_provider": "Snowflake",
"config": {
"account_identifier": "gd12345",
"region": "us-central1.gcp",
"warehouse": "compute_wh",
"database": "my_database",
"role": "mparticle_role",
"user": "mparticle_user",
"password": "mParticleSecurePassword",
"storage_integration": "mp_us1_123_123_s3"
}
}
Example response:
{
"id": "example-connection",
"name": "Example Snowflake Connection",
"state": "active",
"status": "healthy",
"service_provider": "Snowflake",
"config": {
"account_identifier": "gd12345",
"region": "us-central1.gcp",
"warehouse": "compute_wh",
"database": "my_database",
"role": "mparticle_role",
"user": "mparticle_user",
"password": "************",
"storage_integration": "mp_us1_123_123_s3",
"aws_iam_user_arn": "arn:aws:iam::123456:user/externalstages/abcdefg",
"aws_external_id": "GD1234=2_abcdefg=="
},
"created_on": "2023-02-03T23:53:08.413",
"created_by": "developer@mparticle.com",
"last_modified_on": null,
"last_modified_by": null
}
Parameters for Google BigQuery connections:
Name | Type | Description |
---|---|---|
id | string | Unique identifier in slug format. Valid characters include numbers, letters, _, and - |
name | string | Name of the connection |
state | string | Valid values: new , active , or inactive |
service_provider | string | Valid value: BigQuery |
region | string | Warehouse region name where data will be retrieved. Refer to your warehouse documentation to determine your region. BigQuery: here |
project_id | string | BigQuery project ID |
dataset_id | string | BigQuery dataset ID |
service_account_id | string | BigQuery service account ID that was created in the quickstart |
service_account_key | string | BigQuery service account key that was created in the quickstart |
Request body example:
{
"id": "example-bigquery-connection",
"name": "Example BigQuery Connection",
"state": "active",
"service_provider": "BigQuery",
"config": {
"region": "us-east1",
"project_id": "my-gcp-project",
"dataset_id": "my-dataset",
"service_account_id": "mparticle-account@my-gcp-project.iam.gserviceaccount.com",
"service_account_key": "{\"type\": \"service_account\", \"project_id\": \"my-gcp-project\", \"private_key_id\": \"1234abcd456789hjkl\", \"private_key\": \"-----BEGIN PRIVATE KEY-----\\ABC123456789/U\\ABC+12345+abcABC+\\n-----END PRIVATE KEY-----\", \"client_email\": \"mparticle-account@my-gcp-project.iam.gserviceaccount.com\", \"client_id\": \"123456789\", \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\", \"token_uri\": \"https://oauth2.googleapis.com/token\", \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\", \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/mparticle-account%40my-gcp-project.iam.gserviceaccount.com\" }"
}
}
Example response:
{
"id": "example-bigquery-connection",
"name": "Example BigQuery Connection",
"state": "active",
"status": "healthy",
"service_provider": "BigQuery",
"config": {
"region": "us-east1",
"project_id": "my-gcp-project",
"dataset_id": "my-dataset",
"service_account_id": "mparticle-account@my-gcp-project.iam.gserviceaccount.com",
"service_account_key": "************"
},
"created_on": "2023-02-03T23:53:08.413",
"created_by": "developer@mparticle.com",
"last_modified_on": null,
"last_modified_by": null
}
Parameters for Amazon Redshift connections:
Name | Type | Description |
---|---|---|
id | string | Unique identifier in slug format. Valid characters include numbers, letters, _, and - |
name | string | Name of the connection |
state | string | Valid values: new , active , or inactive |
service_provider | string | Valid value: Redshift |
database | string | The identifier for the database in Redshift you are creating a connection for |
user | string | The name for the user you created in Redshift for your connection |
password | string | The password for the user |
aws_iam_role_arn | string | The ARN for the role you created when configuring Redshift for your connection with mParticle. |
host | string | Your AWS Redshift host. You can find this in your Cluster properties from your AWS Redshift dashboard. |
port | string | Your AWS Redshift port. You can find this in your Cluster properties from your AWS Redshift dashboard. |
Request body example:
{
"id": "example-redshift-connection",
"name": "Redshift Connection",
"state": "active",
"service_provider": "Redshift",
"config": {
"database": "dev",
"host": "dwi-test.ab123yyxwwzz.us-east-1.redshift.amazonaws.com",
"user": "mParticle",
"password": "mParticleSecurePassword",
"aws_iam_role_arn": "arn:aws:iam::123456789:role/mParticle_role"
}
}
Example response:
{
"id": "example-redshift-connection",
"name": "Redshift Connection",
"state": "active",
"status": "healthy",
"service_provider": "Redshift",
"config": {
"database": "dev",
"host": "dwi-test.ab123yyxwwzz.us-east-1.redshift.amazonaws.com",
"port": "5439",
"user": "mParticle",
"password": "************",
"aws_iam_role_arn": "arn:aws:iam::123456789:role/mParticle_role"
},
"created_on": "2023-02-03T23:53:08.413",
"created_by": "developer@mparticle.com",
"last_modified_on": null,
"last_modified_by": null
}
Parameters for Databricks connections:
Name | Type | Required | Description |
---|---|---|---|
id | string | Required | Unique identifier in slug format. Valid characters include numbers, letters, _, and - |
name | string | Required | Name of the connection |
state | string | Required | Valid values: new , active , or inactive |
service_provider | string | Required | Valid value: Databricks |
provider | string | Required | The value of the Databricks organization name for the metastore you’re ingesting data from |
schema | string | Required | The name of your data schema |
Request body example:
{
"id": "example-databricks-connection",
"name": "Example Databricks Connection",
"state": "active",
"service_provider": "Databricks",
"config": {
"provider": "your-databricks-org-name",
"schema": "your-schema-name"
}
}
Example response:
{
"id": "example-databricks-connection",
"name": "Example Databricks Connection",
"state": "active",
"service_provider": "Databricks",
"config": {
"provider": "your-databricks-organization-name",
"schema": "your-schema-name"
},
"created_on": "2023-02-03T23:53:08.413",
"created_by": "developer@mparticle.com",
"last_modified_on": null,
"last_modified_by": null
}
Request: PUT {baseURI}/connections/{connectionId}
Parameters are the same as Create a connection
Request body example:
This example corrects a typo in the name of a connection:
{
"id": "example-snowflake-connection",
"name": "Example Snowflake Connection",
"state": "active",
"service_provider": "Snowflake",
"config": {
"account_identifier": "gd12345",
"region": "us-central1.gcp",
"warehouse": "compute_wh",
"database": "my_database",
"role": "mparticle_role",
"user": "mparticle_user",
"password": "mParticleSecurePassword",
"storage_integration": "mp_us1_123_123_s3"
}
}
Example Response:
{
"id": "example-connection",
"name": "Example Snowflake Connection",
"state": "active",
"status": "healthy",
"service_provider": "Snowflake",
"config": {
"source_account_id": "gd1234",
"region": "us-central1.gcp",
"warehouse": "compute_wh",
"database": "indicative",
"role": "mp_role",
"user": "mp_user",
"password": "************",
"storage_integration": "mp_us1_123_123_s3",
"aws_iam_user_arn": "arn:aws:iam::123456:user/externalstages/abcdefg",
"aws_external_id": "GD1234=2_abcdefg=="
},
"created_on": "2023-02-03T23:53:08.413",
"created_by": "developer@mparticle.com",
"last_modified_on": null,
"last_modified_by": null
}
Request: DELETE {baseURI}/connections/{connectionId}
Request body: None
Use these endpoints for managing data models.
For more information about the SQL query defined in a data model, see Warehouse Sync SQL Reference.
GET {baseURI}/data-models
Request body: none
Example response:
[
{
"id": "string",
"name": "example-data-model",
"state": "active",
"status": "valid",
"errors": [
{
"message": "string"
}
],
"type": "sql",
"config": {
"sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\", LAST_UPDATED_DATE_TIME FROM mp.demo_service.tickets WHERE t.status = 'open'"
},
"created_on": "2023-10-24T21:05:19.281Z",
"created_by": "developer@example.com",
"last_modified_on": "2023-10-24T21:05:19.281Z",
"last_modified_by": "developer@example.com"
}
]
Request: GET {baseURI}/data-models/{modelId}
Request body: None
Example response:
{
"id": "example-data-model",
"name": "Example Data Model",
"state": "active",
"status": "valid",
"errors": [
{
"message": "string"
}
],
"type": "sql",
"config": {
"sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\" FROM mp.demo_service.tickets WHERE t.status = 'open'"
},
"created_on": "2023-10-24T21:14:30.532Z",
"created_by": "developer@example.com",
"last_modified_on": "2023-10-24T21:14:30.532Z",
"last_modified_by": "developer@example.com"
}
POST {baseURI}/data-models
Parameters:
Name | Type | Description |
---|---|---|
id | string | Unique identifier in slug format. Valid characters include numbers, letters, _, and - |
name | string | Name of the data model |
state | string | The state of the data model. Valid values: new , active , or inactive |
type | string | Required. Valid value: “sql” |
config | object | A JSON object containing the SQL statement defining the data model |
sql_query | string | A valid SQL query that selects all the columns from Snowflake for this data model. See SQL for a list of supported SQL commands |
Example request body:
{
"id": "example-data-model",
"name": "Example Data Model",
"state": "active",
"errors": [
{
"message": "string"
}
],
"type": "sql",
"config": {
"sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\", LAST_UPDATED_DATE_TIME FROM mp.demo_service.tickets WHERE t.status = 'open'"
}
}
Example response:
{
"id": "string",
"name": "string",
"state": "active",
"status": "valid",
"errors": [
{
"message": "string"
}
],
"type": "sql",
"config": {
"sql_query": "string"
},
"created_on": "2023-10-24T21:23:20.833Z",
"created_by": "string",
"last_modified_on": "2023-10-24T21:23:20.833Z",
"last_modified_by": "string"
}
Request: PUT {baseURI}/data-models/{modelId}
Parameters are the same as Create a data model
Example request body:
{
"id": "example-data-model",
"name": "Example Data Model",
"state": "active",
"errors": [
{
"message": "string"
}
],
"type": "sql",
"config": {
"sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\", LAST_UPDATED_DATE_TIME FROM mp.demo_service.tickets WHERE t.status = 'open'"
}
}
Response example:
{
"id": "string",
"name": "string",
"state": "active",
"status": "valid",
"errors": [
{
"message": "string"
}
],
"type": "sql",
"config": {
"sql_query": "string"
},
"created_on": "2023-10-24T21:23:20.833Z",
"created_by": "string",
"last_modified_on": "2023-10-24T21:23:20.833Z",
"last_modified_by": "string"
}
Request: DELETE {baseURI}/data-models/{modelId}
Request body: None
Use these endpoints for managing pipelines. Pipelines execute a data model to a connection at the specified schedule.
A Pipeline’s schedule is used to configure the execution interval of a pipeline. A Pipeline’s sync mode is used to configure what records are synchronized in each run.
These are the following supported schedule type
fields:
interval
- Used for repeated pipelines according to the pipeline’s frequency
on_demand
- Used for pipelines that will use the trigger APIonce
- The pipeline will only be ran once. This is recommended to perform a single pipeline sync of a table that doesn’t need to be repeatedThese are the following supported schedule frequency
fields:
hourly
daily
weekly
monthly
These are the following supported sync_mode type
fields:
incremental
- Used for pipelines that only load data that has changed between runs according to the iterator_field
full
- Use to perform a full sync on each pipeline interval run. Not recommended as it may lead to repeated data and eventsThe following values creates a pipeline that runs every hour starting February 1st 2023
{
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_on",
"iterator_data_type": "datetime_tz"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"start": "2023-02-01T00:15:00Z"
}
}
The following values creates a pipeline that runs 15 minutes after every hour to account for a delay in data arriving in the source data model
{
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_on",
"iterator_data_type": "datetime_tz"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"start": "2023-02-01T00:00:00Z",
"delay": "15m"
}
}
The following values creates a pipeline that runs every 24 hours for the month of February starting at noon each day and filters data to only include data for that time period
{
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_on",
"iterator_data_type": "datetime_tz",
"from": "2023-02-01T12:00:00Z",
"until": "2023-03-01T:12:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "daily",
"start": "2023-02-01T12:00:00Z",
"end": "2023-03-01T:12:00:00Z"
}
}
The following values creates a pipeline that runs every every month starting in February 5th 2023 with a 5 day delay. The first run should only synchronize data going back to January 2020
{
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_on",
"iterator_data_type": "datetime_tz",
"from": "2020-01-01T00:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "monthly",
"start": "2023-02-01T00:00:00Z",
"delay": "5d"
}
}
The following values creates a pipeline that can be triggered on demand. Each trigger creates a new pipeline run starting from the previous successful pipeline run or schedule_start_time
until the time the trigger is requested.
{
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_on",
"iterator_data_type": "datetime_tz",
"from": "2020-01-01T00:00:00Z"
},
"schedule": {
"type": "on_demand"
}
}
The following table shows an example of how the pipeline’s intervals are set for each action that is performed.
Date | Action | Description |
---|---|---|
2023-02-01 15:13:22Z | Pipeline Created | The pipeline is initially idle |
2023-02-04 08:30:17Z | Trigger API Called | An interval will synchronize data between 2020-01-01 00:00:00Z and 2023-02-04 08:30:17Z |
2023-02-08 12:05:45Z | Trigger API Called | An interval will synchronize data between 2023-02-04 08:30:11Z and 2023-02-08 12:05:45Z |
the following values creates a pipeline that runs immediately, synchronizing values in the table for timestamped for the year 2020.
{
"sync_mode": {
"type": "full",
"iterator_field": "updated_on",
"iterator_data_type": "datetime_tz",
"from": "2020-01-01T00:00:00Z",
"until": "2021-01-01T00:00:00Z"
},
"schedule": {
"type": "once"
}
}
If an error occurs, the status of the connection, data model, and/or pipeline will be set appropriately with an error message
detailing what may be wrong. In a faulted state, no data will synchronize until the issue is resolved. You may update
the affected resource using a patch
or put
command which will clear the fault and the pipeline will run on the next scheduled interval.
Alternatively you may issue a re-sync command by using the trigger
API which will retry the pipeline.
GET {baseURI}/inputs/data-pipelines
Request body: none
Example response:
[
{
"id": "example-pipeline-1",
"name": "Example Pipeline 1",
"pipeline_type": "user_attributes",
"connection_id": "connection-id",
"field_transformation_id": "field-transformation-id",
"data_model_id": "data-model-id",
"partner_feed_id": 0,
"partner_feed_key": "feed-key",
"state": "active",
"status": "idle",
"errors": [
{
"message": "string"
}
],
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_at",
"iterator_data_type": "timestamp",
"from": "2022-07-01T16:00:00Z",
"until": "2022-08-01T16:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"delay": "5m",
"start": "2022-07-01T16:00:00Z",
"end": "2022-08-01T16:00:00Z"
},
"environment": "development",
"created_on": "2023-10-25T15:21:55.899Z",
"created_by": "developer@example.com",
"last_modified_on": "2023-10-25T15:21:55.899Z",
"last_modified_by": "developer@example.com"
},
{
"id": "example-pipeline-2",
"name": "Example Pipeline 2",
"pipeline_type": "user_attributes",
"connection_id": "connection-id",
"field_transformation_id": "field-transformation-id",
"data_model_id": "data-model-id",
"partner_feed_id": 0,
"partner_feed_key": "feed-key",
"state": "active",
"status": "idle",
"errors": [
{
"message": "string"
}
],
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_at",
"iterator_data_type": "timestamp",
"from": "2022-07-01T16:00:00Z",
"until": "2022-08-01T16:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"delay": "5m",
"start": "2022-07-01T16:00:00Z",
"end": "2022-08-01T16:00:00Z"
},
"environment": "development",
"created_on": "2023-10-25T15:21:55.899Z",
"created_by": "developer@example.com",
"last_modified_on": "2023-10-25T15:21:55.899Z",
"last_modified_by": "developer@example.com"
}
]
Request: GET {baseURI}/inputs/data-pipelines/{pipelineId}
Path parameter: {pipelineId}
Required
Request body: none
Example response:
{
"id": "example-pipeline-1",
"name": "Example Pipeline 1",
"pipeline_type": "user_attributes",
"connection_id": "connection-id",
"field_transformation_id": "field-transformation-id",
"data_model_id": "data-model-id",
"partner_feed_id": 0,
"partner_feed_key": "feed-key",
"state": "active",
"status": "idle",
"errors": [
{
"message": "string"
}
],
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_at",
"iterator_data_type": "timestamp",
"from": "2022-07-01T16:00:00Z",
"until": "2022-08-01T16:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"delay": "5m",
"start": "2022-07-01T16:00:00Z",
"end": "2022-08-01T16:00:00Z"
},
"environment": "development",
"created_on": "2023-10-25T15:21:55.899Z",
"created_by": "developer@example.com",
"last_modified_on": "2023-10-25T15:21:55.899Z",
"last_modified_by": "developer@example.com"
}
POST {baseURI}/inputs/data-pipelines
Parameters:
Name | Type | Required | Description |
---|---|---|---|
id | string | Required | Unique identifier in slug format. Valid characters include numbers, letters, _, and - |
name | string | Required | Name of the pipeline |
pipeline_type | string | Required | Valid values: user_attributes , events . If set to events , you must use a fields transformation to map your source data to the destination fields in mParticle. |
connection_id | string | Required | The ID of the connection to use with this pipeline |
field_transformation_id | string | Required for event pipelines | The ID of the field transformation to use when mapping fields in your warehouse data to event fields in mParticle. Field transformations are not available for user data pipelines. |
data_model_id | string | Required | The ID of the data model to use with this pipeline |
partner_feed_id | integer | Optional | The ID of the feed that incoming data will route to. To learn how to create a feed using an API, see feeds API. ^1^ |
partner_feed_key | string | Optional | The Key of the feed that incoming data will route to. |
state | string | Required | The state of the pipeline. Valid values: new , active or paused |
sync_mode | object | Required | JSON object containing the sync mode settings |
type | string | Required | Valid values: incremental or full |
iterator_field | string | Required if incremental | A name of a column in the database. mParticle uses the iterator to track what data needs to be synced. |
iterator_data_type | string | Required if incremental | The data type for the iterator field. For example: timestamp |
from | string | Optional | Timestamp for first record to sync. Format: 2022-10-30T11:00:16Z |
until | string | Optional | Timestamp for last record to sync. Format: 2022-10-30T11:00:16Z |
schedule | object | Required | JSON object containing the pipeline scheduling settings |
type | string | Required | Valid values: interval , once , or on-demand |
frequency | string | Required if interval | Frequency to sync data. Valid values: hourly , daily , weekly , monthly |
delay | string | Required | The amount of time to delay a scheduled interval sync. Useful for situations where data may only be available in the source data set after a period of time. Format: 1d . A number, followed immediately by one of the following units: s for seconds, m for minutes, h for hours, d for days, w for weeks, and y for years. |
start | string | Required if interval | Timestamp of first scheduled interval sync. If sync_mode.from is equal to the provided start date, the pipeline will start on the next scheduled interval. Format: 2022-10-30T11:00:16Z |
end | string | Optional | Timestamp of last scheduled interval sync. Format: 2022-10-30T11:00:16Z |
environment | string | Required | mParticle environment to sync data as. Valid values: development , production |
data_plan_id | string | Optional | ID for the data plan to associate with the pipeline |
data_plan_version | string | Optional | Version for the data plan to associate with the pipeline |
When creating a new pipeline, you must decide if your pipeline will be used to ingest event data or user data:
To lean more about field transformations and how to create them with the Field Transformations API, see Field Transformations API.
Example request body:
Note: only include a field_transformation_id
for event data pipelines.
{
"id": "string",
"name": "string",
"pipeline_type": "events",
"connection_id": "string",
"field_transformation_id": "field-transformation-id",
"data_model_id": "string",
"partner_feed_id": 0,
"partner_feed_key": "string",
"state": "active",
"errors": [
{
"message": "string"
}
],
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_at",
"iterator_data_type": "timestamp",
"from": "2022-07-01T16:00:00Z",
"until": "2022-08-01T16:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"delay": "5m",
"start": "2022-07-01T16:00:00Z",
"end": "2022-08-01T16:00:00Z"
},
"environment": "development",
"data_plan_id": "example-data-plan-id",
"data_plan_version": 2
}
Example response:
{
"id": "string",
"name": "string",
"pipeline_type": "events",
"connection_id": "string",
"field_transformation_id": "field-transformation-id",
"data_model_id": "string",
"partner_feed_id": 0,
"partner_feed_key": "string",
"state": "active",
"status": "idle",
"errors": [
{
"message": "string"
}
],
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_at",
"iterator_data_type": "timestamp",
"from": "2022-07-01T16:00:00Z",
"until": "2022-08-01T16:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"delay": "5m",
"start": "2022-07-01T16:00:00Z",
"end": "2022-08-01T16:00:00Z"
},
"environment": "development",
"data_plan_id": "example-data-plan-id",
"data_plan_version": 2,
"created_on": "2023-10-25T16:02:28.008Z",
"created_by": "string",
"last_modified_on": "2023-10-25T16:02:28.008Z",
"last_modified_by": "string"
}
Request: PUT {baseURI}/inputs/data-pipelines/{pipelineId}
Parameters are the same as Create a pipeline
Example request body:
This example request changes the schedule interval from hourly to weekly:
{
"id": "string",
"name": "string",
"pipeline_type": "user_attributes",
"connection_id": "string",
"data_model_id": "string",
"partner_feed_id": 0,
"partner_feed_key": "string",
"state": "active",
"errors": [
{
"message": "string"
}
],
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_at",
"iterator_data_type": "timestamp",
"from": "2022-07-01T16:00:00Z",
"until": "2022-08-01T16:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"delay": "5m",
"start": "2022-07-01T16:00:00Z",
"end": "2022-08-01T16:00:00Z"
},
"environment": "development",
"data_plan_id": "example-data-plan-id",
"data_plan_version": 2
}
Example response:
{
"id": "string",
"name": "string",
"pipeline_type": "user_attributes",
"connection_id": "string",
"data_model_id": "string",
"partner_feed_id": 0,
"partner_feed_key": "string",
"state": "active",
"status": "idle",
"errors": [
{
"message": "string"
}
],
"sync_mode": {
"type": "incremental",
"iterator_field": "updated_at",
"iterator_data_type": "timestamp",
"from": "2022-07-01T16:00:00Z",
"until": "2022-08-01T16:00:00Z"
},
"schedule": {
"type": "interval",
"frequency": "hourly",
"delay": "5m",
"start": "2022-07-01T16:00:00Z",
"end": "2022-08-01T16:00:00Z"
},
"environment": "development",
"data_plan_id": "example-data-plan-id",
"data_plan_version": 2,
"created_on": "2023-10-25T16:02:28.008Z",
"created_by": "string",
"last_modified_on": "2023-10-25T16:02:28.008Z",
"last_modified_by": "string"
}
Request: DELETE {baseURI}/inputs/data-pipelines/{pipelineId}
Request body: None
POST {baseURI}/inputs/data-pipelines/trigger
Starts syncing data from a pipeline. The behavior varies depending on the pipeline’s sync mode:
incremental
sync mode: the pipeline syncs data for since the last successful sync till the time the trigger was executedfull_sync
sync mode: syncs all data in a pipelineIf a pipeline is faulted, you can use the trigger endpoint to re-attempt synchronization. If successful, this clears the faulted status.
Returns an error 400
if the pipeline is not idle.
The following table shows an example of how the pipeline’s intervals are set for each action that is performed.
Date | Action | Description |
---|---|---|
2023-02-01 15:13:22Z | Pipeline Created | The pipeline is initially idle |
2023-02-04 08:30:17Z | Trigger API Called | An interval will synchronize data between 2023-02-01 00:00:00Z and 2023-02-04 08:30:17Z |
2023-02-08 12:05:45Z | Trigger API Called | An interval will synchronize data between 2023-02-04 08:30:11Z and 2023-02-08 12:05:45Z |
Example request body:
{
"pipeline_id": "string"
}
Example response:
{
"pipeline_run_id": 13325,
"status": "attempt_requested"
}
Example error response:
{
"statusCode": 400,
"errors": [
{
"message": "Pipeline example-pipeline is already in progress."
}
]
}
GET {baseURI}/inputs/data-pipelines/{pipelineId}/report
Retrieves the current status of a pipeline, including the latest completed run.
Request body: none
Response Parameters:
Name | Type | Description |
---|---|---|
status | string | The status of the pipeline. Valid values: “idle”, “running”, “faulted”, or “stopped” |
errors | array | A list of errors detailing why a pipeline may be “faulted” |
connection_status | string | The status of the pipeline’s connection. Valid values: “healthy” or “faulted” |
connection_errors | array | A list of errors detailing why a connection may be “faulted” |
data_model_status | string | The status of the pipeline’s data model. Valid values: “invalid” or “valid” |
data_model_errors | array | A list of errors detailing why a data model may be “invalid” |
latest_successful_pipeline_run_id | integer | The most recent successfully completed pipeline run’s id |
latest_pipeline_run | object | The most recent pipeline run’s status according by it’s logical_date |
id | string | The id of this pipeline run |
pipeline_id | string | The pipeline this run is for |
type | string | Descriptor for reason why this run was created. Values: scheduled , or manual |
status | string | Status for this run. Values: queued , running , success , failed , stopped , or retrying |
errors | array | A list of errors detailing why a run may have a “failed” status |
logical_date | string | Identifier aligned to the chosen schedule interval and frequency |
started_on | string | Timestamp of when this run began running |
ended_on | string | Timestamp of when this run finished |
range_start | string | Start of data retrieved from the data model aligned to the chosen delay |
range_end | string | End of data retrieved from the data model aligned to the chosen delay |
successful_records | string | Count of rows successfully processed |
failed_records | string | Count of rows that failed to extract or be processed into the mParticle format |
Example response:
{
"pipeline_id": "string",
"status": "idle",
"errors": [
{
"message": "string"
}
],
"connection_status": "healthy",
"connection_errors": [
{
"message": "string"
}
],
"data_model_status": "valid",
"data_model_errors": [
{
"message": "string"
}
],
"latest_successful_pipeline_run_id": 0,
"latest_pipeline_run": {
"id": 0,
"pipeline_id": "string",
"type": "scheduled",
"status": "success",
"errors": [
{
"message": "string"
}
],
"logical_date": "2023-10-25T18:11:57.321Z",
"started_on": "2023-10-25T18:11:57.321Z",
"ended_on": "2023-10-25T18:11:57.321Z",
"range_start": "2023-10-25T18:11:57.321Z",
"range_end": "2023-10-25T18:11:57.321Z",
"successful_records": 0,
"failed_records": 0
}
}
GET {baseURI}inputs/data-pipelines/{pipelineId}/runs?currentPage={currentPage}&pageSize={pageSize}&startDate={startDate}&endDate={endDate}
Path Parameters
Name | Required | Description |
---|---|---|
pipelineId | Required | The ID of the pipeline to retrieve the status for |
currentPage | Optional | Page number to return |
pageSize | Optional | Count of items to return per page |
startDate | Optional | The earliest logical_date to look for run statuses |
endDate | Optional | The last logical_date to look for run statuses |
Request body: none
Example response:
{
"items": [
{
"id": 251,
"pipeline_id": "example-pipeline",
"type": "scheduled",
"status": "success",
"errors": [
{
"message": "string"
}
],
"logical_date": "2023-10-25T18:11:57.321Z",
"started_on": "2023-10-25T18:11:57.321Z",
"ended_on": "2023-10-25T18:11:57.321Z",
"range_start": "2023-10-25T18:11:57.321Z",
"range_end": "2023-10-25T18:11:57.321Z",
"successful_records": 100,
"failed_records": 0
},
{
"id": 252,
"pipeline_id": "example-pipeline",
"type": "scheduled",
"status": "success",
"errors": [
{
"message": "string"
}
],
"logical_date": "2023-10-25T19:11:57.321Z",
"started_on": "2023-10-25T19:11:57.321Z",
"ended_on": "2023-10-25T19:11:57.321Z",
"range_start": "2023-10-25T19:11:57.321Z",
"range_end": "2023-10-25T19:11:57.321Z",
"successful_records": 100,
"failed_records": 0
}
],
"current_page": 1,
"page_size": 50,
"total_pages": 1,
"total_items": 1,
"has_previous_page": false,
"has_next_page": false
}
GET {baseURI}/inputs/data-pipelines/{pipelineId}/runs/{pipelineRunId}
Request body: none
Example response:
{
"id": 251,
"pipeline_id": "example-pipeline",
"type": "scheduled",
"status": "success",
"errors": [
{
"message": "string"
}
],
"logical_date": "2023-10-25T18:11:57.321Z",
"started_on": "2023-10-25T18:11:57.321Z",
"ended_on": "2023-10-25T18:11:57.321Z",
"range_start": "2023-10-25T18:11:57.321Z",
"range_end": "2023-10-25T18:11:57.321Z",
"successful_records": 100,
"failed_records": 0
}
The Warehouse Sync API returns error codes or status codes in response to every request.
200
: Successful request. The message varies depending on which resource you requested.
204
: Empty response. Indicates a successful operation.
400
: The resource could not be created because of an error in the request or the entity is in an unprocessable state. This may mean the SQL query contains invalid characters or the query is otherwise invalid. This could be the result of supplying a data plan ID for a plan that is not active.
404
: The requested resource or object wasn’t found. The message varies depending on which resource or object you requested. This could be the result of supplying a data plan ID for a data plan that does not exist in your workspace, or if the data plan does exist but you did not specify a data plan version.
505
: The request failed to connect to the warehouse account. Check the username and password, and make sure the source account ID, region, warehouse, and database entries are correct.
Was this page helpful?