Python Admin

Dominando asyncio Avançado em Python: Semáforos, Locks e Padrões de Concorrência em Projetos Reais Já leu

Introdução: O Problema da Concorrência Controlada Quando trabalhamos com em Python, rapidamente nos deparamos com um desafio fundamental: múltiplas corrotinas precisam acessar o mesmo recurso compartilhado de forma segura e ordenada. Imagine um cenário onde 10 corrotinas tentam escrever em um arquivo simultaneamente, ou onde várias requisições HTTP concorrem pelo mesmo pool de conexões. Sem mecanismos de controle, isso resulta em race conditions, corrupção de dados e comportamento impredizível. Os Semáforos, Locks e padrões avançados de concorrência existem justamente para resolver esse problema. Neste artigo, você aprenderá não apenas como usá-los, mas por que cada um existe e quando aplicar cada estratégia. Vamos além dos tutoriais básicos e explorar casos reais de produção. Locks (Travas): O Fundamento da Exclusão Mútua O que é um Lock e por que precisamos dele Um Lock é o mecanismo mais simples para garantir que apenas uma corrotina execute um trecho crítico de código por vez. Pense nele como uma porta que só uma pessoa

Introdução: O Problema da Concorrência Controlada

Quando trabalhamos com asyncio em Python, rapidamente nos deparamos com um desafio fundamental: múltiplas corrotinas precisam acessar o mesmo recurso compartilhado de forma segura e ordenada. Imagine um cenário onde 10 corrotinas tentam escrever em um arquivo simultaneamente, ou onde várias requisições HTTP concorrem pelo mesmo pool de conexões. Sem mecanismos de controle, isso resulta em race conditions, corrupção de dados e comportamento impredizível.

Os Semáforos, Locks e padrões avançados de concorrência existem justamente para resolver esse problema. Neste artigo, você aprenderá não apenas como usá-los, mas por que cada um existe e quando aplicar cada estratégia. Vamos além dos tutoriais básicos e explorar casos reais de produção.

Locks (Travas): O Fundamento da Exclusão Mútua

O que é um Lock e por que precisamos dele

Um Lock é o mecanismo mais simples para garantir que apenas uma corrotina execute um trecho crítico de código por vez. Pense nele como uma porta que só uma pessoa pode abrir ao mesmo tempo. Se outra corrotina tentar entrar enquanto a porta está fechada, ela aguarda sua vez.

Em asyncio, um Lock é criado e gerenciado através da classe asyncio.Lock. A sintaxe é clara: você adquire o lock, executa seu código crítico e libera o lock. Se não liberar (ou se ocorrer uma exceção), você pode travar toda a sua aplicação. Por isso, sempre use a declaração async with.

import asyncio

# Simulando um recurso compartilhado (contador)
contador = 0
lock = asyncio.Lock()

async def incrementar():
    global contador
    # SEM LOCK - PERIGOSO (NÃO FAÇA ASSIM)
    # temp = contador
    # await asyncio.sleep(0.001)
    # contador = temp + 1

    # COM LOCK - SEGURO
    async with lock:
        temp = contador
        await asyncio.sleep(0.001)  # Simula uma operação demorada
        contador = temp + 1

async def main():
    global contador
    contador = 0

    # 100 corrotinas tentando incrementar simultaneamente
    await asyncio.gather(*[incrementar() for _ in range(100)])
    print(f"Contador final: {contador}")
    print(f"Esperado: 100")

asyncio.run(main())

Saída esperada:

Contador final: 100
Esperado: 100

Observe que usamos async with lock: para adquirir e liberar automaticamente. Isso é essencial para evitar deadlocks mesmo em caso de exceção. Se você usar await lock.acquire() manualmente, nunca esqueça do correspondente lock.release().

Quando um Lock não é suficiente

Locks são binários: ou você tem acesso, ou não tem. Mas e se você quer permitir que 5 corrotinas acessem um recurso simultaneamente, mas bloquear a 6ª? Para isso, precisamos de Semáforos.

Semáforos: Controlando o Acesso em Múltiplas Instâncias

Entendendo a Semântica de Semáforos

Um Semáforo é um contador que começa com um valor inicial (digamos, 5). Quando uma corrotina tenta acessar o recurso, o contador decrementa. Se o contador chegar a 0, novas corrotinas aguardam até que algo libere acesso (incrementando o contador novamente). Isso é perfeito para limitar concorrência em pools de recursos.

import asyncio
import time

# Semáforo que permite no máximo 3 corrotinas simultâneas
semaforo = asyncio.Semaphore(3)

