Use this tutorial to configure your first Warehouse Sync pipeline with [the mParticle Postman collection], and activate the data in a downstream tool. Postman is an easy and friendly environment for developers and non-developers alike to use APIs.
This tutorial is not a complete guide to all of the Warehouse Sync features and APIs. For more reference documentation, see the Warehouse Sync API Reference.
Fork the mParticle Warehouse Sync Postman Collection to your workspace:
A copy of the Warehouse Sync environment is included. You can download it again here.
Create a new Feed in mParticle that will receive the incoming data, and create API credentials that can be used in Postman.
Copy the Server to Server Key for the Feed as you’ll need it in a future step.
You need credentials to use the Platform API to create Warehouse Sync Pipelines.
To create a Platform API credential:
After signing in to the mParticle app as a user with the Admin role, click the user icon in the bottom left corner.
Give the credential a name, and ensure the Platform checkbox is checked and the Admin dropdown is selected. Click the green Save button to save the new credential.
Work with your Snowflake administrator or IT team to ensure Snowflake is reachable and accessible by the mParticle app.
USE ROLE ACCOUNTADMIN;
// Create a unique role for mParticle
CREATE ROLE IF NOT EXISTS {{role_name}};
GRANT USAGE ON WAREHOUSE {{compute_wh}} TO ROLE {{role_name}};
GRANT USAGE ON DATABASE {{database}} TO ROLE {{role_name}};
GRANT USAGE ON SCHEMA {{database}}.{{schema}} TO ROLE {{role_name}};
// Grant SELECT privilege on any tables/views mP needs to access
GRANT SELECT ON TABLE {{database}}.{{schema}}.{{table}} TO ROLE {{role_name}};
// Recommend creating a unique user for mParticle
CREATE OR REPLACE USER {{user_name}} PASSWORD = "{{unique_secure_password}}";
GRANT ROLE {{role_name}} TO USER {{user_name}};
// Note: Don't change the name of the storage integration: plug in your mP POD, Org ID,
// and Acct ID into the name, and be sure to leave the other characters as they are
// ("mp_", underscores, and "_s3").
//
// Use the value for your mParticle instance for {pod_mp_aws_account_id}:
// US1 = 338661164609
// US2 = 386705975570
// AU1 = 526464060896
// EU1 = 583371261087
CREATE OR REPLACE STORAGE INTEGRATION mp_{{pod}}_{{org_id}}_{{acct_id}}_s3
WITH TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = "arn:aws:iam::{{pod_mp_aws_account_id}}:role/ingest-pipeline-data-external-{{org_id}}-{{acct_id}}"
STORAGE_AWS_OBJECT_ACL = "bucket-owner-full-control"
STORAGE_ALLOWED_LOCATIONS = ("s3://{{pod}}-ingest-pipeline-data/{{org_id}}/{{acct_id}}");
GRANT USAGE ON INTEGRATION mp_{{pod}}_{{org_id}}_{{acct_id}}_s3 TO ROLE {{role_name}};
DESCRIBE INTEGRATION mp_{{pod}}_{{org_id}}_{{acct_id}}_s3;
The DESCRIBE command returns a number of different values. Copy the values for STORAGE_AWS_IAM_USER_ARN
and STORAGE_AWS_EXTERNAL_ID
as you will need to enter them in the next section.
Once you have installed Postman, configure the collection environment.
Once successfully forked, you’ll see it in the list of Environment configurations. You can rename it to something more meaningful by right-clicking on the … next to the name and choosing the Rename option.
Replace the placeholder value “REPLACE_ME” for several environment variables with values corresponding to your environment. Ensure you update the values in the CURRENT VALUE column.
Replace WORKSPACE_ID, ACCOUNT_ID, and ORG_ID with the corresponding values.
Click the Warehouse Sync Early Access collection to expand it. Then click the Variables tab.
Replace placeholder values (the ones indicated with “REPLACE_ME”) for several variables with the values corresponding to your environment. Ensure you update the values in the CURRENT VALUE column.
Replace SQL_QUERY with the Snowflake query mParticle should use to retrieve the data from Snowflake. SQL is a powerful language and you can use advanced expressions to filter, aggregate, join, etc. your data. Work with your database administrator if you need help crafting the right SQL query:
As part of the SQL query, you must specify how columns in the query will map to attributes on a user’s profile. You do this by using column aliasing in SQL. For example, in the following query, the column cid
in Snowflake is being mapped to the mParticle attribute customer_id
.
If you don’t provide an alias, mParticle will use the name of the column in Snowflake. If an attribute of this name does not already exist on the user’s profile, mParticle will create a new attribute with this name.
Optional: update the value of the SCHEDULE_INTERVAL parameter. The current value once
means a pipeline will run once and only once after it is created. You can also choose monthly
, weekly
, daily
, or hourly
.
Creating a warehouse sync pipeline takes four steps:
The first step is to create a connection. mParticle uses this information in order to establish a connection with your Snowflake database.
In Postman, ensure the environment drop-down is pointed to the Environment configuration you recently imported.
Click the Body tab to see the information you will pass to the API in order to create the connection.
The values in {{Sample Values}}
is taken from the environment variables you updated in earlier steps. Ensure the values for source_account_id
, region
, warehouse
, and database
reflect the correct values for your organization’s Snowflake data warehouse. You may need to work with your Snowflake DBA to ensure you have the correct values.
Once you are confident all values are correct, click the blue Send button to issue the API call to create the Connection configuration. If everything is correct, mParticle will return a success message with details about the configuration you just created. If it was not successful, you will get an error message with additional information pointing to what the issue might be.
The second step in the pipeline creation process is creating a data model. mParticle uses this information in order to determine what data should be extracted from your Snowflake database and how it will map to mParticle’s data model.
Click the Body tab to see the information you will pass to mParticle in order to create the data model.
The values in {{Sample Values}}
will be taken from the variables you updated in previous steps. Update the values for load_timestamp_field_type
, load_timestamp_field_name
, and load_timestamp_field_time_zone
, to match the timestamp field you provided in your SQL Query in an earlier step.
timestamp_ltz, timestamp_ntz, timestamp_tz, datetime, date, timestamp_unixtime_ms, timestamp_unixtime_s
].Once you are confident all values are correct, click the blue Send button to issue the API request that creates the data model. If everything is correct, mParticle returns a success message with details about the data model you just created. If the request was not successful, mParticle returns an error message with additional information about what the issue might be.
The final step in the process is creating the pipeline itself. You will pass in the Connection and Data Model configurations previously created and provide a few additional details.
Create Pipeline
.Click the Body tab to see the information you will pass to mParticle in order to create the Pipeline configuration.
The values in {{Sample Values}}
will be taken from the variables you updated in previous steps. No values are required to be updated in this step. You can optionally update the environment
variable. It is currently set to target your mParticle development environment, but you can change it to target your production environment.
Once you are confident all values are correct, click the blue Send button to issue the API request that creates the Pipeline configuration. If everything is correct, mParticle returns a success message with details about the configuration you just created. If it was not successful, mParticle returns an error message with additional information about what the issue might be. If you kept the default value for the scheduled_interval
(set to “once”), the pipeline kicks off immediately after creation and only runs once. You can set it hourly
, daily
, or weekly
.
Once a pipeline has been created, you can monitor its status using the additional requests provided in the Postman collection.
Get Pipeline Status
request.mParticle sends a detailed message with the pipeline’s current status. Note that after creating a pipeline, there is an approximate one-minute delay until the pipeline is created in the mParticle backend, so submitting a Get Pipeline Status
call results in a Not Found
error. Try again in a minute if you experience this.
While a Pipeline is ingesting data, monitor it in mParticle, just like you do with other inputs. From mParticle, go to Data Master > Live Stream to inspect the incoming data from Snowflake.
Once the data is ingested, you will see the data points appear on the User’s Profile. Go to Activity > User Activity and look up a sample profile. If the attributes do not show up the way you thought they would, validate the mapping/aliasing you provided in the SQL Query provided earlier.
Now that the data has been loaded into mParticle, it’s time to activate it.
For this section of the tutorial, we will create an audience that uses the loaded data and sends it to a downstream integration.
Create an audience that uses one of the attributes we ingested as a qualifying criteria for it:
Add an audience criteria that leverages one of the data points you ingested from Snowflake. In the example below, we only want to consider users who have a propensity-to-buy score that’s greater than 0.7.
In the final step, we will send users who qualified for this audience to Iterable for further targeting. If your organization doesn’t use Iterable, pick a different integration that your organization uses.
After the audience has been fully calculated, connect it to an output:
In the row for the audience you just created, click the Connect button in the Actions column.
Click Connect Output.
Select the Audience checkbox and click Configure.
Enter a configuration name, your Iterable API key, and the user ID that is used to identify users, and then click Save & Open in Connections.
Provide the ID of the list in Iterable that the user data should be loaded into, and click Add Connection.
Click the Send button to send the audience to Iterable.
Data starts flowing to Iterable.
Now you can open the audience in Iterable and take the next step.
Was this page helpful?