Snowflake konektor

Prepoj Snowflake s Brevo cez Tajo na synchronizáciu zákazníckych segmentov z dátového skladu, obohacovanie profilov kontaktov o analytické dáta a poháňanie personalizovaných marketingových kampaní poznatkami zo skladu.

Prehľad

VlastnosťHodnota
PlatformaSnowflake
KategóriaDátový sklad (Vlastná)
Zložitosť nastaveniaStredná
Oficiálna integráciaNie
Synchronizované dátaZákazníci, Segmenty, Analytika, Udalosti
Metóda autentifikácieKey Pair / OAuth 2.0

Funkcie

  • Reverse ETL - Prenášaj zákaznícke segmenty zo Snowflake do zoznamov kontaktov Brevo
  • Synchronizácia publík - Synchronizuj publíká vypočítané v sklade pre cielené kampane
  • Obohacovanie o analytiku - Obohacuj kontakty Brevo o vypočítané metriky (LTV, RFM skóre)
  • Dopyty v SQL - Použi Snowflake SQL REST API na programové spúšťanie dopytov
  • Plánovaná synchronizácia - Spúšťaj automatizované datové pipeline v konfigurovateľných intervaloch
  • Podpora viacerých príkazov - Spúšťaj komplexné transformácie dát v jednom API volaní

Predpoklady

Predtým, než začneš, uisti sa, že máš:

  1. Snowflake účet s rolou ACCOUNTADMIN alebo SYSADMIN
  2. Brevo účet s API prístupom
  3. Tajo účet s oprávneniami pre konektory
  4. Dedikovaný Snowflake warehouse pre integračné dopyty
  5. Sieťovú politiku povoľujúcu IP adresy Tajo

Autentifikácia

Autentifikácia pomocou Key Pair (odporúčané)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0 autentifikácia

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Autentifikácia SQL API

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Konfigurácia

Základné nastavenie

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Mapovanie polí

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API endpointy

EndpointMetódaPopis
/api/v2/statementsPOSTOdoslanie SQL príkazov na spustenie
/api/v2/statements/{statementHandle}GETKontrola stavu vykonávania
/api/v2/statements/{statementHandle}/cancelPOSTZrušenie bežiaceho príkazu
/api/v2/statements/{statementHandle}?partition={id}GETNačítanie partícií výsledkov

Partície SQL API

Snowflake SQL API vracia veľké výsledkové sady v partíciách. Každá partícia obsahuje približne 12 MB dát. Použi parameter partition na iteráciu cez výsledky.

Ukážky kódu

Inicializácia konektora

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Synchronizácia zákazníckych segmentov cez SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL pipeline

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Obmedzenia rýchlosti

ZdrojLimitPoznámky
Súbežné dopyty SQL API20 na používateľaNa Snowflake účet
Veľkosť výsledkov SQL API12 MB na partíciuStránkovanie pomocou ID partícií
Timeout príkazu172 800 sek (48 hod)Konfigurovateľné na dopyt
API požiadavkyZávisí od plánuPodľa edície Snowflake

Náklady na warehouse

Snowflake účtuje poplatky na základe výpočtového času. Použi dedikovaný, vhodne veľký warehouse pre dopyty Tajo a nastav auto-suspend na minimalizáciu nákladov.

Riešenie problémov

ProblémPríčinaRiešenie
Autentifikácia zlyhalaVypršaný JWT tokenZnovu vygeneruj JWT s platnou expiráciou
Timeout dopytuVeľký datasetPridaj filtre alebo použi inkrementálnu synchronizáciu
Chyba sieteIP nie je na zozname povolenýchPridaj IP adresy Tajo do sieťovej politiky Snowflake
Chýbajúce stĺpceZmena schémyAktualizuj konfiguráciu mapovania polí
Chyba partícieVýsledok je príliš veľkýSpracovávaj výsledky v menších partíciách

Debug režim

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Odporúčané postupy

  1. Použi dedikovaný warehouse - Vyhni sa súťaženiu s produkčnými záťažami
  2. Implementuj inkrementálnu synchronizáciu - Dopytuj iba zmenené záznamy od poslednej synchronizácie
  3. Nastav auto-suspend - Nakonfiguruj warehouse na pozastavenie po 5 minútach nečinnosti
  4. Použi autentifikáciu pomocou key pair - Uprednostni key pair pred autentifikáciou heslom
  5. Optimalizuj dopyty - Filtruj a projektuj iba potrebné stĺpce pre rýchlejšie synchronizácie
  6. Monitoruj kredity - Sleduj spotrebu kreditov Snowflake pre integračné dopyty

Bezpečnosť

  • Autentifikácia pomocou key pair - RSA 2048-bit šifrovanie pre API prístup
  • OAuth 2.0 - Autentifikácia na základe tokenov so scopingom rolí
  • Sieťové politiky - IP allowlisting pre endpointy Tajo
  • Prístup na základe rolí - Dedikovaná Snowflake rola s minimálnymi potrebnými oprávneniami
  • Šifrovaný prenos dát - TLS 1.2+ pre všetku API komunikáciu
  • Maskovanie dát - Použi dynamické maskovanie dát Snowflake pre citlivé polia

Súvisiace zdroje

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI asistent

Ahoj! Opýtajte sa ma na dokumentáciu.