ComposeID

Use ComposeID to resolve unidentified user data stored in Snowflake to known user profiles without modifying your existing data infrastructure or having to permanently store your user data in mParticle.

ComposeID currently supports the profile isolation and best match identity strategies.

1 Setup your mParticle configuration

To use ComposeID with Snowflake, you must have an mParticle account that has been provisioned for the Warehouse Sync API. If you are not sure if your account is provisioned to use Warehouse Sync, contact your mParticle account representative.

1.1 Create API credentials

Once you’ve confirmed your account has access to the Warehouse Sync API, begin by creating a new set of API credentials:

  1. Log into your mParticle account and click the user profile button in the bottom of the left hand nav.
  2. Click Settings and select the API Credentials tab.
  3. Click Add Credential.
  4. In the modal window that appears, enter a unique display name mParticle uses to refer to this particular set of credentials.
  5. Check the Platform box and select Admin from the Permissions dropdown menu.
  6. Click Save.

Your new API credentials are shown in a modal window. Save the values displayed for the following:

  • Organization ID
  • Account ID
  • Client ID
  • Client Secret

You also need to find and copy the value of your workspace ID by clicking Settings next to workspace in the left hand nav. Scroll to the workspace you want to use with IDSync, and copy the number displayed beneath the workspace name.

1.2 Create your Warehouse Sync API access token

Follow the instructions under Authentication in the Warehouse Sync documentation to create an API access token. Use the client ID and client secret generated for your new API credentials from the previous step. You will use your authentication token when calling the Warehouse Sync API later in this guide.

1.3 Create a Snowflake input feed

Use the mParticle Platform API to create a new feed for Snowflake.

  1. Using the API credentials created in step 1.1, fetch an OAuth bearer token by sending a POST request to mParticle’s SSO endpoint:

POST https://sso.auth.mparticle.com/oauth/token

Your request body must contain:

  • client_id - your client ID issued in step 1.1
  • client_secret - your client secret issued in step 1.1
  • audience - set to "https://api.mparticle.com"
  • grant_type - set to "client_credentials"

Example curl request

curl --request POST \
  --url https://sso.auth.mparticle.com/oauth/token \
  --header 'content-type: application/json' \
  --data '{"client_id":"YOUR_CLIENT_ID","client_secret":"YOUR_CLIENT_SECRET","audience":"https://api.mparticle.com","grant_type":"client_credentials"}'

Example raw HTTP request

POST /oauth/token HTTP/1.1
Host: sso.auth.mparticle.com
Content-Type: application/json

{
  "client_id": "your_client_id",
  "client_secret": "your_client_secret",
  "audience": "https://api.mparticle.com",
  "grant_type": "client_credentials"
}

A successful POST request receives the following JSON response, where "access_token" is the OAuth bearer token you can use when authenticating subsequent calls to the Platform API:

{
  "access_token": "YWIxMjdi883GHBBDnjsdKAJQxNjdjYUUJABbg6hdI.8V6HhxW-",
  "expires_in" : 28800,
  "token_type": "Bearer"
}
  1. Submit a POST API request to /workspace/{workspace_id}/partnerfeeds?accountId={account_id} where
  • {workspace_id} is your mParticle workspace ID you found in Step 1.1
  • {account_id} is your mParticle account ID you found in Step 1.1

Set the following settings in your API request body:

Field Type Description
module_name String Must be set to "snowflake".
name String A unique name of your choosing for your new feed.
os String Optional. Defines what platform OS mParticle should associate with your feed. Can be left blank.
settings Array Optional. An array of JSON object where you can define specific settings for your feed. This can be left blank.
is_active boolean Toggles whether your feed is active or not. Set to true.

Example curl request

curl \
  -X POST \
  -H "Authorization: Bearer <YOUR_BEARER_TOKEN>" \
  -H "Content-Type: application/json" \
  -d "{ \
    \"module_name\": \"snowflake\", \
    \"name\": \"snowflake-idsync-feed\", \
    \"os\": \"unknown\", \
    \"settings\": [], \
    \"is_active\": true \
  }" \
  "https://api.mparticle.com/v1/workspace/<YOUR_WORKSPACE_ID>/partnerfeeds?accountId=<YOUR_ACCOUNT_ID>"

