Python Admin

Boas Práticas de MongoDB com Python: pymongo, Motor Assíncrono e Aggregation para Times Ágeis Já leu

Introdução ao MongoDB e PyMongo MongoDB é um banco de dados NoSQL orientado a documentos que armazena dados em formato JSON (ou BSON — Binary JSON). Diferentemente de bancos relacionais tradicionais, MongoDB oferece flexibilidade estrutural, permitindo que documentos na mesma coleção tenham esquemas diferentes. Python, através da biblioteca PyMongo, fornece um driver oficial para interagir com o MongoDB de forma simples e intuitiva. PyMongo é a biblioteca padrão para conectar aplicações Python ao MongoDB. Ela abstrai a complexidade do protocolo de comunicação e oferece uma API Pythônica para realizar operações CRUD (Create, Read, Update, Delete), gerenciar índices e executar consultas complexas. Antes de avançarmos para operações assíncronas e agregações, é fundamental compreender como estabelecer conexões básicas e manipular documentos. Instalação e Configuração Básica Para começar, instale o PyMongo via pip: Agora, vamos criar uma conexão simples com o MongoDB: Neste exemplo, criamos uma conexão com o MongoDB, acessamos um banco chamado e a coleção . A operação insere um único

Introdução ao MongoDB e PyMongo

MongoDB é um banco de dados NoSQL orientado a documentos que armazena dados em formato JSON (ou BSON — Binary JSON). Diferentemente de bancos relacionais tradicionais, MongoDB oferece flexibilidade estrutural, permitindo que documentos na mesma coleção tenham esquemas diferentes. Python, através da biblioteca PyMongo, fornece um driver oficial para interagir com o MongoDB de forma simples e intuitiva.

PyMongo é a biblioteca padrão para conectar aplicações Python ao MongoDB. Ela abstrai a complexidade do protocolo de comunicação e oferece uma API Pythônica para realizar operações CRUD (Create, Read, Update, Delete), gerenciar índices e executar consultas complexas. Antes de avançarmos para operações assíncronas e agregações, é fundamental compreender como estabelecer conexões básicas e manipular documentos.

Instalação e Configuração Básica

Para começar, instale o PyMongo via pip:

pip install pymongo

Agora, vamos criar uma conexão simples com o MongoDB:

from pymongo import MongoClient
from datetime import datetime

# Conectar ao servidor MongoDB local
client = MongoClient('mongodb://localhost:27017/')

# Acessar um banco de dados
db = client['minha_aplicacao']

# Acessar uma coleção
usuarios = db['usuarios']

# Inserir um documento
documento = {
    'nome': 'João Silva',
    'email': 'joao@example.com',
    'idade': 28,
    'criado_em': datetime.now()
}

resultado = usuarios.insert_one(documento)
print(f"ID inserido: {resultado.inserted_id}")

Neste exemplo, criamos uma conexão com o MongoDB, acessamos um banco chamado minha_aplicacao e a coleção usuarios. A operação insert_one() insere um único documento e retorna um objeto com o ID gerado automaticamente pelo MongoDB.

Operações CRUD Essenciais

As operações CRUD são fundamentais. Vamos explorar cada uma:

from pymongo import MongoClient
from datetime import datetime

client = MongoClient('mongodb://localhost:27017/')
db = client['minha_aplicacao']
usuarios = db['usuarios']

# CREATE - Inserir múltiplos documentos
novos_usuarios = [
    {'nome': 'Maria', 'email': 'maria@example.com', 'idade': 25},
    {'nome': 'Pedro', 'email': 'pedro@example.com', 'idade': 30},
    {'nome': 'Ana', 'email': 'ana@example.com', 'idade': 27}
]
resultado = usuarios.insert_many(novos_usuarios)
print(f"IDs inseridos: {resultado.inserted_ids}")

# READ - Buscar um documento
usuario = usuarios.find_one({'nome': 'Maria'})
print(f"Usuário encontrado: {usuario['email']}")

# READ - Buscar múltiplos documentos
usuarios_maiores_26 = usuarios.find({'idade': {'$gte': 26}})
for user in usuarios_maiores_26:
    print(f"{user['nome']} tem {user['idade']} anos")

# UPDATE - Atualizar um documento
usuarios.update_one(
    {'nome': 'Maria'},
    {'$set': {'idade': 26, 'atualizado_em': datetime.now()}}
)

# UPDATE - Atualizar múltiplos documentos
usuarios.update_many(
    {'idade': {'$lt': 28}},
    {'$inc': {'idade': 1}}  # Incrementa a idade em 1
)

# DELETE - Deletar um documento
usuarios.delete_one({'nome': 'Pedro'})

