Como implementar o padrão Observer de forma eficiente e thread-safe

Question154

Tagsdesign-patterns, observer, gof, behavioral-patterns, events, thread-safety, performance, notifications

Introdução

O padrão Observer é um padrão comportamental que define uma relação de dependência um-para-muitos entre objetos, permitindo que múltiplos objetos sejam automaticamente notificados quando o estado de um objeto observado muda. A implementação eficiente e thread-safe é crucial em ambientes concorrentes modernos.

Conceito-chave

O Observer estabelece um mecanismo de notificação loose-coupled onde o subject (observado) mantém uma lista de observers (observadores) e os notifica automaticamente sobre mudanças de estado. A eficiência e thread-safety são alcançadas através de técnicas como copy-on-iteration, weak references, e mecanismos de sincronização adequados.

Tópicos Relevantes

Implementação básica thread-safe

// Interface do Observer
public interface IObserver<T>
{
    void Update(T data);
}

// Interface do Subject
public interface ISubject<T>
{
    void Subscribe(IObserver<T> observer);
    void Unsubscribe(IObserver<T> observer);
    void Notify(T data);
}

// Implementação thread-safe com ReaderWriterLockSlim
public class ThreadSafeSubject<T> : ISubject<T>
{
    private readonly HashSet<IObserver<T>> _observers;
    private readonly ReaderWriterLockSlim _lock;

    public ThreadSafeSubject()
    {
        _observers = new HashSet<IObserver<T>>();
        _lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
    }

