Pular para conteúdo

MONGODB AGGREGATE - Pipeline de Agregação MongoDB

O que é este Node?

O MONGODB AGGREGATE é o node responsável por executar pipelines de agregação no MongoDB, permitindo transformações complexas, agrupamentos, cálculos e análises de dados.

Por que este Node existe?

Queries simples não são suficientes para análises complexas de dados. O MONGODB AGGREGATE existe para:

  1. Análises complexas: Realizar operações avançadas como GROUP BY, SUM, AVG, COUNT
  2. Transformar dados: Modificar estrutura de documentos com $project, $unwind, $lookup
  3. Múltiplos estágios: Combinar várias operações em um pipeline
  4. Performance otimizada: MongoDB executa pipeline de forma eficiente

Como funciona internamente?

Quando o MONGODB AGGREGATE é executado, o sistema:

  1. Valida configuração: Verifica se as credenciais MongoDB foram fornecidas
  2. Estabelece conexão: Conecta ao servidor MongoDB usando as credenciais
  3. Seleciona coleção: Acessa a coleção especificada no banco de dados
  4. Processa pipeline: Substitui variáveis em todos os estágios do pipeline
  5. Executa aggregate: Passa o pipeline para MongoDB.aggregate()
  6. Processa resultados: Converte cursor em array de documentos
  7. Retorna dados: Documentos transformados e contagem total
  8. Fecha conexão: Encerra a conexão com MongoDB

Código interno (mongodb.executor.ts:208-223):

private async aggregate(
  db: Db,
  collectionName: string,
  pipeline: any[],
  context: ExecutionContext,
): Promise<any> {
  const replacedPipeline = this.replaceObjectVariables(pipeline, context.variables);

  const collection = db.collection(collectionName);
  const documents = await collection.aggregate(replacedPipeline).toArray();

  return {
    documents,
    count: documents.length,
  };
}

Quando você DEVE usar este Node?

Use MONGODB AGGREGATE sempre que precisar de análises e transformações complexas:

Casos de uso

  1. Relatórios: "Total de vendas por categoria e mês"
  2. Agrupamentos: "Contar clientes por cidade e estado"
  3. Joins: "Buscar pedidos com dados completos dos clientes ($lookup)"
  4. Estatísticas: "Calcular média, soma, mínimo, máximo de valores"

Quando NÃO usar MONGODB AGGREGATE

  • Busca simples: Use MONGODB FIND para queries básicas (mais rápido)
  • Buscar 1 documento: Use MONGODB FIND_ONE para buscas diretas
  • Modificar dados: Aggregate é read-only, use UPDATE para modificar

Parâmetros Detalhados

operation (string, obrigatório)

O que é: Define qual operação MongoDB será executada. Para pipelines de agregação, use "aggregate".

Padrão: Nenhum (obrigatório)

Flow completo para testar:

