Click here to Skip to main content
15,867,686 members
Articles / Programming Languages / C#

Reactive Programming for .NET and C# Developers - An Introduction To IEnumerable, IQueryable, IObservable, and IQbservable

Rate me:
Please Sign up or sign in to vote.
5.00/5 (26 votes)
1 Sep 2013CPOL10 min read 98.2K   85   7
Exploring Reactive Programming including a detailed look at Interactive and Reactive Extensions for .NET and C# developers.

Preface

Image 1

Reactive Extensions have been out there in the wild for some time, and in this post we should discuss Reactive extensions in a bit more detail. Also, in this post we’ll touch IQbservables – the most mysteriously named thing/interface in the world, may be after Higgs Boson. Push and Pull sequences are everywhere – and now with devices on one end and the cloud at the other end, most data transactions happen via push/pull sequences. Hence, it is essential to grab the basic concepts regarding the programming models around them.

First Things First

Let us take a step back and discuss IEnumerable and IQueryable first, before discussing further about Reactive IObservable and IQbservable (Qbservables = Queryable Observables – Oh yea, funny name).

IEnumerable<T>

As you may be aware, the IEnumerable model can be viewed as a pull operation. You are getting an enumerator, and then you iterate the collection by moving forward using MoveNext on a set of items till you reach the final item. And Pull models are useful when the environment is requesting data from an external source. To cover some basics - IEnumerable has a GetEnumerator method which returns an enumerator with a MoveNext() method and a Current property. Offline tip - A C# foreach statement can iterate on any dumb thing that can return a GetEnumerator. Anyway, here is what the non-generic version of IEnumerable looks like:

C#
public interface IEnumerable
{
    IEnumerator GetEnumerator();
}

public interface IEnumerator
{
    Object Current {get;}
    bool MoveNext();
    void Reset();
}

Now, LINQ defines a set of operators as extension methods, on top of the generic version of IEnumerable – i.e., IEnumerable<T> - So by leveraging the type inference support for Generic Methods, you can invoke these methods on any IEnumerable without specifying the type. I.e., you could say someStringArray.Count() instead of someStringArray.Count<String>(). You can explore the Enumerable class to find these static extensions.

The actual query operators in this case (like Where, Count, etc.) with related expressions are compiled to IL, and they operate in a process much like any IL code is executed by CLR. From an implementation point of view, the parameter of LINQ clauses like Where is a lambda expression (as you may already know, from.. select is just syntax sugar that gets expanded to extension methods of IEnumerable<T>), and in most cases a delegate like Func<T,..> can represent an expression from an in-memory perspective. But what if you want to apply query operators on items sitting somewhere else? For example, how to apply LINQ operators on top of a set of data rows stored in a table in a database that may be in the cloud, instead of an in-memory collection that is an IEnumerable<T>? That is exactly what IQueryable<T> is for.

IQueryable<T>

IQueryable<T> is an IEnumerable<T> (it inherits from IEnumerable<T>) and it points to a query expression that can be executed in a remote world. The LINQ operators for querying objects of type IQueryable<T> are defined in the Queryable class, and returns Expression<Func<T..>> when you apply them on an IQueryable<T>, which is a System.Linq.Expressions.Expression (you can read about expression trees here). This will be translated to the remote world (say a SQL system) via a query provider. So, essentially, IQueryable concrete implementations point to a query expression and a Query Provider – it is the job of the Query Provider to translate the query expression to the query language of the remote world where it gets executed. From an implementation point of view, the parameter you pass to LINQ that is applied on an IQueryable is assigned to an Expression<T,..> instead. Expression trees in .NET provide a way to represent code as data or a kind of Abstract Syntax Trees. Later, the query provider will walk through this to construct an equivalent query in the remote world.

C#
public interface IQueryable : IEnumerable {       
    Type ElementType { get; }
    Expression Expression { get; }
    IQueryProvider Provider { get; }
}
public interface IQueryable<T> : IEnumerable<T>, IQueryable, IEnumerable {
   ..
} 

