Snowflake Konektor
Povežite Snowflake s Brevom putem Taja kako biste sinkronizirali korisničke segmente iz vašeg skladišta podataka, obogatili profile kontakata analitičkim podacima i pokrenuli personalizirane marketinške kampanje uz uvide vođene skladištem.
Pregled
| Svojstvo | Vrijednost |
|---|---|
| Platforma | Snowflake |
| Kategorija | Skladište podataka (Prilagođeno) |
| Složenost postavljanja | Srednje |
| Službena integracija | Ne |
| Sinkronizirani podaci | Korisnici, Segmenti, Analitika, Događaji |
| Metoda autentifikacije | Par ključeva / OAuth 2.0 |
Značajke
- Reverse ETL - Guranje korisničkih segmenata iz Snowflakea na Brevo popise kontakata
- Sinkronizacija publike - Sinkronizacija publika izračunatih u skladištu za ciljane kampanje
- Obogaćivanje analitike - Obogatite Brevo kontakte izračunatim metrikama (LTV, RFM ocjene)
- Upiti temeljeni na SQL-u - Koristite Snowflake SQL REST API za programatsko izvršavanje upita
- Zakazana sinkronizacija - Pokretanje automatiziranih cjevovoda podataka u konfigurabilnim intervalima
- Podrška za višestruke naredbe - Izvršavanje složenih transformacija podataka u jednim API pozivima
Preduvjeti
Prije nego što počnete, osigurajte da imate:
- Snowflake račun s ulogom ACCOUNTADMIN ili SYSADMIN
- Brevo račun s API pristupom
- Tajo račun s dozvolama konektora
- Namjensko Snowflake skladište za integracijske upite
- Mrežnu politiku koja dopušta Tajo IP adrese
Autentifikacija
Autentifikacija parom ključeva (Preporučeno)
# Generirajte RSA par ključevaopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Dodijelite javni ključ Snowflake korisniku# U Snowflakeu:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 autentifikacija
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL API autentifikacija
# Korištenje JWT tokena s SQL API-jemcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfiguracija
Osnovna konfiguracija
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Svakih 6 sati
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapiranje polja
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI krajnje točke
| Krajnja točka | Metoda | Opis |
|---|---|---|
/api/v2/statements | POST | Pošaljite SQL naredbe za izvršavanje |
/api/v2/statements/{statementHandle} | GET | Provjerite status izvršavanja |
/api/v2/statements/{statementHandle}/cancel | POST | Otkažite pokrenenu naredbu |
/api/v2/statements/{statementHandle}?partition={id} | GET | Dohvatite particije rezultata |
SQL API particije
Snowflake SQL API vraća velike skupove rezultata u particijama. Svaka particija sadrži do otprilike 12 MB podataka. Koristite parametar particije za iteraciju kroz rezultate.
Primjeri koda
Inicijalizacija konektora
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Sinkronizacija korisničkih segmenata putem SQL API-ja
// Izvršite SQL upit putem Snowflake SQL REST API-jaconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Anktirajte rezultatelet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sinkronizacija u Brevo putem Tajafor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL cjevovod
// Guranje izračunatih publika iz Snowflakea na Brevo popiseawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Ograničenja brzine
| Resurs | Ograničenje | Napomene |
|---|---|---|
| SQL API istovremeni upiti | 20 po korisniku | Po Snowflake računu |
| Veličina rezultata SQL API-ja | 12 MB po particiji | Paginirajte s ID-ovima particija |
| Istek naredbe | 172.800 s (48 h) | Konfigurabilno po upitu |
| API zahtjevi | Varira ovisno o planu | Temelji se na Snowflake izdanju |
Troškovi skladišta
Snowflake naplaćuje prema vremenu računanja. Koristite namjensko, odgovarajuće veličine skladište za Tajo upite i postavite automatsko obustavljanje za minimiziranje troškova.
Rješavanje problema
| Problem | Uzrok | Rješenje |
|---|---|---|
| Autentifikacija neuspješna | Istekli JWT token | Regenerirajte JWT s valjanim istekom |
| Istek upita | Veliki skup podataka | Dodajte filtre ili koristite inkrementalnu sinkronizaciju |
| Mrežna greška | IP nije na listi dopuštenih | Dodajte Tajo IP-ove u Snowflake mrežnu politiku |
| Nedostaju stupci | Promjena sheme | Ažurirajte konfiguraciju mapiranja polja |
| Greška particije | Rezultat prevelik | Obradite rezultate u manjim particijama |
Način otklanjanja grešaka
connectors: snowflake: debug: true log_level: verbose log_queries: truePreporučene prakse
- Koristite namjensko skladište - Izbjegavajte sukob s produkcijskim radnim opterećenjima
- Implementirajte inkrementalnu sinkronizaciju - Upitujte samo promijenjene zapise od posljednje sinkronizacije
- Postavite automatsko obustavljanje - Konfigurirajte skladište da se obustavi nakon 5 minuta neaktivnosti
- Koristite autentifikaciju parom ključeva - Preferirajte par ključeva umjesto autentifikacije lozinkom
- Optimizirajte upite - Filtrirajte i projicirajte samo potrebne stupce za brže sinkronizacije
- Pratite kredite - Pratite potrošnju Snowflake kredita za integracijske upite
Sigurnost
- Autentifikacija parom ključeva - RSA 2048-bitna enkripcija za API pristup
- OAuth 2.0 - Autentifikacija temeljena na tokenu s opsežiranjem uloga
- Mrežne politike - IP lista dopuštenih za Tajo uslužne krajnje točke
- Pristup temeljen na ulogama - Namjenski Snowflake role s minimalnim potrebnim dozvolama
- Šifrirani prijenos podataka - TLS 1.2+ za svu API komunikaciju
- Maskiranje podataka - Koristite Snowflake dinamičko maskiranje podataka za osjetljiva polja