Click here to Skip to main content
15,885,278 members
Articles / Programming Languages / C#

Task Completion On Event

Rate me:
Please Sign up or sign in to vote.
5.00/5 (4 votes)
24 Apr 2016CPOL5 min read 23.8K   188   8   4
A road trip to implement generic TaskCompletionSource for non-generic message event types

Introduction

Recently, I implemented a solution to a specific use case for invoking a continuation (using TaskCompletionSource) for an event that passes in a specific message derived from a base class. For example, consider several messages that can be generated from some "external" source (a couple real life example: an Ingenico iSC480 and also receiving RabbitMQ messages from a BeagleBone):

Image 1

Each of these messages is derived from a base class, so the boilerplate concrete implementation looks like this:

C#
public class Message { }
public class Message1 : Message { }
public class Message2 : Message { }
public class Message3 : Message { }

In my use case, these messages are received effectively asynchronously and the specific message will be received only after some user action has occurred. In my particular case, I know exactly what message I should receive.

It should also be noted that I wanted to avoid writing if-then-else or switch statements based on the message ID or the message packet type. The goals were:

  1. Be able to await for a specific message
  2. Have the underlying implementation automatically cancel the awaiting task if a different message is received.

So, in my particular case, I wanted to be able to do something like this:

C#
// Do something that expects Message2 to be returned at some point
Message2 msg = await device.WaitForMessage<Message2>();
// Do something once Message2 has been received.

The complexity in this occurs partly because the message packets are received through a standard event handler:

C#
public event EventHandler<MessageEventArgs> MessageReceived;

So, the overall workflow looks something like this:

Image 2

Where MessageEventArgs looks like this (at least for the example in this article):

C#
public class MessageEventArgs : EventArgs
{
  public Message Message { get; set; }
}

A very important point to this story is that I am only ever expecting one kind of message to allow the continuation. Anything else should cancel processing the continuation.

A Mock Device

We can implement a mock device like this:

C#
public class MockDevice
{
  public event EventHandler<MessageEventArgs> MessageReceived;

  public void SimulateMessage(int n)
  {
    switch (n)
    {
      case 1:
        MessageReceived.Fire(this, new MessageEventArgs() { Message = new Message1() });
        break;

      case 2:
        MessageReceived.Fire(this, new MessageEventArgs() { Message = new Message2() });
        break;

      case 3:
        MessageReceived.Fire(this, new MessageEventArgs() { Message = new Message3() });
        break;
    }
  }
}

In the actual implementation, there is no switch statement, instead I use class attributes to designate which concrete message packet class gets instantiated for which message, via reflection (as well as populating message-specific properties from the parameters that are acquired separately from the device), but all that is outside the purpose of this article.

A Basic Interface to the Device

The basic interface to the device looks like this:

C#
public class DeviceInterface
{
  public MockDevice Device { get; protected set; }

  public DeviceInterface()
  {
    Device = new MockDevice();
    Device.MessageReceived += OnMessageReceived;
  }

  protected void OnMessageReceived(object sender, MessageEventArgs e)
  {
    // Here we want to set the task completion result.
  }
}

Take 1: Completing With a Message Result

Image 3

TaskCompletionSource (read about it here) is a useful mechanism for external asynchronous operations. From MSDN:

In many scenarios, it is useful to enable a Task<TResult> to represent an external asynchronous operation. TaskCompletionSource<TResult> is provided for this purpose. It enables the creation of a task that can be handed out to consumers, and those consumers can use the members of the task as they would any other. However, unlike most tasks, the state of a task created by a TaskCompletionSource is controlled explicitly by the methods on TaskCompletionSource. This enables the completion of the external asynchronous operation to be propagated to the underlying Task.

I initially used it like this:

C#
public class DeviceInterface
{
  public MockDevice Device { get; protected set; }

  protected TaskCompletionSource<Message> tcs;

  public DeviceInterface()
  {
    Device = new MockDevice();
    Device.MessageReceived += OnMessageReceived;
  }

  public Task<Message> WaitForMessage()
  {
    tcs = new TaskCompletionSource<Message>();

    return tcs.Task;
  }

  protected void OnMessageReceived(object sender, MessageEventArgs e)
  {
    tcs.SetResult(e.Message);
  }
}

Notice the declaration TaskCompletionSource<Message>.  This immediately indicates that we have a problem, because we're specifying the result type as Message, rather than one of the desired sub-classes of Message.  We'll deal with that later.  First, let's just get some basic understanding of how to use TaskCompletionSource under our belt.

A simple test program illustrates that this works:

C#
class Program
{
  static DeviceInterface deviceInterface = new DeviceInterface();

  static void Main(string[] args)
  {
    DoSomethingThatExpectsMessage2();
    Thread.Sleep(500);
    deviceInterface.Device.SimulateMessage(2);
  }

