Click here to Skip to main content
15,886,110 members
Articles / Programming Languages / C#

RX: How to extend RX with Fluent extensions. Demonstrating a batched, a pausable and a synchronised observable

Rate me:
Please Sign up or sign in to vote.
5.00/5 (2 votes)
21 Oct 2014CPOL4 min read 27.1K   62   12   5
Demonstrates how to extend RX with fluent extensions using an synchronised, pausable and batch observable.

 

Introduction

If you use RX heavily, you will undoubted come across gaps in the functionality provided.  This article will take you through how to extend RX using fluent extensions.

Background

Extending RX is a lot easier that you'd ever imagine, and can be achieved in a few lines of code. All you really need are two elements, an IObservable implementation and an extension method.

The IObservable implemenation

The IObservable interface is very simply contract with only one method Subscribe, which takes a single IObserver argument. Before I continue if you don't know the difference between a hot and cold observable then, I'll quickly explain; A hot observable is an observable that publishes data even if it doesn't have any observers subscribing, where as a cold observable only publishes when an observer is subscribing. The typical behavior for most of the provided observables, is cold and they will only generate a sequence when a observer is subscribing.

Why is that important for us?  Well when creating your IObservable implementation, hot and cold observables will required very different implementations.

A Hot implementation

When creating a hot implementation, every observable subscription will share the same generated sequence. This means that the class will need to start generating the sequence either before an observer subscribes or when the first observer subscribes. It also means that when an observer subscribes it will need to be captured and held in a list, until disposed at which point it needs to be removed from the list, see below;

C++
private readonly HashSet<IObserver<T>> _observers = new HashSet<IObserver<T>>();

public void Subscribe(IObserver<T> observer)
{
    // If not already add, add the observer
   lock(_observers) 
   {
      if(_observers.Contains(observer)) return Disposabled.Empty();
       _observers.Add(observer);
   }

   // Create a disposable delegate that removes the observable from the list when 
   // dispose is called
   return Disposable.Create(() =>
    {
       lock(_observers)
       {
           _observers.Remove(observer);
       }
    }
}

When generating the sequence, each of the observers will need to be notified of each element generated.

C++
public void Generate()
{
    int i = 0;
    while(true)
    {
        lock(_observers)
        {
            foreach(var o in _observers)
                o.OnNext(i);
        }
        Thread.Sleep(50);
    } 
}

A Cold implementation

While a cold implementation doesn't need to maintain a list of observers, it does have a different requirement. As a sequence is produced only when the observer subscribes and only for that observer, the sequence needs to be generated at the time of the subscription.  All functionality and objects required to create the sequence must be declared at that point and should not be shared, see the example below;

C++
public void Subscribe(IObserver<int> observer)
{
    var thread = new Thread( () =>
     {
        int i = 0;
        while(true)
        {
            o.OnNext(i);
            Thread.Sleep(50);
        } 
     });
     thread.Start();

    return Disposable.Create(() => thread.Abort());
}

A Functional approach

An alternative and more functional approach is to use the Observable.Create method, which does not require you to construct a class, and certainly makes more sense for simpler implementations.   The method has several different overloads, but probably the easiest, is the one that accepts a function with an IObserver parameter and that returns an IDisposable.  The same principals still apply, in that all functionality and objects required to create the sequence must be declared at that point and should not be shared.  See below for an example;

C++
Observable.Create<string>( observer =>
{
   return a => observer.OnNext(a => a.ToString()),  
                ex => observer.OnError(ex),  
                () => observer.OnCompleted());
}

 

Interacting with an existing sequencing

It is most likely that if you are going to create a Fluent extension that you will want to interact with an existing sequence and either modify its output or add some form of behavior. To do this is actually very simple and all you need is an existing observable. The observable should be passed as an arguement to your class constructor and depending on whether you are implementing a cold or hot observable, subscribed to in your class's subscribe method or when you start to generate your sequence, see the hot and cold examples below;

Hot Example

C++
  //
  // A constructor that takes an observable, that will be the source for the records
  //
  public ToStringObservable(IObservable<T> observable)
  {
    _observable = observable;
  }
  
  public void Generate()
  {
      _dispose = _observable.Subscribe(
        a => 
        { 
           lock(_observers)
            {
                foreach(var o in _observers)
                    o.OnNext(a.ToString());
            }
         }, 
        ex => 
        { 
           lock(_observers)
            {
                foreach(var o in _observers)
                    o.OnError(ex)
            }
        }, 
        () =>
        {
           lock(_observers)
            {
                foreach(var o in _observers)
                    o.OnCompleted();
            }
        });
   }

Cold Example

C++
  public ToStringObservable(IObservable<T> observable)
  {
    _observable = observable;
  }
  
  public IDisposable Subscribe(IObserver<String> observer)
  {
     return _observable.Subscribe( a => 
         { 
             observer.OnNext(a.ToString());
         }, e => observer.OnError(e), () => observer.OnCompleted());
  }

}

