RABBITMQ - Message Queue Operations
O que é este Node?
O RABBITMQ é o node responsável por integrar com RabbitMQ message broker para publicar mensagens, consumir filas, criar/gerenciar filas e obter informações de filas.
Por que este Node existe?
Sistemas distribuídos precisam comunicação assíncrona confiável. O RABBITMQ existe para:
- Comunicação assíncrona: Desacoplar sistemas que não precisam se comunicar em tempo real
- Filas de trabalho: Distribuir tarefas entre múltiplos workers
- Garantia de entrega: Mensagens não se perdem mesmo se consumer estiver offline
- Load balancing: Distribuir carga entre múltiplos consumers
- Event-driven architecture: Implementar arquitetura orientada a eventos
Como funciona internamente?
Quando o RABBITMQ é executado, o sistema:
- Identifica operação: publish, consume, create_queue, delete_queue, get_queue_info, purge_queue
- Valida parâmetros: Nome da fila, exchange, routing key conforme operação
- Executa operação: Simula operação RabbitMQ (produção requer amqplib)
- Retorna resultado: Confirmação da operação com detalhes
- Se erro: Lança exceção
Código interno (infrastructure-executor.service.ts:78-105):
private async executeRabbitMQ(parameters: any, context: any): Promise<any> {
const { operation, queueName, exchange, routingKey, message, connectionUrl } = parameters;
this.logger.log(`🐰 RABBITMQ - Operation: ${operation}, Queue: ${queueName}`);
switch (operation) {
case 'publish':
return this.publishToQueue(queueName, message, exchange, routingKey, connectionUrl, context);
case 'consume':
return this.consumeFromQueue(queueName, connectionUrl, context);
case 'create_queue':
return this.createQueue(queueName, parameters.options, connectionUrl, context);
case 'delete_queue':
return this.deleteQueue(queueName, connectionUrl, context);
case 'get_queue_info':
return this.getQueueInfo(queueName, connectionUrl, context);
case 'purge_queue':
return this.purgeQueue(queueName, connectionUrl, context);
default:
throw new Error(`Unsupported RabbitMQ operation: ${operation}`);
}
}
Quando você DEVE usar este Node?
Use RABBITMQ quando precisar comunicação assíncrona confiável:
Casos de uso
- Processamento em background: Enviar tarefas pesadas para workers
- Notificações assíncronas: Enviar emails, SMS sem bloquear flow
- Integração de sistemas: Comunicar sistemas diferentes de forma desacoplada
- Event sourcing: Publicar eventos que múltiplos sistemas consomem
- Rate limiting: Controlar taxa de processamento via filas
Quando NÃO usar RABBITMQ
- Kafka: Use KAFKA para streaming de grandes volumes (>100k msgs/s)
- SQS: Use AWS SQS se já está na AWS
- Sync API: Use WEBHOOK se resposta imediata é necessária
Parâmetros Detalhados
operation (string, obrigatório)
O que é: Tipo de operação a ser executada no RabbitMQ.
Valores válidos:
- publish: Publicar mensagem em fila
- consume: Consumir mensagens de fila
- create_queue: Criar nova fila
- delete_queue: Deletar fila
- get_queue_info: Obter informações da fila
- purge_queue: Limpar todas as mensagens da fila
Flow completo para testar:
{
"name": "Teste RABBITMQ - Publish",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "rabbitmq_1",
"type": "rabbitmq",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Publicar Mensagem",
"parameters": {
"operation": "publish",
"queueName": "notifications",
"message": { "type": "email", "to": "user@example.com" },
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Mensagem publicada na fila!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "rabbitmq_1" },
{ "source": "rabbitmq_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Mensagem é publicada na fila RabbitMQ.
queueName (string, obrigatório)
O que é: Nome da fila RabbitMQ.
Flow completo para testar:
{
"name": "Teste RABBITMQ - Queue Name",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "rabbitmq_1",
"type": "rabbitmq",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Criar Fila",
"parameters": {
"operation": "create_queue",
"queueName": "email-queue",
"options": { "durable": true },
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Fila 'email-queue' criada!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "rabbitmq_1" },
{ "source": "rabbitmq_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Fila é criada com nome especificado.
message (any, obrigatório para publish)
O que é: Conteúdo da mensagem a ser publicada (objeto, string, etc).
Flow completo para testar:
{
"name": "Teste RABBITMQ - Message",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "input_1",
"type": "input",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Pedir Email",
"parameters": {
"message": "Email do destinatário:",
"variable": "email"
}
}
},
{
"id": "rabbitmq_1",
"type": "rabbitmq",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Enfileirar Email",
"parameters": {
"operation": "publish",
"queueName": "email-queue",
"message": {
"to": "{{email}}",
"subject": "Bem-vindo!",
"body": "Obrigado por se cadastrar",
"timestamp": "{{now}}"
},
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 700, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Email enfileirado para {{email}}"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 900, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "input_1" },
{ "source": "input_1", "target": "rabbitmq_1" },
{ "source": "rabbitmq_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Mensagem estruturada é publicada na fila.
exchange (string, opcional)
O que é: Nome do exchange RabbitMQ (padrão: 'default').
Flow completo para testar:
{
"name": "Teste RABBITMQ - Exchange",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "rabbitmq_1",
"type": "rabbitmq",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Publicar com Exchange",
"parameters": {
"operation": "publish",
"queueName": "logs",
"exchange": "logs-exchange",
"routingKey": "error",
"message": { "level": "error", "msg": "System failure" },
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Log publicado no exchange!"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "rabbitmq_1" },
{ "source": "rabbitmq_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Teste: Mensagem é roteada via exchange especificado.
routingKey (string, opcional)
O que é: Chave de roteamento para exchanges (padrão: nome da fila).
connectionUrl (string, opcional)
O que é: URL de conexão do RabbitMQ (padrão: 'amqp://localhost').
options (object, opcional)
O que é: Opções ao criar fila (ex: { durable: true }).
Parâmetros
| Campo | Tipo | Obrigatório | Descrição |
|---|---|---|---|
| operation | string | Sim | publish, consume, create_queue, delete_queue, get_queue_info, purge_queue |
| queueName | string | Sim | Nome da fila |
| message | any | Sim* | Mensagem a publicar (*para publish) |
| exchange | string | Não | Nome do exchange (padrão: default) |
| routingKey | string | Não | Chave de roteamento (padrão: queueName) |
| connectionUrl | string | Não | URL do RabbitMQ (padrão: amqp://localhost) |
| options | object | Não | Opções da fila (ex: durable, autoDelete) |
Exemplo 1: Enfileirar Notificações
Objetivo: Publicar notificações para processamento assíncrono
JSON para Importar
{
"name": "RABBITMQ - Enfileirar Notificação",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "input_1",
"type": "input",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Pedir Nome",
"parameters": {
"message": "Qual seu nome?",
"variable": "nome"
}
}
},
{
"id": "rabbitmq_1",
"type": "rabbitmq",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Enfileirar Welcome",
"parameters": {
"operation": "publish",
"queueName": "welcome-emails",
"message": {
"type": "welcome",
"userName": "{{nome}}",
"timestamp": "{{now}}",
"priority": "high"
},
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 700, "y": 100 },
"data": {
"label": "Confirmar",
"parameters": {
"message": "Olá {{nome}}! Email de boas-vindas será enviado em breve."
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 900, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "input_1" },
{ "source": "input_1", "target": "rabbitmq_1" },
{ "source": "rabbitmq_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Saída esperada:
Sistema: Qual seu nome?
Usuário: Maria
Sistema: Olá Maria! Email de boas-vindas será enviado em breve.
Exemplo 2: Obter Info da Fila
Objetivo: Monitorar status de uma fila
JSON para Importar
{
"name": "RABBITMQ - Info da Fila",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "rabbitmq_1",
"type": "rabbitmq",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Checar Fila",
"parameters": {
"operation": "get_queue_info",
"queueName": "email-queue",
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Mostrar Status",
"parameters": {
"message": "Fila: email-queue\nMensagens: {{rabbitmq_1.info.messageCount}}\nConsumers: {{rabbitmq_1.info.consumerCount}}"
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 700, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "rabbitmq_1" },
{ "source": "rabbitmq_1", "target": "message_1" },
{ "source": "message_1", "target": "end_1" }
]
}
Saída esperada:
Sistema: Fila: email-queue
Mensagens: 42
Consumers: 3
Exemplo 3: Criar e Limpar Fila
Objetivo: Gerenciar ciclo de vida de filas
JSON para Importar
{
"name": "RABBITMQ - Gerenciar Fila",
"nodes": [
{
"id": "start_1",
"type": "start",
"position": { "x": 100, "y": 100 },
"data": { "label": "Início" }
},
{
"id": "rabbitmq_1",
"type": "rabbitmq",
"position": { "x": 300, "y": 100 },
"data": {
"label": "Criar Fila",
"parameters": {
"operation": "create_queue",
"queueName": "temp-processing",
"options": { "durable": true, "autoDelete": false },
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_1",
"type": "message",
"position": { "x": 500, "y": 100 },
"data": {
"label": "Confirmar Criação",
"parameters": {
"message": "Fila criada! Publicando mensagens..."
}
}
},
{
"id": "rabbitmq_2",
"type": "rabbitmq",
"position": { "x": 700, "y": 100 },
"data": {
"label": "Limpar Fila",
"parameters": {
"operation": "purge_queue",
"queueName": "temp-processing",
"connectionUrl": "amqp://localhost"
}
}
},
{
"id": "message_2",
"type": "message",
"position": { "x": 900, "y": 100 },
"data": {
"label": "Confirmar Limpeza",
"parameters": {
"message": "Fila limpa! Pronta para uso."
}
}
},
{
"id": "end_1",
"type": "end",
"position": { "x": 1100, "y": 100 },
"data": { "label": "Fim" }
}
],
"edges": [
{ "source": "start_1", "target": "rabbitmq_1" },
{ "source": "rabbitmq_1", "target": "message_1" },
{ "source": "message_1", "target": "rabbitmq_2" },
{ "source": "rabbitmq_2", "target": "message_2" },
{ "source": "message_2", "target": "end_1" }
]
}
Saída esperada:
Sistema: Fila criada! Publicando mensagens...
Sistema: Fila limpa! Pronta para uso.
Resposta do Node
Publish Operation
{
"success": true,
"action": "rabbitmq_message_published",
"queue": "notifications",
"exchange": "default",
"routingKey": "notifications",
"message": { "type": "email", "to": "user@example.com" },
"timestamp": "2025-01-15T10:30:00.000Z"
}
Get Queue Info Operation
{
"success": true,
"action": "rabbitmq_queue_info_retrieved",
"queue": "email-queue",
"info": {
"messageCount": 42,
"consumerCount": 3,
"durable": true
},
"timestamp": "2025-01-15T10:30:00.000Z"
}
Conceitos RabbitMQ
Queue (Fila)
Buffer que armazena mensagens até serem consumidas.
Exchange
Roteia mensagens para filas baseado em regras.
Routing Key
Chave usada pelo exchange para rotear mensagens.
Durable
Fila sobrevive a reinicialização do broker.
Consumer
Aplicação que lê mensagens da fila.
Boas Práticas
✅ SIM:
- Use filas duráveis para mensagens importantes
- Nomeie filas de forma descritiva (ex: email-notifications)
- Implemente dead letter queues para mensagens com erro
- Monitore tamanho das filas regularmente
- Use acknowledgments para garantir processamento
❌ NÃO:
- Não acumule milhões de mensagens em uma fila
- Não use RabbitMQ para dados que precisam ser persistidos permanentemente
- Não publique mensagens gigantes (>1MB)
- Não ignore mensagens com erro (implemente retry)
Dicas
💡 Produção: Esta implementação é simulada. Em produção, instale amqplib no projeto
💡 Monitoring: Use RabbitMQ Management UI para monitorar filas
💡 Patterns: Estude padrões: Work Queues, Pub/Sub, Routing, Topics, RPC
💡 Scaling: Múltiplos consumers da mesma fila = load balancing automático
Próximo Node
→ KAFKA - Apache Kafka para streaming → AWS_SQS - Fila gerenciada da AWS → REDIS - Cache e pub/sub