# DELETE - Deletar múltiplos documentos
usuarios.delete_many({'idade': {'$lt': 25}})

Compreender esses operadores MongoDB ($gte, $set, $inc) é crucial. Eles representam a linguagem de consulta do MongoDB e permitem operações complexas de forma declarativa.

Motor: Programação Assíncrona com MongoDB

Quando sua aplicação precisa lidar com múltiplas conexões simultâneas (servidores web, APIs), as operações síncronas no PyMongo podem se tornar gargalos. Motor é uma biblioteca que torna o PyMongo totalmente assíncrono, permitindo que sua aplicação não bloqueie durante operações de banco de dados.

Motor funciona como um wrapper assíncrono do PyMongo, mantendo a mesma API, mas retornando corrotinas que devem ser aguardadas com await. Isso é particularmente útil em frameworks como FastAPI, Quart e aiohttp, que também são assíncronos por natureza.

Instalação e Conceitos Fundamentais

Instale o Motor:

pip install motor

Antes de escrever código, entenda que o Motor utiliza async/await, o padrão de programação assíncrona do Python moderno. Quando você aguarda uma operação assíncrona, a thread é liberada para processar outras requisições, maximizando a utilização de recursos.

import asyncio
from motor.motor_asyncio import AsyncClient
from datetime import datetime

async def main():
    # Conectar ao MongoDB usando Motor
    client = AsyncClient('mongodb://localhost:27017/')
    db = client['minha_aplicacao']
    usuarios = db['usuarios']

    # Inserir um documento de forma assíncrona
    resultado = await usuarios.insert_one({
        'nome': 'Lucas',
        'email': 'lucas@example.com',
        'idade': 24
    })
    print(f"Documento inserido com ID: {resultado.inserted_id}")

    # Buscar um documento
    usuario = await usuarios.find_one({'nome': 'Lucas'})
    print(f"Usuário: {usuario}")

    # Buscar múltiplos documentos de forma assíncrona
    cursor = usuarios.find({'idade': {'$gte': 20}})
    async for user in cursor:
        print(f"{user['nome']}: {user['idade']} anos")

    # Fechar a conexão
    client.close()

# Executar a corrotina
asyncio.run(main())

A diferença fundamental é que cada operação retorna uma corrotina que deve ser aguardada com await. Isso permite que o event loop do Python processe outras tarefas enquanto o banco de dados processa a requisição.

Integrando Motor com FastAPI

Motor brilha quando integrado com frameworks web assíncronos. Aqui está um exemplo com FastAPI:

from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncClient
from pydantic import BaseModel
from typing import Optional
from bson import ObjectId

app = FastAPI()

# Configuração do banco de dados
MONGODB_URL = "mongodb://localhost:27017"
client = AsyncClient(MONGODB_URL)
db = client['minha_aplicacao']

# Fechar a conexão ao desligar a aplicação
@app.on_event("shutdown")
async def shutdown_db():
    client.close()

# Modelo Pydantic para validação
class Usuario(BaseModel):
    nome: str
    email: str
    idade: int

class UsuarioResponse(Usuario):
    id: str

    class Config:
        from_attributes = True

@app.post("/usuarios/")
async def criar_usuario(usuario: Usuario):
    collection = db['usuarios']
    resultado = await collection.insert_one(usuario.dict())
    return {"id": str(resultado.inserted_id), **usuario.dict()}

@app.get("/usuarios/{usuario_id}")
async def obter_usuario(usuario_id: str):
    collection = db['usuarios']
    try:
        documento = await collection.find_one({"_id": ObjectId(usuario_id)})
        if not documento:
            raise HTTPException(status_code=404, detail="Usuário não encontrado")
        documento['id'] = str(documento['_id'])
        return documento
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"ID inválido: {str(e)}")

@app.get("/usuarios/")
async def listar_usuarios(skip: int = 0, limit: int = 10):
    collection = db['usuarios']
    cursor = collection.find().skip(skip).limit(limit)
    usuarios = []
    async for doc in cursor:
        doc['id'] = str(doc['_id'])
        usuarios.append(doc)
    return usuarios

@app.put("/usuarios/{usuario_id}")
async def atualizar_usuario(usuario_id: str, usuario: Usuario):
    collection = db['usuarios']
    try:
        resultado = await collection.update_one(
            {"_id": ObjectId(usuario_id)},
            {"$set": usuario.dict()}
        )
        if resultado.matched_count == 0:
            raise HTTPException(status_code=404, detail="Usuário não encontrado")
        return {"mensagem": "Usuário atualizado com sucesso"}
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Erro: {str(e)}")