async def trabalho_com_limite(id_tarefa):
    async with semaforo:
        print(f"[{time.time():.2f}] Tarefa {id_tarefa} iniciada")
        await asyncio.sleep(2)  # Simula trabalho demorado
        print(f"[{time.time():.2f}] Tarefa {id_tarefa} concluída")

async def main():
    # Cria 10 tarefas, mas apenas 3 executam simultaneamente
    inicio = time.time()
    await asyncio.gather(*[trabalho_com_limite(i) for i in range(10)])
    tempo_total = time.time() - inicio

    print(f"\nTempo total: {tempo_total:.2f}s")
    print(f"Esperado: ~7s (10 tarefas / 3 em paralelo = 4 rodadas, 3 esperas)")

asyncio.run(main())

Saída esperada (timestamps relativos):

[0.00] Tarefa 0 iniciada
[0.00] Tarefa 1 iniciada
[0.00] Tarefa 2 iniciada
[2.00] Tarefa 0 concluída
[2.00] Tarefa 3 iniciada
[2.00] Tarefa 1 concluída
[2.00] Tarefa 4 iniciada
...
Tempo total: 7.00s
Esperado: ~7s

A grande diferença entre Lock e Semáforo é que o Lock é sempre binário (1 acesso por vez), enquanto o Semáforo é configurável. Use Semáforo quando quiser controlar um número máximo de acessos simultâneos.

BoundedSemaphore: Proteção contra Erros

Existe uma variante chamada BoundedSemaphore que previne que você libere mais acesso do que o limite inicial. Isso protege contra bugs onde você chama release() sem correspondente acquire().

import asyncio

# BoundedSemaphore com limite de 2
semaforo = asyncio.BoundedSemaphore(2)

async def exemplo_error():
    await semaforo.acquire()
    print(f"Adquirido. Valor interno: {semaforo._value}")

    # Tentar liberar mais vezes que o limite
    semaforo.release()
    print(f"Liberado 1x. Valor interno: {semaforo._value}")

    try:
        semaforo.release()  # Aqui vai dar erro
    except ValueError as e:
        print(f"Erro capturado: {e}")

asyncio.run(exemplo_error())

Padrões Avançados de Concorrência

Condition: Sincronização com Predicados

Um Condition combina um Lock com um mecanismo de notificação. É útil quando uma corrotina precisa esperar por uma condição específica ser satisfeita por outra corrotina. Diferente de um Lock puro, você pode "acordar" seletivamente corrotinas que estão aguardando.

import asyncio

# Simulando um produtor-consumidor
condicao = asyncio.Condition()
buffer = []

async def produtor():
    """Adiciona itens ao buffer e notifica consumidores"""
    for i in range(5):
        async with condicao:
            buffer.append(f"item-{i}")
            print(f"Produzido: item-{i}, buffer agora: {buffer}")
            condicao.notify_all()  # Acorda TODOS os consumidores

        await asyncio.sleep(0.5)

async def consumidor(id_consumidor):
    """Aguarda itens no buffer"""
    while True:
        async with condicao:
            # Aguarda enquanto buffer estiver vazio
            await condicao.wait_for(lambda: len(buffer) > 0)

            item = buffer.pop(0)
            print(f"  Consumidor-{id_consumidor} consumiu: {item}")

            # Se foi o último item, acordar produtores se houvesse
            if len(buffer) == 0:
                print(f"  Consumidor-{id_consumidor}: buffer vazio")

        await asyncio.sleep(0.1)

        # Saída artificial para não rodar infinitamente
        if id_consumidor == 0 and len(buffer) == 0:
            break

async def main():
    # Cria produtor e 2 consumidores
    prod = asyncio.create_task(produtor())
    cons1 = asyncio.create_task(consumidor(1))
    cons2 = asyncio.create_task(consumidor(2))

    await prod
    # Aguarda consumidores finalizarem
    await asyncio.sleep(2)

asyncio.run(main())

Saída esperada:

Produzido: item-0, buffer agora: ['item-0']
  Consumidor-1 consumiu: item-0
  Consumidor-1: buffer vazio
Produzido: item-1, buffer agora: ['item-1']
  Consumidor-2 consumiu: item-1
  ...

Event: Sinalização Simples

Um Event é ainda mais simples que um Condition. É um booleano que pode ser "setado" (True) ou "limpo" (False). As corrotinas podem aguardar até que o evento seja setado.

import asyncio

evento_inicio = asyncio.Event()