    public void Subscribe(IObserver<T> observer)
    {
        if (observer == null) throw new ArgumentNullException(nameof(observer));

        _lock.EnterWriteLock();
        try
        {
            _observers.Add(observer);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Unsubscribe(IObserver<T> observer)
    {
        if (observer == null) return;

        _lock.EnterWriteLock();
        try
        {
            _observers.Remove(observer);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Notify(T data)
    {
        // Copy-on-iteration pattern para thread-safety
        IObserver<T>[] observersCopy;
        
        _lock.EnterReadLock();
        try
        {
            observersCopy = _observers.ToArray();
        }
        finally
        {
            _lock.ExitReadLock();
        }

        // Notificar fora do lock para evitar deadlocks
        foreach (var observer in observersCopy)
        {
            try
            {
                observer.Update(data);
            }
            catch (Exception ex)
            {
                // Log exception mas continue notificando outros observers
                OnObserverException?.Invoke(observer, ex);
            }
        }
    }

    public event Action<IObserver<T>, Exception> OnObserverException;

    public void Dispose()
    {
        _lock?.EnterWriteLock();
        try
        {
            _observers?.Clear();
        }
        finally
        {
            _lock?.ExitWriteLock();
            _lock?.Dispose();
        }
    }
}

Implementação com Weak References

// Observer com weak references para evitar vazamentos de memória
public class WeakReferenceSubject<T> : ISubject<T>
{
    private readonly List<WeakReference<IObserver<T>>> _observers;
    private readonly object _lock = new object();

    public WeakReferenceSubject()
    {
        _observers = new List<WeakReference<IObserver<T>>>();
    }

    public void Subscribe(IObserver<T> observer)
    {
        if (observer == null) throw new ArgumentNullException(nameof(observer));

        lock (_lock)
        {
            // Remove referencias mortas antes de adicionar nova
            CleanupDeadReferences();
            _observers.Add(new WeakReference<IObserver<T>>(observer));
        }
    }

    public void Unsubscribe(IObserver<T> observer)
    {
        if (observer == null) return;

        lock (_lock)
        {
            for (int i = _observers.Count - 1; i >= 0; i--)
            {
                if (_observers[i].TryGetTarget(out var target) && 
                    ReferenceEquals(target, observer))
                {
                    _observers.RemoveAt(i);
                    break;
                }
            }
        }
    }

    public void Notify(T data)
    {
        List<IObserver<T>> activeObservers;

        lock (_lock)
        {
            activeObservers = new List<IObserver<T>>(_observers.Count);
            
            for (int i = _observers.Count - 1; i >= 0; i--)
            {
                if (_observers[i].TryGetTarget(out var observer))
                {
                    activeObservers.Add(observer);
                }
                else
                {
                    // Remove referência morta
                    _observers.RemoveAt(i);
                }
            }
        }

        // Notificar fora do lock
        Parallel.ForEach(activeObservers, observer =>
        {
            try
            {
                observer.Update(data);
            }
            catch (Exception ex)
            {
                OnObserverException?.Invoke(observer, ex);
            }
        });
    }

    private void CleanupDeadReferences()
    {
        for (int i = _observers.Count - 1; i >= 0; i--)
        {
            if (!_observers[i].TryGetTarget(out _))
            {
                _observers.RemoveAt(i);
            }
        }
    }

    public event Action<IObserver<T>, Exception> OnObserverException;
}

Implementação com Async/Await

// Observer assíncrono
public interface IAsyncObserver<T>
{
    Task UpdateAsync(T data, CancellationToken cancellationToken = default);
}

public class AsyncSubject<T> : IDisposable
{
    private readonly ConcurrentBag<IAsyncObserver<T>> _observers;
    private readonly SemaphoreSlim _semaphore;

    public AsyncSubject()
    {
        _observers = new ConcurrentBag<IAsyncObserver<T>>();
        _semaphore = new SemaphoreSlim(1, 1);
    }

    public async Task SubscribeAsync(IAsyncObserver<T> observer)
    {
        if (observer == null) throw new ArgumentNullException(nameof(observer));

        await _semaphore.WaitAsync();
        try
        {
            _observers.Add(observer);
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async Task NotifyAsync(T data, CancellationToken cancellationToken = default)
    {
        var observers = _observers.ToArray();
        
        var tasks = observers.Select(async observer =>
        {
            try
            {
                await observer.UpdateAsync(data, cancellationToken);
            }
            catch (Exception ex)
            {
                OnObserverException?.Invoke(observer, ex);
            }
        });

        await Task.WhenAll(tasks);
    }

    public async Task NotifyAsync(T data, TimeSpan timeout)
    {
        using (var cts = new CancellationTokenSource(timeout))
        {
            await NotifyAsync(data, cts.Token);
        }
    }

    public event Action<IAsyncObserver<T>, Exception> OnObserverException;

    public void Dispose()
    {
        _semaphore?.Dispose();
    }
}

Exemplo Prático

Sistema de monitoramento de preços de ações com diferentes tipos de observers:

// Modelo de dados
public class StockPrice
{
    public string Symbol { get; set; }
    public decimal Price { get; set; }
    public decimal Change { get; set; }
    public DateTime Timestamp { get; set; }
}

// Observer para logging
public class StockPriceLogger : IObserver<StockPrice>
{
    private readonly ILogger _logger;

    public StockPriceLogger(ILogger logger)
    {
        _logger = logger;
    }

    public void Update(StockPrice data)
    {
        _logger.Log($"[{data.Timestamp}] {data.Symbol}: ${data.Price} ({data.Change:+0.00;-0.00;0.00})");
    }
}

// Observer para alertas
public class StockPriceAlertService : IAsyncObserver<StockPrice>
{
    private readonly INotificationService _notificationService;
    private readonly Dictionary<string, decimal> _alertThresholds;

    public StockPriceAlertService(INotificationService notificationService)
    {
        _notificationService = notificationService;
        _alertThresholds = new Dictionary<string, decimal>();
    }

    public void SetAlert(string symbol, decimal threshold)
    {
        _alertThresholds[symbol] = threshold;
    }

    public async Task UpdateAsync(StockPrice data, CancellationToken cancellationToken = default)
    {
        if (_alertThresholds.TryGetValue(data.Symbol, out var threshold))
        {
            if (Math.Abs(data.Change) >= threshold)
            {
                var message = $"ALERT: {data.Symbol} moved {data.Change:+0.00;-0.00;0.00} to ${data.Price}";
                await _notificationService.SendAlertAsync(message, cancellationToken);
            }
        }
    }
}

// Observer para cache/armazenamento
public class StockPriceCache : IObserver<StockPrice>
{
    private readonly ConcurrentDictionary<string, StockPrice> _cache;
    private readonly IRepository<StockPrice> _repository;

    public StockPriceCache(IRepository<StockPrice> repository)
    {
        _cache = new ConcurrentDictionary<string, StockPrice>();
        _repository = repository;
    }

    public void Update(StockPrice data)
    {
        // Atualizar cache
        _cache.AddOrUpdate(data.Symbol, data, (key, oldValue) => data);

        // Persistir de forma assíncrona (fire-and-forget)
        Task.Run(async () =>
        {
            try
            {
                await _repository.SaveAsync(data);
            }
            catch (Exception ex)
            {
                // Log error but don't throw
                Console.WriteLine($"Error saving stock price: {ex.Message}");
            }
        });
    }

    public StockPrice GetLatestPrice(string symbol)
    {
        _cache.TryGetValue(symbol, out var price);
        return price;
    }
}

// Sistema principal de monitoramento
public class StockPriceMonitor : IDisposable
{
    private readonly ThreadSafeSubject<StockPrice> _syncSubject;
    private readonly AsyncSubject<StockPrice> _asyncSubject;
    private readonly Timer _priceUpdateTimer;
    private readonly Random _random = new Random();

    public StockPriceMonitor()
    {
        _syncSubject = new ThreadSafeSubject<StockPrice>();
        _asyncSubject = new AsyncSubject<StockPrice>();
        
        // Simular atualizações de preço
        _priceUpdateTimer = new Timer(SimulatePriceUpdate, null, 
            TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
    }

    public void SubscribeSync(IObserver<StockPrice> observer)
    {
        _syncSubject.Subscribe(observer);
    }

    public async Task SubscribeAsync(IAsyncObserver<StockPrice> observer)
    {
        await _asyncSubject.SubscribeAsync(observer);
    }

    private async void SimulatePriceUpdate(object state)
    {
        var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN" };
        var symbol = symbols[_random.Next(symbols.Length)];
        
        var price = new StockPrice
        {
            Symbol = symbol,
            Price = 100 + _random.Next(-50, 51),
            Change = (_random.NextDouble() - 0.5) * 10,
            Timestamp = DateTime.UtcNow
        };

        // Notificar observers síncronos
        _syncSubject.Notify(price);

        // Notificar observers assíncronos
        await _asyncSubject.NotifyAsync(price, TimeSpan.FromSeconds(5));
    }

    public void Dispose()
    {
        _priceUpdateTimer?.Dispose();
        _syncSubject?.Dispose();
        _asyncSubject?.Dispose();
    }
}

// Uso do sistema
public class StockMonitoringService
{
    public async Task StartMonitoringAsync()
    {
        var monitor = new StockPriceMonitor();
        var logger = new ConsoleLogger();
        var notificationService = new EmailNotificationService();
        var repository = new SqlStockPriceRepository();

        // Configurar observers
        var priceLogger = new StockPriceLogger(logger);
        var alertService = new StockPriceAlertService(notificationService);
        var priceCache = new StockPriceCache(repository);

        // Configurar alertas
        alertService.SetAlert("AAPL", 5.0m);
        alertService.SetAlert("GOOGL", 10.0m);

        // Registrar observers
        monitor.SubscribeSync(priceLogger);
        monitor.SubscribeSync(priceCache);
        await monitor.SubscribeAsync(alertService);

        // Manter sistema rodando
        Console.WriteLine("Monitoramento iniciado. Pressione qualquer tecla para parar...");
        Console.ReadKey();

        monitor.Dispose();
    }
}

Benefícios

1. Thread-Safety

  • ReaderWriterLockSlim para performance otimizada em cenários read-heavy
  • Copy-on-iteration previne modificações concorrentes durante notificação
  • Tratamento de exceções isolado evita corrupção de estado

2. Gerenciamento de Memória

  • Weak references previnem vazamentos de memória
  • Cleanup automático de referências mortas
  • Disposable pattern para liberação adequada de recursos

3. Performance

  • Paralelização de notificações quando apropriado
  • Minimização de tempo em locks críticos
  • Otimizações para cenários read-heavy vs write-heavy

4. Robustez

  • Tratamento de exceções não afeta outros observers
  • Timeouts para operações assíncronas
  • Cancellation tokens para operações longas

5. Flexibilidade

  • Suporte tanto para operações síncronas quanto assíncronas
  • Diferentes estratégias de notificação
  • Extensibilidade através de interfaces bem definidas

Considerações de Implementação

Escolha da estratégia de sincronização:

  • lock simples para casos básicos
  • ReaderWriterLockSlim para cenários read-heavy
  • ConcurrentCollection para operações lock-free quando possível

Weak vs Strong References:

  • Weak references para prevenir vazamentos de memória
  • Strong references quando lifecycle é controlado explicitamente

Notificação Síncrona vs Assíncrona:

  • Síncrona para operações rápidas e determinísticas
  • Assíncrona para operações I/O ou processamento pesado

Tratamento de Exceções:

  • Isolamento de falhas entre observers
  • Logging adequado para debugging
  • Recovery gracioso quando possível