Question — 154
Tags —
Introdução
O padrão Observer é um padrão comportamental que define uma relação de dependência um-para-muitos entre objetos, permitindo que múltiplos objetos sejam automaticamente notificados quando o estado de um objeto observado muda. A implementação eficiente e thread-safe é crucial em ambientes concorrentes modernos.
Conceito-chave
O Observer estabelece um mecanismo de notificação loose-coupled onde o subject (observado) mantém uma lista de observers (observadores) e os notifica automaticamente sobre mudanças de estado. A eficiência e thread-safety são alcançadas através de técnicas como copy-on-iteration, weak references, e mecanismos de sincronização adequados.
Tópicos Relevantes
Implementação básica thread-safe
// Interface do Observer
public interface IObserver<T>
{
void Update(T data);
}
// Interface do Subject
public interface ISubject<T>
{
void Subscribe(IObserver<T> observer);
void Unsubscribe(IObserver<T> observer);
void Notify(T data);
}
// Implementação thread-safe com ReaderWriterLockSlim
public class ThreadSafeSubject<T> : ISubject<T>
{
private readonly HashSet<IObserver<T>> _observers;
private readonly ReaderWriterLockSlim _lock;
public ThreadSafeSubject()
{
_observers = new HashSet<IObserver<T>>();
_lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
}
public void Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
_lock.EnterWriteLock();
try
{
_observers.Add(observer);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Unsubscribe(IObserver<T> observer)
{
if (observer == null) return;
_lock.EnterWriteLock();
try
{
_observers.Remove(observer);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Notify(T data)
{
// Copy-on-iteration pattern para thread-safety
IObserver<T>[] observersCopy;
_lock.EnterReadLock();
try
{
observersCopy = _observers.ToArray();
}
finally
{
_lock.ExitReadLock();
}
// Notificar fora do lock para evitar deadlocks
foreach (var observer in observersCopy)
{
try
{
observer.Update(data);
}
catch (Exception ex)
{
// Log exception mas continue notificando outros observers
OnObserverException?.Invoke(observer, ex);
}
}
}
public event Action<IObserver<T>, Exception> OnObserverException;
public void Dispose()
{
_lock?.EnterWriteLock();
try
{
_observers?.Clear();
}
finally
{
_lock?.ExitWriteLock();
_lock?.Dispose();
}
}
}
Implementação com Weak References
// Observer com weak references para evitar vazamentos de memória
public class WeakReferenceSubject<T> : ISubject<T>
{
private readonly List<WeakReference<IObserver<T>>> _observers;
private readonly object _lock = new object();
public WeakReferenceSubject()
{
_observers = new List<WeakReference<IObserver<T>>>();
}
public void Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
lock (_lock)
{
// Remove referencias mortas antes de adicionar nova
CleanupDeadReferences();
_observers.Add(new WeakReference<IObserver<T>>(observer));
}
}
public void Unsubscribe(IObserver<T> observer)
{
if (observer == null) return;
lock (_lock)
{
for (int i = _observers.Count - 1; i >= 0; i--)
{
if (_observers[i].TryGetTarget(out var target) &&
ReferenceEquals(target, observer))
{
_observers.RemoveAt(i);
break;
}
}
}
}
public void Notify(T data)
{
List<IObserver<T>> activeObservers;
lock (_lock)
{
activeObservers = new List<IObserver<T>>(_observers.Count);
for (int i = _observers.Count - 1; i >= 0; i--)
{
if (_observers[i].TryGetTarget(out var observer))
{
activeObservers.Add(observer);
}
else
{
// Remove referência morta
_observers.RemoveAt(i);
}
}
}
// Notificar fora do lock
Parallel.ForEach(activeObservers, observer =>
{
try
{
observer.Update(data);
}
catch (Exception ex)
{
OnObserverException?.Invoke(observer, ex);
}
});
}
private void CleanupDeadReferences()
{
for (int i = _observers.Count - 1; i >= 0; i--)
{
if (!_observers[i].TryGetTarget(out _))
{
_observers.RemoveAt(i);
}
}
}
public event Action<IObserver<T>, Exception> OnObserverException;
}
Implementação com Async/Await
// Observer assíncrono
public interface IAsyncObserver<T>
{
Task UpdateAsync(T data, CancellationToken cancellationToken = default);
}
public class AsyncSubject<T> : IDisposable
{
private readonly ConcurrentBag<IAsyncObserver<T>> _observers;
private readonly SemaphoreSlim _semaphore;
public AsyncSubject()
{
_observers = new ConcurrentBag<IAsyncObserver<T>>();
_semaphore = new SemaphoreSlim(1, 1);
}
public async Task SubscribeAsync(IAsyncObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
await _semaphore.WaitAsync();
try
{
_observers.Add(observer);
}
finally
{
_semaphore.Release();
}
}
public async Task NotifyAsync(T data, CancellationToken cancellationToken = default)
{
var observers = _observers.ToArray();
var tasks = observers.Select(async observer =>
{
try
{
await observer.UpdateAsync(data, cancellationToken);
}
catch (Exception ex)
{
OnObserverException?.Invoke(observer, ex);
}
});
await Task.WhenAll(tasks);
}
public async Task NotifyAsync(T data, TimeSpan timeout)
{
using (var cts = new CancellationTokenSource(timeout))
{
await NotifyAsync(data, cts.Token);
}
}
public event Action<IAsyncObserver<T>, Exception> OnObserverException;
public void Dispose()
{
_semaphore?.Dispose();
}
}
Exemplo Prático
Sistema de monitoramento de preços de ações com diferentes tipos de observers:
// Modelo de dados
public class StockPrice
{
public string Symbol { get; set; }
public decimal Price { get; set; }
public decimal Change { get; set; }
public DateTime Timestamp { get; set; }
}
// Observer para logging
public class StockPriceLogger : IObserver<StockPrice>
{
private readonly ILogger _logger;
public StockPriceLogger(ILogger logger)
{
_logger = logger;
}
public void Update(StockPrice data)
{
_logger.Log($"[{data.Timestamp}] {data.Symbol}: ${data.Price} ({data.Change:+0.00;-0.00;0.00})");
}
}
// Observer para alertas
public class StockPriceAlertService : IAsyncObserver<StockPrice>
{
private readonly INotificationService _notificationService;
private readonly Dictionary<string, decimal> _alertThresholds;
public StockPriceAlertService(INotificationService notificationService)
{
_notificationService = notificationService;
_alertThresholds = new Dictionary<string, decimal>();
}
public void SetAlert(string symbol, decimal threshold)
{
_alertThresholds[symbol] = threshold;
}
public async Task UpdateAsync(StockPrice data, CancellationToken cancellationToken = default)
{
if (_alertThresholds.TryGetValue(data.Symbol, out var threshold))
{
if (Math.Abs(data.Change) >= threshold)
{
var message = $"ALERT: {data.Symbol} moved {data.Change:+0.00;-0.00;0.00} to ${data.Price}";
await _notificationService.SendAlertAsync(message, cancellationToken);
}
}
}
}
// Observer para cache/armazenamento
public class StockPriceCache : IObserver<StockPrice>
{
private readonly ConcurrentDictionary<string, StockPrice> _cache;
private readonly IRepository<StockPrice> _repository;
public StockPriceCache(IRepository<StockPrice> repository)
{
_cache = new ConcurrentDictionary<string, StockPrice>();
_repository = repository;
}
public void Update(StockPrice data)
{
// Atualizar cache
_cache.AddOrUpdate(data.Symbol, data, (key, oldValue) => data);
// Persistir de forma assíncrona (fire-and-forget)
Task.Run(async () =>
{
try
{
await _repository.SaveAsync(data);
}
catch (Exception ex)
{
// Log error but don't throw
Console.WriteLine($"Error saving stock price: {ex.Message}");
}
});
}
public StockPrice GetLatestPrice(string symbol)
{
_cache.TryGetValue(symbol, out var price);
return price;
}
}
// Sistema principal de monitoramento
public class StockPriceMonitor : IDisposable
{
private readonly ThreadSafeSubject<StockPrice> _syncSubject;
private readonly AsyncSubject<StockPrice> _asyncSubject;
private readonly Timer _priceUpdateTimer;
private readonly Random _random = new Random();
public StockPriceMonitor()
{
_syncSubject = new ThreadSafeSubject<StockPrice>();
_asyncSubject = new AsyncSubject<StockPrice>();
// Simular atualizações de preço
_priceUpdateTimer = new Timer(SimulatePriceUpdate, null,
TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
public void SubscribeSync(IObserver<StockPrice> observer)
{
_syncSubject.Subscribe(observer);
}
public async Task SubscribeAsync(IAsyncObserver<StockPrice> observer)
{
await _asyncSubject.SubscribeAsync(observer);
}
private async void SimulatePriceUpdate(object state)
{
var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN" };
var symbol = symbols[_random.Next(symbols.Length)];
var price = new StockPrice
{
Symbol = symbol,
Price = 100 + _random.Next(-50, 51),
Change = (_random.NextDouble() - 0.5) * 10,
Timestamp = DateTime.UtcNow
};
// Notificar observers síncronos
_syncSubject.Notify(price);
// Notificar observers assíncronos
await _asyncSubject.NotifyAsync(price, TimeSpan.FromSeconds(5));
}
public void Dispose()
{
_priceUpdateTimer?.Dispose();
_syncSubject?.Dispose();
_asyncSubject?.Dispose();
}
}
// Uso do sistema
public class StockMonitoringService
{
public async Task StartMonitoringAsync()
{
var monitor = new StockPriceMonitor();
var logger = new ConsoleLogger();
var notificationService = new EmailNotificationService();
var repository = new SqlStockPriceRepository();
// Configurar observers
var priceLogger = new StockPriceLogger(logger);
var alertService = new StockPriceAlertService(notificationService);
var priceCache = new StockPriceCache(repository);
// Configurar alertas
alertService.SetAlert("AAPL", 5.0m);
alertService.SetAlert("GOOGL", 10.0m);
// Registrar observers
monitor.SubscribeSync(priceLogger);
monitor.SubscribeSync(priceCache);
await monitor.SubscribeAsync(alertService);
// Manter sistema rodando
Console.WriteLine("Monitoramento iniciado. Pressione qualquer tecla para parar...");
Console.ReadKey();
monitor.Dispose();
}
}
Benefícios
1. Thread-Safety
- ReaderWriterLockSlim para performance otimizada em cenários read-heavy
- Copy-on-iteration previne modificações concorrentes durante notificação
- Tratamento de exceções isolado evita corrupção de estado
2. Gerenciamento de Memória
- Weak references previnem vazamentos de memória
- Cleanup automático de referências mortas
- Disposable pattern para liberação adequada de recursos
3. Performance
- Paralelização de notificações quando apropriado
- Minimização de tempo em locks críticos
- Otimizações para cenários read-heavy vs write-heavy
4. Robustez
- Tratamento de exceções não afeta outros observers
- Timeouts para operações assíncronas
- Cancellation tokens para operações longas
5. Flexibilidade
- Suporte tanto para operações síncronas quanto assíncronas
- Diferentes estratégias de notificação
- Extensibilidade através de interfaces bem definidas
Considerações de Implementação
Escolha da estratégia de sincronização:
lock
simples para casos básicosReaderWriterLockSlim
para cenários read-heavyConcurrentCollection
para operações lock-free quando possível
Weak vs Strong References:
- Weak references para prevenir vazamentos de memória
- Strong references quando lifecycle é controlado explicitamente
Notificação Síncrona vs Assíncrona:
- Síncrona para operações rápidas e determinísticas
- Assíncrona para operações I/O ou processamento pesado
Tratamento de Exceções:
- Isolamento de falhas entre observers
- Logging adequado para debugging
- Recovery gracioso quando possível