async def trabalhador(id_trabalhador):
    print(f"Trabalhador-{id_trabalhador} aguardando sinal...")
    await evento_inicio.wait()  # Bloqueia até que o evento seja setado
    print(f"Trabalhador-{id_trabalhador} começando trabalho!")
    await asyncio.sleep(1)
    print(f"Trabalhador-{id_trabalhador} concluído!")

async def coordinador():
    await asyncio.sleep(2)
    print("Coordinador: iniciando todos os trabalhadores!")
    evento_inicio.set()  # Todos os aguardadores são desbloqueados

async def main():
    # 3 trabalhadores esperando
    trabalhadores = [asyncio.create_task(trabalhador(i)) for i in range(3)]
    coord = asyncio.create_task(coordinador())

    await asyncio.gather(*trabalhadores, coord)

asyncio.run(main())

Saída esperada:

Trabalhador-0 aguardando sinal...
Trabalhador-1 aguardando sinal...
Trabalhador-2 aguardando sinal...
Coordinador: iniciando todos os trabalhadores!
Trabalhador-0 começando trabalho!
Trabalhador-1 começando trabalho!
Trabalhador-2 começando trabalho!
...

Padrão Prático: Rate Limiting em Requisições HTTP

Um dos casos de uso mais comuns é limitar a taxa de requisições para não sobrecarregar uma API externa. Vamos combinar Semáforo com requisições reais:

import asyncio
import aiohttp

class RateLimiter:
    """
    Limita a taxa de requisições usando Semáforo.
    Garante que apenas `max_concurrent` requisições aconteçam simultaneamente.
    """
    def __init__(self, max_concurrent=5):
        self.semaforo = asyncio.Semaphore(max_concurrent)
        self.session = None

    async def fetch(self, url):
        async with self.semaforo:
            async with self.session.get(url, timeout=5) as response:
                return await response.text()

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]

    async with RateLimiter(max_concurrent=2) as limiter:
        # Mesmo com 5 URLs, apenas 2 requisições acontecem por vez
        tasks = [limiter.fetch(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        print(f"Completadas {len([r for r in results if r])} requisições")

# asyncio.run(main())
# Nota: Descomente para testar com aiohttp instalado

Este padrão é fundamental em produção para evitar que sua aplicação seja banida por fazer muitas requisições simultâneas.

Antipadrões Comuns e Como Evitá-los

1. Deadlock: Adquirir locks em ordem diferente

import asyncio

lock_a = asyncio.Lock()
lock_b = asyncio.Lock()

async def tarefa_1():
    async with lock_a:
        await asyncio.sleep(0.1)
        async with lock_b:
            print("Tarefa 1 completada")

async def tarefa_2():
    async with lock_b:
        await asyncio.sleep(0.1)
        async with lock_a:
            print("Tarefa 2 completada")

# Isso causará DEADLOCK!
# asyncio.run(asyncio.gather(tarefa_1(), tarefa_2()))

# SOLUÇÃO: Sempre adquirir na mesma ordem
async def tarefa_1_corrigida():
    async with lock_a:
        await asyncio.sleep(0.1)
        async with lock_b:
            print("Tarefa 1 completada")

async def tarefa_2_corrigida():
    async with lock_a:  # Mesma ordem!
        await asyncio.sleep(0.1)
        async with lock_b:
            print("Tarefa 2 completada")

asyncio.run(asyncio.gather(tarefa_1_corrigida(), tarefa_2_corrigida()))

2. Não liberar o lock em caso de exceção

import asyncio

lock = asyncio.Lock()
contador = 0

async def perigoso():
    """NÃO FAÇA ASSIM"""
    await lock.acquire()
    try:
        contador += 1
        raise ValueError("Ops!")
    finally:
        lock.release()  # Nem sempre chamado se esquecer try/finally

async def seguro():
    """FAÇA ASSIM"""
    async with lock:
        contador += 1
        raise ValueError("Ops!")
        # Lock é liberado automaticamente mesmo com exceção

Conclusão

Você aprendeu que locks são binários e ideais para seções críticas pequenas, enquanto semáforos controlam concorrência em múltiplas instâncias de um recurso. Além disso, Condition e Event resolvem problemas de sincronização entre corrotinas, permitindo comunicação eficiente sem busy-waiting.

O padrão prático que você deve levar: sempre use async with para adquirir locks e semáforos, nunca misture ordens de aquisição de múltiplos locks, e considere usar Semáforo quando quiser limitar concorrência em APIs externas ou recursos finitos.

Referências


Artigos relacionados