Example HTTP request

POST /oauth/token HTTP/1.1
Host: sso.auth.mparticle.com
Content-Type: application/json
Authorization: <YOUR_BEARER_TOKEN>

{
  "module_name": "snowflake",
  "name": "YOUR_FEED_NAME",
  "is_active": true
}

The response contains your new feed’s server to server key and secret. Save these values to use when configuring your Warehouse Sync pipeline in a later step.

2 Configure Snowflake for mParticle IDSync

To grant mParticle IDSync access to your user data in Snowflake, you need to run a SQL statement creating the necessary roles and usage grants. When running this SQL statement, you need to provide the correct values for several fields according to your Snowflake account and database setup. The fields you must set are:

Field Name Description
role_name This will be the name for a new role that mParticle uses when running IDSync on your Snowflake data. Use a value like “mparticle_idsync”
compute_wh The name of the warehouse in Snowflake containing your user data.
target_database The name of the database in your Snowflake warehouse.
target_table The name of the table in your Snowflake database.
user_name The database specific username that mParticle uses when running IDSync on your user data.
unique_secure_password The Snowflake password mParticle uses when running IDSync on your user data.
pod The geographic pod where your mParticle account is localized. Either US1, US2, EU1, or AU1. See Data Hosting Locations to find your pod.
pod_mp_aws_account_id The mParticle AWS account associated with your geographic pod. For US1, use 338661164609. For US2, use 386705975570. For EU1, use 583371261087. For AU1, use 526464060896.
org_id Your mParticle organization ID saved from step 1.1.
acct_id Your mParticle account ID saved from step 1.1.

Log into the Snowflake console and navigate to Worksheets. Make sure to select the correct worksheet context containing the user data you will be ingesting into mParticle.

Run the following SQL statement using the correct values for your Snowflake and mParticle configurations described above:

// Replace the placeholders below with the correct values for your mParticle and Snowflake configurations
SET role_name = 'YOUR_ROLE_NAME';
SET compute_wh = 'YOUR_WAREHOUSE_NAME';
SET target_database = 'YOUR_DATABASE_NAME';
SET target_schema = 'YOUR_SCHEMA';
SET target_table = 'YOUR_TABLE';
SET user_name = 'SNOWFLAKE_DATABASE_USERNAME';
SET unique_secure_password = 'SNOWFLAKE_DATABASE_PASSWORD';
SET pod = 'POD_LOCATION (US1, US2, EU1, or AU1)';
// Replace AWS_ACCOUNT_ID below with the correct ID for your pod location
// US1 = '338661164609'
// US2 = '386705975570'
// AU1 = '526464060896'
// EU1 = '583371261087'

SET pod_mp_aws_account_id = 'AWS_ACCOUNT_ID';
SET org_id = 'YOUR_MPARTICLE_ORG_ID';
SET acct_id = 'YOUR_MPARTICLE_ACCOUNT_ID';
// end customer set variables
    
    
// Below are mParticle set variables. Do not make changes to the rest of the SQL statement
SET target_database_schema = concat($target_database,'.',$target_schema);
SET target_database_schema_table = concat($target_database,'.',$target_schema,'.',$target_table);
SET mp_schema = 'MPARTICLE';
SET full_mparticle_schema = concat($target_database,'.',$mp_schema);
SET storage_integration_name = concat('mp_',$pod,'_',$org_id,'_',$acct_id,'_s3');
SET storage_integration_aws_role_arn = concat('arn:aws:iam::',$pod_mp_aws_account_id,':role/ingest-pipeline-data-external-',$org_id,'-',$acct_id);
SET storage_integration_allowed_locations = concat('s3://',$pod,'-ingest-pipeline-data/',$org_id,'/',$acct_id);
// end mparticle set variables

USE ROLE ACCOUNTADMIN;

// Create a unique role for mParticle
CREATE ROLE IF NOT EXISTS identifier($role_name);