For example, in LINQ to Entity Framework or LINQ to SQL, the query provider will convert the expressions to SQL and hand it over to the database server. You can even view the translation to the target query language (SQL) just by looking at them or in short, the LINQ query operators you apply on IQueryable will be used to build an expression tree, and this will be translated by the query provider to build and execute a query in a remote world. Read this article if you are not clear about how expression trees are built using Expression<T> from Lambdas.

Reactive Extensions

So, now let us get into the anatomy and philosophy of observables.

IObservable <T>

As we discussed, objects of type IEnumerable<T> are pull sequences. But then, in real world, at times we push things as well – not just pull. (Health Alert – when you do both together, make sure you do it safe). In a lot of scenarios, the push pattern makes a lot of sense – for example, instead of you waiting in a queue infinitely day and night with your neighbors in front of the local post office to collect snail mails, the post office agent will just push you the mails to your home when they arrive.

Now, one of the cool things about push and pull sequences are, they are duals. This also means, IObservable<T> is a dual of IEnumerable<T> – see the code below. So, to keep the story short, the dual interface of IEnumerable derived using the Categorical Duality is IObservable. The story goes like some members in Erik’s team (he was with Microsoft then) had a well deserved temporal megalomaniac hyperactive spike when they discovered this duality. Here is a beautiful paper from Erik on that if you are more interested – A brief summary of Erik’s paper is below.

C#
//Generic version of IEnumerable, ignoring the non generic IEnumerable base
interface IEnumerable<out T>
{
    IEnumerator<T> GetEnumerator();
}

interface IEnumerator<out T>: IDisposable
{
    bool MoveNext(); // throws Exception
    T Current { get; } 
}

//Its dual IObservable
interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

interface IObserver<in T>
{
    void OnCompleted(bool done);
    void OnError(Exception exception);
    T OnNext { set; } 
}

Surprisingly, the IObservable implementation looks like the Observer pattern.

Now, LINQ operators are cool. They are very expressive, and provide an abstraction to query things. So the crazy guys in the Reactive Team thought they should take LINQ to work against event streams. Event streams are in fact push sequences, instead of pull sequences. So, they built IObservable. The IObservable fabric lets you write LINQ operators on top of push sequences like event streams, much the same way you query IEnumerable<T>. The LINQ operators for an object of type IObservable<T> are defined in the Observable class. So, how will you implement a LINQ operator, like where, on an observer to do some filtering? Here is a simple example of the filter operator Where for an IEnumerable and an IObservable (simplified for comparison). In the case of IEnumerable, you dispose the enumerator when we are done with traversing.

C#
//Where for IEnumerable

static IEnumerable<T> Where<T>(IEnumerable<T> source, Func<T, bool> predicate)
{
    // foreach(var element in source)
    //   if (predicate(element))
    //        yield return element;

    using (var enumerator = source.GetEnumerator())
    {
        while (enumerator.MoveNext())
        {
            var value= enumerator.Current;
            if (predicate(value))
            {
                yield return value;
            }
        }
    }
}

//Where for IObservable

static  IObservable<T> Where<T>(this IObserver<T> source, Func<T, bool> predicate)
{
   return Observable.Create<T>(observer =>
   {
       return source.Subscribe(Observer.Create<T>(value =>
           {
               try
               {
                   if (predicate(value)) observer.OnNext(value);
               }
               catch (Exception e)
               {
                   observer.OnError(e);
               }
           }));
   });
}

Now, look at the IObservable’s Where implementation. In this case, we return the IDisposable handle to an Observable so that we can dispose it to stop subscription. For filtering, we are simply creating an inner observable that we are subscribing to the source to apply our filtering logic inside that - and then creating another top level observable that subscribes to the inner observable we created. Now, you can have any concrete implementation for IObservable<T> that wraps an event source, and then you can query that using Where!! Cool. The Observable class in Reactive Extensions has a few helper methods to create observables from events, like FromEvent. Let us create an observable, and query the events now. Fortunately, the Rx Team already has the entire implementation of Observables and related Query operators so that we don’t end up writing customer query operators like this.

