Snowflake 커넥터
Tajo를 통해 Snowflake를 Brevo에 연결하여 데이터 웨어하우스의 고객 세그먼트를 동기화하고, 분석 데이터로 연락처 프로필을 보강하며, 웨어하우스 기반 인사이트로 맞춤형 마케팅 캠페인을 강화하십시오.
개요
| 속성 | 값 |
|---|---|
| 플랫폼 | Snowflake |
| 카테고리 | 데이터 웨어하우스 (Custom) |
| 설정 복잡도 | 중간 |
| 공식 통합 | 아니오 |
| 동기화 데이터 | 고객, 세그먼트, 분석, 이벤트 |
| 인증 방법 | 키 페어 / OAuth 2.0 |
기능
- Reverse ETL - Snowflake의 고객 세그먼트를 Brevo 연락처 목록으로 푸시
- 오디언스 동기화 - 타겟 캠페인을 위해 웨어하우스에서 계산된 오디언스 동기화
- 분석 강화 - 계산된 지표(LTV, RFM 점수)로 Brevo 연락처 보강
- SQL 기반 쿼리 - Snowflake SQL REST API를 사용하여 프로그래매틱하게 쿼리 실행
- 예약 동기화 - 설정 가능한 간격으로 자동화된 데이터 파이프라인 실행
- 다중 명령문 지원 - 단일 API 호출로 복잡한 데이터 변환 실행
사전 요구 사항
시작하기 전에 다음이 준비되어 있는지 확인하십시오.
- ACCOUNTADMIN 또는 SYSADMIN 역할이 있는 Snowflake 계정
- API 접근이 가능한 Brevo 계정
- 커넥터 권한이 있는 Tajo 계정
- 통합 쿼리 전용 Snowflake 웨어하우스
- Tajo IP 주소를 허용하는 네트워크 정책
인증
키 페어 인증 (권장)
# RSA 키 페어 생성openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Snowflake 사용자에게 공개 키 할당# Snowflake에서:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 인증
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL API 인증
# SQL API에 JWT 토큰 사용curl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'구성
기본 설정
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # 6시간마다
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_sync필드 매핑
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI 엔드포인트
| 엔드포인트 | 메서드 | 설명 |
|---|---|---|
/api/v2/statements | POST | 실행을 위해 SQL 명령문 제출 |
/api/v2/statements/{statementHandle} | GET | 실행 상태 확인 |
/api/v2/statements/{statementHandle}/cancel | POST | 실행 중인 명령문 취소 |
/api/v2/statements/{statementHandle}?partition={id} | GET | 결과 파티션 가져오기 |
SQL API 파티션
Snowflake SQL API는 대용량 결과 세트를 파티션으로 반환합니다. 각 파티션에는 약 12MB의 데이터가 포함됩니다. 결과를 반복하려면 partition 매개변수를 사용하십시오.
코드 예제
커넥터 초기화
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});SQL API를 통한 고객 세그먼트 동기화
// Snowflake SQL REST API를 통해 SQL 쿼리 실행const response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// 결과 폴링let status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Tajo를 통해 Brevo로 동기화for (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL 파이프라인
// Snowflake의 계산된 오디언스를 Brevo 목록으로 푸시await tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});속도 제한
| 리소스 | 제한 | 참고 |
|---|---|---|
| SQL API 동시 쿼리 | 사용자당 20개 | Snowflake 계정당 |
| SQL API 결과 크기 | 파티션당 12MB | 파티션 ID로 페이지네이션 |
| 명령문 시간 초과 | 172,800초 (48시간) | 쿼리당 구성 가능 |
| API 요청 | 플랜에 따라 다름 | Snowflake 에디션 기반 |
웨어하우스 비용
Snowflake는 컴퓨팅 시간을 기준으로 요금을 부과합니다. Tajo 쿼리에 전용으로 적절한 크기의 웨어하우스를 사용하고 비용을 최소화하기 위해 자동 일시 중지를 설정하십시오.
문제 해결
| 문제 | 원인 | 해결 방법 |
|---|---|---|
| 인증 실패 | 만료된 JWT 토큰 | 유효한 만료 시간으로 JWT 재생성 |
| 쿼리 시간 초과 | 대용량 데이터 세트 | 필터 추가 또는 증분 동기화 사용 |
| 네트워크 오류 | IP가 허용 목록에 없음 | Snowflake 네트워크 정책에 Tajo IP 추가 |
| 누락된 열 | 스키마 변경 | 필드 매핑 구성 업데이트 |
| 파티션 오류 | 결과가 너무 큼 | 결과를 더 작은 파티션으로 처리 |
디버그 모드
connectors: snowflake: debug: true log_level: verbose log_queries: true모범 사례
- 전용 웨어하우스 사용 - 프로덕션 워크로드와의 경합 방지
- 증분 동기화 구현 - 마지막 동기화 이후 변경된 레코드만 쿼리
- 자동 일시 중지 설정 - 5분 비활성 후 일시 중지되도록 웨어하우스 구성
- 키 페어 인증 사용 - 비밀번호 인증보다 키 페어 선호
- 쿼리 최적화 - 더 빠른 동기화를 위해 필요한 열만 필터링 및 프로젝션
- 크레딧 모니터링 - 통합 쿼리의 Snowflake 크레딧 소비 추적
보안
- 키 페어 인증 - API 접근용 RSA 2048비트 암호화
- OAuth 2.0 - 역할 범위 지정이 포함된 토큰 기반 인증
- 네트워크 정책 - Tajo 서비스 엔드포인트에 대한 IP 허용 목록
- 역할 기반 접근 - 최소 필요 권한을 가진 전용 Snowflake 역할
- 암호화된 데이터 전송 - 모든 API 통신에 TLS 1.2+ 사용
- 데이터 마스킹 - 민감한 필드에 Snowflake 동적 데이터 마스킹 사용