GRANT USAGE ON WAREHOUSE identifier($compute_wh) TO ROLE identifier($role_name);
GRANT USAGE ON DATABASE identifier($target_database) TO ROLE identifier($role_name);
GRANT USAGE ON SCHEMA identifier($target_database_schema) TO ROLE identifier($role_name);

// Create a schema for mparticle to store temporary data
CREATE SCHEMA IF NOT EXISTS identifier($full_mparticle_schema);

// Grant ownership to mparticle of the new schema to store temporary data
GRANT OWNERSHIP ON SCHEMA identifier($full_mparticle_schema) TO ROLE identifier($role_name);
GRANT OWNERSHIP ON ALL TABLES IN SCHEMA identifier($full_mparticle_schema) TO ROLE identifier($role_name);

// Grant privileges on any tables/views mP needs to access to write MPIDs
GRANT SELECT,INSERT,UPDATE ON TABLE identifier($target_database_schema_table) TO ROLE identifier($role_name);

// Recommend creating a unique user for mParticle
CREATE OR REPLACE USER identifier($user_name) PASSWORD = $unique_secure_password;
GRANT ROLE identifier($role_name) TO USER identifier($user_name);

CREATE STORAGE INTEGRATION IF NOT EXISTS identifier($storage_integration_name)
WITH TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = $storage_integration_aws_role_arn
STORAGE_AWS_OBJECT_ACL = "bucket-owner-full-control"
STORAGE_ALLOWED_LOCATIONS = ($storage_integration_allowed_locations);

GRANT USAGE ON INTEGRATION identifier($storage_integration_name) TO ROLE identifier($role_name);

// Grab info from the describe integration call: STORAGE_AWS_ROLE_ARN and STORAGE_AWS_EXTERNAL_ID
DESCRIBE INTEGRATION identifier($storage_integration_name);

// ADD MPID column to your target table
ALTER TABLE identifier($target_database_schema_table) ADD COLUMN MPID number;
DESCRIBE TABLE identifier($target_database_schema_table);

After running the SQL statement, save the values returned for:

  • STORAGE_AWS_IAM_USER_ARN
  • STORAGE_AWS_EXTERNAL_ID

You will need these values when creating your Warehouse Sync pipeline in the final step.

3 Setup Postman

Use the Postman collection for the Warehouse Sync API to configure your environment settings according to the instructions in Postman setup.

Run in Postman

You will need to set the values for the following variables in Postman:

Postman environment variables

Postman Environment Variable Description
ORG_ID Your mParticle organization ID saved from step 1.1.
ACCOUNT_ID Your mParticle account ID saved from step 1.1.
WORKSPACE_ID Your mParticle workspace ID saved from step 1.1.
POD The geographic pod where your mParticle account is localized. Either US1, US2, EU1, or AU1. See Data Hosting Locations to find your pod.
PLATFORM_API_CLIENT_ID The client ID saved from step 1.1
PLATFORM_API_CLIENT_SECRET The client secret saved from step 1.1
PARTNER_FEED_ID The feed key you saved from step 1.3 when creating your Snowflake feed.
SNOWFLAKE_ROLE Your Snowflake role from step 2.
SNOWFLAKE_WAREHOUSE Your Snowflake Warehouse name from step 2.
SNOWFLAKE_DATABASE Your Snowflake Database name from step 2.
SNOWFLAKE_USER Your Snowflake service account user name from step 2.
SNOWFLAKE_PASSWORD Your Snowflake service account password from step 2.
SNOWFLAKE_ACCT_ID Your Snowflake Account ID.
SNOWFLAKE_REGION Your Snowflake region.
SNOWFLAKE_AWS_IAM_USER_ARN The value returned for STORAGE_AWS_IAM_USER_ARN after running the SQL statement in step 2.
SNOWFLAKE_AWS_EXTERNAL_ID The value returned for STORAGE_AWS_EXTERNAL_ID after running the SQL statement in step 2.

Postman collection variables

Postman Environment Variable Description
INGEST_PIPE_LINE_NAME A unique name to identify your Warehouse Sync pipeline in mParticle. For example, “IDSync Snowflake Pipeline”.
INGEST_PIPELINE_SLUG A shortened version of your pipeline name. For example, “idsync-snowflake-pipeline’
SQL_QUERY This SQL query is how you specify what columns in your Snowflake database you want to submit to IDSync for resolution. There are several criteria for this query defined in the following section.

