Conector Snowflake

Conecte o Snowflake ao Brevo através do Tajo para sincronizar segmentos de clientes do seu data warehouse, enriquecer perfis de contato com dados de analytics e potencializar campanhas de marketing personalizadas com insights orientados pelo warehouse.

Visão geral

PropriedadeValor
PlataformaSnowflake
CategoriaData Warehouse (Personalizado)
Complexidade de configuraçãoMédia
Integração oficialNão
Dados sincronizadosClientes, Segmentos, Analytics, Eventos
Método de autenticaçãoKey Pair / OAuth 2.0

Recursos

  • Reverse ETL - Envie segmentos de clientes do Snowflake para listas de contatos do Brevo
  • Sincronização de audiências - Sincronize audiências computadas no warehouse para campanhas direcionadas
  • Enriquecimento de analytics - Enriqueça contatos do Brevo com métricas computadas (LTV, scores RFM)
  • Consultas baseadas em SQL - Use a Snowflake SQL REST API para executar consultas programaticamente
  • Sincronização agendada - Execute pipelines de dados automatizados em intervalos configuráveis
  • Suporte multi-statement - Execute transformações de dados complexas em chamadas de API únicas

Pré-requisitos

Antes de começar, certifique-se de ter:

  1. Uma conta Snowflake com role ACCOUNTADMIN ou SYSADMIN
  2. Uma conta Brevo com acesso à API
  3. Uma conta Tajo com permissões de conector
  4. Um warehouse Snowflake dedicado para consultas de integração
  5. Política de rede permitindo os endereços IP do Tajo

Autenticação

Autenticação por Key Pair (Recomendado)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

Autenticação OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Autenticação da SQL API

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Configuração

Configuração básica

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Mapeamento de campos

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

Endpoints da API

EndpointMétodoDescrição
/api/v2/statementsPOSTEnviar statements SQL para execução
/api/v2/statements/{statementHandle}GETVerificar status de execução
/api/v2/statements/{statementHandle}/cancelPOSTCancelar um statement em execução
/api/v2/statements/{statementHandle}?partition={id}GETRecuperar partições de resultados

Partições da SQL API

A SQL API do Snowflake retorna grandes conjuntos de resultados em partições. Cada partição contém até aproximadamente 12MB de dados. Use o parâmetro partition para iterar pelos resultados.

Exemplos de código

Inicializar o conector

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Sincronizar segmentos de clientes via SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Pipeline de Reverse ETL

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Limites de taxa

RecursoLimiteObservações
Consultas simultâneas da SQL API20 por usuárioPor conta Snowflake
Tamanho do resultado da SQL API12MB por partiçãoPagine com IDs de partição
Timeout de statement172.800 seg (48h)Configurável por consulta
Requisições à APIVaria por planoCom base na edição do Snowflake

Custos do warehouse

O Snowflake cobra com base no tempo de compute. Use um warehouse dedicado e apropriadamente dimensionado para consultas do Tajo e configure auto-suspend para minimizar custos.

Solução de problemas

ProblemaCausaSolução
Falha de autenticaçãoToken JWT expiradoRegere o JWT com expiração válida
Timeout de consultaGrande volume de dadosAdicione filtros ou use sincronização incremental
Erro de redeIP não permitidoAdicione os IPs do Tajo à política de rede do Snowflake
Colunas ausentesMudança de schemaAtualize a configuração de mapeamento de campos
Erro de partiçãoResultado muito grandeProcesse resultados em partições menores

Modo de depuração

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Melhores práticas

  1. Use um warehouse dedicado - Evite contenção com cargas de trabalho de produção
  2. Implemente sincronização incremental - Consulte apenas registros alterados desde a última sincronização
  3. Configure auto-suspend - Configure o warehouse para suspender após 5 minutos de inatividade
  4. Use autenticação por key pair - Prefira key pair em vez de autenticação por senha
  5. Otimize as consultas - Filtre e projete apenas as colunas necessárias para sincronizações mais rápidas
  6. Monitore os créditos - Acompanhe o consumo de créditos do Snowflake para consultas de integração

Segurança

  • Autenticação por key pair - Criptografia RSA 2048-bit para acesso à API
  • OAuth 2.0 - Autenticação baseada em token com escopo de role
  • Políticas de rede - Lista de permissões de IP para endpoints do serviço Tajo
  • Acesso baseado em role - Role Snowflake dedicada com privilégios mínimos necessários
  • Transferência de dados criptografada - TLS 1.2+ para todas as comunicações de API
  • Mascaramento de dados - Use mascaramento dinâmico de dados do Snowflake para campos sensíveis

Recursos relacionados

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
Assistente AI

Olá! Pergunte-me qualquer coisa sobre a documentação.