Snowflake कनेक्टर
अपने data warehouse से customer segments sync करने, analytics data के साथ contact profiles को समृद्ध करने, और warehouse-driven insights के साथ personalized marketing campaigns चलाने हेतु Tajo के माध्यम से Snowflake को Brevo से कनेक्ट करें।
अवलोकन
| Property | Value |
|---|---|
| Platform | Snowflake |
| Category | Data Warehouse (Custom) |
| Setup Complexity | Medium |
| Official Integration | No |
| Data Synced | Customers, Segments, Analytics, Events |
| Auth Method | Key Pair / OAuth 2.0 |
विशेषताएं
- Reverse ETL - Snowflake से customer segments को Brevo contact lists में push करें
- Audience sync - targeted campaigns के लिए warehouse-computed audiences sync करें
- Analytics enrichment - computed metrics (LTV, RFM scores) के साथ Brevo contacts को समृद्ध करें
- SQL-आधारित queries - queries को programmatically execute करने के लिए Snowflake SQL REST API का उपयोग करें
- Scheduled sync - configurable intervals पर automated data pipelines चलाएं
- Multi-statement support - एकल API calls में जटिल data transformations execute करें
पूर्वावश्यकताएं
शुरू करने से पहले, सुनिश्चित करें कि आपके पास हैं:
- ACCOUNTADMIN या SYSADMIN role वाला एक Snowflake account
- API access वाला एक Brevo account
- connector permissions वाला एक Tajo account
- integration queries के लिए एक समर्पित Snowflake warehouse
- Tajo IP addresses की अनुमति देने वाली Network policy
प्रमाणीकरण
Key Pair Authentication (अनुशंसित)
# RSA key pair generate करेंopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Snowflake user को public key assign करें# Snowflake में:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 Authentication
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL API Authentication
# SQL API के साथ JWT token का उपयोगcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'कॉन्फ़िगरेशन
बेसिक सेटअप
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # हर 6 घंटे में
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncField Mapping
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI Endpoints
| Endpoint | Method | विवरण |
|---|---|---|
/api/v2/statements | POST | execution के लिए SQL statements submit करें |
/api/v2/statements/{statementHandle} | GET | execution status जांचें |
/api/v2/statements/{statementHandle}/cancel | POST | running statement cancel करें |
/api/v2/statements/{statementHandle}?partition={id} | GET | result partitions पुनः प्राप्त करें |
SQL API Partitions
Snowflake SQL API बड़े result sets को partitions में लौटाता है। प्रत्येक partition में लगभग 12MB तक data होता है। results के माध्यम से iterate करने के लिए partition parameter का उपयोग करें।
कोड उदाहरण
कनेक्टर शुरू करें
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});SQL API के माध्यम से Customer Segments Sync करें
// Snowflake SQL REST API के माध्यम से SQL query execute करेंconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// results के लिए poll करेंlet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Tajo के माध्यम से Brevo से sync करेंfor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL Pipeline
// computed audiences को Snowflake से Brevo lists में push करेंawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Rate Limits
| संसाधन | Limit | नोट्स |
|---|---|---|
| SQL API concurrent queries | प्रति user 20 | प्रति Snowflake account |
| SQL API result size | प्रति partition 12MB | partition IDs के साथ paginate करें |
| Statement timeout | 172,800 सेकंड (48h) | प्रति query configurable |
| API requests | Plan के अनुसार भिन्न | Snowflake edition पर आधारित |
Warehouse Costs
Snowflake compute time के आधार पर charge करता है। Tajo queries के लिए एक समर्पित, उपयुक्त आकार के warehouse का उपयोग करें और लागत को न्यूनतम करने के लिए auto-suspend सेट करें।
समस्या निवारण
| समस्या | कारण | समाधान |
|---|---|---|
| Authentication failed | Expired JWT token | valid expiration के साथ JWT पुनः generate करें |
| Query timeout | बड़ा dataset | filters जोड़ें या incremental sync का उपयोग करें |
| Network error | IP whitelisted नहीं | Snowflake network policy में Tajo IPs जोड़ें |
| गायब columns | Schema change | field mapping configuration अपडेट करें |
| Partition error | Result बहुत बड़ा | छोटे partitions में results process करें |
Debug Mode
connectors: snowflake: debug: true log_level: verbose log_queries: trueसर्वोत्तम प्रथाएं
- एक समर्पित warehouse का उपयोग करें - production workloads के साथ contention से बचें
- Incremental sync लागू करें - केवल last sync के बाद से बदले गए records क्वेरी करें
- Auto-suspend सेट करें - warehouse को 5 मिनट की inactivity के बाद suspend होने के लिए कॉन्फ़िगर करें
- Key pair auth का उपयोग करें - password authentication की तुलना में key pair को प्राथमिकता दें
- Queries को अनुकूलित करें - तेज syncs के लिए केवल आवश्यक columns को filter और project करें
- Credits monitor करें - integration queries के लिए Snowflake credit consumption ट्रैक करें
सुरक्षा
- Key pair authentication - API access के लिए RSA 2048-bit encryption
- OAuth 2.0 - role scoping के साथ Token-आधारित authentication
- Network policies - Tajo service endpoints के लिए IP allowlisting
- Role-आधारित access - न्यूनतम आवश्यक privileges के साथ समर्पित Snowflake role
- Encrypted data transfer - सभी API communications के लिए TLS 1.2+
- Data masking - sensitive fields के लिए Snowflake dynamic data masking का उपयोग करें