Conector Snowflake

Conectați Snowflake la Brevo prin Tajo pentru a sincroniza segmentele de clienți din depozitul de date, a îmbogăți profilurile de contact cu date analitice și a alimenta campanii de marketing personalizate cu informații bazate pe depozit.

Prezentare generală

ProprietateValoare
PlatformăSnowflake
CategorieDepozit de date (Personalizat)
Complexitate configurareMedie
Integrare oficialăNu
Date sincronizateClienți, Segmente, Analize, Evenimente
Metodă de autentificarePereche de chei / OAuth 2.0

Funcționalități

  • Reverse ETL - Transmiteți segmentele de clienți din Snowflake la listele de contacte Brevo
  • Sincronizare audiențe - Sincronizați audiențele calculate în depozit pentru campanii targetate
  • Îmbogățire cu analize - Îmbogățiți contactele Brevo cu metrici calculate (LTV, scoruri RFM)
  • Interogări bazate pe SQL - Utilizați Snowflake SQL REST API pentru a executa interogări programatic
  • Sincronizare programată - Rulați pipeline-uri de date automatizate la intervale configurabile
  • Suport instrucțiuni multiple - Executați transformări complexe de date într-un singur apel API

Cerințe preliminare

Înainte de a începe, asigurați-vă că aveți:

  1. Un cont Snowflake cu rolul ACCOUNTADMIN sau SYSADMIN
  2. Un cont Brevo cu acces API
  3. Un cont Tajo cu permisiuni de conector
  4. Un depozit Snowflake dedicat pentru interogările de integrare
  5. Politică de rețea care permite adresele IP Tajo

Autentificare

Autentificare cu pereche de chei (Recomandat)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

Autentificare OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Autentificare SQL API

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Configurare

Configurare de bază

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Mapare câmpuri

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

Puncte finale API

Punct finalMetodăDescriere
/api/v2/statementsPOSTTrimite instrucțiuni SQL pentru execuție
/api/v2/statements/{statementHandle}GETVerifică statusul execuției
/api/v2/statements/{statementHandle}/cancelPOSTAnulează o instrucțiune în curs
/api/v2/statements/{statementHandle}?partition={id}GETRecuperează partițiile de rezultate

Partiții SQL API

Snowflake SQL API returnează seturi de rezultate mari în partiții. Fiecare partiție conține aproximativ 12MB de date. Utilizați parametrul partition pentru a itera prin rezultate.

Exemple de cod

Inițializare conector

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Sincronizare segmente clienți prin SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Pipeline Reverse ETL

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Limite de rată

ResursăLimităNote
Interogări concurente SQL API20 per utilizatorPer cont Snowflake
Dimensiune rezultat SQL API12MB per partițiePaginați cu ID-uri de partiție
Timeout instrucțiune172.800 sec (48h)Configurabil per interogare
Cereri APIVariază după planBazat pe ediția Snowflake

Costuri depozit

Snowflake taxează în funcție de timpul de calcul. Utilizați un depozit dedicat, de dimensiune corespunzătoare pentru interogările Tajo și setați auto-suspend pentru a minimiza costurile.

Depanare

ProblemăCauzăSoluție
Autentificare eșuatăToken JWT expiratRegenerați JWT cu expirare validă
Timeout interogareSet de date mareAdăugați filtre sau utilizați sincronizare incrementală
Eroare de rețeaIP neadăugat la lista albăAdăugați IP-urile Tajo la politica de rețea Snowflake
Coloane lipsăModificare schemăActualizați configurația mapării câmpurilor
Eroare partițieRezultate prea mariProcesați rezultatele în partiții mai mici

Modul de depanare

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Bune practici

  1. Utilizați un depozit dedicat - Evitați contenciunea cu volumele de lucru de producție
  2. Implementați sincronizare incrementală - Interogați doar înregistrările modificate de la ultima sincronizare
  3. Setați auto-suspend - Configurați depozitul să se suspende după 5 minute de inactivitate
  4. Utilizați autentificarea cu pereche de chei - Preferați perechea de chei față de autentificarea cu parolă
  5. Optimizați interogările - Filtrați și proiectați doar coloanele necesare pentru sincronizări mai rapide
  6. Monitorizați creditele - Urmăriți consumul de credite Snowflake pentru interogările de integrare

Securitate

  • Autentificare cu pereche de chei - Criptare RSA 2048-bit pentru accesul API
  • OAuth 2.0 - Autentificare bazată pe token cu scopare pe rol
  • Politici de rețea - Allowlisting IP pentru punctele finale ale serviciului Tajo
  • Acces bazat pe rol - Rol Snowflake dedicat cu privilegii minime necesare
  • Transfer de date criptat - TLS 1.2+ pentru toate comunicațiile API
  • Mascare date - Utilizați mascarea dinamică a datelor Snowflake pentru câmpurile sensibile

Resurse conexe

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
Asistent AI

Bună! Întreabă-mă orice despre documentație.