Defining your IDSync SQL query

mParticle IDSync will execute identification requests according to the data you select from your Snowflake database in a SQL query. The SQL query is defined as a Postman collection variable, and is included in the request body for the API call that executes a sync for your Warehouse Sync pipeline. Adhere to the following criteria when writing your SQL query:

  • Your SQL query must be expressed as a selection (using the SELECT SQL operator) of a database table. mParticle will send the results of the identify resolution back into the same table. You may not use any JOIN or AGG functions in your query, as this will prevent mParticle from writing the results of the identity resolution back to your database.
  • Each field returned by the projection of your SQL query must map exactly to a column that exists in your table in Snowflake. mParticle uses each underlying field to write resolved identities back into this table.
  • Field names returned by the projection of your SQL query must be included in the list of Reserved mParticle Identity types.
  • Your SQL query may not contain any quotes around Snowflake column, table, schema, or database names.
  • Your SQL query may not end with a semi-colon.
  • Your SQL query must include the MPID column (that the customer created in the setup steps) in its projection.
  • You may add WHERE clauses to your query if you want to filter which rows are included in identity resolution requests.
  • We recommend including a filter to only run identity resolution requests for null MPID columns. For example, “WHERE MPID IS NULL”.

4 Create your Warehouse Sync pipeline

After setting up Postman and creating the SQL query used to submit ID requests to IDSync, you can complete the configuration of your Warehouse Sync pipeline by following the instructions in Create your first Warehouse Sync pipeline.

After creating your pipeline:

  • IDSync maps any anonymous identities included in your sync to known MPIDs in your mParticle account when your pipeline runs.
  • mParticle automatically creates a new table named {YOUR_DATABASE}_MPID_MAPPING_TABLE (where {YOUR_DATABASE} is replaced with the name of your Snowflake database).
  • mParticle stores the mappings between the identities sent to IDSync and the MPIDs they resolved to in the MPID mapping table.

Example MPID Mapping Table

CUSTOMER_ID EMAIL IDFA IDFV GAID AID AMP_ID DAS MPID
123456 email@example.com null null null null null null 8910111213

5 Access the MPID mapping table

To join your new MPID mapping table with other tables in your Snowflake database:

  1. Create a custom UDF (User Defined Function) in Snowflake that generates a SQL statement you can run from your Snowflake Console.
  2. Grant your Snowflake role permission to run the custom UDF.
  3. Call the UDF from a SQL query in your Snowflake Console, specifying the other tables and columns you want to join to your MPID mapping table.
  4. Run the SQL query generated by the UDF.

5.1 Create the UDF

From your Snowflake Console, run the following SQL statement which creates a new UDF called create_mp_identity_order. When you call this UDF in the next step, it returns a SQL statement you can execute that will join the tables and columns you specify with your new MPID mappings.

create or replace function create_mp_identity_order(columns array, orig_table string, mapping_table string, identity_order array)
    returns string
    language python
    runtime_version = '3.10'
    handler = 'create_mp_identity_order_py'
    as
$$
from typing import List, Dict
# Ensure the columns is mapped from {"user_column":"mparticle_equivalent_identity_name"}
default_mp_identity_order = {
    "customer_id": 0,
    "email": 1,
    "idfa": 2, 
    "idfv": 3, 
    "gaid": 4, 
    "aid": 5, 
    "amp_id": 6, 
    "das": 7
}
def generate_inner_select(orig_table: str, mapping_table: str):
    return f"select\n\tot.*, mt.mpid as mpid\nfrom\n\t{orig_table} ot\nleft join\n\t{mapping_table} mt\n"
def generate_on_inner(identity_map: Dict):
    return f"on\n\t(ot.{identity_map['column_name']} = mt.{identity_map['mp_identity_map']})\n"
def generate_where_inner(old: list, identity_map: Dict):
    end_where = "mt.mpid is not null"
    if not old:
        return f"where\n\t{end_where}"
    where_prefix = [f"ot.{x} is null" for x in old]
    where_prefix = " and ".join(where_prefix)
    return f"where\n\t{where_prefix} and {end_where}" 