Creating the Fluent extension

This is probably the easier part of the whole process, you simply need to construct and return you IObservable implementation.  For hot or shared this makes less sense.

C++
// Converts each element of the sequence to a string
public static IObservable<string> ToStringObservable<T>(this IObservable<T> observable)
{
    return new ToStringObservable<T>(observable);
}

A Cold Functional Fluent extension

Following the earlier functional example, below is an example of how to create a Fluent extension with a functional approach.  In short your taking the functionally from the case and moving it to the Observable.Create method.

C++
// Converts a sequence into a pausable sequence
public static IObservable<string> ToPausable<T>(this IObservable<T> observable, 
                                                          PauseState pause)
{
    //Call the IObservable create method to create the sequence
    return Observable.Create<string>( observer =>
        {
           //queue to store the squence items while paused
           var queue = new Queue<T>(); 

           //as we have mutiple observable sequences we will have several observables
           //so a composite disposable collection becomes very useful
           var disposables = new CompositeDisposable();

           //watch for the pause state to be set to unpaused 
           //so we can replay collected items
           disposables.Add(_pause.StateChanged
                  .Where(p => !p)
                  .Subscribe( _ =>
                      {
                          lock (queue)
                          {
                              while (queue.Count > 0)
                              {
                                  observer.OnNext(queue.Dequeue());
                              }
                          }
                      }));

           //subscribe to the sequence and while pause collect the items
           disposables.Add( observable.Subscribe( a => 
               {
                    lock(queue)
                    {
                        if (_pause.Paused)
                            queue.Enqueue(a);
                        else
                            observer.OnNext(a);
                    }
                }, e => observer.OnError(e), () => observer.OnCompleted()));

       return disposables;
    });
} 

 

A simple batched observable

Rx provides the SelectMany Linq extension that turns a single element into an enumeration, but what if you wanted to batch together updates and process them at the same time? (an example being if you wanted to limit the calls to the dispatcher).  Well how about this; a batch observable.

Below is an IObservable implementation that observes notifications from a sequence and batches them into batches of a specified size, before sending them to an observer.  All functionality and objects required to perform the batching need to be performed and created in the Subscribe method, as this method could be called multiple times and each observer will expect not to share the batched data.

C++
//
// Batched observable class that batches records in a specified size
//
public class BatchObservable<T> : IObservable<IEnumerable<T>>
{
  private readonly IObservable<T> _observable;
  private readonly int _batchSize;
  
  //
  // A constructor that takes an observable, that will be the source for the records
  //
  public BatchObservable(IObservable<T> observable, int batchSize)
  {
    _observable = observable;
    _batchSize = batchSize;
  }
  
  //
  // Subscribes the batch observable
  // The records are held in a list until enough records are found 
  // at which point the observer is informed
  //
  public IDisposable Subscribe(IObserver<IEnumerable<T>> observer)
  {
     var batch = new List<T>();
     return _observable.Subscribe( a => 
         { 
             batch.Add(a);
             if( batch.Count == _batchSize)
             {
                observer.OnNext(batch);
                batch.Clear();
             }
         }, e => observer.OnError(e), () => observer.OnCompleted());
  }

}

Next we need the extension method.  Quite simply this takes the same parameters as the batch class and we will call the method ToBatch.

C++
//
// Extension method to create the batchObserver
//
public static IObservable<IEnumerable<T>> ToBatch<T>(this IObservable<T> observable, int batchSize)
{
    return new BatchObservable<T>(observable, batchSize);
}

To use the method we simply call it from an observer.

C++
//
// Call the ToBatch method
//
obseverable.ToBatch(100)

Other exmaples

The zip file contains the following exmaple observables

LatestByKeyObservable     Provides the most recent element for each item, based on a key.

PausableObservable           An observable that can be paused.

SynchronisedObservable   Synchronises to two observables based on a specified comparison.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Technical Lead
United Kingdom United Kingdom
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
QuestionA simple batched observable Pin
George Swan22-Oct-14 6:12
mveGeorge Swan22-Oct-14 6:12 
QuestionWouldn't you use Observable.Create() Pin
Sacha Barber22-Oct-14 0:06
Sacha Barber22-Oct-14 0:06 
AnswerRe: Wouldn't you use Observable.Create() Pin
greggma22-Oct-14 1:16
greggma22-Oct-14 1:16 
GeneralRe: Wouldn't you use Observable.Create() Pin
Sacha Barber22-Oct-14 11:42
Sacha Barber22-Oct-14 11:42 
QuestionNice article is demonstrates how to extend RX with fluent extensions using an synchronised, pausable and batch observable. Pin
Volynsky Alex21-Oct-14 12:02
professionalVolynsky Alex21-Oct-14 12:02 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.