Snowflake Connector

เชื่อมต่อ Snowflake กับ Brevo ผ่าน Tajo เพื่อซิงค์ customer segments จาก data warehouse เพิ่มความสมบูรณ์ให้โปรไฟล์ผู้ติดต่อด้วยข้อมูล analytics และขับเคลื่อนแคมเปญการตลาดที่เป็นส่วนตัวด้วยข้อมูลเชิงลึกจาก warehouse

ภาพรวม

คุณสมบัติค่า
แพลตฟอร์มSnowflake
หมวดหมู่Data Warehouse (แบบกำหนดเอง)
ความซับซ้อนในการตั้งค่าปานกลาง
การผสานรวมอย่างเป็นทางการไม่
ข้อมูลที่ซิงค์ลูกค้า Segments Analytics เหตุการณ์
วิธีการยืนยันตัวตนKey Pair / OAuth 2.0

ฟีเจอร์

  • Reverse ETL - ส่ง customer segments จาก Snowflake ไปยังรายการผู้ติดต่อ Brevo
  • การซิงค์ audience - ซิงค์ audiences ที่คำนวณโดย warehouse สำหรับแคมเปญที่กำหนดเป้าหมาย
  • การเพิ่มความสมบูรณ์ analytics - เพิ่มความสมบูรณ์ให้ผู้ติดต่อ Brevo ด้วย metrics ที่คำนวณแล้ว (LTV, RFM scores)
  • SQL-based queries - ใช้ Snowflake SQL REST API เพื่อรัน queries แบบ programmatic
  • การซิงค์ตามกำหนดเวลา - รัน data pipelines อัตโนมัติตามช่วงเวลาที่กำหนดได้
  • รองรับหลาย statement - รัน data transformations ที่ซับซ้อนในการเรียก API เดียว

ข้อกำหนดเบื้องต้น

ก่อนเริ่มต้น ตรวจสอบให้แน่ใจว่าคุณมี:

  1. บัญชี Snowflake ที่มีบทบาท ACCOUNTADMIN หรือ SYSADMIN
  2. บัญชี Brevo ที่มีสิทธิ์เข้าถึง API
  3. บัญชี Tajo ที่มีสิทธิ์ connector
  4. Snowflake warehouse เฉพาะสำหรับ integration queries
  5. Network policy ที่อนุญาต IP addresses ของ Tajo

การยืนยันตัวตน

การยืนยันตัวตนด้วย Key Pair (แนะนำ)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

การยืนยันตัวตนด้วย OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

การยืนยันตัวตน SQL API

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

การกำหนดค่า

การตั้งค่าพื้นฐาน

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

การแมปฟิลด์

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API Endpoints

Endpointเมธอดคำอธิบาย
/api/v2/statementsPOSTส่ง SQL statements สำหรับการรัน
/api/v2/statements/{statementHandle}GETตรวจสอบสถานะการรัน
/api/v2/statements/{statementHandle}/cancelPOSTยกเลิก statement ที่กำลังรัน
/api/v2/statements/{statementHandle}?partition={id}GETดึง result partitions

SQL API Partitions

Snowflake SQL API ส่งคืน result sets ขนาดใหญ่เป็น partitions แต่ละ partition มีข้อมูลประมาณ 12MB ใช้พารามิเตอร์ partition เพื่อวนซ้ำผ่านผลลัพธ์

ตัวอย่างโค้ด

เริ่มต้น Connector

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

ซิงค์ Customer Segments ผ่าน SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL Pipeline

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

ขีดจำกัดอัตรา

ทรัพยากรขีดจำกัดหมายเหตุ
SQL API concurrent queries20 ต่อผู้ใช้ต่อบัญชี Snowflake
ขนาด SQL API result12MB ต่อ partitionแบ่งหน้าด้วย partition IDs
Statement timeout172,800 วินาที (48 ชั่วโมง)กำหนดค่าได้ต่อ query
คำขอ APIแตกต่างตามแผนขึ้นอยู่กับ Snowflake edition

ค่าใช้จ่าย Warehouse

Snowflake เรียกเก็บเงินตามเวลาคำนวณ ใช้ warehouse ขนาดที่เหมาะสมสำหรับ Tajo queries และตั้งค่า auto-suspend เพื่อลดค่าใช้จ่าย

การแก้ไขปัญหา

ปัญหาสาเหตุวิธีแก้
การยืนยันตัวตนล้มเหลวJWT token หมดอายุสร้าง JWT ใหม่พร้อมวันหมดอายุที่ถูกต้อง
Query หมดเวลาDataset ขนาดใหญ่เพิ่มตัวกรองหรือใช้ incremental sync
Network errorIP ไม่ได้อยู่ใน whitelistเพิ่ม Tajo IPs ใน Snowflake network policy
คอลัมน์หายไปSchema เปลี่ยนแปลงอัปเดตการกำหนดค่าการแมปฟิลด์
Partition errorผลลัพธ์ใหญ่เกินไปประมวลผลผลลัพธ์เป็น partitions เล็กกว่า

โหมด Debug

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

แนวทางปฏิบัติที่ดีที่สุด

  1. ใช้ warehouse เฉพาะ - หลีกเลี่ยงการแย่งทรัพยากรกับ workloads production
  2. ใช้ incremental sync - Query เฉพาะ records ที่เปลี่ยนแปลงตั้งแต่ sync ครั้งล่าสุด
  3. ตั้งค่า auto-suspend - กำหนดค่า warehouse ให้หยุดทำงานหลังจาก 5 นาทีที่ไม่มีกิจกรรม
  4. ใช้การยืนยันตัวตนด้วย key pair - ต้องการ key pair มากกว่าการยืนยันตัวตนด้วยรหัสผ่าน
  5. ปรับแต่ง queries - กรองและ project เฉพาะคอลัมน์ที่จำเป็นสำหรับการซิงค์ที่เร็วขึ้น
  6. ตรวจสอบ credits - ติดตามการบริโภค Snowflake credits สำหรับ integration queries

ความปลอดภัย

  • การยืนยันตัวตนด้วย key pair - การเข้ารหัส RSA 2048-bit สำหรับการเข้าถึง API
  • OAuth 2.0 - การยืนยันตัวตนแบบ token พร้อม role scoping
  • Network policies - IP allowlisting สำหรับ service endpoints ของ Tajo
  • การควบคุมการเข้าถึงตามบทบาท - บทบาท Snowflake เฉพาะพร้อมสิทธิ์ขั้นต่ำที่จำเป็น
  • การถ่ายโอนข้อมูลที่เข้ารหัส - TLS 1.2+ สำหรับการสื่อสาร API ทั้งหมด
  • Data masking - ใช้ Snowflake dynamic data masking สำหรับฟิลด์ที่ละเอียดอ่อน

แหล่งข้อมูลที่เกี่ยวข้อง

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
ผู้ช่วย AI

สวัสดี! ถามฉันเกี่ยวกับเอกสารได้เลย