@app.delete("/usuarios/{usuario_id}")
async def deletar_usuario(usuario_id: str):
    collection = db['usuarios']
    try:
        resultado = await collection.delete_one({"_id": ObjectId(usuario_id)})
        if resultado.deleted_count == 0:
            raise HTTPException(status_code=404, detail="Usuário não encontrado")
        return {"mensagem": "Usuário deletado com sucesso"}
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Erro: {str(e)}")

Este exemplo demonstra como Motor integra perfeitamente com FastAPI, mantendo a aplicação responsiva mesmo sob carga elevada. Cada requisição HTTP é processada de forma assíncrona, e enquanto aguarda a resposta do banco de dados, o event loop pode processar outras requisições.

Agregação: Processamento Avançado de Dados

A agregação no MongoDB é um framework poderoso para transformar, filtrar e processar documentos no próprio servidor. Diferentemente de buscas simples, agregações permitem operações complexas como agrupamento, projeção, ordenação e cálculos, tudo de forma eficiente no lado do servidor. Isso reduz drasticamente a quantidade de dados transferidos para a aplicação.

Conceitos de Pipeline de Agregação

Uma agregação no MongoDB funciona como um pipeline: documentos passam por uma série de estágios (stages), cada um transformando os dados até obter o resultado final. Os estágios mais comuns são:

  • $match: Filtra documentos (equivalente a WHERE no SQL)
  • $group: Agrupa documentos por um campo específico
  • $project: Seleciona/renomeia campos
  • $sort: Ordena documentos
  • $limit: Limita o número de documentos
  • $lookup: Join com outra coleção
  • $unwind: Desconstrói arrays em documentos separados
  • $addFields: Adiciona novos campos calculados

Vamos ilustrar com um exemplo prático:

from pymongo import MongoClient
from datetime import datetime, timedelta

client = MongoClient('mongodb://localhost:27017/')
db = client['loja']
vendas = db['vendas']

# Inserir dados de exemplo
vendas.insert_many([
    {
        'data': datetime.now() - timedelta(days=10),
        'vendedor': 'João',
        'produto': 'Notebook',
        'quantidade': 2,
        'preco_unitario': 3000
    },
    {
        'data': datetime.now() - timedelta(days=8),
        'vendedor': 'Maria',
        'produto': 'Mouse',
        'quantidade': 10,
        'preco_unitario': 50
    },
    {
        'data': datetime.now() - timedelta(days=5),
        'vendedor': 'João',
        'produto': 'Teclado',
        'quantidade': 5,
        'preco_unitario': 150
    },
    {
        'data': datetime.now() - timedelta(days=3),
        'vendedor': 'Maria',
        'produto': 'Monitor',
        'quantidade': 1,
        'preco_unitario': 1200
    },
])

# Pipeline de agregação: Total de vendas por vendedor
pipeline = [
    {
        '$group': {
            '_id': '$vendedor',
            'total_vendido': {
                '$sum': {'$multiply': ['$quantidade', '$preco_unitario']}
            },
            'quantidade_produtos': {'$sum': '$quantidade'}
        }
    },
    {
        '$sort': {'total_vendido': -1}
    }
]

resultado = list(vendas.aggregate(pipeline))
for doc in resultado:
    print(f"Vendedor: {doc['_id']}, Total: R$ {doc['total_vendido']}, Quantidade: {doc['quantidade_produtos']}")

Este pipeline primeiro agrupa as vendas por vendedor, calculando o total de vendas e a quantidade total de produtos, depois ordena pelo total de vendas em ordem decrescente.

Agregações Complexas com Lookups e Unwinding

Para casos mais avançados, combinamos múltiplos estágios:

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['escola']

# Criar coleções de exemplo
alunos = db['alunos']
notas = db['notas']

# Limpar dados anteriores
alunos.delete_many({})
notas.delete_many({})

# Inserir alunos
alunos.insert_many([
    {'_id': 1, 'nome': 'Alice'},
    {'_id': 2, 'nome': 'Bob'},
    {'_id': 3, 'nome': 'Carlos'}
])

# Inserir notas (referenciando alunos)
notas.insert_many([
    {'aluno_id': 1, 'materia': 'Matemática', 'nota': 8.5},
    {'aluno_id': 1, 'materia': 'Português', 'nota': 9.0},
    {'aluno_id': 2, 'materia': 'Matemática', 'nota': 7.0},
    {'aluno_id': 2, 'materia': 'Português', 'nota': 8.0},
    {'aluno_id': 3, 'materia': 'Matemática', 'nota': 9.5},
    {'aluno_id': 3, 'materia': 'Português', 'nota': 7.5},
])

