Connettore Snowflake
Collega Snowflake a Brevo tramite Tajo per sincronizzare i segmenti cliente dal tuo data warehouse, arricchire i profili dei contatti con dati analitici e alimentare campagne marketing personalizzate con insight dal warehouse.
Panoramica
| Proprietà | Valore |
|---|---|
| Piattaforma | Snowflake |
| Categoria | Data Warehouse (Custom) |
| Complessità di setup | Media |
| Integrazione ufficiale | No |
| Dati sincronizzati | Clienti, Segmenti, Analytics, Eventi |
| Metodo di autenticazione | Key Pair / OAuth 2.0 |
Funzionalità
- Reverse ETL - Invia segmenti cliente da Snowflake alle liste contatti Brevo
- Sync delle audience - Sincronizza audience computate nel warehouse per campagne mirate
- Arricchimento analytics - Arricchisci i contatti Brevo con metriche computate (LTV, score RFM)
- Query SQL-based - Usa la REST API SQL di Snowflake per eseguire query in modo programmatico
- Sync pianificato - Esegui pipeline dati automatizzate a intervalli configurabili
- Supporto multi-statement - Esegui trasformazioni dati complesse in singole chiamate API
Prerequisiti
Prima di iniziare, assicurati di avere:
- Un account Snowflake con ruolo ACCOUNTADMIN o SYSADMIN
- Un account Brevo con accesso API
- Un account Tajo con permessi sui connettori
- Un warehouse Snowflake dedicato per le query di integrazione
- Una network policy che consente gli indirizzi IP Tajo
Autenticazione
Autenticazione Key Pair (consigliata)
# Genera la coppia di chiavi RSAopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assegna la chiave pubblica all'utente Snowflake# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Autenticazione OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Autenticazione SQL API
# Usando un token JWT con la SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Configurazione
Setup di base
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Ogni 6 ore
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapping dei campi
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEEndpoint API
| Endpoint | Metodo | Descrizione |
|---|---|---|
/api/v2/statements | POST | Invia statement SQL per l’esecuzione |
/api/v2/statements/{statementHandle} | GET | Controlla lo stato di esecuzione |
/api/v2/statements/{statementHandle}/cancel | POST | Annulla uno statement in esecuzione |
/api/v2/statements/{statementHandle}?partition={id} | GET | Recupera partizioni di risultato |
Partizioni SQL API
La SQL API di Snowflake restituisce result set grandi in partizioni. Ogni partizione contiene fino a circa 12MB di dati. Usa il parametro partition per iterare sui risultati.
Esempi di codice
Inizializzare il connettore
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Sincronizzare i segmenti cliente tramite SQL API
// Esegue una query SQL tramite la REST API SQL di Snowflakeconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Polling per i risultatilet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sincronizza su Brevo tramite Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Pipeline Reverse ETL
// Invia audience computate da Snowflake alle liste Brevoawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Limiti di rate
| Risorsa | Limite | Note |
|---|---|---|
| Query concorrenti SQL API | 20 per utente | Per account Snowflake |
| Dimensione risultato SQL API | 12MB per partizione | Pagina con ID partizione |
| Statement timeout | 172.800 sec (48h) | Configurabile per query |
| Richieste API | Variabile per piano | In base all’edizione Snowflake |
Costi del warehouse
Snowflake addebita in base al tempo di compute. Usa un warehouse dedicato e dimensionato adeguatamente per le query Tajo e imposta l’auto-suspend per minimizzare i costi.
Risoluzione dei problemi
| Problema | Causa | Soluzione |
|---|---|---|
| Autenticazione fallita | Token JWT scaduto | Rigenera il JWT con scadenza valida |
| Timeout della query | Dataset grande | Aggiungi filtri o usa sync incrementale |
| Errore di rete | IP non in whitelist | Aggiungi gli IP Tajo alla network policy di Snowflake |
| Colonne mancanti | Cambio di schema | Aggiorna la configurazione del field mapping |
| Errore di partizione | Risultato troppo grande | Elabora i risultati in partizioni più piccole |
Modalità debug
connectors: snowflake: debug: true log_level: verbose log_queries: trueBest practice
- Usa un warehouse dedicato - Evita contesa con i workload di produzione
- Implementa sync incrementale - Interroga solo i record modificati dal sync precedente
- Imposta auto-suspend - Configura il warehouse per sospendersi dopo 5 minuti di inattività
- Usa autenticazione key pair - Preferisci key pair all’autenticazione via password
- Ottimizza le query - Filtra e proietta solo le colonne necessarie per sync più veloci
- Monitora i crediti - Traccia il consumo di crediti Snowflake per le query di integrazione
Sicurezza
- Autenticazione key pair - Cifratura RSA 2048-bit per l’accesso API
- OAuth 2.0 - Autenticazione basata su token con scope di ruolo
- Network policy - IP allowlisting per gli endpoint del servizio Tajo
- Accesso basato su ruolo - Ruolo Snowflake dedicato con privilegi minimi necessari
- Trasferimento dati cifrato - TLS 1.2+ per tutte le comunicazioni API
- Data masking - Usa il dynamic data masking di Snowflake per campi sensibili