Snowflake 连接器
通过 Tajo 将 Snowflake 连接到 Brevo,从数据仓库同步客户细分,用分析数据丰富联系人档案,并用仓库驱动的洞察为个性化营销活动赋能。
概览
| 属性 | 值 |
|---|---|
| 平台 | Snowflake |
| 类别 | 数据仓库(自定义) |
| 设置复杂度 | 中等 |
| 官方集成 | 否 |
| 同步数据 | 用户、细分、分析、事件 |
| 认证方式 | 密钥对 / OAuth 2.0 |
功能
- 反向 ETL - 将 Snowflake 中的客户细分推送到 Brevo 联系人列表
- 受众同步 - 同步仓库计算的受众,用于精准活动
- 分析富化 - 用计算指标(LTV、RFM 评分)丰富 Brevo 联系人
- 基于 SQL 的查询 - 使用 Snowflake SQL REST API 以编程方式执行查询
- 定时同步 - 按可配置时间间隔运行自动化数据管道
- 多语句支持 - 在单次 API 调用中执行复杂的数据转换
前提条件
开始之前,请确保您已具备:
- 具有 ACCOUNTADMIN 或 SYSADMIN 角色的 Snowflake 账户
- 具有 API 访问权限的 Brevo 账户
- 具有连接器权限的 Tajo 账户
- 专用于集成查询的 Snowflake 虚拟仓库
- 允许 Tajo IP 地址的网络策略
认证
密钥对认证(推荐)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 认证
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL API 认证
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'配置
基础设置
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_sync字段映射
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI 端点
| 端点 | 方法 | 描述 |
|---|---|---|
/api/v2/statements | POST | 提交 SQL 语句执行 |
/api/v2/statements/{statementHandle} | GET | 检查执行状态 |
/api/v2/statements/{statementHandle}/cancel | POST | 取消正在运行的语句 |
/api/v2/statements/{statementHandle}?partition={id} | GET | 检索结果分区 |
SQL API 分区
Snowflake SQL API 以分区形式返回大型结果集。每个分区包含约 12 MB 的数据。使用分区参数迭代结果。
代码示例
初始化连接器
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});通过 SQL API 同步客户细分
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}反向 ETL 管道
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});速率限制
| 资源 | 限制 | 说明 |
|---|---|---|
| SQL API 并发查询 | 每用户 20 个 | 每个 Snowflake 账户 |
| SQL API 结果大小 | 每分区 12 MB | 使用分区 ID 分页 |
| 语句超时 | 172,800 秒(48 小时) | 每次查询可配置 |
| API 请求 | 因计划而异 | 基于 Snowflake 版本 |
虚拟仓库费用
Snowflake 按计算时间收费。为 Tajo 查询使用专用且适当大小的虚拟仓库,并设置自动挂起以最小化成本。
故障排除
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 认证失败 | JWT 令牌过期 | 以有效过期时间重新生成 JWT |
| 查询超时 | 数据集过大 | 添加过滤条件或使用增量同步 |
| 网络错误 | IP 未白名单 | 将 Tajo IP 添加到 Snowflake 网络策略 |
| 列缺失 | 架构变更 | 更新字段映射配置 |
| 分区错误 | 结果过大 | 以较小分区处理结果 |
调试模式
connectors: snowflake: debug: true log_level: verbose log_queries: true最佳实践
- 使用专用虚拟仓库 - 避免与生产工作负载争用资源
- 实施增量同步 - 只查询上次同步后变更的记录
- 设置自动挂起 - 配置虚拟仓库在 5 分钟无活动后挂起
- 使用密钥对认证 - 优先密钥对认证而非密码认证
- 优化查询 - 只过滤和投影所需列以加快同步速度
- 监控积分 - 追踪集成查询的 Snowflake 积分消耗
安全
- 密钥对认证 - RSA 2048 位加密进行 API 访问
- OAuth 2.0 - 带角色范围的基于令牌的认证
- 网络策略 - Tajo 服务端点的 IP 白名单
- 基于角色的访问 - 具有最低所需权限的专用 Snowflake 角色
- 加密数据传输 - 所有 API 通信使用 TLS 1.2+
- 数据脱敏 - 对敏感字段使用 Snowflake 动态数据脱敏