Snowflake 连接器

通过 Tajo 将 Snowflake 连接到 Brevo,从数据仓库同步客户细分,用分析数据丰富联系人档案,并用仓库驱动的洞察为个性化营销活动赋能。

概览

属性
平台Snowflake
类别数据仓库(自定义)
设置复杂度中等
官方集成
同步数据用户、细分、分析、事件
认证方式密钥对 / OAuth 2.0

功能

  • 反向 ETL - 将 Snowflake 中的客户细分推送到 Brevo 联系人列表
  • 受众同步 - 同步仓库计算的受众,用于精准活动
  • 分析富化 - 用计算指标(LTV、RFM 评分)丰富 Brevo 联系人
  • 基于 SQL 的查询 - 使用 Snowflake SQL REST API 以编程方式执行查询
  • 定时同步 - 按可配置时间间隔运行自动化数据管道
  • 多语句支持 - 在单次 API 调用中执行复杂的数据转换

前提条件

开始之前,请确保您已具备:

  1. 具有 ACCOUNTADMIN 或 SYSADMIN 角色的 Snowflake 账户
  2. 具有 API 访问权限的 Brevo 账户
  3. 具有连接器权限的 Tajo 账户
  4. 专用于集成查询的 Snowflake 虚拟仓库
  5. 允许 Tajo IP 地址的网络策略

认证

密钥对认证(推荐)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0 认证

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

SQL API 认证

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

配置

基础设置

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

字段映射

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API 端点

端点方法描述
/api/v2/statementsPOST提交 SQL 语句执行
/api/v2/statements/{statementHandle}GET检查执行状态
/api/v2/statements/{statementHandle}/cancelPOST取消正在运行的语句
/api/v2/statements/{statementHandle}?partition={id}GET检索结果分区

SQL API 分区

Snowflake SQL API 以分区形式返回大型结果集。每个分区包含约 12 MB 的数据。使用分区参数迭代结果。

代码示例

初始化连接器

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

通过 SQL API 同步客户细分

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

反向 ETL 管道

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

速率限制

资源限制说明
SQL API 并发查询每用户 20 个每个 Snowflake 账户
SQL API 结果大小每分区 12 MB使用分区 ID 分页
语句超时172,800 秒(48 小时)每次查询可配置
API 请求因计划而异基于 Snowflake 版本

虚拟仓库费用

Snowflake 按计算时间收费。为 Tajo 查询使用专用且适当大小的虚拟仓库,并设置自动挂起以最小化成本。

故障排除

问题原因解决方案
认证失败JWT 令牌过期以有效过期时间重新生成 JWT
查询超时数据集过大添加过滤条件或使用增量同步
网络错误IP 未白名单将 Tajo IP 添加到 Snowflake 网络策略
列缺失架构变更更新字段映射配置
分区错误结果过大以较小分区处理结果

调试模式

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

最佳实践

  1. 使用专用虚拟仓库 - 避免与生产工作负载争用资源
  2. 实施增量同步 - 只查询上次同步后变更的记录
  3. 设置自动挂起 - 配置虚拟仓库在 5 分钟无活动后挂起
  4. 使用密钥对认证 - 优先密钥对认证而非密码认证
  5. 优化查询 - 只过滤和投影所需列以加快同步速度
  6. 监控积分 - 追踪集成查询的 Snowflake 积分消耗

安全

  • 密钥对认证 - RSA 2048 位加密进行 API 访问
  • OAuth 2.0 - 带角色范围的基于令牌的认证
  • 网络策略 - Tajo 服务端点的 IP 白名单
  • 基于角色的访问 - 具有最低所需权限的专用 Snowflake 角色
  • 加密数据传输 - 所有 API 通信使用 TLS 1.2+
  • 数据脱敏 - 对敏感字段使用 Snowflake 动态数据脱敏

相关资源

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI 助手

你好!关于文档有任何问题都可以问我。