Arquitetura de Dashboards em Tempo Real: Consumindo Fluxos de Dados de Alta Frequência
Arquitetura de Dashboards em Tempo Real: Consumindo Fluxos de Dados de Alta Frequência
Dashboards tradicionais operam em ciclos de atualização discretos—consultas agendadas, snapshots periódicos. Mas quando você precisa visualizar dados que mudam a cada milissegundo, essa abordagem falha. Arquiteturas de streaming em tempo real exigem uma mudança fundamental: de pull para push, de estado para fluxo, de latência aceitável para latência imperceptível.
O Desafio Fundamental: Latência vs. Consistência
Em sistemas de streaming, você enfrenta um trade-off crítico. WebSockets oferecem latência ultra-baixa (dezenas de milissegundos), mas cada conexão consome recursos do servidor. Apache Kafka fornece durabilidade e replay, mas introduz latência de alguns segundos. A escolha depende do seu caso de uso:
- WebSockets diretos: Ideal para 100-1000 clientes simultâneos com dados críticos (preços, alertas). Cada cliente mantém uma conexão bidirecional persistente.
- Kafka + Consumer Groups: Melhor para 10k+ consumidores, tolerância a latência de 2-5 segundos, e necessidade de replay histórico.
- Híbrido: Kafka como fonte de verdade, WebSockets para distribuição aos clientes—o padrão mais robusto.
Considere backpressure: se seu dashboard não consegue processar eventos tão rápido quanto chegam, você precisa de um mecanismo de fila. Kafka resolve isso naturalmente; WebSockets exigem implementação manual.
Padrão de Arquitetura: Ingestão → Processamento → Distribuição
Uma arquitetura típica tem três camadas:
import asyncio
import json
from collections import deque
from datetime import datetime
class StreamBuffer:
"""Buffer circular para dados de alta frequência com agregação."""
def __init__(self, window_size=100, aggregation_interval=1.0):
self.buffer = deque(maxlen=window_size)
self.aggregation_interval = aggregation_interval
self.last_aggregation = datetime.now()
self.subscribers = set()
async def ingest(self, event):
"""Ingere evento e dispara agregação se intervalo passou."""
self.buffer.append(event)
elapsed = (datetime.now() - self.last_aggregation).total_seconds()
if elapsed >= self.aggregation_interval:
await self.broadcast_aggregation()
self.last_aggregation = datetime.now()
async def broadcast_aggregation(self):
"""Calcula agregação e envia para todos os subscribers."""
if not self.buffer:
return
# Agregação simples: média dos últimos valores
values = [e.get('value', 0) for e in self.buffer]
aggregated = {
'timestamp': datetime.now().isoformat(),
'mean': sum(values) / len(values),
'count': len(values)
}
for subscriber in self.subscribers:
await subscriber(aggregated)
def subscribe(self, callback):
self.subscribers.add(callback)
Este padrão resolve um problema crítico: dados brutos chegam em alta frequência, mas seu dashboard não precisa renderizar cada evento. Agregação em janelas reduz carga de rede em 10-100x.
Gerenciamento de Conexões WebSocket: Escalabilidade Real
WebSockets parecem simples, mas escalar para milhares de conexões exige cuidado:
import weakref
class DashboardHub:
"""Gerencia múltiplas conexões WebSocket com cleanup automático."""
def __init__(self):
self.connections = weakref.WeakSet() # Cleanup automático
self.message_queue = asyncio.Queue(maxsize=10000)
async def register(self, websocket):
self.connections.add(websocket)
try:
async for message in websocket:
await self.message_queue.put(message)
except Exception as e:
print(f"Conexão perdida: {e}")
finally:
# WeakSet remove automaticamente
pass
async def broadcast(self, data):
"""Broadcast com tratamento de desconexões."""
disconnected = set()
for ws in self.connections:
try:
await ws.send(json.dumps(data))
except Exception:
disconnected.add(ws)
# Remove conexões mortas
for ws in disconnected:
self.connections.discard(ws)
Use WeakSet para evitar memory leaks. Cada conexão WebSocket consome ~100KB de memória; 10k conexões = 1GB. Cleanup automático é crítico.
Sincronização de Estado: O Problema do Catch-up
Quando um cliente se conecta, ele precisa do estado atual. Não pode esperar pelo próximo evento—pode levar minutos. Solução:
class StateSnapshot:
"""Mantém snapshot do estado para novos clientes."""
def __init__(self):
self.current_state = {}
self.version = 0
def update(self, key, value):
self.current_state[key] = value
self.version += 1
def get_snapshot(self):
"""Retorna estado + versão para detecção de desincronização."""
return {
'state': self.current_state.copy(),
'version': self.version,
'timestamp': datetime.now().isoformat()
}
Quando um cliente se reconecta, compare versões. Se client_version < server_version, há dados perdidos—reenvie histórico de Kafka.
Considerações de Produção
Monitoramento: Rastreie latência P99 (não apenas média). Um cliente lento pode acumular 100MB de mensagens não entregues. Implemente circuit breakers: desconecte clientes que não conseguem acompanhar.
Compressão: JSON bruto é ineficiente. Use MessagePack ou Protobuf para reduzir payload em 60-70%. Crítico em conexões móveis.
Deduplicação: Em redes instáveis, eventos podem chegar duplicados. Inclua IDs únicos e deduplicar no cliente (Pandas drop_duplicates(subset=['event_id'])).
Integração com Power BI e Pandas
Power BI não consome WebSockets nativamente, mas você pode usar Python como intermediário: WebSocket → Pandas DataFrame → Power BI via API REST. Mantenha um DataFrame em memória, atualize a cada agregação, e envie deltas para Power BI a cada 5 segundos.
Key Takeaways
- Escolha entre WebSockets (latência ultra-baixa, escalabilidade limitada) e Kafka (durabilidade, latência aceitável, escalabilidade massiva) baseado em volume de clientes e tolerância a latência, não em preferência pessoal
- Agregação em janelas de tempo reduz carga de rede em ordens de magnitude—nunca envie dados brutos de alta frequência; sempre processe antes de distribuir
- Sincronização de estado via snapshots com versionamento previne desincronização crítica quando clientes se reconectam ou perdem pacotes
Enjoyed this reading?
SharpStack delivers personalized tech readings every day, calibrated to your skill level. 5 minutes a day to stay sharp.
“Stay sharp. At your pace. Everyday.”