{
  "name": "Teste MongoDB Aggregate - Operation",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "mongodb_1",
      "type": "mongodb",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Agregar Vendas",
        "operation": "aggregate",
        "config": {
          "connectionString": "mongodb://localhost:27017",
          "database": "ecommerce"
        },
        "collection": "pedidos",
        "pipeline": [
          { "$group": { "_id": "$status", "total": { "$sum": 1 } } }
        ],
        "responseVariable": "resultado"
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Mostrar Resultado",
        "parameters": {
          "message": "✅ Agregação executada! {{resultado.count}} grupos"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "mongodb_1" },
    { "source": "mongodb_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: 1. Execute o flow 2. Espere: "✅ Agregação executada! X grupos"

config (object, obrigatório)

O que é: Objeto contendo as credenciais e configurações de conexão com o MongoDB.

Estrutura: - connectionString (string): String de conexão completa MongoDB - OU host, port, user, password: Configuração individual - database (string, obrigatório): Nome do banco de dados

Flow completo para testar:

{
  "name": "Teste MongoDB Aggregate - Config",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "mongodb_1",
      "type": "mongodb",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Agregar com Auth",
        "operation": "aggregate",
        "config": {
          "host": "localhost",
          "port": 27017,
          "database": "analytics",
          "user": "admin",
          "password": "senha123"
        },
        "collection": "eventos",
        "pipeline": [
          { "$match": { "tipo": "click" } },
          { "$count": "total" }
        ],
        "responseVariable": "resultado"
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Confirmar",
        "parameters": {
          "message": "Análise concluída"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "mongodb_1" },
    { "source": "mongodb_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: 1. Configure MongoDB com autenticação 2. Execute o flow 3. Espere: "Análise concluída"

collection (string, obrigatório)

O que é: Nome da coleção MongoDB onde o pipeline de agregação será executado.

Flow completo para testar:

{
  "name": "Teste MongoDB Aggregate - Collection",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "mongodb_1",
      "type": "mongodb",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Agregar Coleção",
        "operation": "aggregate",
        "config": {
          "connectionString": "mongodb://localhost:27017",
          "database": "crm"
        },
        "collection": "clientes",
        "pipeline": [
          { "$group": { "_id": "$cidade", "total": { "$sum": 1 } } },
          { "$sort": { "total": -1 } }
        ],
        "responseVariable": "cidadesTop"
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Mostrar Resultado",
        "parameters": {
          "message": "{{cidadesTop.count}} cidades encontradas"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "mongodb_1" },
    { "source": "mongodb_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: 1. Execute o flow 2. Espere: "X cidades encontradas"

pipeline (array, obrigatório)

O que é: Array de estágios de agregação MongoDB ($match, $group, $project, $sort, $limit, etc).

Estágios comuns: - $match: Filtrar documentos (como WHERE em SQL) - $group: Agrupar por campo (como GROUP BY em SQL) - $project: Selecionar/transformar campos - $sort: Ordenar resultados - $limit: Limitar quantidade de resultados - $lookup: JOIN com outra coleção - $unwind: Descompactar arrays

Flow completo para testar:

{
  "name": "Teste MongoDB Aggregate - Pipeline",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "mongodb_1",
      "type": "mongodb",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Pipeline Completo",
        "operation": "aggregate",
        "config": {
          "connectionString": "mongodb://localhost:27017",
          "database": "ecommerce"
        },
        "collection": "pedidos",
        "pipeline": [
          {
            "$match": {
              "status": "concluido",
              "dataCriacao": { "$gte": "2024-01-01T00:00:00.000Z" }
            }
          },
          {
            "$group": {
              "_id": "$categoria",
              "totalVendas": { "$sum": "$valor" },
              "quantidadePedidos": { "$sum": 1 },
              "mediaTicket": { "$avg": "$valor" }
            }
          },
          {
            "$sort": { "totalVendas": -1 }
          },
          {
            "$limit": 10
          }
        ],
        "responseVariable": "topCategorias"
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Mostrar Top",
        "parameters": {
          "message": "📊 Top {{topCategorias.count}} categorias mais vendidas"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "mongodb_1" },
    { "source": "mongodb_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: 1. Execute o flow 2. Espere: "📊 Top X categorias mais vendidas"

responseVariable (string, opcional)

O que é: Nome da variável onde o resultado do aggregate será armazenado (contém documents e count).

Padrão: Se não especificado, resultado não é armazenado

Flow completo para testar:

{
  "name": "Teste MongoDB Aggregate - Response Variable",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "mongodb_1",
      "type": "mongodb",
      "position": { "x": 300, "y": 100 },
      "data": {
        "label": "Salvar Estatísticas",
        "operation": "aggregate",
        "config": {
          "connectionString": "mongodb://localhost:27017",
          "database": "analytics"
        },
        "collection": "acessos",
        "pipeline": [
          {
            "$group": {
              "_id": { "$dateToString": { "format": "%Y-%m-%d", "date": "$timestamp" } },
              "acessos": { "$sum": 1 }
            }
          },
          { "$sort": { "_id": -1 } },
          { "$limit": 7 }
        ],
        "responseVariable": "acessosUltimos7Dias"
      }
    },
    {
      "id": "message_1",
      "type": "message",
      "position": { "x": 500, "y": 100 },
      "data": {
        "label": "Usar Variável",
        "parameters": {
          "message": "📈 Estatísticas dos últimos 7 dias:\n{{acessosUltimos7Dias.count}} dias com dados"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "mongodb_1" },
    { "source": "mongodb_1", "target": "message_1" },
    { "source": "message_1", "target": "end_1" }
  ]
}

Teste: 1. Execute o flow 2. Espere: "📈 Estatísticas dos últimos 7 dias: X dias com dados"

Parâmetros

Campo Tipo Obrigatório Descrição
operation string Sim Tipo de operação MongoDB ("aggregate")
config object Sim Configuração de conexão MongoDB
collection string Sim Nome da coleção
pipeline array Sim Array de estágios de agregação
responseVariable string Não Variável para armazenar resultado

Exemplo 1: Relatório de Vendas por Categoria

Objetivo: Calcular total de vendas, quantidade e ticket médio por categoria

JSON para Importar

{
  "name": "Relatório de Vendas por Categoria",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "message_start",
      "type": "message",
      "position": { "x": 250, "y": 100 },
      "data": {
        "label": "Iniciando",
        "parameters": {
          "message": "📊 Gerando relatório de vendas..."
        }
      }
    },
    {
      "id": "mongodb_aggregate",
      "type": "mongodb",
      "position": { "x": 400, "y": 100 },
      "data": {
        "label": "Agregar Vendas",
        "operation": "aggregate",
        "config": {
          "connectionString": "mongodb://localhost:27017",
          "database": "ecommerce"
        },
        "collection": "pedidos",
        "pipeline": [
          {
            "$match": {
              "status": "concluido"
            }
          },
          {
            "$group": {
              "_id": "$categoria",
              "totalVendas": { "$sum": "$valor" },
              "quantidade": { "$sum": 1 },
              "ticketMedio": { "$avg": "$valor" },
              "maiorVenda": { "$max": "$valor" },
              "menorVenda": { "$min": "$valor" }
            }
          },
          {
            "$project": {
              "categoria": "$_id",
              "totalVendas": { "$round": ["$totalVendas", 2] },
              "quantidade": 1,
              "ticketMedio": { "$round": ["$ticketMedio", 2] },
              "maiorVenda": 1,
              "menorVenda": 1,
              "_id": 0
            }
          },
          {
            "$sort": { "totalVendas": -1 }
          }
        ],
        "responseVariable": "relatorio"
      }
    },
    {
      "id": "message_result",
      "type": "message",
      "position": { "x": 550, "y": 100 },
      "data": {
        "label": "Mostrar Relatório",
        "parameters": {
          "message": "✅ Relatório gerado!\n\n📊 {{relatorio.count}} categorias analisadas\n\nPrimeira categoria:\nTotal: R$ {{relatorio.documents[0].totalVendas}}\nQuantidade: {{relatorio.documents[0].quantidade}}\nTicket Médio: R$ {{relatorio.documents[0].ticketMedio}}"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "message_start" },
    { "source": "message_start", "target": "mongodb_aggregate" },
    { "source": "mongodb_aggregate", "target": "message_result" },
    { "source": "message_result", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: 📊 Gerando relatório de vendas...
Sistema: ✅ Relatório gerado!

📊 5 categorias analisadas

Primeira categoria:
Total: R$ 45890.50
Quantidade: 152
Ticket Médio: R$ 301.91

Exemplo 2: Clientes por Cidade (Top 10)

Objetivo: Contar e ranquear as 10 cidades com mais clientes

JSON para Importar

{
  "name": "Clientes por Cidade - Top 10",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "mongodb_aggregate",
      "type": "mongodb",
      "position": { "x": 250, "y": 100 },
      "data": {
        "label": "Agrupar por Cidade",
        "operation": "aggregate",
        "config": {
          "connectionString": "mongodb://localhost:27017",
          "database": "crm"
        },
        "collection": "clientes",
        "pipeline": [
          {
            "$match": {
              "ativo": true,
              "cidade": { "$exists": true, "$ne": null }
            }
          },
          {
            "$group": {
              "_id": {
                "cidade": "$cidade",
                "estado": "$estado"
              },
              "totalClientes": { "$sum": 1 }
            }
          },
          {
            "$project": {
              "cidade": "$_id.cidade",
              "estado": "$_id.estado",
              "totalClientes": 1,
              "_id": 0
            }
          },
          {
            "$sort": { "totalClientes": -1 }
          },
          {
            "$limit": 10
          }
        ],
        "responseVariable": "topCidades"
      }
    },
    {
      "id": "condition_1",
      "type": "condition",
      "position": { "x": 400, "y": 100 },
      "data": {
        "label": "Tem Resultados?",
        "conditions": [
          {
            "variable": "topCidades.count",
            "operator": "greater_than",
            "value": "0",
            "targetNode": "message_success"
          }
        ],
        "defaultTarget": "message_empty"
      }
    },
    {
      "id": "message_success",
      "type": "message",
      "position": { "x": 550, "y": 50 },
      "data": {
        "label": "Mostrar Top",
        "parameters": {
          "message": "🏆 Top 10 Cidades:\n\n1º {{topCidades.documents[0].cidade}}/{{topCidades.documents[0].estado}}: {{topCidades.documents[0].totalClientes}} clientes\n2º {{topCidades.documents[1].cidade}}/{{topCidades.documents[1].estado}}: {{topCidades.documents[1].totalClientes}} clientes"
        }
      }
    },
    {
      "id": "message_empty",
      "type": "message",
      "position": { "x": 550, "y": 150 },
      "data": {
        "label": "Vazio",
        "parameters": {
          "message": "ℹ️ Nenhum cliente encontrado"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "mongodb_aggregate" },
    { "source": "mongodb_aggregate", "target": "condition_1" },
    { "source": "message_success", "target": "end_1" },
    { "source": "message_empty", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: 🏆 Top 10 Cidades:

1º São Paulo/SP: 523 clientes
2º Rio de Janeiro/RJ: 387 clientes

Exemplo 3: Análise de Vendas Mensais

Objetivo: Calcular total de vendas e ticket médio por mês dos últimos 6 meses

JSON para Importar

{
  "name": "Análise de Vendas Mensais",
  "nodes": [
    {
      "id": "start_1",
      "type": "start",
      "position": { "x": 100, "y": 100 },
      "data": { "label": "Início" }
    },
    {
      "id": "variable_1",
      "type": "variable",
      "position": { "x": 250, "y": 100 },
      "data": {
        "label": "Data Limite",
        "parameters": {
          "variables": [
            { "name": "data6MesesAtras", "value": "{{date_subtract_months(now, 6)}}" }
          ]
        }
      }
    },
    {
      "id": "mongodb_aggregate",
      "type": "mongodb",
      "position": { "x": 400, "y": 100 },
      "data": {
        "label": "Análise Mensal",
        "operation": "aggregate",
        "config": {
          "connectionString": "mongodb://localhost:27017",
          "database": "ecommerce"
        },
        "collection": "pedidos",
        "pipeline": [
          {
            "$match": {
              "status": "concluido",
              "dataCriacao": { "$gte": "{{data6MesesAtras}}" }
            }
          },
          {
            "$group": {
              "_id": {
                "ano": { "$year": "$dataCriacao" },
                "mes": { "$month": "$dataCriacao" }
              },
              "totalVendas": { "$sum": "$valor" },
              "quantidadePedidos": { "$sum": 1 },
              "ticketMedio": { "$avg": "$valor" }
            }
          },
          {
            "$project": {
              "mesAno": {
                "$concat": [
                  { "$toString": "$_id.mes" },
                  "/",
                  { "$toString": "$_id.ano" }
                ]
              },
              "totalVendas": { "$round": ["$totalVendas", 2] },
              "quantidadePedidos": 1,
              "ticketMedio": { "$round": ["$ticketMedio", 2] },
              "_id": 0
            }
          },
          {
            "$sort": { "_id.ano": -1, "_id.mes": -1 }
          }
        ],
        "responseVariable": "vendasMensais"
      }
    },
    {
      "id": "message_result",
      "type": "message",
      "position": { "x": 550, "y": 100 },
      "data": {
        "label": "Mostrar Análise",
        "parameters": {
          "message": "📈 Análise dos últimos {{vendasMensais.count}} meses:\n\nTotal de vendas: R$ {{vendasMensais.documents[0].totalVendas}}\nPedidos: {{vendasMensais.documents[0].quantidadePedidos}}\nTicket Médio: R$ {{vendasMensais.documents[0].ticketMedio}}"
        }
      }
    },
    {
      "id": "end_1",
      "type": "end",
      "position": { "x": 700, "y": 100 },
      "data": { "label": "Fim" }
    }
  ],
  "edges": [
    { "source": "start_1", "target": "variable_1" },
    { "source": "variable_1", "target": "mongodb_aggregate" },
    { "source": "mongodb_aggregate", "target": "message_result" },
    { "source": "message_result", "target": "end_1" }
  ]
}

Saída esperada:

Sistema: 📈 Análise dos últimos 6 meses:

Total de vendas: R$ 125890.50
Pedidos: 423
Ticket Médio: R$ 297.61

Resposta do Node

{
  "documents": [
    {
      "categoria": "Eletrônicos",
      "totalVendas": 45890.50,
      "quantidade": 152,
      "ticketMedio": 301.91
    },
    {
      "categoria": "Livros",
      "totalVendas": 12340.00,
      "quantidade": 234,
      "ticketMedio": 52.74
    }
  ],
  "count": 2
}

Boas Práticas

SIM:

  • Use $match no início do pipeline para filtrar dados cedo
  • Combine múltiplos estágios para transformações complexas
  • Use $project para limitar campos retornados (melhora performance)
  • Aplique $limit para evitar retornar milhares de documentos
  • Use índices nos campos usados em $match e $group
  • Teste pipeline no MongoDB Compass antes de usar no flow

NÃO:

  • Não use aggregate para buscas simples (use FIND)
  • Não esqueça $match para filtrar antes de agrupar
  • Não retorne todos os campos se precisa apenas de alguns
  • Não faça $lookup sem índices (pode ser muito lento)
  • Não ignore a ordem dos estágios (impacta performance)

Dicas

💡 Dica 1: A ordem dos estágios importa! Filtre ($match) antes de agrupar ($group)

💡 Dica 2: Use $project para renomear campos e _id: 0 para remover o campo _id

💡 Dica 3: $round é útil para formatar valores decimais em relatórios

💡 Dica 4: Combine $unwind para trabalhar com arrays dentro de documentos

💡 Dica 5: Para JOIN entre coleções, use $lookup (equivalente ao LEFT JOIN do SQL)

Próximos Nodes

MONGODB FIND - Busca simples sem agregação → MONGODB INSERT - Criar novos documentos → MONGODB UPDATE - Modificar documentos → MONGODB DELETE - Remover documentos