KAFKA - Apache Kafka Streaming
O que é este Node?
O KAFKA é o node responsável por integrar com Apache Kafka para produzir mensagens, consumir streams, criar/deletar topics e listar topics disponíveis.
Por que este Node existe?
Sistemas modernos precisam processar milhões de eventos em tempo real. O KAFKA existe para:
- Streaming de dados: Processar fluxos contínuos de dados em tempo real
- Event sourcing: Manter histórico imutável de todos os eventos
- Log aggregation: Coletar e centralizar logs de múltiplos sistemas
- Alta throughput: Processar milhões de mensagens por segundo
- Arquitetura event-driven: Base para sistemas orientados a eventos
Como funciona internamente?
Quando o KAFKA é executado, o sistema:
- Identifica operação: produce, consume, create_topic, delete_topic, list_topics
- Valida parâmetros: Topic, brokers, partition conforme operação
- Executa operação: Simula operação Kafka (produção requer kafkajs)
- Retorna resultado: Confirmação com detalhes da operação
- Se erro: Lança exceção com mensagem
Código interno (infrastructure-executor.service.ts:110-134):
private async executeKafka(parameters: any, context: any): Promise<any> {
const { operation, topic, message, partition, brokers, groupId } = parameters;
this.logger.log(`📨 KAFKA - Operation: ${operation}, Topic: ${topic}`);
switch (operation) {
case 'produce':
return this.produceKafkaMessage(topic, message, partition, brokers, context);
case 'consume':
return this.consumeKafkaMessages(topic, groupId, brokers, context);
case 'create_topic':
return this.createKafkaTopic(topic, parameters.partitions, parameters.replicationFactor, brokers, context);
case 'delete_topic':
return this.deleteKafkaTopic(topic, brokers, context);
case 'list_topics':
return this.listKafkaTopics(brokers, context);
default:
throw new Error(`Unsupported Kafka operation: ${operation}`);
}
}
Quando você DEVE usar este Node?
Use KAFKA quando precisar streaming de alto volume:
Casos de uso
- Real-time analytics: Processar eventos em tempo real para dashboards
- Activity tracking: Rastrear ações de usuários em grande escala
- Log aggregation: Centralizar logs de microservices
- Event sourcing: Armazenar histórico completo de mudanças
- IoT data pipeline: Processar streams de sensores IoT
Quando NÃO usar KAFKA
- RabbitMQ: Use RabbitMQ para filas de trabalho tradicionais (<10k msgs/s)
- SQS: Use AWS SQS para simplicidade se já está na AWS
- Baixo volume: Kafka é overhead para poucos eventos
- Request-response: Use HTTP/REST para comunicação síncrona
Parâmetros Detalhados
operation (string, obrigatório)
O que é: Tipo de operação a ser executada no Kafka.
Valores válidos:
- produce: Produzir mensagem para topic
- consume: Consumir mensagens de topic
- create_topic: Criar novo topic
- delete_topic: Deletar topic
- list_topics: Listar todos os topics
Flow completo para testar:
{
"name": "Teste KAFKA - Produce",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "kafka_1",
"type": "kafka",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Produzir Evento",
"parameters": {
"operation": "produce",
"topic": "user-events",
"message": { "event": "user_signup", "userId": 123 },
"brokers": ["localhost:9092"]
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Evento publicado no Kafka!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "kafka_1" },
{ "source": "kafka_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Mensagem é produzida no topic Kafka.
topic (string, obrigatório)
O que é: Nome do topic Kafka onde mensagens serão produzidas/consumidas.
Flow completo para testar:
{
"name": "Teste KAFKA - Topic",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "kafka_1",
"type": "kafka",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Criar Topic",
"parameters": {
"operation": "create_topic",
"topic": "orders",
"partitions": 3,
"replicationFactor": 2,
"brokers": ["localhost:9092"]
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Topic 'orders' criado com sucesso!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "kafka_1" },
{ "source": "kafka_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Topic é criado no cluster Kafka.
message (any, obrigatório para produce)
O que é: Conteúdo da mensagem a ser produzida no topic.
Flow completo para testar:
{
"name": "Teste KAFKA - Message",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "input_1",
"type": "input",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Pedir Ação",
"parameters": {
"message": "Qual ação do usuário?",
"variable": "acao"
}
}
},
{
"id": "kafka_1",
"type": "kafka",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Registrar Evento",
"parameters": {
"operation": "produce",
"topic": "user-activity",
"message": {
"eventType": "{{acao}}",
"userId": 456,
"timestamp": "{{now}}",
"metadata": {
"source": "lumina-flow",
"version": "1.0"
}
},
"brokers": ["localhost:9092"]
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 700, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Ação '{{acao}}' registrada no Kafka!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 900, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "input_1" },
{ "source": "input_1", "target": "kafka_1" },
{ "source": "kafka_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Evento estruturado é produzido no topic.
partition (number, opcional)
O que é: Número da partição específica para produzir (padrão: 0).
Flow completo para testar:
{
"name": "Teste KAFKA - Partition",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "kafka_1",
"type": "kafka",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Produzir em P2",
"parameters": {
"operation": "produce",
"topic": "logs",
"partition": 2,
"message": { "level": "error", "msg": "Database connection failed" },
"brokers": ["localhost:9092"]
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Mensagem enviada para partição 2!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "kafka_1" },
{ "source": "kafka_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Mensagem vai para partição específica.
brokers (array, opcional)
O que é: Lista de brokers Kafka (padrão: ["localhost:9092"]).
groupId (string, obrigatório para consume)
O que é: ID do consumer group (para balanceamento de carga).
partitions (number, opcional)
O que é: Número de partições ao criar topic (padrão: 1).
replicationFactor (number, opcional)
O que é: Fator de replicação ao criar topic (padrão: 1).
Parâmetros
| Campo | Tipo | Obrigatório | Descrição |
|---|---|---|---|
| operation | string | Sim | produce, consume, create_topic, delete_topic, list_topics |
| topic | string | Sim | Nome do topic Kafka |
| message | any | Sim* | Mensagem a produzir (*para produce) |
| partition | number | Não | Número da partição (padrão: 0) |
| brokers | array | Não | Lista de brokers (padrão: ["localhost:9092"]) |
| groupId | string | Sim* | ID do consumer group (*para consume) |
| partitions | number | Não | Partições ao criar topic (padrão: 1) |
| replicationFactor | number | Não | Fator de replicação (padrão: 1) |
Exemplo 1: Rastrear Eventos de Usuário
Objetivo: Produzir eventos de ações de usuários
JSON para Importar
{
"name": "KAFKA - Rastrear Eventos",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "input_1",
"type": "input",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Pedir Nome",
"parameters": {
"message": "Qual seu nome?",
"variable": "nome"
}
}
},
{
"id": "kafka_1",
"type": "kafka",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Registrar Signup",
"parameters": {
"operation": "produce",
"topic": "user-events",
"message": {
"eventType": "user_signup",
"userName": "{{nome}}",
"timestamp": "{{now}}",
"source": "whatsapp",
"metadata": {
"flowId": "signup-flow-v1",
"channel": "lumina"
}
},
"brokers": ["localhost:9092"]
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 700, "y": 100 },
"data": {
"label": "Boas-vindas",
"parameters": {
"message": "Bem-vindo, {{nome}}! Seu cadastro foi registrado."
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 900, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "input_1" },
{ "source": "input_1", "target": "kafka_1" },
{ "source": "kafka_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Saída esperada:
Sistema: Qual seu nome?
Usuário: João Silva
Sistema: Bem-vindo, João Silva! Seu cadastro foi registrado.
Exemplo 2: Criar Topic para Logs
Objetivo: Criar topic particionado para logs de aplicação
JSON para Importar
{
"name": "KAFKA - Criar Topic Logs",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "kafka_1",
"type": "kafka",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Criar Topic",
"parameters": {
"operation": "create_topic",
"topic": "application-logs",
"partitions": 6,
"replicationFactor": 3,
"brokers": ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Topic 'application-logs' criado!\n6 partições, fator de replicação 3"
}
}
},
{
"id": "kafka_2",
"type": "kafka",
"position": { "x": 700, "y": 100 },
"data": {
"label": "Produzir Log",
"parameters": {
"operation": "produce",
"topic": "application-logs",
"message": {
"level": "info",
"service": "api-gateway",
"message": "Topic initialized successfully",
"timestamp": "{{now}}"
},
"brokers": ["kafka1:9092"]
}
}
},
{
"id": "message_2",
"type": "message",
"position": { "x": 900, "y": 100 },
"data": {
"label": "Sucesso",
"parameters": {
"message": "Primeiro log publicado!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 1100, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "kafka_1" },
{ "source": "kafka_1", "target": "message_1" },
{ "source": "message_1", "target": "kafka_2" },
{ "source": "kafka_2", "target": "message_2" },
{ "source": "message_2", "target": "end_1" }
]
}
Saída esperada:
Sistema: Topic 'application-logs' criado!
6 partições, fator de replicação 3
Sistema: Primeiro log publicado!
Exemplo 3: Listar Topics
Objetivo: Descobrir topics disponíveis no cluster
JSON para Importar
{
"name": "KAFKA - Listar Topics",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "kafka_1",
"type": "kafka",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Listar Topics",
"parameters": {
"operation": "list_topics",
"brokers": ["localhost:9092"]
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Mostrar Topics",
"parameters": {
"message": "Topics disponíveis:\n{{kafka_1.topics}}"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "kafka_1" },
{ "source": "kafka_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Saída esperada:
Sistema: Topics disponíveis:
example-topic-1, example-topic-2, logs
Resposta do Node
Produce Operation
{
"success": true,
"action": "kafka_message_produced",
"topic": "user-events",
"partition": 0,
"message": { "event": "user_signup", "userId": 123 },
"brokers": ["localhost:9092"],
"timestamp": "2025-01-15T10:30:00.000Z"
}
Create Topic Operation
{
"success": true,
"action": "kafka_topic_created",
"topic": "orders",
"partitions": 3,
"replicationFactor": 2,
"timestamp": "2025-01-15T10:30:00.000Z"
}
List Topics Operation
{
"success": true,
"action": "kafka_topics_listed",
"topics": ["example-topic-1", "example-topic-2", "logs"],
"brokers": ["localhost:9092"],
"timestamp": "2025-01-15T10:30:00.000Z"
}
Conceitos Kafka
Topic
Canal onde mensagens são publicadas (como tabela de database).
Partition
Subdivisão de topic para paralelismo e escalabilidade.
Broker
Servidor Kafka que armazena e serve dados.
Consumer Group
Grupo de consumers que dividem consumo de partições.
Offset
Posição sequencial de mensagem em partição.
Replication Factor
Número de cópias de cada partição (para tolerância a falhas).
Boas Práticas
✅ SIM:
- Use múltiplas partições para alta throughput
- Defina retention policy apropriado para seus dados
- Use consumer groups para escalabilidade
- Monitore lag de consumers
- Planeje schema de mensagens (ex: Avro, JSON Schema)
❌ NÃO:
- Não crie topics com partição única para alto volume
- Não ignore offsets - pode processar duplicados
- Não produza mensagens > 1MB (configure max.message.bytes)
- Não delete topics em produção sem backup
- Não use Kafka como database de longo prazo
Dicas
💡 Produção: Esta implementação é simulada. Em produção, instale kafkajs no projeto
💡 Partitioning: Mais partições = mais paralelismo, mas mais overhead
💡 Key: Use message key para garantir ordem dentro de partição
💡 Monitoring: Use ferramentas como Kafka Manager, Confluent Control Center
💡 Schemas: Use Schema Registry para evolução de schemas
Próximo Node
→ RABBITMQ - Message queue tradicional → AWS_KINESIS - Streaming gerenciado da AWS → REDIS - Pub/sub leve