# Pipeline: Média de notas por aluno
pipeline = [
    {
        '$group': {
            '_id': '$aluno_id',
            'media': {'$avg': '$nota'},
            'notas_obtidas': {'$push': '$nota'}
        }
    },
    {
        '$lookup': {
            'from': 'alunos',
            'localField': '_id',
            'foreignField': '_id',
            'as': 'info_aluno'
        }
    },
    {
        '$unwind': '$info_aluno'
    },
    {
        '$project': {
            '_id': 0,
            'nome': '$info_aluno.nome',
            'media': {'$round': ['$media', 2]},
            'total_notas': {'$size': '$notas_obtidas'},
            'situacao': {
                '$cond': [
                    {'$gte': ['$media', 7]},
                    'Aprovado',
                    'Reprovado'
                ]
            }
        }
    },
    {
        '$sort': {'media': -1}
    }
]

resultado = list(notas.aggregate(pipeline))
for doc in resultado:
    print(f"{doc['nome']}: Média {doc['media']} - {doc['situacao']} ({doc['total_notas']} notas)")

Neste exemplo complexo, fazemos:
1. Agrupamos notas por aluno, calculando a média
2. Fazemos um lookup (join) com a coleção de alunos
3. Desconstruímos o array de alunos com $unwind
4. Projetamos campos customizados com lógica condicional ($cond)
5. Ordenamos pelo resultado final

Agregação Assíncrona com Motor

Motor também oferece suporte a agregações assíncronas:

import asyncio
from motor.motor_asyncio import AsyncClient

async def agregacao_assincrona():
    client = AsyncClient('mongodb://localhost:27017/')
    db = client['vendas_online']
    pedidos = db['pedidos']

    # Inserir dados de exemplo
    await pedidos.insert_many([
        {'cliente': 'Alice', 'valor': 150.00, 'status': 'entregue'},
        {'cliente': 'Bob', 'valor': 320.00, 'status': 'entregue'},
        {'cliente': 'Alice', 'valor': 80.00, 'status': 'pendente'},
        {'cliente': 'Carlos', 'valor': 450.00, 'status': 'entregue'},
        {'cliente': 'Bob', 'valor': 200.00, 'status': 'cancelado'},
    ])

    # Pipeline de agregação
    pipeline = [
        {
            '$match': {'status': 'entregue'}
        },
        {
            '$group': {
                '_id': '$cliente',
                'total_gasto': {'$sum': '$valor'},
                'quantidade_pedidos': {'$sum': 1}
            }
        },
        {
            '$sort': {'total_gasto': -1}
        }
    ]

    # Executar agregação de forma assíncrona
    cursor = pedidos.aggregate(pipeline)
    print("=== Clientes com mais compras entregues ===")
    async for doc in cursor:
        print(f"{doc['_id']}: R$ {doc['total_gasto']:.2f} ({doc['quantidade_pedidos']} pedidos)")

    client.close()

asyncio.run(agregacao_assincrona())

Este exemplo demonstra como usar agregações com Motor, permitindo processamento eficiente de grandes volumes de dados sem bloquear a aplicação.

Dicas de Performance em Agregações

Agregações são poderosas, mas devem ser otimizadas. Use índices nos campos usados em $match para melhorar drasticamente a performance. Posicione $match o mais cedo possível no pipeline, reduzindo o volume de dados processados nos estágios subsequentes. Evite $lookup em coleções muito grandes, pois isso tem custo computacional elevado.

# Exemplo de criação de índices
vendas = db['vendas']
vendas.create_index('vendedor')  # Índice simples
vendas.create_index([('data', -1), ('vendedor', 1)])  # Índice composto

Conclusão

Nesta aula aprofundada, você aprendeu que MongoDB com Python oferece flexibilidade estrutural aliada a simplicidade de uso, seja através do PyMongo para aplicações síncronas ou Motor para sistemas de alta concorrência. A escolha entre PyMongo e Motor depende da arquitetura de sua aplicação: se você está construindo APIs assíncronas modernas com FastAPI, Quart ou aiohttp, Motor é a solução natural; caso contrário, PyMongo oferece simplicidade sem comprometer funcionalidade.

Em segundo lugar, agregações são o coração do processamento avançado no MongoDB, permitindo que você realize operações complexas no servidor em vez de trazer dados brutos para a aplicação. Dominar pipelines de agregação transforma-o de um usuário básico a um profissional capaz de extrair insights de dados em tempo real, otimizando performance e reduzindo consumo de banda.

Por fim, a combinação de Motor + FastAPI + Agregações cria aplicações escaláveis e eficientes, capazes de servir centenas de requisições simultâneas sem degradação de performance. Pratique construindo pequenos projetos, domine os operadores de agregação, e você terá domínio completo dessa stack moderna e poderosa.

Referências


Artigos relacionados