Pular para conteúdo

SQS RECEIVE - Receber Mensagens da Fila

O que é este Node?

O SQS RECEIVE é o node responsável por consumir mensagens de filas do AWS SQS para processamento.

Por que este Node existe?

Consumir mensagens de filas de forma controlada é essencial. O SQS RECEIVE existe para:

  1. Polling controlado: Buscar mensagens da fila quando pronto para processar
  2. Long polling: Reduzir custos esperando mensagens chegarem
  3. Batch processing: Processar múltiplas mensagens de uma vez
  4. Visibility timeout: Garantir que mensagem não seja processada em duplicidade
  5. Controle de fluxo: Processar no ritmo adequado
  6. Escalabilidade: Múltiplos consumidores podem processar em paralelo

Como funciona internamente?

Quando o SQS RECEIVE é executado, o sistema:

  1. Autentica na AWS: Usa accessKeyId, secretAccessKey e region
  2. Faz polling na fila: Busca mensagens disponíveis
  3. Aplica long polling: Se waitTimeSeconds > 0, aguarda mensagens
  4. Marca como invisível: Aplica visibility timeout
  5. Retorna mensagens: Array com 0 a maxMessages
  6. Se vazio: Retorna array vazio []
  7. Se erro: Lança exceção

Código interno (aws-executors.service.ts:117-127):

case 'receiveMessages':
  const receiveResult = await sqs.receiveMessage({
    QueueUrl: data.queueUrl,
    MaxNumberOfMessages: data.maxMessages || 10,
    WaitTimeSeconds: data.waitTimeSeconds || 0,
    VisibilityTimeout: data.visibilityTimeout,
  }).promise();
  return {
    success: true,
    messages: receiveResult.Messages || [],
  };

Quando você DEVE usar este Node?

Use SQS RECEIVE sempre que precisar consumir mensagens assíncronas:

Casos de uso:

  1. Worker de processamento: Consumir jobs de uma fila
  2. Event processor: Processar eventos do sistema
  3. Batch jobs: Processar lotes de tarefas
  4. Background tasks: Executar tarefas agendadas
  5. Data pipeline: Processar dados em etapas
  6. Webhook consumer: Processar webhooks recebidos
  7. Email sender: Processar fila de emails

Quando NÃO usar SQS RECEIVE:

  • Trigger automático: Use Lambda com SQS trigger ao invés de polling manual
  • Real-time streaming: Use Kinesis para streaming contínuo
  • Respostas síncronas: Não use filas para respostas imediatas

Parâmetros Detalhados

queueUrl (string, obrigatório)

O que é: URL completa da fila SQS de onde as mensagens serão lidas.

Flow completo para testar:

{
  "name": "Teste SQS Receive - Queue URL",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "sqs_1",
      "type": "aws_sqs",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Buscar Mensagens",
        "parameters": {
          "operation": "receiveMessages",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Mostrar Resultado",
        "parameters": {
          "message": "📥 Mensagens recebidas: {{messages}}"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "sqs_1" },
    { "source": "sqs_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

maxMessages (number, opcional)

O que é: Número máximo de mensagens a receber em uma única chamada.

Padrão: 10

Limite: 1 a 10 (AWS limita a 10 por request)

Flow completo para testar:

{
  "name": "Teste SQS Receive - Max Messages",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Informar",
        "parameters": {
          "message": "🔄 Processando batch de mensagens..."
        }
      }
    },
    {
      "id": "sqs_1",
      "type": "aws_sqs",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Receber Batch",
        "parameters": {
          "operation": "receiveMessages",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{sqs_queue_url}}",
          "maxMessages": 5
        }
      }
    },
    {
      "id": "message_2",
      "type": "message",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Resultado",
        "parameters": {
          "message": "✅ Recebidas {{messages.length}} mensagens"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 900, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "message_1" },
    { "source": "message_1", "target": "sqs_1" },
    { "source": "sqs_1", "target": "message_2" },
    { "source": "message_2", "target": "end_1" }
  ]
}

