Snowflake конектор
Свържи Snowflake с Brevo през Tajo, за да синхронизираш клиентски сегменти от своя data warehouse, да обогатиш профилите на контактите с аналитични данни и да захраниш персонализирани маркетингови кампании с инсайти, задвижвани от warehouse.
Преглед
| Свойство | Стойност |
|---|---|
| Платформа | Snowflake |
| Категория | Data Warehouse (персонализирана) |
| Сложност на настройка | Средна |
| Официална интеграция | Не |
| Синхронизирани данни | Клиенти, сегменти, аналитика, събития |
| Метод на автентикация | Key Pair / OAuth 2.0 |
Функции
- Reverse ETL – Изпращай клиентски сегменти от Snowflake към списъци с контакти в Brevo
- Синхронизация на audience-и – Синхронизирай изчислени в warehouse audience-и за таргетирани кампании
- Обогатяване с аналитика – Обогати Brevo контактите с изчислени метрики (LTV, RFM скорове)
- SQL-базирани заявки – Използвай Snowflake SQL REST API, за да изпълняваш заявки програмно
- Планирана синхронизация – Изпълнявай автоматизирани data pipelines на конфигурируеми интервали
- Multi-statement поддръжка – Изпълнявай сложни data трансформации в едно API извикване
Предварителни условия
Преди да започнеш, увери се, че имаш:
- Snowflake акаунт с роля ACCOUNTADMIN или SYSADMIN
- Акаунт в Brevo с API достъп
- Акаунт в Tajo с разрешения за конектори
- Специален Snowflake warehouse за интеграционни заявки
- Network policy, позволяваща IP адресите на Tajo
Автентикация
Key Pair автентикация (препоръчително)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 автентикация
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Автентикация на SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Конфигурация
Основна настройка
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncМапване на полета
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI крайни точки
| Крайна точка | Метод | Описание |
|---|---|---|
/api/v2/statements | POST | Изпрати SQL statements за изпълнение |
/api/v2/statements/{statementHandle} | GET | Провери статуса на изпълнение |
/api/v2/statements/{statementHandle}/cancel | POST | Откажи изпълняващ се statement |
/api/v2/statements/{statementHandle}?partition={id} | GET | Извлечи партиции с резултати |
Партиции в SQL API
Snowflake SQL API връща големи резултатни набори в партиции. Всяка партиция съдържа до приблизително 12MB данни. Използвай параметъра за партиция, за да итерираш през резултатите.
Примери с код
Инициализация на конектора
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Синхронизация на клиентски сегменти чрез SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL Pipeline
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Ограничения на заявките
| Ресурс | Лимит | Бележки |
|---|---|---|
| Едновременни заявки в SQL API | 20 на потребител | На Snowflake акаунт |
| Размер на резултата в SQL API | 12MB на партиция | Пагинирай с partition IDs |
| Timeout на statement | 172 800 сек (48 ч) | Конфигурируем на заявка |
| API заявки | Варира според плана | Базира се на Snowflake edition |
Разходи за warehouse
Snowflake таксува на базата на compute време. Използвай специален, подходящо оразмерен warehouse за Tajo заявките и настрой auto-suspend, за да минимизираш разходите.
Отстраняване на проблеми
| Проблем | Причина | Решение |
|---|---|---|
| Неуспешна автентикация | Изтекъл JWT token | Регенерирай JWT с валиден срок на валидност |
| Timeout на заявка | Голям dataset | Добави филтри или използвай инкрементална синхронизация |
| Мрежова грешка | IP не е в whitelist | Добави IP-тата на Tajo в network policy на Snowflake |
| Липсващи колони | Промяна в схемата | Обнови конфигурацията за мапване на полета |
| Грешка в партиция | Резултатът е твърде голям | Обработвай резултатите в по-малки партиции |
Режим за дебъг
connectors: snowflake: debug: true log_level: verbose log_queries: trueДобри практики
- Използвай специален warehouse – Избягвай конкуренция с production workloads
- Внедри инкрементална синхронизация – Заявявай само променените записи от последната синхронизация
- Настрой auto-suspend – Конфигурирай warehouse да се суспендира след 5 минути неактивност
- Използвай key pair автентикация – Предпочитай key pair пред автентикация с парола
- Оптимизирай заявките – Филтрирай и заявявай само необходимите колони за по-бързи синхронизации
- Мониторирай кредитите – Проследявай потреблението на Snowflake кредити за интеграционни заявки
Сигурност
- Key pair автентикация – RSA 2048-битово криптиране за API достъп
- OAuth 2.0 – Token-базирана автентикация със scoping по роля
- Network policies – IP allowlisting за крайните точки на услугата Tajo
- Достъп на базата на роли – Специална Snowflake роля с минимално необходимите права
- Криптиран трансфер на данни – TLS 1.2+ за всички API комуникации
- Маскиране на данни – Използвай Snowflake dynamic data masking за чувствителни полета