Snowflake Connector
Forbind Snowflake til Brevo via Tajo for at synkronisere kundesegmenter fra dit data warehouse, berige kontaktprofiler med analysedata og drive personaliserede marketingkampagner med warehouse-baserede indsigter.
Oversigt
| Egenskab | Værdi |
|---|---|
| Platform | Snowflake |
| Kategori | Data warehouse (brugerdefineret) |
| Opsætningskompleksitet | Mellem |
| Officiel integration | Nej |
| Synkroniserede data | Kunder, segmenter, analyser, hændelser |
| Autentifikationsmetode | Nøglepar / OAuth 2.0 |
Funktioner
- Reverse ETL - Skub kundesegmenter fra Snowflake til Brevo-kontaktlister
- Målgruppe-synkronisering - Synkronisér warehouse-beregnede målgrupper til målrettede kampagner
- Analyseberigelse - Berig Brevo-kontakter med beregnede metrikker (LTV, RFM-scores)
- SQL-baserede forespørgsler - Brug Snowflake SQL REST API til at udføre forespørgsler programmatisk
- Planlagt synkronisering - Kør automatiserede datapipelines på konfigurerbare intervaller
- Flerudsagns-understøttelse - Udfør komplekse datatransformationer i enkelte API-kald
Forudsætninger
Før du begynder, skal du sikre dig, at du har:
- En Snowflake-konto med ACCOUNTADMIN- eller SYSADMIN-rollen
- En Brevo-konto med API-adgang
- En Tajo-konto med konnektorrettigheder
- Et dedikeret Snowflake-warehouse til integrationsforespørgsler
- En netværkspolitik, der tillader Tajos IP-adresser
Autentifikation
Nøglepar-autentifikation (anbefalet)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0-autentifikation
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL API-autentifikation
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfiguration
Grundlæggende opsætning
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncFeltmapping
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI-endpoints
| Endpoint | Metode | Beskrivelse |
|---|---|---|
/api/v2/statements | POST | Indsend SQL-sætninger til udførelse |
/api/v2/statements/{statementHandle} | GET | Tjek udførelsesstatus |
/api/v2/statements/{statementHandle}/cancel | POST | Annullér en kørende sætning |
/api/v2/statements/{statementHandle}?partition={id} | GET | Hent resultatpartitioner |
SQL API-partitioner
Snowflake SQL API returnerer store resultatsæt i partitioner. Hver partition indeholder op til ca. 12 MB data. Brug partitionsparameteren til at iterere gennem resultater.
Kodeeksempler
Initialisér konnektor
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Synkronisér kundesegmenter via SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL-pipeline
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Ratebegrænsninger
| Ressource | Grænse | Noter |
|---|---|---|
| Samtidige SQL API-forespørgsler | 20 per bruger | Per Snowflake-konto |
| SQL API-resultatstørrelse | 12 MB per partition | Paginér med partitions-id’er |
| Sætningstimeout | 172.800 sek. (48 t) | Konfigurerbar per forespørgsel |
| API-forespørgsler | Varierer efter plan | Baseret på Snowflake-udgave |
Warehouse-omkostninger
Snowflake afregner baseret på computetid. Brug et dedikeret, passende dimensioneret warehouse til Tajo-forespørgsler, og aktivér auto-suspend for at minimere omkostninger.
Fejlfinding
| Problem | Årsag | Løsning |
|---|---|---|
| Autentifikation mislykkedes | Udløbet JWT-token | Regenerér JWT med gyldig udløbstid |
| Query-timeout | Stort datasæt | Tilføj filtre eller brug inkrementel synkronisering |
| Netværksfejl | IP ikke på whitelist | Tilføj Tajo-IP’er til Snowflakes netværkspolitik |
| Manglende kolonner | Skemaændring | Opdatér feltmapping-konfigurationen |
| Partitionsfejl | Resultat for stort | Behandl resultater i mindre partitioner |
Fejlfindingstilstand
connectors: snowflake: debug: true log_level: verbose log_queries: trueBest practices
- Brug et dedikeret warehouse - Undgå konflikt med produktionsworkloads
- Implementér inkrementel synkronisering - Forespørg kun ændrede poster siden sidste synkronisering
- Aktivér auto-suspend - Konfigurér warehouse til at suspendere efter 5 minutters inaktivitet
- Brug nøglepar-autentifikation - Foretræk nøglepar frem for adgangskode-autentifikation
- Optimér forespørgsler - Filtrér og projicér kun nødvendige kolonner for hurtigere synkroniseringer
- Overvåg credits - Spor Snowflake-creditforbrug for integrationsforespørgsler
Sikkerhed
- Nøglepar-autentifikation - RSA 2048-bit-kryptering til API-adgang
- OAuth 2.0 - Tokenbaseret autentifikation med rollescoping
- Netværkspolitikker - IP-allowlisting for Tajo-serviceendpoints
- Rollebaseret adgang - Dedikeret Snowflake-rolle med minimumsrettigheder
- Krypteret dataoverførsel - TLS 1.2+ til al API-kommunikation
- Datamaskering - Brug Snowflakes dynamiske datamaskering til følsomme felter