Teste: Receberá até 5 mensagens por vez, mesmo se houver mais na fila.

waitTimeSeconds (number, opcional)

O que é: Tempo de long polling - aguarda mensagens chegarem antes de retornar vazio.

Padrão: 0 (short polling - retorna imediatamente)

Limite: 0 a 20 segundos

Benefício: Reduz custos em até 50% (menos requests vazias)

Flow completo para testar:

{
  "name": "Teste SQS Receive - Long Polling",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Informar",
        "parameters": {
          "message": "⏳ Aguardando mensagens (long polling 20s)..."
        }
      }
    },
    {
      "id": "sqs_1",
      "type": "aws_sqs",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Receber com Long Polling",
        "parameters": {
          "operation": "receiveMessages",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{sqs_queue_url}}",
          "maxMessages": 10,
          "waitTimeSeconds": 20
        }
      }
    },
    {
      "id": "condition_1",
      "type": "condition",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Tem Mensagens?",
        "parameters": {
          "variable": "messages.length",
          "operator": ">",
          "value": "0"
        }
      }
    },
    {
      "id": "message_2",
      "type": "message",
      "position": { "x": 900, "y": 50 },
      "data": {
        "label": "Processar",
        "parameters": {
          "message": "✅ Processando {{messages.length}} mensagens"
        }
      }
    },
    {
      "id": "message_3",
      "type": "message",
      "position": { "x": 900, "y": 150 },
      "data": {
        "label": "Vazio",
        "parameters": {
          "message": "📭 Nenhuma mensagem na fila"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 1100, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "message_1" },
    { "source": "message_1", "target": "sqs_1" },
    { "source": "sqs_1", "target": "condition_1" },
    { "source": "condition_1", "target": "message_2", "label": "true" },
    { "source": "condition_1", "target": "message_3", "label": "false" },
    { "source": "message_2", "target": "end_1" },
    { "source": "message_3", "target": "end_1" }
  ]
}

Teste: Se fila vazia, aguarda até 20s antes de retornar vazio. Economiza requests!

visibilityTimeout (number, opcional)

O que é: Tempo (segundos) que mensagem fica invisível após ser lida (previne duplicatas).

Padrão: Configuração da fila (30s por padrão)

Limite: 0 a 43200 segundos (12 horas)

Como funciona: - Mensagem lida → fica invisível por N segundos - Se não deletada → volta para fila após timeout - Se processada → delete antes do timeout

Flow completo para testar:

{
  "name": "Teste SQS Receive - Visibility Timeout",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "sqs_1",
      "type": "aws_sqs",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Receber Mensagem",
        "parameters": {
          "operation": "receiveMessages",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{sqs_queue_url}}",
          "maxMessages": 1,
          "visibilityTimeout": 300
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Processar",
        "parameters": {
          "message": "⚙️ Processando mensagem...\n⏰ Invisível por 5 minutos"
        }
      }
    },
    {
      "id": "delay_1",
      "type": "delay",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Simular Processamento",
        "parameters": {
          "seconds": 30
        }
      }
    },
    {
      "id": "sqs_2",
      "type": "aws_sqs",
      "position": { "x": 900, "y": 100 },
      "data": {
        "label": "Deletar Mensagem",
        "parameters": {
          "operation": "deleteMessage",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{sqs_queue_url}}",
          "receiptHandle": "{{messages[0].ReceiptHandle}}"
        }
      }
    },
    {
      "id": "message_2",
      "type": "message",
      "position": { "x": 1100, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "✅ Mensagem processada e deletada!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 1300, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "sqs_1" },
    { "source": "sqs_1", "target": "message_1" },
    { "source": "message_1", "target": "delay_1" },
    { "source": "delay_1", "target": "sqs_2" },
    { "source": "sqs_2", "target": "message_2" },
    { "source": "message_2", "target": "end_1" }
  ]
}

Teste: Mensagem fica invisível por 5 minutos. Se processar e deletar antes, não volta.

