Pular para conteúdo

KAFKA - Apache Kafka Streaming

O que é este Node?

O KAFKA é o node responsável por integrar com Apache Kafka para produzir mensagens, consumir streams, criar/deletar topics e listar topics disponíveis.

Por que este Node existe?

Sistemas modernos precisam processar milhões de eventos em tempo real. O KAFKA existe para:

  1. Streaming de dados: Processar fluxos contínuos de dados em tempo real
  2. Event sourcing: Manter histórico imutável de todos os eventos
  3. Log aggregation: Coletar e centralizar logs de múltiplos sistemas
  4. Alta throughput: Processar milhões de mensagens por segundo
  5. Arquitetura event-driven: Base para sistemas orientados a eventos

Como funciona internamente?

Quando o KAFKA é executado, o sistema:

  1. Identifica operação: produce, consume, create_topic, delete_topic, list_topics
  2. Valida parâmetros: Topic, brokers, partition conforme operação
  3. Executa operação: Simula operação Kafka (produção requer kafkajs)
  4. Retorna resultado: Confirmação com detalhes da operação
  5. Se erro: Lança exceção com mensagem

Código interno (infrastructure-executor.service.ts:110-134):

private async executeKafka(parameters: any, context: any): Promise<any> {
  const { operation, topic, message, partition, brokers, groupId } = parameters;

  this.logger.log(`📨 KAFKA - Operation: ${operation}, Topic: ${topic}`);

  switch (operation) {
    case 'produce':
      return this.produceKafkaMessage(topic, message, partition, brokers, context);

    case 'consume':
      return this.consumeKafkaMessages(topic, groupId, brokers, context);

    case 'create_topic':
      return this.createKafkaTopic(topic, parameters.partitions, parameters.replicationFactor, brokers, context);

    case 'delete_topic':
      return this.deleteKafkaTopic(topic, brokers, context);

    case 'list_topics':
      return this.listKafkaTopics(brokers, context);

    default:
      throw new Error(`Unsupported Kafka operation: ${operation}`);
  }
}

Quando você DEVE usar este Node?

Use KAFKA quando precisar streaming de alto volume:

Casos de uso

  1. Real-time analytics: Processar eventos em tempo real para dashboards
  2. Activity tracking: Rastrear ações de usuários em grande escala
  3. Log aggregation: Centralizar logs de microservices
  4. Event sourcing: Armazenar histórico completo de mudanças
  5. IoT data pipeline: Processar streams de sensores IoT

Quando NÃO usar KAFKA

  • RabbitMQ: Use RabbitMQ para filas de trabalho tradicionais (<10k msgs/s)
  • SQS: Use AWS SQS para simplicidade se já está na AWS
  • Baixo volume: Kafka é overhead para poucos eventos
  • Request-response: Use HTTP/REST para comunicação síncrona

Parâmetros Detalhados

operation (string, obrigatório)

O que é: Tipo de operação a ser executada no Kafka.

Valores válidos: - produce: Produzir mensagem para topic - consume: Consumir mensagens de topic - create_topic: Criar novo topic - delete_topic: Deletar topic - list_topics: Listar todos os topics

Flow completo para testar:

