Snowflake конектор

Свържи Snowflake с Brevo през Tajo, за да синхронизираш клиентски сегменти от своя data warehouse, да обогатиш профилите на контактите с аналитични данни и да захраниш персонализирани маркетингови кампании с инсайти, задвижвани от warehouse.

Преглед

СвойствоСтойност
ПлатформаSnowflake
КатегорияData Warehouse (персонализирана)
Сложност на настройкаСредна
Официална интеграцияНе
Синхронизирани данниКлиенти, сегменти, аналитика, събития
Метод на автентикацияKey Pair / OAuth 2.0

Функции

  • Reverse ETL – Изпращай клиентски сегменти от Snowflake към списъци с контакти в Brevo
  • Синхронизация на audience-и – Синхронизирай изчислени в warehouse audience-и за таргетирани кампании
  • Обогатяване с аналитика – Обогати Brevo контактите с изчислени метрики (LTV, RFM скорове)
  • SQL-базирани заявки – Използвай Snowflake SQL REST API, за да изпълняваш заявки програмно
  • Планирана синхронизация – Изпълнявай автоматизирани data pipelines на конфигурируеми интервали
  • Multi-statement поддръжка – Изпълнявай сложни data трансформации в едно API извикване

Предварителни условия

Преди да започнеш, увери се, че имаш:

  1. Snowflake акаунт с роля ACCOUNTADMIN или SYSADMIN
  2. Акаунт в Brevo с API достъп
  3. Акаунт в Tajo с разрешения за конектори
  4. Специален Snowflake warehouse за интеграционни заявки
  5. Network policy, позволяваща IP адресите на Tajo

Автентикация

Key Pair автентикация (препоръчително)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0 автентикация

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Автентикация на SQL API

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Конфигурация

Основна настройка

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Мапване на полета

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API крайни точки

Крайна точкаМетодОписание
/api/v2/statementsPOSTИзпрати SQL statements за изпълнение
/api/v2/statements/{statementHandle}GETПровери статуса на изпълнение
/api/v2/statements/{statementHandle}/cancelPOSTОткажи изпълняващ се statement
/api/v2/statements/{statementHandle}?partition={id}GETИзвлечи партиции с резултати

Партиции в SQL API

Snowflake SQL API връща големи резултатни набори в партиции. Всяка партиция съдържа до приблизително 12MB данни. Използвай параметъра за партиция, за да итерираш през резултатите.

Примери с код

Инициализация на конектора

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Синхронизация на клиентски сегменти чрез SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL Pipeline

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Ограничения на заявките

РесурсЛимитБележки
Едновременни заявки в SQL API20 на потребителНа Snowflake акаунт
Размер на резултата в SQL API12MB на партицияПагинирай с partition IDs
Timeout на statement172 800 сек (48 ч)Конфигурируем на заявка
API заявкиВарира според планаБазира се на Snowflake edition

Разходи за warehouse

Snowflake таксува на базата на compute време. Използвай специален, подходящо оразмерен warehouse за Tajo заявките и настрой auto-suspend, за да минимизираш разходите.

Отстраняване на проблеми

ПроблемПричинаРешение
Неуспешна автентикацияИзтекъл JWT tokenРегенерирай JWT с валиден срок на валидност
Timeout на заявкаГолям datasetДобави филтри или използвай инкрементална синхронизация
Мрежова грешкаIP не е в whitelistДобави IP-тата на Tajo в network policy на Snowflake
Липсващи колониПромяна в схематаОбнови конфигурацията за мапване на полета
Грешка в партицияРезултатът е твърде голямОбработвай резултатите в по-малки партиции

Режим за дебъг

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Добри практики

  1. Използвай специален warehouse – Избягвай конкуренция с production workloads
  2. Внедри инкрементална синхронизация – Заявявай само променените записи от последната синхронизация
  3. Настрой auto-suspend – Конфигурирай warehouse да се суспендира след 5 минути неактивност
  4. Използвай key pair автентикация – Предпочитай key pair пред автентикация с парола
  5. Оптимизирай заявките – Филтрирай и заявявай само необходимите колони за по-бързи синхронизации
  6. Мониторирай кредитите – Проследявай потреблението на Snowflake кредити за интеграционни заявки

Сигурност

  • Key pair автентикация – RSA 2048-битово криптиране за API достъп
  • OAuth 2.0 – Token-базирана автентикация със scoping по роля
  • Network policies – IP allowlisting за крайните точки на услугата Tajo
  • Достъп на базата на роли – Специална Snowflake роля с минимално необходимите права
  • Криптиран трансфер на данни – TLS 1.2+ за всички API комуникации
  • Маскиране на данни – Използвай Snowflake dynamic data masking за чувствителни полета

Свързани ресурси

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI асистент

Здравейте! Попитайте ме за документацията.