You can do a nuget for install-package Rx-Main to install Rx, and try out this example that shows event filtering.

C#
//Let us print all ticks between 5 seconds and 20 seconds

//Interval in milli seconds
var timer = new Timer() { Interval = 1000 };
timer.Start();

//Create our event stream which is an Observable
var eventStream = Observable.FromEventPattern<ElapsedEventArgs>(timer, "Elapsed");
var nowTime = DateTime.Now;

//Same as eventStream.Where(item => ...);
var filteredEvents = from e in eventStream
                     let time = e.EventArgs.SignalTime
                     where
                         time > nowTime.AddSeconds(5) &&
                         time < nowTime.AddSeconds(20)
                     select e;
//Subscribe to our observable
filteredEvents.Subscribe(t => Console.WriteLine(DateTime.Now));
Console.WriteLine("Let us wait..");

//Dispose filteredEvents explicitly if you want
Console.ReadKey();

Obviously, in the above example, we could’ve used Observable.Timer – but I just wanted to show how to wrap an external event source with observables. Similarly, you can wrap your Mouse Events or WPF events. You can explore more about Rx and observables, and a few applications here. Let us move on now to IQbservables.

IQbservable<T>

Now, let us focus on IQbservable<T>. IQbservable<T> is the counterpart to IObserver<T> to represent a query on push sequences/event sources as an expression, much like IQueryable<T> is the counterpart of IEnumerable<T>. So, what exactly does this mean? If you inspect IQbservable, you can see that:

C#
public interface IQbservable<out T> : IQbservable, IObservable<T>
{
}

public interface IQbservable
{
    Type ElementType { get; }
    Expression Expression { get; }
    IQbservableProvider Provider { get; }
} 

You can see that it has an Expression property to represent the LINQ to Observable query much like how IQueryable had an Expression to represent the AST of a LINQ query. IQbservableProvider is responsible for translating the expression to the language of a remote event source (may be a stream server in the cloud).

Interactive Extensions

Interactive Extensions, at its core, has a number of new extensions methods for IEnumerable<T> – i.e., it adds a number of utility LINQ to Object query operators. You may have hand coded some of these utility extension methods somewhere in your helpers or utility classes, but now a lot of them are aggregated together by the Rx team. Also, this post assumes you are familiar with the cold IEnumerable model and iterators in C#. Basically, what the C# compiler does is, it takes a yield return statement and generates a class out of that for each iterator. So, in one way, each C# iterator internally holds a state machine. You can examine this using Reflector or something, on a method yield returning an IEnumerator<T>. Or better, there is a cool post from my friend Abhishek Sur here or this post about implementation of Iterators in C#.

More About Interactive Extensions

Fire up a C# console application, and install the Interactive Extensions Package using install-package Ix-Main. You can explore the System.Linq.EnumerationsEx namespace in System.Interactive.dll - Now, let us explore some useful extension methods that got added to IEnumerable.

image

Examining Few Utility Methods In Interactive Extensions

Let us quickly examine a few useful Utility methods.

Do

What the simplest version of Do does is pretty interesting. It'll lazily invoke an action on each element in the sequence, when we do the enumeration leveraging the iterator.

C#
//Let us create a set of numbers
var numbers = new int[] { 30, 40, 20, 40 };
var result=numbers.Do(n=>Console.WriteLine(n));

Console.WriteLine("Before Enumeration");

foreach(var item in result)
{
    //The action will be invoked when we actually enumerate
}
Console.WriteLine("After Enumeration");

And the result below. Note that the action (in this case, our Console.WriteLine to print the values) is applied when we enumerate.

image

Now, the implementation of the simplest version of the Do method is something like this: if you have a quick peek at the the Interactive Extensions source code here in CodePlex, you could see how our Do method is actually implemented. Here is a shortened version:

C#
public static class StolenLinqExtensions
{
    public static IEnumerable<TSource> StolenDo<TSource>(
            this IEnumerable<TSource> source, Action<TSource> onNext)
    {
        //Get the enumerator
        using (var e = source.GetEnumerator())
        {
            while (true)
            {
                //Move next
                if (!e.MoveNext())
                    break;
                var current = e.Current;

                //Call our action on top of the current item
                onNext(current);

                //Yield return
                yield return current;
            }
        }
    }
}

Cool, huh?

DoWhile

DoWhile in Ix is pretty interesting. It generates an enumerable sequence, by repeating the source sequence till the given condition is true.

C#
IEnumerable<TResult> DoWhile<TResult>(IEnumerable<TResult> source, Func<bool> condition)

Consider the following code:

C#
var numbers = new int[] { 30, 40, 20, 40 };
var then = DateTime.Now.Add(new TimeSpan(0, 0, 10));
var results = numbers.DoWhile(() => DateTime.Now < then);

foreach (var r in results)
Console.WriteLine(r);

As expected, you’ll see the foreach loop enumerating results repeatedly, till we meet the DateTime.Now < then condition – i.e., till we reach 10 seconds.

Scan

Scan will take a sequence, to apply an accumulator function to generate a sequence of accumulated values. For an example, let us create a simple sum accumulator that'll take a set of numbers to accumulate the sum of each number with the previous one:

C#
var numbers = new int[] { 10, 20, 30, 40 };
 
//0 is just the starting seed value
var results = numbers.Scan(0,(sum, num) => sum+num);

//Print Results. Results will contain 10, 30, 60, 100

//0+10=10
//10+20 = 30
//30 + 30 = 60
//60 + 40 = 100

And you may have a look at the actual Scan implementation, from the Rx repository in CodePlex. Here is an abbreviated version:

C#
IEnumerable<TAccumulate> StolenScan<TSource, TAccumulate>
   (this IEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, 
                       TSource, TAccumulate> accumulator)
{
    var acc = seed;
    foreach (var item in source)
    {
        acc = accumulator(acc, item);
        yield return acc;
    }
}

Conclusion

We just touched the tip of the iceberg, as the objective of this post was to introduce you to Ix and Rx. There is a pretty exciting talk from Bart De Smet here that you should not miss. Ix is specifically very interesting because of its functional roots. Have a look at the Reactive Extensions repository in CodePlex for more inspiration, that should give you a lot more ideas about a few functional patterns. You may also play with Ix Providers and Ix Async packages.

And let me take the liberty of embedding the drawing created by Charles that is a concrete representation of the abstract drawing Bart did in the white board. This is the summary of this post.

representation of the three dimensional graph of Rx's computational fabric

We’ll discuss more practical scenarios where Rx and Ix come so handy in future – mainly for device to cloud interaction scenarios, complex event processing, task distribution using ISheduler, etc. - along with some brilliant add-on libraries others are creating on top of Rx. But this one was for a quick introduction. Happy coding!!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect
India India
Architect, Developer, Speaker | Wannabe GUT inventor & Data Scientist | Microsoft MVP in C#

Comments and Discussions

 
GeneralMy vote of 5 Pin
Duncan Edwards Jones1-Oct-15 20:43
professionalDuncan Edwards Jones1-Oct-15 20:43 
QuestionQuestion Pin
Sacha Barber3-Sep-13 10:35
Sacha Barber3-Sep-13 10:35 
AnswerRe: Question Pin
Anoop Pillai3-Sep-13 18:16
Anoop Pillai3-Sep-13 18:16 
GeneralRe: Question Pin
Sacha Barber4-Sep-13 3:41
Sacha Barber4-Sep-13 3:41 
QuestionNice Pin
Sacha Barber1-Sep-13 21:43
Sacha Barber1-Sep-13 21:43 
GeneralMy vote of 5 Pin
seee sharp1-Sep-13 20:23
seee sharp1-Sep-13 20:23 
GeneralMy vote of 5 Pin
Prafulla Hunde1-Sep-13 9:25
Prafulla Hunde1-Sep-13 9:25 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.