Use this tutorial to configure your first Warehouse Sync pipeline using the mParticle Postman collection, and use the data from your pipeline to create an mParticle Audience. Postman is an easy and friendly environment for both developers and non-developers to use APIs.
This tutorial is not a complete guide to all Warehouse Sync features. For a complete API reference, see the Warehouse Sync API Reference.
Fork the mParticle Warehouse Sync Postman Collection to your workspace:
A copy of the Warehouse Sync environment is included. You can download it again here.
You need credentials to use the Platform API to create Warehouse Sync pipelines.
To create a Platform API credential:
Give the credential a name, check the Platform checkbox and select Admin from the Permissions dropdown menu. Click the green Save button.
Work with your warehouse administrator or IT team to ensure your warehouse is reachable and accessible by mParticle.
Run the following commands from your Snowflake instance:
USE ROLE ACCOUNTADMIN;
// mParticle recommends creating a unique role for warehouse sync
CREATE ROLE IF NOT EXISTS {{role_name}};
GRANT USAGE ON WAREHOUSE {{warehouse}} TO ROLE {{role_name}};
GRANT USAGE ON DATABASE {{database}} TO ROLE {{role_name}};
GRANT USAGE ON SCHEMA {{database}}.{{schema}} TO ROLE {{role_name}};
// Grant SELECT privilege on any tables/views mP needs to access
GRANT SELECT ON TABLE {{database}}.{{schema}}.{{table}} TO ROLE {{role_name}};
// mParticle recommends creating a unique user for mParticle
CREATE OR REPLACE USER {{user_name}} PASSWORD = "{{unique_secure_password}}";
GRANT ROLE {{role_name}} TO USER {{user_name}};
CREATE OR REPLACE STORAGE INTEGRATION {{storage_integration_name}}
WITH TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = "arn:aws:iam::{{mp_pod_aws_account_id}}:role/ingest-pipeline-data-external-{{mp_org_id}}-{{mp_acct_id}}"
STORAGE_AWS_OBJECT_ACL = "bucket-owner-full-control"
STORAGE_ALLOWED_LOCATIONS = ("s3://{{mp_pod}}-ingest-pipeline-data/{{mp_org_id}}/{{mp_acct_id}}");
GRANT USAGE ON INTEGRATION {{storage_integration_name}} TO ROLE {{role_name}};
Where:
role_name
: The ID of the role mParticle will assume while executing SQL commands on your Snowflake instance. mParticle recommends creating a unique role for warehouse sync.warehouse
: The ID of the Snowflake virtual warehouse compute cluster where SQL commands will be executed.database
: The ID of the database in your Snowflake instance from which you want to sync data.schema
: The ID of the schema in your Snowflake instance containing the tables you want to sync data from.table
: The ID of the table containing data you want to sync. Grant SELECT privileges on any tables/views mParticle needs to access.user_name
: The ID of the user mParticle will log in as while executing SQL commands on your Snowflake instance. mParticle recommends creating a unique role for warehouse sync.
unique_secure_password
.storage_integration_name
: The ID of a Snowflake external storage integration allowing mParticle to unload data from your Snowflake instance to an S3 storage bucket.mp_pod
: The mParticle region ID of your data hosting location, one of US1
, US2
, AU1
, or EU1
.mp_pod_aws_account_id
: The mParticle provided ID for the data hosting location where your organization resides. Use the corresponding value for your mParticle instance:
338661164609
386705975570
526464060896
583371261087
mp_org_id
: The mParticle provided ID of your organization where this connection will be stored. This can be found from your API client setup in step 1.mp_acct_id
: The mParticle provided ID of the account where this connection will be stored. This can be found from your API client setup in step 1.
service_account_key
will be the contents of the generated JSON file. Save this value for your Postman setup.
Navigate to your BigQuery instance from console.cloud.google.com.
project_id
is the first portion of Dataset ID (the portion before the .
). In the example above, it is mp-project
.dataset_id
is the second portion of Dataset ID (the portion immediately after the .
) In the example above, it is mp-dataset
.region
is the Data location. This is us-east4
in the example above.
-- Create a unique user for mParticle
CREATE USER {{user_name}} WITH PASSWORD '{{unique_secure_password}}'
-- Grant schema usage permissions to the new user
GRANT USAGE ON SCHEMA {{schema_name}} TO {{user_name}}
-- Grant SELECT privilege on any tables/views mP needs to access to the new user
GRANT SELECT ON TABLE {{schema_name}}.{{table_name}} TO {{user_name}}
Replace {mp_pod_aws_account_id}
with one of the following values according to your mParticle instance’s location:
338661164609
386705975570
526464060896
583371261087
{
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Resource": "arn:aws:iam::{{mp_pod_aws_account_id}}:role/ingest-pipeline-data-external-{{mp_org_id}}-{{mp_acct_id}}",
"Sid": ""
}
],
"Version": "2012-10-17"
}
mparticle_redshift_assume_role_policy
, and click Create policy.mparticle_redshift_role
, and click Create role.Your configuration will differ between Amazon Redshift and Amazon Redshift Serverless. To complete your configuration, follow the appropriate steps for your use case below.
Make sure to save the value of your new role’s ARN. You will need to use this when setting up Postman in the next section.
mparticle_redshift_role
.mparticle_redshift_role
.Warehouse Sync uses the Databricks-to-Databricks Delta Sharing protocol to ingest data from Databricks into mParticle.
Complete the following steps to prepare your Databricks instance for Warehouse Sync.
mParticle_{YOUR-DATA-POD}
under Recipient name where {YOUR-DATA-POD}
is either us1
, us2
, eu1
, or au1
depending on the location of the data pod configured for your mParticle account. In Sharing identifier, enter one of the following identifiers below, depending on the location of your mParticle account’s data pod:
aws:us-east-1:e92fd7c1-5d24-4113-b83d-07e0edbb787b
aws:us-east-1:e92fd7c1-5d24-4113-b83d-07e0edbb787b
aws:eu-central-1:2b8d9413-05fe-43ce-a570-3f6bc5fc3acf
aws:ap-southeast-2:ac9a9fc4-22a2-40cc-a706-fef8a4cd554e
Within the Create a new share window, enter mparticle_{YOUR-MPARTICLE-ORG-ID}_{YOUR-MPARTICLE-ACCOUNT-ID}
under Share name where {YOUR-MPARTICLE-ORG-ID}
and {YOUR-MPARTICLE-ACCOUNT-ID}
are your mParticle Org and Account IDs.
Databricks Delta Sharing does not currently support the TIMESTAMP_NTZ data type.
Other data types that are not currently supported by the Databricks integration for Warehouse Sync (for both user and events data) include:
If you are ingesting events data through Warehouse Sync, the following data types are unsupported:
While multi-dimensional, or nested, arrays are unsupported, you can still ingest simple arrays with events data.
Once you have installed Postman, configure the collection environment settings and variables.
If you successfully forked the Warehouse Sync API collection, you’ll see it in the list of Environment configurations. You can rename it to something more meaningful by right-clicking on the … next to the name and choosing the Rename option.
Replace the the placeholders (replace_me
) with the correct values for your environment. You must update the values under column labeled Current value.
PLATFORM_API_CLIENT_ID
and PLATFORM_API_CLIENT_SECRET
with your new Platform API credentials.Replace WORKSPACE_ID
, ACCOUNT_ID
, and ORG_ID
with the corresponding values for your mParticle account.
POD
with the regional pod your mParticle account is deployed on. Look at the URL in your browser where you are signed into mParticle. The POD is one of the following values: US1, US2, EU1, AU1.Enter the data warehouse usernames and passwords you saved from “Step 2. Data Warehouse Setup.” according to the data warehouse you are using:
SNOWFLAKE_PASSWORD
and SNOWFLAKE_STORAGE_INTEGRATION
with the values you saved in step 2. Please refer to the Snowflake documentation to determine your account_identifier
and region
. BIG_QUERY_SERVICE_ACCOUNT_ID
with the service account ID you used in BigQuery, and BIG_QUERY_SERVICE_ACCOUNT_KEY
with the key from the generated JSON file in step 2.REDSHIFT_USER
, REDSHIFT_PASSWORD
, and REDSHIFT_AWS_IAM_ROLE_ARN
with the values created in step 2.Replace replace_me
placeholders with the values corresponding to your environment. Ensure you update the values in the Current value column.
INGEST_PIPELINE_SLUG
and INGEST_PIPELINE_NAME
with the slug and name you want to use to identify your new pipeline.Replace SQL_QUERY
with the database SQL query mParticle will use to retrieve data from your warehouse. SQL is a powerful language and you can use advanced expressions to filter, aggregate, join, etc. your data. Work with your database administrator if you need help crafting the right SQL query:
As part of the SQL query, you must specify how columns in the query will map to attributes on a user’s profile. You do this by using column aliasing in SQL. For example, in the following query, the column cid
in Snowflake is being mapped to the mParticle attribute customer_id
.
If you don’t provide an alias, mParticle will use the name of the column in your database. If an attribute of this name does not already exist on the user’s profile, mParticle will create a new attribute with this name.
Creating a Warehouse Sync pipeline requires completing the following steps:
After configuration, you can monitor the pipeline.
First, you must create a data feed by submitting a request to the Feeds API. mParticle uses this feed for rules and connections in your workspace. You must provide a value for module_name
that corresponds with the data warehouse you are using.
Valid values for module_name
are:
Redshift
BigQuery
Snowflake
Databricks
These values are case sensitive. For example, if you use snowflake
instead of Snowflake
, you will encounter errors later in your configuration.
Click the Body tab to see the information you will pass to the API in order to create the feed.
module_name
must be one of Snowflake
, BigQuery
, or Redshift
Values surrounded by double braces (for example: {{WORKSPACE_ID}}
) are taken from the variables you updated in previous steps of this tutorial.
The next step is to create a connection between mParticle and your data warehouse.
Click the Body tab and replace each "replace_me"
placeholder with the correct value for your specific warehouse.
The values in {{Sample Values}}
are taken from the environment variables you updated in earlier steps. Make sure these values match the values for your organization’s data warehouse. You may need to work with your database administrator to ensure you have the correct values.
Verify all values you changed, and click the blue Send button. mParticle returns a success message with details about the configuration you just created. If the request fails, mParticle returns an error message with additional information.
The next step is to create a data model. A data model is a SQL query that mParticle sends to your warehouse specifying exactly what columns, rows, and fields of data you want to ingest through your pipeline.
In the case of pipelines that ingest user profile data, the data model is also responsible for mapping ingested data fields to mParticle user attributes.
To create a data model using the Warehouse Sync API:
Select the Body tab and enter your data model.
{{Sample Values}}
are taken from the variables you updated in previous steps. For more details about using SQL to create a data model, with example queries and best practices, see the Warehouse Sync SQL Reference.
You can create a field transformation to specify exactly how fields in your database should map to fields in the mParticle JSON schema.
For detailed instructions on how to create a field transformation, read Event Data Mapping.
To use your field transformation, add the field_transformation_id
to the request body of your API call when creating your pipeline in the next step.
The final step is to create the pipeline. You pass in the connection and data model configurations previously created along with your sync mode and scheduling settings.
Create Pipeline
.Select the Body tab, and update the sync_mode
and schedule
settings as follows:
For sync_mode
:
type
to either incremental
or full
type
to incremental
, Set iterator_field
to the name of the column in your sql query mParticle will monitor to track changes that need to be syncedIf you set type
to incremental
, Set iterator_data_type
to the datatype of your iterator
timestamp_ltz, timestamp_ntz, timestamp_tz, datetime, date, timestamp_unixtime_ms, timestamp_unixtime_s
.datetime
, date
, timestamp
, timestamp_unixtime_ms
, timestamp_unixtime_s
.date
, timestamp
, timestamptz
, timestamp_unixtime_ms
, timestamp_unixtime_s
.date
, timestamp
, timestamp_unixtime_ms
, timestamp_unixtime_s
.For schedule
:
type
to interval
, once
, or on_demand
type
to interval
, set frequency
to hourly
, daily
, weekly
, or monthly
start
to the date-time value you want recurring syncs to begin<img src="/images/dwi/api-v2-tutorial/5.png" alt="Postman page showing request results">
The values in `{{Sample Values}}` will be taken from the variables you updated in previous steps. You can optionally update the `environment` variable. It is currently set to target your mParticle development environment, but you can change it to target your production environment.
field_transformation_id
to the ID of a a custom field transformation to map data from your warehouse to specific fields in the mParticle JSON schema. To learn more about custom field transformations and how to create them, see Field Transformations API.plan_id
and plan_version
and set these to the values of the data plan and version you want to use. You must use a data plan version that is active and exists in the mParticle workspace you are using.Once a pipeline has been created, you can monitor its status using the additional requests provided in the Postman collection.
Get Pipeline Report
.mParticle sends a detailed message with the pipeline’s current status. After creating a pipeline, there is an approximate one-minute delay until the pipeline is created in the mParticle backend, so submitting a Get Pipeline Report
request results in a Not Found
error. Try again after several minutes.
While a pipeline is ingesting data, you can monitor it in mParticle as you would with any other input. From mParticle, go to Data Master > Live Stream to inspect the incoming data from Snowflake.
Once the data is ingested, the data points appear on the user’s profile. Go to Activity > User Activity and look up a sample profile. If the attributes do not appear as expected, validate the mapping you provided in the SQL query provided earlier.
Now that the data has been loaded into mParticle, it’s time to put it to use by creating an audience using the newly ingested data and sending it to a downstream integration.
Create an audience that uses one of the attributes we ingested as a qualifying criteria for it:
Add an audience criteria that leverages one of the data points you ingested from your warehouse. In the example below, we only want to consider users who have a propensity-to-buy score that’s greater than 0.7.
In the final step, we will send users who qualified for this audience to Iterable for further targeting. If your organization doesn’t use Iterable, pick a different integration that your organization uses.
After the audience has been fully calculated, connect it to an output:
In the row for the audience you just created, click the Connect button in the Actions column.
Click Connect Output.
Select the Audience checkbox and click Configure.
Enter a configuration name, your Iterable API key, and the user ID that is used to identify users, and then click Save & Open in Connections.
Provide the ID of the list in Iterable that the user data should be loaded into, and click Add Connection.
Click the Send button to send the audience to Iterable.
Data starts flowing to Iterable.
Now you can open the audience in Iterable.
Was this page helpful?