Snowflake 커넥터

Tajo를 통해 Snowflake를 Brevo에 연결하여 데이터 웨어하우스의 고객 세그먼트를 동기화하고, 분석 데이터로 연락처 프로필을 보강하며, 웨어하우스 기반 인사이트로 맞춤형 마케팅 캠페인을 강화하십시오.

개요

속성
플랫폼Snowflake
카테고리데이터 웨어하우스 (Custom)
설정 복잡도중간
공식 통합아니오
동기화 데이터고객, 세그먼트, 분석, 이벤트
인증 방법키 페어 / OAuth 2.0

기능

  • Reverse ETL - Snowflake의 고객 세그먼트를 Brevo 연락처 목록으로 푸시
  • 오디언스 동기화 - 타겟 캠페인을 위해 웨어하우스에서 계산된 오디언스 동기화
  • 분석 강화 - 계산된 지표(LTV, RFM 점수)로 Brevo 연락처 보강
  • SQL 기반 쿼리 - Snowflake SQL REST API를 사용하여 프로그래매틱하게 쿼리 실행
  • 예약 동기화 - 설정 가능한 간격으로 자동화된 데이터 파이프라인 실행
  • 다중 명령문 지원 - 단일 API 호출로 복잡한 데이터 변환 실행

사전 요구 사항

시작하기 전에 다음이 준비되어 있는지 확인하십시오.

  1. ACCOUNTADMIN 또는 SYSADMIN 역할이 있는 Snowflake 계정
  2. API 접근이 가능한 Brevo 계정
  3. 커넥터 권한이 있는 Tajo 계정
  4. 통합 쿼리 전용 Snowflake 웨어하우스
  5. Tajo IP 주소를 허용하는 네트워크 정책

인증

키 페어 인증 (권장)

Terminal window
# RSA 키 페어 생성
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Snowflake 사용자에게 공개 키 할당
# Snowflake에서:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0 인증

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

SQL API 인증

Terminal window
# SQL API에 JWT 토큰 사용
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

구성

기본 설정

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # 6시간마다
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

필드 매핑

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API 엔드포인트

엔드포인트메서드설명
/api/v2/statementsPOST실행을 위해 SQL 명령문 제출
/api/v2/statements/{statementHandle}GET실행 상태 확인
/api/v2/statements/{statementHandle}/cancelPOST실행 중인 명령문 취소
/api/v2/statements/{statementHandle}?partition={id}GET결과 파티션 가져오기

SQL API 파티션

Snowflake SQL API는 대용량 결과 세트를 파티션으로 반환합니다. 각 파티션에는 약 12MB의 데이터가 포함됩니다. 결과를 반복하려면 partition 매개변수를 사용하십시오.

코드 예제

커넥터 초기화

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

SQL API를 통한 고객 세그먼트 동기화

// Snowflake SQL REST API를 통해 SQL 쿼리 실행
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// 결과 폴링
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Tajo를 통해 Brevo로 동기화
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL 파이프라인

// Snowflake의 계산된 오디언스를 Brevo 목록으로 푸시
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

속도 제한

리소스제한참고
SQL API 동시 쿼리사용자당 20개Snowflake 계정당
SQL API 결과 크기파티션당 12MB파티션 ID로 페이지네이션
명령문 시간 초과172,800초 (48시간)쿼리당 구성 가능
API 요청플랜에 따라 다름Snowflake 에디션 기반

웨어하우스 비용

Snowflake는 컴퓨팅 시간을 기준으로 요금을 부과합니다. Tajo 쿼리에 전용으로 적절한 크기의 웨어하우스를 사용하고 비용을 최소화하기 위해 자동 일시 중지를 설정하십시오.

문제 해결

문제원인해결 방법
인증 실패만료된 JWT 토큰유효한 만료 시간으로 JWT 재생성
쿼리 시간 초과대용량 데이터 세트필터 추가 또는 증분 동기화 사용
네트워크 오류IP가 허용 목록에 없음Snowflake 네트워크 정책에 Tajo IP 추가
누락된 열스키마 변경필드 매핑 구성 업데이트
파티션 오류결과가 너무 큼결과를 더 작은 파티션으로 처리

디버그 모드

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

모범 사례

  1. 전용 웨어하우스 사용 - 프로덕션 워크로드와의 경합 방지
  2. 증분 동기화 구현 - 마지막 동기화 이후 변경된 레코드만 쿼리
  3. 자동 일시 중지 설정 - 5분 비활성 후 일시 중지되도록 웨어하우스 구성
  4. 키 페어 인증 사용 - 비밀번호 인증보다 키 페어 선호
  5. 쿼리 최적화 - 더 빠른 동기화를 위해 필요한 열만 필터링 및 프로젝션
  6. 크레딧 모니터링 - 통합 쿼리의 Snowflake 크레딧 소비 추적

보안

  • 키 페어 인증 - API 접근용 RSA 2048비트 암호화
  • OAuth 2.0 - 역할 범위 지정이 포함된 토큰 기반 인증
  • 네트워크 정책 - Tajo 서비스 엔드포인트에 대한 IP 허용 목록
  • 역할 기반 접근 - 최소 필요 권한을 가진 전용 Snowflake 역할
  • 암호화된 데이터 전송 - 모든 API 통신에 TLS 1.2+ 사용
  • 데이터 마스킹 - 민감한 필드에 Snowflake 동적 데이터 마스킹 사용

관련 리소스

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI 어시스턴트

안녕하세요! 문서에 대해 무엇이든 물어보세요.