Neste post vou falar sobre aplicações reativas e quando pensamos nesse assunto nos perguntamos por que desenvolver uma aplicação reativa?

Por que usuários esperam dados reais em tempo real, eles esperam que a confirmação de uma compra online seja confirmada na hora, eles esperam que os jogos online sejam responsivos e respondam a determinadas ações.

A programação reativa baseia-se em fluxos de dados e propagação de mudanças com o modelo de execução de uma linguagem de programação repercutindo automaticamente através do fluxo de dados.

Para se construir aplicações reativas, as aplicações reativas devem ser capazes de:

  • Reagir a eventos – a natureza de ser orientada a eventos e reagir conforme a propagação de um evento
  • Reagir a cargas – Deve ser escalável
  • Reagir a falhas – Ser resiliente a falhas em todos os níveis
  • Reagir aos usuários – Combinar todas as características acima citada para proporcionar uma experiência de usuário interativa.

E para atender a demandas de aplicações reativas eis que surgi o Rx .NET (Reactive Extensions). 

O que é Rx .NET (Reactive Extensions)

O Reactive Extensions (Rx) é uma biblioteca open source para nós desenvolvedores compor programas assíncronos baseados em eventos usando LINQ e o padrão de sequência Observable

Com o Rx podemos representar fluxos de dados assíncronos com Observable, fazer consultas em fluxos de dados assíncronos usando operadores LINQ e também podemos parametrizar o fluxo concorrente de dados usando Schedulers.

Como funciona

Usando o Rx, nós podemos representar múltiplos fluxos de dados assíncronos, esses fluxos de dados podem ser por exemplo cotações de ações, twitters com determinadas hashtag, web service request entre outros e subscrever os eventos do fluxo de dados usando a interface IObserver<T>. A interface IObserver<T> notifica o nossa aplicação quando um evento ocorre e assim podemos tratar esse evento e fazer determinadas ações.

O Rx também fornece métodos para tratamento de exceções, cancelamentos, sincronizações entre outros.

Arquitetura do Rx .NET

RxArch

Usando o Rx .NET (Reactive Extensions)

O Rx .NET está disponível  no NuGet e para usar a biblioteca basta executar esse comando no Package Manager Console: Install-Package Rx-Main -Pre

Vamos a alguns exemplos de como usar o Rx.Net

Observando um determinado Evento:

public static event EventHandler SampleEvent;

static void Main(string[] args)
{
   var eventAsObservable = Observable.FromEventPattern(ev => SampleEvent += ev, 
                                                       ev => SampleEvent -= ev);

   //Create event subscriber
   var s = eventAsObservable.Subscribe(x => Console.WriteLine("evento recebido"));

   Console.WriteLine("Raise event");
   if (SampleEvent != null)
       SampleEvent(null, EventArgs.Empty);

   Thread.Sleep(100);

   Console.WriteLine("Unsubscribe");
   s.Dispose();
   Console.ReadKey();
}

Disparando um Subscribe por Intervalos

static void Main()
{
 
   var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();

   using (observable.Subscribe(
          x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
   {
     Console.WriteLine("Press any key to unsubscribe");
     Console.ReadKey();
   }

   Console.WriteLine("Press any key to exit");


   Console.ReadKey();
}

Observando um Range de Valores

static void Main(string[] args)
{
    Samples().Wait();
}

static async Task Samples()
{
    var xs = Observable.Range(0, 10, ThreadPoolScheduler.Instance);

    Console.WriteLine("Last  = " + await xs);
    Console.WriteLine("First = " + await xs.FirstAsync());
    Console.WriteLine("Third = " + await xs.ElementAt(3));
    Console.WriteLine("All   = " + string.Join(", ", await xs.ToList()));

    try
    {
        Console.WriteLine("Erro = " + await xs.Select(x => 1 / (5 - x)));
    }
    catch (DivideByZeroException)
    {
        Console.WriteLine("Aconteceu um Erro");
    }
}

Suporte a Subscribe Async

static void Main(string[] args)
{
    var fsw = new FileSystemWatcher(@"C:\")
    {
       IncludeSubdirectories = true,
       EnableRaisingEvents = true
    };

    var changes = from c in Observable.FromEventPattern<FileSystemEventArgs>(fsw, "Changed")
                  select c.EventArgs.FullPath;
    changes.Window(TimeSpan.FromSeconds(5)).Subscribe(async window =>
    {
         var count = await window.Count();
         Console.WriteLine("{0} events last 5 seconds", count);
    });

    Console.ReadLine();
}

Fazendo um Jogo Ping-Pong Reativo

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ReactiveExtensions
{
    class Program
    {
        static void Main(string[] args)
        {
            var ping = new Ping();
            var pong = new Pong();

            Console.WriteLine("Press any key to stop ...");

            var pongSubscription = ping.Subscribe(pong);
            var pingSubscription = pong.Subscribe(ping);

            Console.ReadKey();

            pongSubscription.Dispose();
            pingSubscription.Dispose();

            Console.WriteLine("Ping Pong has completed.");
        }
    }

    class Ping : ISubject<Pong, Ping>
    {
        public void OnNext(Pong value)
        {
            Console.WriteLine("Ping received Pong.");
        }

        public void OnError(Exception exception)
        {
            Console.WriteLine("Ping experienced an exception and had to quit playing.");
        }

        public void OnCompleted()
        {
            Console.WriteLine("Ping finished.");
        }

        public IDisposable Subscribe(IObserver<Ping> observer)
        {
            return Observable.Interval(TimeSpan.FromSeconds(2))
                .Where(n => n < 10)
                .Select(n => this)
                .Subscribe(observer);
        }

        public void Dispose()
        {
            OnCompleted();
        }


    }

    class Pong : ISubject<Ping, Pong>
    {
        public void OnNext(Ping value)
        {
            Console.WriteLine("Pong received Ping.");
        }

        public void OnError(Exception exception)
        {
            Console.WriteLine("Pong experienced an exception and had to quit playing.");
        }

        public void OnCompleted()
        {
            Console.WriteLine("Pong finished.");
        }


        public IDisposable Subscribe(IObserver<Pong> observer)
        {
            return Observable.Interval(TimeSpan.FromSeconds(1.5))
                .Where(n => n < 10)
                .Select(n => this)
                .Subscribe(observer);
        }
        public void Dispose()
        {
            OnCompleted();
        }

        
    }
}

O Rx .NET é a ferramenta ideal para criarmos aplicações reativas e incrivelmente poderosas. Com aplicações reativas conseguimos aumentar o nível de interatividade dos nossos usuários utilizando esse modelo de programação em aplicações web e até jogos.

O que acharam do Rx .NET ? Não deixem de comentar.

Para mais informações sobre o Rx .Net clique aqui.

Abs e até a próxima.

Rafael Cruz

É desenvolvedor .NET há mais de 12 anos, certificado Microsoft desde de 2006, instrutor oficial Microsoft há 5 anos, Pós Graduado em Engenharia de Software pela UFRJ, suas certificações são Microsoft Certified Trainer, Microsoft Certified Solution Developer (Web, Application Lifecycle Management), Microsoft Certified Professional Developer, Microsoft Certified Tecnology Specialist.
Atualmente trabalha como arquiteto de sistema, escreve artigos no .NET Coders e no seu blog rafaelcruz.azurewebsites.net para compartilhar experiências na área de desenvolvimento Web, Aplicativos Móveis e Cloud Solutions.

Twitter LinkedIn 

Comentários

comentarios