Snowflake Connector

Forbind Snowflake til Brevo via Tajo for at synkronisere kundesegmenter fra dit data warehouse, berige kontaktprofiler med analysedata og drive personaliserede marketingkampagner med warehouse-baserede indsigter.

Oversigt

EgenskabVærdi
PlatformSnowflake
KategoriData warehouse (brugerdefineret)
OpsætningskompleksitetMellem
Officiel integrationNej
Synkroniserede dataKunder, segmenter, analyser, hændelser
AutentifikationsmetodeNøglepar / OAuth 2.0

Funktioner

  • Reverse ETL - Skub kundesegmenter fra Snowflake til Brevo-kontaktlister
  • Målgruppe-synkronisering - Synkronisér warehouse-beregnede målgrupper til målrettede kampagner
  • Analyseberigelse - Berig Brevo-kontakter med beregnede metrikker (LTV, RFM-scores)
  • SQL-baserede forespørgsler - Brug Snowflake SQL REST API til at udføre forespørgsler programmatisk
  • Planlagt synkronisering - Kør automatiserede datapipelines på konfigurerbare intervaller
  • Flerudsagns-understøttelse - Udfør komplekse datatransformationer i enkelte API-kald

Forudsætninger

Før du begynder, skal du sikre dig, at du har:

  1. En Snowflake-konto med ACCOUNTADMIN- eller SYSADMIN-rollen
  2. En Brevo-konto med API-adgang
  3. En Tajo-konto med konnektorrettigheder
  4. Et dedikeret Snowflake-warehouse til integrationsforespørgsler
  5. En netværkspolitik, der tillader Tajos IP-adresser

Autentifikation

Nøglepar-autentifikation (anbefalet)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0-autentifikation

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

SQL API-autentifikation

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Konfiguration

Grundlæggende opsætning

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Feltmapping

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API-endpoints

EndpointMetodeBeskrivelse
/api/v2/statementsPOSTIndsend SQL-sætninger til udførelse
/api/v2/statements/{statementHandle}GETTjek udførelsesstatus
/api/v2/statements/{statementHandle}/cancelPOSTAnnullér en kørende sætning
/api/v2/statements/{statementHandle}?partition={id}GETHent resultatpartitioner

SQL API-partitioner

Snowflake SQL API returnerer store resultatsæt i partitioner. Hver partition indeholder op til ca. 12 MB data. Brug partitionsparameteren til at iterere gennem resultater.

Kodeeksempler

Initialisér konnektor

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Synkronisér kundesegmenter via SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL-pipeline

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Ratebegrænsninger

RessourceGrænseNoter
Samtidige SQL API-forespørgsler20 per brugerPer Snowflake-konto
SQL API-resultatstørrelse12 MB per partitionPaginér med partitions-id’er
Sætningstimeout172.800 sek. (48 t)Konfigurerbar per forespørgsel
API-forespørgslerVarierer efter planBaseret på Snowflake-udgave

Warehouse-omkostninger

Snowflake afregner baseret på computetid. Brug et dedikeret, passende dimensioneret warehouse til Tajo-forespørgsler, og aktivér auto-suspend for at minimere omkostninger.

Fejlfinding

ProblemÅrsagLøsning
Autentifikation mislykkedesUdløbet JWT-tokenRegenerér JWT med gyldig udløbstid
Query-timeoutStort datasætTilføj filtre eller brug inkrementel synkronisering
NetværksfejlIP ikke på whitelistTilføj Tajo-IP’er til Snowflakes netværkspolitik
Manglende kolonnerSkemaændringOpdatér feltmapping-konfigurationen
PartitionsfejlResultat for stortBehandl resultater i mindre partitioner

Fejlfindingstilstand

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Best practices

  1. Brug et dedikeret warehouse - Undgå konflikt med produktionsworkloads
  2. Implementér inkrementel synkronisering - Forespørg kun ændrede poster siden sidste synkronisering
  3. Aktivér auto-suspend - Konfigurér warehouse til at suspendere efter 5 minutters inaktivitet
  4. Brug nøglepar-autentifikation - Foretræk nøglepar frem for adgangskode-autentifikation
  5. Optimér forespørgsler - Filtrér og projicér kun nødvendige kolonner for hurtigere synkroniseringer
  6. Overvåg credits - Spor Snowflake-creditforbrug for integrationsforespørgsler

Sikkerhed

  • Nøglepar-autentifikation - RSA 2048-bit-kryptering til API-adgang
  • OAuth 2.0 - Tokenbaseret autentifikation med rollescoping
  • Netværkspolitikker - IP-allowlisting for Tajo-serviceendpoints
  • Rollebaseret adgang - Dedikeret Snowflake-rolle med minimumsrettigheder
  • Krypteret dataoverførsel - TLS 1.2+ til al API-kommunikation
  • Datamaskering - Brug Snowflakes dynamiske datamaskering til følsomme felter

Relaterede ressourcer

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI-assistent

Hej! Spørg mig om dokumentationen.