Snowflake konektor
Prepoj Snowflake s Brevo cez Tajo na synchronizáciu zákazníckych segmentov z dátového skladu, obohacovanie profilov kontaktov o analytické dáta a poháňanie personalizovaných marketingových kampaní poznatkami zo skladu.
Prehľad
| Vlastnosť | Hodnota |
|---|---|
| Platforma | Snowflake |
| Kategória | Dátový sklad (Vlastná) |
| Zložitosť nastavenia | Stredná |
| Oficiálna integrácia | Nie |
| Synchronizované dáta | Zákazníci, Segmenty, Analytika, Udalosti |
| Metóda autentifikácie | Key Pair / OAuth 2.0 |
Funkcie
- Reverse ETL - Prenášaj zákaznícke segmenty zo Snowflake do zoznamov kontaktov Brevo
- Synchronizácia publík - Synchronizuj publíká vypočítané v sklade pre cielené kampane
- Obohacovanie o analytiku - Obohacuj kontakty Brevo o vypočítané metriky (LTV, RFM skóre)
- Dopyty v SQL - Použi Snowflake SQL REST API na programové spúšťanie dopytov
- Plánovaná synchronizácia - Spúšťaj automatizované datové pipeline v konfigurovateľných intervaloch
- Podpora viacerých príkazov - Spúšťaj komplexné transformácie dát v jednom API volaní
Predpoklady
Predtým, než začneš, uisti sa, že máš:
- Snowflake účet s rolou ACCOUNTADMIN alebo SYSADMIN
- Brevo účet s API prístupom
- Tajo účet s oprávneniami pre konektory
- Dedikovaný Snowflake warehouse pre integračné dopyty
- Sieťovú politiku povoľujúcu IP adresy Tajo
Autentifikácia
Autentifikácia pomocou Key Pair (odporúčané)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 autentifikácia
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Autentifikácia SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfigurácia
Základné nastavenie
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapovanie polí
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI endpointy
| Endpoint | Metóda | Popis |
|---|---|---|
/api/v2/statements | POST | Odoslanie SQL príkazov na spustenie |
/api/v2/statements/{statementHandle} | GET | Kontrola stavu vykonávania |
/api/v2/statements/{statementHandle}/cancel | POST | Zrušenie bežiaceho príkazu |
/api/v2/statements/{statementHandle}?partition={id} | GET | Načítanie partícií výsledkov |
Partície SQL API
Snowflake SQL API vracia veľké výsledkové sady v partíciách. Každá partícia obsahuje približne 12 MB dát. Použi parameter partition na iteráciu cez výsledky.
Ukážky kódu
Inicializácia konektora
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Synchronizácia zákazníckych segmentov cez SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL pipeline
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Obmedzenia rýchlosti
| Zdroj | Limit | Poznámky |
|---|---|---|
| Súbežné dopyty SQL API | 20 na používateľa | Na Snowflake účet |
| Veľkosť výsledkov SQL API | 12 MB na partíciu | Stránkovanie pomocou ID partícií |
| Timeout príkazu | 172 800 sek (48 hod) | Konfigurovateľné na dopyt |
| API požiadavky | Závisí od plánu | Podľa edície Snowflake |
Náklady na warehouse
Snowflake účtuje poplatky na základe výpočtového času. Použi dedikovaný, vhodne veľký warehouse pre dopyty Tajo a nastav auto-suspend na minimalizáciu nákladov.
Riešenie problémov
| Problém | Príčina | Riešenie |
|---|---|---|
| Autentifikácia zlyhala | Vypršaný JWT token | Znovu vygeneruj JWT s platnou expiráciou |
| Timeout dopytu | Veľký dataset | Pridaj filtre alebo použi inkrementálnu synchronizáciu |
| Chyba siete | IP nie je na zozname povolených | Pridaj IP adresy Tajo do sieťovej politiky Snowflake |
| Chýbajúce stĺpce | Zmena schémy | Aktualizuj konfiguráciu mapovania polí |
| Chyba partície | Výsledok je príliš veľký | Spracovávaj výsledky v menších partíciách |
Debug režim
connectors: snowflake: debug: true log_level: verbose log_queries: trueOdporúčané postupy
- Použi dedikovaný warehouse - Vyhni sa súťaženiu s produkčnými záťažami
- Implementuj inkrementálnu synchronizáciu - Dopytuj iba zmenené záznamy od poslednej synchronizácie
- Nastav auto-suspend - Nakonfiguruj warehouse na pozastavenie po 5 minútach nečinnosti
- Použi autentifikáciu pomocou key pair - Uprednostni key pair pred autentifikáciou heslom
- Optimalizuj dopyty - Filtruj a projektuj iba potrebné stĺpce pre rýchlejšie synchronizácie
- Monitoruj kredity - Sleduj spotrebu kreditov Snowflake pre integračné dopyty
Bezpečnosť
- Autentifikácia pomocou key pair - RSA 2048-bit šifrovanie pre API prístup
- OAuth 2.0 - Autentifikácia na základe tokenov so scopingom rolí
- Sieťové politiky - IP allowlisting pre endpointy Tajo
- Prístup na základe rolí - Dedikovaná Snowflake rola s minimálnymi potrebnými oprávneniami
- Šifrovaný prenos dát - TLS 1.2+ pre všetku API komunikáciu
- Maskovanie dát - Použi dynamické maskovanie dát Snowflake pre citlivé polia