Use ComposeID to resolve unidentified user data stored in Snowflake to known user profiles without modifying your existing data infrastructure or having to permanently store your user data in mParticle.
ComposeID currently supports the profile isolation and best match identity strategies.
To use ComposeID with Snowflake, you must have an mParticle account that has been provisioned for the Warehouse Sync API. If you are not sure if your account is provisioned to use Warehouse Sync, contact your mParticle account representative.
Once you’ve confirmed your account has access to the Warehouse Sync API, begin by creating a new set of API credentials:
Your new API credentials are shown in a modal window. Save the values displayed for the following:
You also need to find and copy the value of your workspace ID by clicking Settings next to workspace in the left hand nav. Scroll to the workspace you want to use with IDSync, and copy the number displayed beneath the workspace name.
Follow the instructions under Authentication in the Warehouse Sync documentation to create an API access token. Use the client ID and client secret generated for your new API credentials from the previous step. You will use your authentication token when calling the Warehouse Sync API later in this guide.
Use the mParticle Platform API to create a new feed for Snowflake.
POST
https://sso.auth.mparticle.com/oauth/token
Your request body must contain:
client_id
- your client ID issued in step 1.1client_secret
- your client secret issued in step 1.1audience
- set to "https://api.mparticle.com"
grant_type
- set to "client_credentials"
Example curl request
curl --request POST \
--url https://sso.auth.mparticle.com/oauth/token \
--header 'content-type: application/json' \
--data '{"client_id":"YOUR_CLIENT_ID","client_secret":"YOUR_CLIENT_SECRET","audience":"https://api.mparticle.com","grant_type":"client_credentials"}'
Example raw HTTP request
POST /oauth/token HTTP/1.1
Host: sso.auth.mparticle.com
Content-Type: application/json
{
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"audience": "https://api.mparticle.com",
"grant_type": "client_credentials"
}
A successful POST request receives the following JSON response, where "access_token"
is the OAuth bearer token you can use when authenticating subsequent calls to the Platform API:
{
"access_token": "YWIxMjdi883GHBBDnjsdKAJQxNjdjYUUJABbg6hdI.8V6HhxW-",
"expires_in" : 28800,
"token_type": "Bearer"
}
/workspace/{workspace_id}/partnerfeeds?accountId={account_id}
where{workspace_id}
is your mParticle workspace ID you found in Step 1.1{account_id}
is your mParticle account ID you found in Step 1.1Set the following settings in your API request body:
Field | Type | Description |
---|---|---|
module_name | String | Must be set to "snowflake" . |
name | String | A unique name of your choosing for your new feed. |
os | String | Optional. Defines what platform OS mParticle should associate with your feed. Can be left blank. |
settings | Array | Optional. An array of JSON object where you can define specific settings for your feed. This can be left blank. |
is_active | boolean | Toggles whether your feed is active or not. Set to true . |
Example curl request
curl \
-X POST \
-H "Authorization: Bearer <YOUR_BEARER_TOKEN>" \
-H "Content-Type: application/json" \
-d "{ \
\"module_name\": \"snowflake\", \
\"name\": \"snowflake-idsync-feed\", \
\"os\": \"unknown\", \
\"settings\": [], \
\"is_active\": true \
}" \
"https://api.mparticle.com/v1/workspace/<YOUR_WORKSPACE_ID>/partnerfeeds?accountId=<YOUR_ACCOUNT_ID>"
Example HTTP request
POST /oauth/token HTTP/1.1
Host: sso.auth.mparticle.com
Content-Type: application/json
Authorization: <YOUR_BEARER_TOKEN>
{
"module_name": "snowflake",
"name": "YOUR_FEED_NAME",
"is_active": true
}
The response contains your new feed’s server to server key and secret. Save these values to use when configuring your Warehouse Sync pipeline in a later step.
To grant mParticle IDSync access to your user data in Snowflake, you need to run a SQL statement creating the necessary roles and usage grants. When running this SQL statement, you need to provide the correct values for several fields according to your Snowflake account and database setup. The fields you must set are:
Field Name | Description |
---|---|
role_name |
This will be the name for a new role that mParticle uses when running IDSync on your Snowflake data. Use a value like “mparticle_idsync” |
compute_wh |
The name of the warehouse in Snowflake containing your user data. |
target_database |
The name of the database in your Snowflake warehouse. |
target_table |
The name of the table in your Snowflake database. |
user_name |
The database specific username that mParticle uses when running IDSync on your user data. |
unique_secure_password |
The Snowflake password mParticle uses when running IDSync on your user data. |
pod |
The geographic pod where your mParticle account is localized. Either US1, US2, EU1, or AU1. See Data Hosting Locations to find your pod. |
pod_mp_aws_account_id |
The mParticle AWS account associated with your geographic pod. For US1, use 338661164609 . For US2, use 386705975570 . For EU1, use 583371261087 . For AU1, use 526464060896 . |
org_id |
Your mParticle organization ID saved from step 1.1. |
acct_id |
Your mParticle account ID saved from step 1.1. |
Log into the Snowflake console and navigate to Worksheets. Make sure to select the correct worksheet context containing the user data you will be ingesting into mParticle.
Run the following SQL statement using the correct values for your Snowflake and mParticle configurations described above:
// Replace the placeholders below with the correct values for your mParticle and Snowflake configurations
SET role_name = 'YOUR_ROLE_NAME';
SET compute_wh = 'YOUR_WAREHOUSE_NAME';
SET target_database = 'YOUR_DATABASE_NAME';
SET target_schema = 'YOUR_SCHEMA';
SET target_table = 'YOUR_TABLE';
SET user_name = 'SNOWFLAKE_DATABASE_USERNAME';
SET unique_secure_password = 'SNOWFLAKE_DATABASE_PASSWORD';
SET pod = 'POD_LOCATION (US1, US2, EU1, or AU1)';
// Replace AWS_ACCOUNT_ID below with the correct ID for your pod location
// US1 = '338661164609'
// US2 = '386705975570'
// AU1 = '526464060896'
// EU1 = '583371261087'
SET pod_mp_aws_account_id = 'AWS_ACCOUNT_ID';
SET org_id = 'YOUR_MPARTICLE_ORG_ID';
SET acct_id = 'YOUR_MPARTICLE_ACCOUNT_ID';
// end customer set variables
// Below are mParticle set variables. Do not make changes to the rest of the SQL statement
SET target_database_schema = concat($target_database,'.',$target_schema);
SET target_database_schema_table = concat($target_database,'.',$target_schema,'.',$target_table);
SET mp_schema = 'MPARTICLE';
SET full_mparticle_schema = concat($target_database,'.',$mp_schema);
SET storage_integration_name = concat('mp_',$pod,'_',$org_id,'_',$acct_id,'_s3');
SET storage_integration_aws_role_arn = concat('arn:aws:iam::',$pod_mp_aws_account_id,':role/ingest-pipeline-data-external-',$org_id,'-',$acct_id);
SET storage_integration_allowed_locations = concat('s3://',$pod,'-ingest-pipeline-data/',$org_id,'/',$acct_id);
// end mparticle set variables
USE ROLE ACCOUNTADMIN;
// Create a unique role for mParticle
CREATE ROLE IF NOT EXISTS identifier($role_name);
GRANT USAGE ON WAREHOUSE identifier($compute_wh) TO ROLE identifier($role_name);
GRANT USAGE ON DATABASE identifier($target_database) TO ROLE identifier($role_name);
GRANT USAGE ON SCHEMA identifier($target_database_schema) TO ROLE identifier($role_name);
// Create a schema for mparticle to store temporary data
CREATE SCHEMA IF NOT EXISTS identifier($full_mparticle_schema);
// Grant ownership to mparticle of the new schema to store temporary data
GRANT OWNERSHIP ON SCHEMA identifier($full_mparticle_schema) TO ROLE identifier($role_name);
GRANT OWNERSHIP ON ALL TABLES IN SCHEMA identifier($full_mparticle_schema) TO ROLE identifier($role_name);
// Grant privileges on any tables/views mP needs to access to write MPIDs
GRANT SELECT,INSERT,UPDATE ON TABLE identifier($target_database_schema_table) TO ROLE identifier($role_name);
// Recommend creating a unique user for mParticle
CREATE OR REPLACE USER identifier($user_name) PASSWORD = $unique_secure_password;
GRANT ROLE identifier($role_name) TO USER identifier($user_name);
CREATE STORAGE INTEGRATION IF NOT EXISTS identifier($storage_integration_name)
WITH TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = $storage_integration_aws_role_arn
STORAGE_AWS_OBJECT_ACL = "bucket-owner-full-control"
STORAGE_ALLOWED_LOCATIONS = ($storage_integration_allowed_locations);
GRANT USAGE ON INTEGRATION identifier($storage_integration_name) TO ROLE identifier($role_name);
// Grab info from the describe integration call: STORAGE_AWS_ROLE_ARN and STORAGE_AWS_EXTERNAL_ID
DESCRIBE INTEGRATION identifier($storage_integration_name);
// ADD MPID column to your target table
ALTER TABLE identifier($target_database_schema_table) ADD COLUMN MPID number;
DESCRIBE TABLE identifier($target_database_schema_table);
After running the SQL statement, save the values returned for:
STORAGE_AWS_IAM_USER_ARN
STORAGE_AWS_EXTERNAL_ID
You will need these values when creating your Warehouse Sync pipeline in the final step.
Use the Postman collection for the Warehouse Sync API to configure your environment settings according to the instructions in Postman setup.
You will need to set the values for the following variables in Postman:
Postman Environment Variable | Description |
---|---|
ORG_ID |
Your mParticle organization ID saved from step 1.1. |
ACCOUNT_ID |
Your mParticle account ID saved from step 1.1. |
WORKSPACE_ID |
Your mParticle workspace ID saved from step 1.1. |
POD |
The geographic pod where your mParticle account is localized. Either US1, US2, EU1, or AU1. See Data Hosting Locations to find your pod. |
PLATFORM_API_CLIENT_ID |
The client ID saved from step 1.1 |
PLATFORM_API_CLIENT_SECRET |
The client secret saved from step 1.1 |
PARTNER_FEED_ID |
The feed key you saved from step 1.3 when creating your Snowflake feed. |
SNOWFLAKE_ROLE |
Your Snowflake role from step 2. |
SNOWFLAKE_WAREHOUSE |
Your Snowflake Warehouse name from step 2. |
SNOWFLAKE_DATABASE |
Your Snowflake Database name from step 2. |
SNOWFLAKE_USER |
Your Snowflake service account user name from step 2. |
SNOWFLAKE_PASSWORD |
Your Snowflake service account password from step 2. |
SNOWFLAKE_ACCT_ID |
Your Snowflake Account ID. |
SNOWFLAKE_REGION |
Your Snowflake region. |
SNOWFLAKE_AWS_IAM_USER_ARN |
The value returned for STORAGE_AWS_IAM_USER_ARN after running the SQL statement in step 2. |
SNOWFLAKE_AWS_EXTERNAL_ID |
The value returned for STORAGE_AWS_EXTERNAL_ID after running the SQL statement in step 2. |
Postman Environment Variable | Description |
---|---|
INGEST_PIPE_LINE_NAME |
A unique name to identify your Warehouse Sync pipeline in mParticle. For example, “IDSync Snowflake Pipeline”. |
INGEST_PIPELINE_SLUG |
A shortened version of your pipeline name. For example, “idsync-snowflake-pipeline’ |
SQL_QUERY |
This SQL query is how you specify what columns in your Snowflake database you want to submit to IDSync for resolution. There are several criteria for this query defined in the following section. |
mParticle IDSync will execute identification requests according to the data you select from your Snowflake database in a SQL query. The SQL query is defined as a Postman collection variable, and is included in the request body for the API call that executes a sync for your Warehouse Sync pipeline. Adhere to the following criteria when writing your SQL query:
JOIN
or AGG
functions in your query, as this will prevent mParticle from writing the results of the identity resolution back to your database.WHERE
clauses to your query if you want to filter which rows are included in identity resolution requests.After setting up Postman and creating the SQL query used to submit ID requests to IDSync, you can complete the configuration of your Warehouse Sync pipeline by following the instructions in Create your first Warehouse Sync pipeline.
After creating your pipeline:
{YOUR_DATABASE}_MPID_MAPPING_TABLE
(where {YOUR_DATABASE}
is replaced with the name of your Snowflake database).CUSTOMER_ID | IDFA | IDFV | GAID | AID | AMP_ID | DAS | MPID | |
---|---|---|---|---|---|---|---|---|
123456 | email@example.com | null | null | null | null | null | null | 8910111213 |
… | … | … | … | … | … | … | … | … |
To join your new MPID mapping table with other tables in your Snowflake database:
From your Snowflake Console, run the following SQL statement which creates a new UDF called create_mp_identity_order
. When you call this UDF in the next step, it returns a SQL statement you can execute that will join the tables and columns you specify with your new MPID mappings.
create or replace function create_mp_identity_order(columns array, orig_table string, mapping_table string, identity_order array)
returns string
language python
runtime_version = '3.10'
handler = 'create_mp_identity_order_py'
as
$$
from typing import List, Dict
# Ensure the columns is mapped from {"user_column":"mparticle_equivalent_identity_name"}
default_mp_identity_order = {
"customer_id": 0,
"email": 1,
"idfa": 2,
"idfv": 3,
"gaid": 4,
"aid": 5,
"amp_id": 6,
"das": 7
}
def generate_inner_select(orig_table: str, mapping_table: str):
return f"select\n\tot.*, mt.mpid as mpid\nfrom\n\t{orig_table} ot\nleft join\n\t{mapping_table} mt\n"
def generate_on_inner(identity_map: Dict):
return f"on\n\t(ot.{identity_map['column_name']} = mt.{identity_map['mp_identity_map']})\n"
def generate_where_inner(old: list, identity_map: Dict):
end_where = "mt.mpid is not null"
if not old:
return f"where\n\t{end_where}"
where_prefix = [f"ot.{x} is null" for x in old]
where_prefix = " and ".join(where_prefix)
return f"where\n\t{where_prefix} and {end_where}"
def generate_identity_order_map(identity_order: List):
if not identity_order:
return default_mp_identity_order
return {x: i for i, x in enumerate(identity_order)}
def create_mp_identity_order_py(columns: List[Dict], orig_table: str, mapping_table: str, identity_order: List):
mp_identity_order = generate_identity_order_map(identity_order)
for identity in columns:
if identity['mp_identity_map'] not in mp_identity_order:
raise Exception(f"The mparticle identity for '{identity['mp_identity_map']}' from mapping {identity} does not exist");
mp_identity_order_applied = sorted(columns, key=lambda d: mp_identity_order[d['mp_identity_map']])
inner_select = generate_inner_select(orig_table, mapping_table)
s = "with "
count = 1
old = []
for id in mp_identity_order_applied:
s += f"cte_{count} as (\n"
s += inner_select + generate_on_inner(id) + generate_where_inner(old, id)
s += "),\n"
old.append(id['column_name'])
count += 1
s = s[:-2]
s += "\n"
select_cte = [f"select * from cte_{x}" for x in range(1, count)]
select_cte = "\nunion all\n".join(select_cte)
s += select_cte
s += ";"
return s
$$;
To make sure your role in Snowflake can execute the new UDF, run the following statement in your console where my_role
is the name of your user role in Snowflake:
GRANT USAGE ON FUNCTION create_mp_identity_order(number, number) TO my_role;
From your Snowflake console, run:
select create_mp_identity_order(
[
{'column_name': 'user_id', 'mp_identity_map': 'customer_id'},
{'column_name': 'das', 'mp_identity_map': 'das'},
{'column_name': 'vendor_id', 'mp_identity_map': 'idfv'}
], 'database.public.your_datawarehouse', 'database.mparticle.mpid_mapping_table', null::array);
create_mp_identity_order
is the UDF we created in step 5.1.database.public.your_datawarehouse
is the name of your database and table in Snowflake.database.mparticle.mpid_mapping_table
is the new MPID mapping table created by mParticle.Running this SQL query creates a separate SQL query specific to your configuration that you can then run to join the MPID mapping table with other tables and columns in your Snowflake database.
The above example assumes you are using the standard ID priority in your mParticle account and that the database table database.public.your_datawarehouse
contains three columns called user_id
, das
, and vendor_id
. The query joins these columns to the columns called customer_id
, das
, and idfv
in the MPID mapping table.
If you are using a different ID strategy and a different ID priority, you can customize the identity ordering used by the UDF by adding an array to your SQL statement. For example, if you use the profile isolation strategy, you can specify the correct ID priority by adding the array ['CUSTOMERID', 'MOBILE_NUMBER', 'EMAIL', 'IOS_IDFA', 'IOS_IDFV']
:
select create_mp_identity_order(
[
{'column_name': 'user_id', 'mp_identity_map': 'customer_id'},
{'column_name': 'phone_number', 'mp_identity_map': 'mobile_number'},
{'column_name': 'email', 'mp_identity_map': 'email'}
], 'database.public.your_datawarehouse', 'database.mparticle.mpid_mapping_table', ['customer_id', 'mobile_number', 'email']);
You can customize the ordering however you want. For example, you can place email
higher in the priority with:
select create_mp_identity_order(
[
{'column_name': 'user_id', 'mp_identity_map': 'customer_id'},
{'column_name': 'phone_number', 'mp_identity_map': 'mobile_number'},
{'column_name': 'email', 'mp_identity_map': 'email'}
], 'database.public.your_datawarehouse', 'database.mparticle.mpid_mapping_table', ['customer_id', 'email', 'mobile_number']);
Running the UDF as described in step 5.3 generates a SQL query specific to your database configuration that you can run to join the MPID mapping table with your own table in Snowflake.
Was this page helpful?