{
  "name": "Teste KAFKA - Produce",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "kafka_1",
      "type": "kafka",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Produzir Evento",
        "parameters": {
          "operation": "produce",
          "topic": "user-events",
          "message": { "event": "user_signup", "userId": 123 },
          "brokers": ["localhost:9092"]
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Evento publicado no Kafka!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "kafka_1" },
    { "source": "kafka_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Mensagem é produzida no topic Kafka.

topic (string, obrigatório)

O que é: Nome do topic Kafka onde mensagens serão produzidas/consumidas.

Flow completo para testar:

{
  "name": "Teste KAFKA - Topic",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "kafka_1",
      "type": "kafka",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Criar Topic",
        "parameters": {
          "operation": "create_topic",
          "topic": "orders",
          "partitions": 3,
          "replicationFactor": 2,
          "brokers": ["localhost:9092"]
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Topic 'orders' criado com sucesso!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "kafka_1" },
    { "source": "kafka_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Topic é criado no cluster Kafka.

message (any, obrigatório para produce)

O que é: Conteúdo da mensagem a ser produzida no topic.

Flow completo para testar:

{
  "name": "Teste KAFKA - Message",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "input_1",
      "type": "input",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Pedir Ação",
        "parameters": {
          "message": "Qual ação do usuário?",
          "variable": "acao"
        }
      }
    },
    {
      "id": "kafka_1",
      "type": "kafka",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Registrar Evento",
        "parameters": {
          "operation": "produce",
          "topic": "user-activity",
          "message": {
            "eventType": "{{acao}}",
            "userId": 456,
            "timestamp": "{{now}}",
            "metadata": {
              "source": "lumina-flow",
              "version": "1.0"
            }
          },
          "brokers": ["localhost:9092"]
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Ação '{{acao}}' registrada no Kafka!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 900, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "input_1" },
    { "source": "input_1", "target": "kafka_1" },
    { "source": "kafka_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Evento estruturado é produzido no topic.

partition (number, opcional)

O que é: Número da partição específica para produzir (padrão: 0).

Flow completo para testar:

{
  "name": "Teste KAFKA - Partition",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "kafka_1",
      "type": "kafka",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Produzir em P2",
        "parameters": {
          "operation": "produce",
          "topic": "logs",
          "partition": 2,
          "message": { "level": "error", "msg": "Database connection failed" },
          "brokers": ["localhost:9092"]
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Mensagem enviada para partição 2!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "kafka_1" },
    { "source": "kafka_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Mensagem vai para partição específica.

brokers (array, opcional)

O que é: Lista de brokers Kafka (padrão: ["localhost:9092"]).

groupId (string, obrigatório para consume)

O que é: ID do consumer group (para balanceamento de carga).

partitions (number, opcional)

O que é: Número de partições ao criar topic (padrão: 1).

replicationFactor (number, opcional)

O que é: Fator de replicação ao criar topic (padrão: 1).

Parâmetros

Campo Tipo Obrigatório Descrição
operation string Sim produce, consume, create_topic, delete_topic, list_topics
topic string Sim Nome do topic Kafka
message any Sim* Mensagem a produzir (*para produce)
partition number Não Número da partição (padrão: 0)
brokers array Não Lista de brokers (padrão: ["localhost:9092"])
groupId string Sim* ID do consumer group (*para consume)
partitions number Não Partições ao criar topic (padrão: 1)
replicationFactor number Não Fator de replicação (padrão: 1)

Exemplo 1: Rastrear Eventos de Usuário

Objetivo: Produzir eventos de ações de usuários

JSON para Importar

{
  "name": "KAFKA - Rastrear Eventos",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "input_1",
      "type": "input",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Pedir Nome",
        "parameters": {
          "message": "Qual seu nome?",
          "variable": "nome"
        }
      }
    },
    {
      "id": "kafka_1",
      "type": "kafka",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Registrar Signup",
        "parameters": {
          "operation": "produce",
          "topic": "user-events",
          "message": {
            "eventType": "user_signup",
            "userName": "{{nome}}",
            "timestamp": "{{now}}",
            "source": "whatsapp",
            "metadata": {
              "flowId": "signup-flow-v1",
              "channel": "lumina"
            }
          },
          "brokers": ["localhost:9092"]
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Boas-vindas",
        "parameters": {
          "message": "Bem-vindo, {{nome}}! Seu cadastro foi registrado."
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 900, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "input_1" },
    { "source": "input_1", "target": "kafka_1" },
    { "source": "kafka_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: Qual seu nome?
Usuário: João Silva
Sistema: Bem-vindo, João Silva! Seu cadastro foi registrado.

Exemplo 2: Criar Topic para Logs

Objetivo: Criar topic particionado para logs de aplicação

JSON para Importar

{
  "name": "KAFKA - Criar Topic Logs",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "kafka_1",
      "type": "kafka",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Criar Topic",
        "parameters": {
          "operation": "create_topic",
          "topic": "application-logs",
          "partitions": 6,
          "replicationFactor": 3,
          "brokers": ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Topic 'application-logs' criado!\n6 partições, fator de replicação 3"
        }
      }
    },
    {
      "id": "kafka_2",
      "type": "kafka",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Produzir Log",
        "parameters": {
          "operation": "produce",
          "topic": "application-logs",
          "message": {
            "level": "info",
            "service": "api-gateway",
            "message": "Topic initialized successfully",
            "timestamp": "{{now}}"
          },
          "brokers": ["kafka1:9092"]
        }
      }
    },
    {
      "id": "message_2",
      "type": "message",
      "position": { "x": 900, "y": 100 },
      "data": {
        "label": "Sucesso",
        "parameters": {
          "message": "Primeiro log publicado!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 1100, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "kafka_1" },
    { "source": "kafka_1", "target": "message_1" },
    { "source": "message_1", "target": "kafka_2" },
    { "source": "kafka_2", "target": "message_2" },
    { "source": "message_2", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: Topic 'application-logs' criado!
6 partições, fator de replicação 3
Sistema: Primeiro log publicado!

Exemplo 3: Listar Topics

Objetivo: Descobrir topics disponíveis no cluster

JSON para Importar

{
  "name": "KAFKA - Listar Topics",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "kafka_1",
      "type": "kafka",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Listar Topics",
        "parameters": {
          "operation": "list_topics",
          "brokers": ["localhost:9092"]
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Mostrar Topics",
        "parameters": {
          "message": "Topics disponíveis:\n{{kafka_1.topics}}"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "kafka_1" },
    { "source": "kafka_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: Topics disponíveis:
example-topic-1, example-topic-2, logs

Resposta do Node

Produce Operation

{
  "success": true,
  "action": "kafka_message_produced",
  "topic": "user-events",
  "partition": 0,
  "message": { "event": "user_signup", "userId": 123 },
  "brokers": ["localhost:9092"],
  "timestamp": "2025-01-15T10:30:00.000Z"
}

Create Topic Operation

{
  "success": true,
  "action": "kafka_topic_created",
  "topic": "orders",
  "partitions": 3,
  "replicationFactor": 2,
  "timestamp": "2025-01-15T10:30:00.000Z"
}

List Topics Operation

{
  "success": true,
  "action": "kafka_topics_listed",
  "topics": ["example-topic-1", "example-topic-2", "logs"],
  "brokers": ["localhost:9092"],
  "timestamp": "2025-01-15T10:30:00.000Z"
}

Conceitos Kafka

Topic

Canal onde mensagens são publicadas (como tabela de database).

Partition

Subdivisão de topic para paralelismo e escalabilidade.

Broker

Servidor Kafka que armazena e serve dados.

Consumer Group

Grupo de consumers que dividem consumo de partições.

Offset

Posição sequencial de mensagem em partição.

Replication Factor

Número de cópias de cada partição (para tolerância a falhas).

Boas Práticas

SIM:

  • Use múltiplas partições para alta throughput
  • Defina retention policy apropriado para seus dados
  • Use consumer groups para escalabilidade
  • Monitore lag de consumers
  • Planeje schema de mensagens (ex: Avro, JSON Schema)

NÃO:

  • Não crie topics com partição única para alto volume
  • Não ignore offsets - pode processar duplicados
  • Não produza mensagens > 1MB (configure max.message.bytes)
  • Não delete topics em produção sem backup
  • Não use Kafka como database de longo prazo

Dicas

💡 Produção: Esta implementação é simulada. Em produção, instale kafkajs no projeto

💡 Partitioning: Mais partições = mais paralelismo, mas mais overhead

💡 Key: Use message key para garantir ordem dentro de partição

💡 Monitoring: Use ferramentas como Kafka Manager, Confluent Control Center

💡 Schemas: Use Schema Registry para evolução de schemas

Próximo Node

RABBITMQ - Message queue tradicional → AWS_KINESIS - Streaming gerenciado da AWS → REDIS - Pub/sub leve