Pular para conteúdo

RABBITMQ - Message Queue Operations

O que é este Node?

O RABBITMQ é o node responsável por integrar com RabbitMQ message broker para publicar mensagens, consumir filas, criar/gerenciar filas e obter informações de filas.

Por que este Node existe?

Sistemas distribuídos precisam comunicação assíncrona confiável. O RABBITMQ existe para:

  1. Comunicação assíncrona: Desacoplar sistemas que não precisam se comunicar em tempo real
  2. Filas de trabalho: Distribuir tarefas entre múltiplos workers
  3. Garantia de entrega: Mensagens não se perdem mesmo se consumer estiver offline
  4. Load balancing: Distribuir carga entre múltiplos consumers
  5. Event-driven architecture: Implementar arquitetura orientada a eventos

Como funciona internamente?

Quando o RABBITMQ é executado, o sistema:

  1. Identifica operação: publish, consume, create_queue, delete_queue, get_queue_info, purge_queue
  2. Valida parâmetros: Nome da fila, exchange, routing key conforme operação
  3. Executa operação: Simula operação RabbitMQ (produção requer amqplib)
  4. Retorna resultado: Confirmação da operação com detalhes
  5. Se erro: Lança exceção

Código interno (infrastructure-executor.service.ts:78-105):

private async executeRabbitMQ(parameters: any, context: any): Promise<any> {
  const { operation, queueName, exchange, routingKey, message, connectionUrl } = parameters;

  this.logger.log(`🐰 RABBITMQ - Operation: ${operation}, Queue: ${queueName}`);

  switch (operation) {
    case 'publish':
      return this.publishToQueue(queueName, message, exchange, routingKey, connectionUrl, context);

    case 'consume':
      return this.consumeFromQueue(queueName, connectionUrl, context);

    case 'create_queue':
      return this.createQueue(queueName, parameters.options, connectionUrl, context);

    case 'delete_queue':
      return this.deleteQueue(queueName, connectionUrl, context);

    case 'get_queue_info':
      return this.getQueueInfo(queueName, connectionUrl, context);

    case 'purge_queue':
      return this.purgeQueue(queueName, connectionUrl, context);

    default:
      throw new Error(`Unsupported RabbitMQ operation: ${operation}`);
  }
}

Quando você DEVE usar este Node?

Use RABBITMQ quando precisar comunicação assíncrona confiável:

Casos de uso

  1. Processamento em background: Enviar tarefas pesadas para workers
  2. Notificações assíncronas: Enviar emails, SMS sem bloquear flow
  3. Integração de sistemas: Comunicar sistemas diferentes de forma desacoplada
  4. Event sourcing: Publicar eventos que múltiplos sistemas consomem
  5. Rate limiting: Controlar taxa de processamento via filas

Quando NÃO usar RABBITMQ

  • Kafka: Use KAFKA para streaming de grandes volumes (>100k msgs/s)
  • SQS: Use AWS SQS se já está na AWS
  • Sync API: Use WEBHOOK se resposta imediata é necessária

Parâmetros Detalhados

operation (string, obrigatório)

O que é: Tipo de operação a ser executada no RabbitMQ.

Valores válidos: - publish: Publicar mensagem em fila - consume: Consumir mensagens de fila - create_queue: Criar nova fila - delete_queue: Deletar fila - get_queue_info: Obter informações da fila - purge_queue: Limpar todas as mensagens da fila

Flow completo para testar:

{
  "name": "Teste RABBITMQ - Publish",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "rabbitmq_1",
      "type": "rabbitmq",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Publicar Mensagem",
        "parameters": {
          "operation": "publish",
          "queueName": "notifications",
          "message": { "type": "email", "to": "user@example.com" },
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Mensagem publicada na fila!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "rabbitmq_1" },
    { "source": "rabbitmq_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Mensagem é publicada na fila RabbitMQ.

queueName (string, obrigatório)

O que é: Nome da fila RabbitMQ.

Flow completo para testar:

{
  "name": "Teste RABBITMQ - Queue Name",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "rabbitmq_1",
      "type": "rabbitmq",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Criar Fila",
        "parameters": {
          "operation": "create_queue",
          "queueName": "email-queue",
          "options": { "durable": true },
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Fila 'email-queue' criada!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "rabbitmq_1" },
    { "source": "rabbitmq_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Fila é criada com nome especificado.

message (any, obrigatório para publish)

O que é: Conteúdo da mensagem a ser publicada (objeto, string, etc).

Flow completo para testar:

{
  "name": "Teste RABBITMQ - Message",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "input_1",
      "type": "input",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Pedir Email",
        "parameters": {
          "message": "Email do destinatário:",
          "variable": "email"
        }
      }
    },
    {
      "id": "rabbitmq_1",
      "type": "rabbitmq",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Enfileirar Email",
        "parameters": {
          "operation": "publish",
          "queueName": "email-queue",
          "message": {
            "to": "{{email}}",
            "subject": "Bem-vindo!",
            "body": "Obrigado por se cadastrar",
            "timestamp": "{{now}}"
          },
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Email enfileirado para {{email}}"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 900, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "input_1" },
    { "source": "input_1", "target": "rabbitmq_1" },
    { "source": "rabbitmq_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Mensagem estruturada é publicada na fila.

exchange (string, opcional)

O que é: Nome do exchange RabbitMQ (padrão: 'default').

Flow completo para testar:

{
  "name": "Teste RABBITMQ - Exchange",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "rabbitmq_1",
      "type": "rabbitmq",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Publicar com Exchange",
        "parameters": {
          "operation": "publish",
          "queueName": "logs",
          "exchange": "logs-exchange",
          "routingKey": "error",
          "message": { "level": "error", "msg": "System failure" },
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Log publicado no exchange!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "rabbitmq_1" },
    { "source": "rabbitmq_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: Mensagem é roteada via exchange especificado.

routingKey (string, opcional)

O que é: Chave de roteamento para exchanges (padrão: nome da fila).

connectionUrl (string, opcional)

O que é: URL de conexão do RabbitMQ (padrão: 'amqp://localhost').

options (object, opcional)

O que é: Opções ao criar fila (ex: { durable: true }).

Parâmetros

Campo Tipo Obrigatório Descrição
operation string Sim publish, consume, create_queue, delete_queue, get_queue_info, purge_queue
queueName string Sim Nome da fila
message any Sim* Mensagem a publicar (*para publish)
exchange string Não Nome do exchange (padrão: default)
routingKey string Não Chave de roteamento (padrão: queueName)
connectionUrl string Não URL do RabbitMQ (padrão: amqp://localhost)
options object Não Opções da fila (ex: durable, autoDelete)

Exemplo 1: Enfileirar Notificações

Objetivo: Publicar notificações para processamento assíncrono

JSON para Importar

{
  "name": "RABBITMQ - Enfileirar Notificação",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "input_1",
      "type": "input",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Pedir Nome",
        "parameters": {
          "message": "Qual seu nome?",
          "variable": "nome"
        }
      }
    },
    {
      "id": "rabbitmq_1",
      "type": "rabbitmq",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Enfileirar Welcome",
        "parameters": {
          "operation": "publish",
          "queueName": "welcome-emails",
          "message": {
            "type": "welcome",
            "userName": "{{nome}}",
            "timestamp": "{{now}}",
            "priority": "high"
          },
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Olá {{nome}}! Email de boas-vindas será enviado em breve."
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 900, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "input_1" },
    { "source": "input_1", "target": "rabbitmq_1" },
    { "source": "rabbitmq_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: Qual seu nome?
Usuário: Maria
Sistema: Olá Maria! Email de boas-vindas será enviado em breve.

Exemplo 2: Obter Info da Fila

Objetivo: Monitorar status de uma fila

JSON para Importar

{
  "name": "RABBITMQ - Info da Fila",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "rabbitmq_1",
      "type": "rabbitmq",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Checar Fila",
        "parameters": {
          "operation": "get_queue_info",
          "queueName": "email-queue",
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Mostrar Status",
        "parameters": {
          "message": "Fila: email-queue\nMensagens: {{rabbitmq_1.info.messageCount}}\nConsumers: {{rabbitmq_1.info.consumerCount}}"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "rabbitmq_1" },
    { "source": "rabbitmq_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: Fila: email-queue
Mensagens: 42
Consumers: 3

Exemplo 3: Criar e Limpar Fila

Objetivo: Gerenciar ciclo de vida de filas

JSON para Importar

{
  "name": "RABBITMQ - Gerenciar Fila",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "rabbitmq_1",
      "type": "rabbitmq",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Criar Fila",
        "parameters": {
          "operation": "create_queue",
          "queueName": "temp-processing",
          "options": { "durable": true, "autoDelete": false },
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar Criação",
        "parameters": {
          "message": "Fila criada! Publicando mensagens..."
        }
      }
    },
    {
      "id": "rabbitmq_2",
      "type": "rabbitmq",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Limpar Fila",
        "parameters": {
          "operation": "purge_queue",
          "queueName": "temp-processing",
          "connectionUrl": "amqp://localhost"
        }
      }
    },
    {
      "id": "message_2",
      "type": "message",
      "position": { "x": 900, "y": 100 },
      "data": {
        "label": "Confirmar Limpeza",
        "parameters": {
          "message": "Fila limpa! Pronta para uso."
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 1100, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "rabbitmq_1" },
    { "source": "rabbitmq_1", "target": "message_1" },
    { "source": "message_1", "target": "rabbitmq_2" },
    { "source": "rabbitmq_2", "target": "message_2" },
    { "source": "message_2", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: Fila criada! Publicando mensagens...
Sistema: Fila limpa! Pronta para uso.

Resposta do Node

Publish Operation

{
  "success": true,
  "action": "rabbitmq_message_published",
  "queue": "notifications",
  "exchange": "default",
  "routingKey": "notifications",
  "message": { "type": "email", "to": "user@example.com" },
  "timestamp": "2025-01-15T10:30:00.000Z"
}

Get Queue Info Operation

{
  "success": true,
  "action": "rabbitmq_queue_info_retrieved",
  "queue": "email-queue",
  "info": {
    "messageCount": 42,
    "consumerCount": 3,
    "durable": true
  },
  "timestamp": "2025-01-15T10:30:00.000Z"
}

Conceitos RabbitMQ

Queue (Fila)

Buffer que armazena mensagens até serem consumidas.

Exchange

Roteia mensagens para filas baseado em regras.

Routing Key

Chave usada pelo exchange para rotear mensagens.

Durable

Fila sobrevive a reinicialização do broker.

Consumer

Aplicação que lê mensagens da fila.

Boas Práticas

SIM:

  • Use filas duráveis para mensagens importantes
  • Nomeie filas de forma descritiva (ex: email-notifications)
  • Implemente dead letter queues para mensagens com erro
  • Monitore tamanho das filas regularmente
  • Use acknowledgments para garantir processamento

NÃO:

  • Não acumule milhões de mensagens em uma fila
  • Não use RabbitMQ para dados que precisam ser persistidos permanentemente
  • Não publique mensagens gigantes (>1MB)
  • Não ignore mensagens com erro (implemente retry)

Dicas

💡 Produção: Esta implementação é simulada. Em produção, instale amqplib no projeto

💡 Monitoring: Use RabbitMQ Management UI para monitorar filas

💡 Patterns: Estude padrões: Work Queues, Pub/Sub, Routing, Topics, RPC

💡 Scaling: Múltiplos consumers da mesma fila = load balancing automático

Próximo Node

KAFKA - Apache Kafka para streaming → AWS_SQS - Fila gerenciada da AWS → REDIS - Cache e pub/sub