Parâmetros

Campo Tipo Obrigatório Descrição
operation string Sim Deve ser "receiveMessages"
accessKeyId string Sim AWS Access Key ID
secretAccessKey string Sim AWS Secret Access Key
region string Sim Região AWS
queueUrl string Sim URL da fila SQS
maxMessages number Não Máx msgs (1-10, padrão: 10)
waitTimeSeconds number Não Long polling (0-20s, padrão: 0)
visibilityTimeout number Não Timeout invisibilidade (0-43200s)

Exemplo 1: Worker de Processamento

Objetivo: Consumir e processar mensagens de uma fila

JSON para Importar

{
  "name": "Worker de Processamento",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Status",
        "parameters": {
          "message": "🔄 Worker iniciado - buscando mensagens..."
        }
      }
    },
    {
      "id": "sqs_1",
      "type": "aws_sqs",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Receber Mensagens",
        "parameters": {
          "operation": "receiveMessages",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{worker_queue_url}}",
          "maxMessages": 3,
          "waitTimeSeconds": 20,
          "visibilityTimeout": 120
        }
      }
    },
    {
      "id": "condition_1",
      "type": "condition",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Tem Mensagens?",
        "parameters": {
          "variable": "messages.length",
          "operator": ">",
          "value": "0"
        }
      }
    },
    {
      "id": "message_2",
      "type": "message",
      "position": { "x": 900, "y": 50 },
      "data": {
        "label": "Processar",
        "parameters": {
          "message": "⚙️ Processando {{messages.length}} mensagens...\n\nMensagem 1: {{messages[0].Body}}"
        }
      }
    },
    {
      "id": "sqs_2",
      "type": "aws_sqs",
      "position": { "x": 1100, "y": 50 },
      "data": {
        "label": "Deletar Processada",
        "parameters": {
          "operation": "deleteMessage",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{worker_queue_url}}",
          "receiptHandle": "{{messages[0].ReceiptHandle}}"
        }
      }
    },
    {
      "id": "message_3",
      "type": "message",
      "position": { "x": 1300, "y": 50 },
      "data": {
        "label": "Sucesso",
        "parameters": {
          "message": "✅ Mensagens processadas com sucesso!"
        }
      }
    },
    {
      "id": "message_4",
      "type": "message",
      "position": { "x": 900, "y": 150 },
      "data": {
        "label": "Vazio",
        "parameters": {
          "message": "📭 Nenhuma mensagem na fila no momento"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 1500, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "message_1" },
    { "source": "message_1", "target": "sqs_1" },
    { "source": "sqs_1", "target": "condition_1" },
    { "source": "condition_1", "target": "message_2", "label": "true" },
    { "source": "condition_1", "target": "message_4", "label": "false" },
    { "source": "message_2", "target": "sqs_2" },
    { "source": "sqs_2", "target": "message_3" },
    { "source": "message_3", "target": "end_1" },
    { "source": "message_4", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: 🔄 Worker iniciado - buscando mensagens...
[aguarda até 20s]
Sistema: ⚙️ Processando 3 mensagens...

Mensagem 1: {"orderId": "123", "action": "process"}
Sistema: ✅ Mensagens processadas com sucesso!

Exemplo 2: Email Processor

Objetivo: Processar fila de emails para envio

JSON para Importar

{
  "name": "Email Processor",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "sqs_1",
      "type": "aws_sqs",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Buscar Emails",
        "parameters": {
          "operation": "receiveMessages",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{email_queue_url}}",
          "maxMessages": 5,
          "waitTimeSeconds": 10,
          "visibilityTimeout": 60
        }
      }
    },
    {
      "id": "variable_1",
      "type": "variable",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Extrair Dados",
        "parameters": {
          "variable": "emailData",
          "value": "{{messages[0].Body}}"
        }
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 700, "y": 100 },
      "data": {
        "label": "Info Email",
        "parameters": {
          "message": "📧 Enviando email para: {{emailData.to}}\nAssunto: {{emailData.subject}}"
        }
      }
    },
    {
      "id": "sqs_2",
      "type": "aws_sqs",
      "position": { "x": 900, "y": 100 },
      "data": {
        "label": "Deletar da Fila",
        "parameters": {
          "operation": "deleteMessage",
          "accessKeyId": "{{aws_access_key}}",
          "secretAccessKey": "{{aws_secret_key}}",
          "region": "us-east-1",
          "queueUrl": "{{email_queue_url}}",
          "receiptHandle": "{{messages[0].ReceiptHandle}}"
        }
      }
    },
    {
      "id": "message_2",
      "type": "message",
      "position": { "x": 1100, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "✅ Email enviado e removido da fila!"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 1300, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "sqs_1" },
    { "source": "sqs_1", "target": "variable_1" },
    { "source": "variable_1", "target": "message_1" },
    { "source": "message_1", "target": "sqs_2" },
    { "source": "sqs_2", "target": "message_2" },
    { "source": "message_2", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: 📧 Enviando email para: jose@example.com
Assunto: Bem-vindo!
Sistema: ✅ Email enviado e removido da fila!

Resposta do Node

{
  "success": true,
  "messages": [
    {
      "MessageId": "f8a7d2e1-c5b4-4d6a-9e8c-1f2a3b4c5d6e",
      "ReceiptHandle": "AQEBzW...handle-string...xyz==",
      "Body": "{\"orderId\":\"123\",\"action\":\"process\"}",
      "MD5OfBody": "7d8b5e2a...",
      "Attributes": {
        "SentTimestamp": "1642512000000",
        "ApproximateReceiveCount": "1"
      },
      "MessageAttributes": {
        "Priority": {
          "StringValue": "high",
          "DataType": "String"
        }
      }
    }
  ]
}

Estrutura da Mensagem

Cada mensagem recebida contém:

{
  MessageId: string;           // ID único
  ReceiptHandle: string;       // Handle para deletar
  Body: string;                // Conteúdo (JSON string)
  MD5OfBody: string;          // Checksum
  Attributes: {
    SentTimestamp: string;     // Quando foi enviada
    ApproximateReceiveCount: string;  // Tentativas
    ApproximateFirstReceiveTimestamp: string;
  };
  MessageAttributes?: object;  // Atributos customizados
}

Boas Práticas

SIM:

  • Use long polling (waitTimeSeconds: 20) para reduzir custos
  • Configure visibilityTimeout > tempo de processamento
  • Delete mensagens após processar com sucesso
  • Implemente retry logic com DLQ
  • Processe em batch quando possível
  • Monitore ApproximateReceiveCount
  • Use try-catch para erros de processamento
  • Configure alarmes CloudWatch

NÃO:

  • Não use short polling desnecessariamente (caro)
  • Não esqueça de deletar mensagens processadas
  • Não configure visibilityTimeout muito curto
  • Não processe mensagens sem idempotência
  • Não ignore ReceiptHandle (necessário para delete)
  • Não deixe mensagens voltarem infinitamente

Fluxo Completo: Receive → Process → Delete

1. receiveMessage()
   ↓ mensagem fica invisível
2. Processar mensagem
   ↓ se sucesso
3. deleteMessage(ReceiptHandle)
   ↓ mensagem removida
4. Fim

Se falhar ou timeout:
   → mensagem volta para fila
   → tentativas incrementam
   → após maxReceiveCount → DLQ

Dicas

💡 ApproximateReceiveCount: Monitore para detectar mensagens problemáticas

💡 ReceiptHandle: Sempre salve para poder deletar depois

💡 Batch Processing: Processe todas do array, não só a primeira

💡 Idempotência: Mensagem pode chegar duplicada, prepare-se

💡 Timeout Strategy: visibility > tempo processamento + margem

Próximo Node

SQS DELETE - Deletar mensagem da fila → SQS SEND - Enviar mensagem para fila → SNS PUBLISH - Publicar em tópico SNS