  static async void DoSomethingThatExpectsMessage2()
  {
    Message2 msg = (Message2)await deviceInterface.WaitForMessage();
    Console.WriteLine("Message2 received.");
  }
}

Image 4

But it doesn't work well.  Consider this:

C#
deviceInterface.Device.SimulateMessage(1);  // let's get a different message!

Image 5

What happened?  To answer that (although hopefully it's obvious), we need to put a try-catch block around the DoSomething method:

Image 6

Quite so.  We're expecting message 2, but we got message 1!

Dynamic Duck Disaster

Image 7

The amusing this is, if we were using a duck-typed language, we wouldn't have to deal with the complexities of generics and getting the right type.  We can do this too!  By using the dynamic keyword, we can simply operate on the type with the assumption that it is the type we are wanting, and if it isn't, well, you'll know at runtime, just like in a duck-typed interpreted language.

C#
dynamic dmsg = await deviceInterface.WaitForMessage();
Console.WriteLine("Message Type: " + dmsg.GetType().ToString());

Image 8

Notice what happens if we receive a "Message1":

C#
deviceInterface.Device.SimulateMessage(1);

Image 9

Very cool!

Do not do this.

Take 2: Checking for the Right Type

Image 10

In this version, I save the expected message type.  This has the advantage of allowing the code to cancel the task (and thus prevent the continuation from running) if the message type is something different than expected:

C#
public class DeviceInterface
{
  public MockDevice Device { get; protected set; }

  protected TaskCompletionSource<Message> tcs;
  protected Type messageType;

  public DeviceInterface()
  {
    Device = new MockDevice();
    Device.MessageReceived += OnMessageReceived;
  }

  public Task<Message> WaitForMessage<T>()
  {
    tcs = new TaskCompletionSource<Message>();
    messageType = typeof(T);

    return tcs.Task;
  }

  protected void OnMessageReceived(object sender, MessageEventArgs e)
  {
    if (e.Message.GetType() == messageType)
    {
      tcs.SetResult(e.Message);
    }
    else
    {
      tcs.SetCanceled();
    }
  }
}

The DoSomething method looks like this now:

C#
static async void DoSomethingThatExpectsMessage2()
{
  try
  {
    Message2 msg = (Message2)await deviceInterface.WaitForMessage<Message2>();
    Console.WriteLine("Message2 received.");
  }
  catch (TaskCanceledException)
  {
    // We don't care
  }
  catch (Exception ex)
  {
    Console.WriteLine(ex.Message);
  }
}

Notice that the desired type is passed in as a generic parameter. While better, there's still things I don't like:

  1. The downcast to the actual message is still required
  2. We now have an independent variable that is holding the message type.

Take 3: Ideally, But Doesn't Compile...

Image 11

Image 12

Take 4: Encapsulating TaskCompletionSource with an Interface

Image 13

In order to accomplish our goal, we have to abstract out the generic type <T> such that the event handler OnMessageReceived doesn't need to know what <T> is.  This requires encapsulating TaskCompletionSource:

C#
internal class TCS<T> : ITCS where T : Message, new()
{
  public Task<T> Task { get { return internalTcs.Task; } }

  protected TaskCompletionSource<T> internalTcs;

  public TCS()
  {
    internalTcs = new TaskCompletionSource<T>();
  }

  public void SetResult(object val)
  {
    internalTcs.SetResult((T)val);
  } 
}

The magic really happens in our SetResult, in that we're passing in an object, but cast it to T on the call to SetResult.  This allows the awaiter to get exactly the type of message that is being awaited, rather than having to downcast the return value of the await.

But we still need that interface, which looks like this:

C#
public interface ITCS
{
  void SetResult(object val);
}

Now, our device interface can be written using the interface to our wrapper:

C#
public class DeviceInterface
{
  public MockDevice Device { get; protected set; }

  protected ITCS tcs;

  public DeviceInterface()
  {
    Device = new MockDevice();
    Device.MessageReceived += OnMessageReceived;
  }

  public Task<T> WaitForMessage<T>() where T : Message, new()
  {
    TCS<T> wrappedTcs = new TCS<T>();
    tcs = wrappedTcs;

    return wrappedTcs.Task;
  }

  protected void OnMessageReceived(object sender, MessageEventArgs e)
  {
    tcs.SetResult(e.Message);
  }
}

Now the message handler doesn't need to know (or care) what the message type is!

The usage is finally what we want:

C#
Message2 msg = await deviceInterface.WaitForMessage<Message2>();

Bells and Whistles

Image 14

In our interface, we should expose some additional methods:

C#
public interface ITCS
{
  void SetResult(object val);
  void TrySetResult(object val);
  void SetCanceled();
  void TrySetCanceled();
}

The implementation should be obvious!

We can also check that the message type is of the type we're expecting to handle in the task completion:

C#
internal class TCS<T> : ITCS where T : Message, new()
{
  public bool IsOfType(Type t)
  {
    return typeof(T) == t;
  }

  public TaskCompletionSource<T> TaskCompletionSource = new TaskCompletionSource<T>();

  public void SetResult(object val)
  {
    Assert.That(IsOfType(val.GetType()), "TrySetResult called for an object of type " + 
     val.GetType().ToString() + 
     " when the task completion is expecting an object of type " + typeof(T).ToString());
  
    internalTcs.SetResult((T)val);
  }

  ...etc...

We can also do some more interesting things in our message handler, like canceling the awaiting task (and thus the continuation) if the wrong message is received:

C#
protected void OnMessageReceived(object sender, MessageEventArgs e)
{
  if (tcs.IsOfType(e.Message.GetType()))
  {
    tcs.SetResult(e.Message);
  }
  else
  {
    tcs.SetCanceled();
  }
}

Which gives us a way to recover from the wrong message:

C#
static async void DoSomethingThatExpectsMessage2()
{
  try
  {
    Message2 msg = await deviceInterface.WaitForMessage<Message2>();
    Console.WriteLine("Message2 received.");
  }
  catch (TaskCanceledException)
  {
    // Do recovery here:
    Console.WriteLine("Task has been canceled!");
  }
  catch (Exception ex)
  {
    Console.WriteLine(ex.Message);
  }
}

Image 15

We can also fire off other events, like an "unsolicited message" event (not illustrated), when we receive an unexpected event.

Which brings me to another point -- we can certainly receive messages when we're not waiting for something, anything. So it'll be useful to implement something like this:

C#
public class DeviceInterface
{
  public event EventHandler<MessageEventArgs> UnsolicitedMessage;

  ...

  protected void OnMessageReceived(object sender, MessageEventArgs e)
  {
    if ((tcs == null) || (tcs.IsDone))
    {
        UnsolicitedMessage.Fire(this, e);
    }
    else
    {
      if (tcs.IsOfType(e.Message.GetType()))
      {
        tcs.SetResult(e.Message);
      }
      else
      {
        tcs.SetCanceled();
        UnsolicitedMessage.Fire(this, e);
      }
    }
  }
}

Because I know that I'm only ever awaiting for one specific response, I can check that there is no awaiting task:

C#
public Task<T> WaitForMessage<T>() where T : Message, new()
{
  Assert.ThatIf(tcs != null, () => tcs.IsDone,
    () => "The task " + tcs.TypeOf.ToString() + " is already awaiting completion.");
  TCS<T> wrappedTcs = new TCS<T>();
  tcs = wrappedTcs;

  return wrappedTcs.Task;
}

This way, if I accidentally do something like this:

C#
DoSomethingThatExpectsMessage2();
DoSomethingThatExpectsMessage3();

Then I get an exception:

Image 16

Extension Methods and Helpers

Image 17

There's a couple extension methods and assertion helpers that I've used in this article which should be self-explanatory:

C#
public static class Assert
{
  public static void That(bool b, string exceptionMessage)
  {
    if (!b) throw new Exception(exceptionMessage);
  }

  // Don't evaluate the message unless b is false.
  public static void That(bool b, Func<string> msg)
  {
    if (!b) throw new Exception(msg());
  }

  /// <summary>
  /// Only evaluate the thenExpr if b is true.
  /// </summary>
  public static void ThatIf(bool b, Func<bool> thenExpr, Func<string> msg)
  {
    if (b)
    {
      Assert.That(thenExpr(), msg);
    }
  }
}

public static class ExtensionMethods
{
  public static void Fire<TEventArgs>(this EventHandler<TEventArgs> theEvent, 
			object sender, TEventArgs e) where TEventArgs : EventArgs
  {
    if (theEvent != null)
    {
      theEvent(sender, e);
    }
  }

  public static bool IsDone(this Task task)
  {
    return task.IsCanceled | task.IsCompleted | task.IsFaulted;
  }
}

Conclusion

Image 18

I hope you enjoyed the road trip!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions

 
QuestionHate to say it but Pin
Sacha Barber24-Apr-16 7:59
Sacha Barber24-Apr-16 7:59 
AnswerRe: Hate to say it but Pin
Marc Clifton24-Apr-16 12:37
mvaMarc Clifton24-Apr-16 12:37 
GeneralRe: Hate to say it but Pin
Sacha Barber24-Apr-16 20:01
Sacha Barber24-Apr-16 20:01 
GeneralRe: Hate to say it but Pin
Marc Clifton25-Apr-16 3:39
mvaMarc Clifton25-Apr-16 3:39 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.