def generate_identity_order_map(identity_order: List):
    if not identity_order:
        return default_mp_identity_order
    return {x: i for i, x in enumerate(identity_order)}
def create_mp_identity_order_py(columns: List[Dict], orig_table: str, mapping_table: str, identity_order: List):
    mp_identity_order = generate_identity_order_map(identity_order)
    for identity in columns:
        if identity['mp_identity_map'] not in mp_identity_order:
            raise Exception(f"The mparticle identity for '{identity['mp_identity_map']}' from mapping {identity} does not exist");
    mp_identity_order_applied = sorted(columns, key=lambda d: mp_identity_order[d['mp_identity_map']])
    inner_select = generate_inner_select(orig_table, mapping_table)
    s = "with "
    count = 1
    old = []
    for id in mp_identity_order_applied:
        s += f"cte_{count} as (\n"
        s += inner_select + generate_on_inner(id) + generate_where_inner(old, id)
        s += "),\n"
        old.append(id['column_name'])
        count += 1
    s = s[:-2]
    s += "\n"
    select_cte = [f"select * from cte_{x}" for x in range(1, count)]
    select_cte = "\nunion all\n".join(select_cte)
    s += select_cte
    s += ";"
    return s
$$;

5.2 Grant permissions to run the UDF

To make sure your role in Snowflake can execute the new UDF, run the following statement in your console where my_role is the name of your user role in Snowflake:

GRANT USAGE ON FUNCTION create_mp_identity_order(number, number) TO my_role;

5.3 Run the UDF

From your Snowflake console, run:

select create_mp_identity_order(
[
    {'column_name': 'user_id', 'mp_identity_map': 'customer_id'}, 
    {'column_name': 'das', 'mp_identity_map': 'das'}, 
    {'column_name': 'vendor_id', 'mp_identity_map': 'idfv'}
], 'database.public.your_datawarehouse', 'database.mparticle.mpid_mapping_table', null::array);
  • create_mp_identity_order is the UDF we created in step 5.1.
  • database.public.your_datawarehouse is the name of your database and table in Snowflake.
  • database.mparticle.mpid_mapping_table is the new MPID mapping table created by mParticle.

Running this SQL query creates a separate SQL query specific to your configuration that you can then run to join the MPID mapping table with other tables and columns in your Snowflake database.

The above example assumes you are using the standard ID priority in your mParticle account and that the database table database.public.your_datawarehouse contains three columns called user_id, das, and vendor_id. The query joins these columns to the columns called customer_id, das, and idfv in the MPID mapping table.

If you are using a different ID strategy and a different ID priority, you can customize the identity ordering used by the UDF by adding an array to your SQL statement. For example, if you use the profile isolation strategy, you can specify the correct ID priority by adding the array ['CUSTOMERID', 'MOBILE_NUMBER', 'EMAIL', 'IOS_IDFA', 'IOS_IDFV']:

select create_mp_identity_order(
[
    {'column_name': 'user_id', 'mp_identity_map': 'customer_id'}, 
    {'column_name': 'phone_number', 'mp_identity_map': 'mobile_number'}, 
    {'column_name': 'email', 'mp_identity_map': 'email'}
], 'database.public.your_datawarehouse', 'database.mparticle.mpid_mapping_table', ['customer_id', 'mobile_number', 'email']);

You can customize the ordering however you want. For example, you can place email higher in the priority with:

select create_mp_identity_order(
[
    {'column_name': 'user_id', 'mp_identity_map': 'customer_id'}, 
    {'column_name': 'phone_number', 'mp_identity_map': 'mobile_number'}, 
    {'column_name': 'email', 'mp_identity_map': 'email'}
], 'database.public.your_datawarehouse', 'database.mparticle.mpid_mapping_table', ['customer_id', 'email', 'mobile_number']);

5.4 Run the SQL statement returned by the UDF

Running the UDF as described in step 5.3 generates a SQL query specific to your database configuration that you can run to join the MPID mapping table with your own table in Snowflake.

Was this page helpful?