Introduction
A couple of weeks ago, I attended a seminar on PRISM at Microsoft's office. Although I have not been working with GUI for more than two years now, I am always enthusiastic to hear about interesting architectural principles, and PRISM is indeed an interesting one. One of the key concepts of the PRISM architecture is the EventAggregator
component whose responsibility is to enable different application modules to communicate without knowing each other in a publish/subscribe paradigm. It's not entirely a new concept as the earlier CAB (SCSF) also had a similar implementation, and there are also some other Open Source implementations of the EventAggregator pattern like EventBroker and MemBus.
So, you must be asking yourself, why do we need another implementation. Well, the answer lies in the little details of the implementation and on one important feature - the cross process event distribution.
The project I am involved in is a large command and control system which has a rich GUI smart client system running on four screens and divided into three processes. Therefore, in order to fully utilize the EventAggregator
principle, I needed some kind of IPC (Inter Process Communication) enabled library for sending events between GUI instances on a single machine.
The other thing is that some of these existing implementations oblige you to inherit from some base class when you define your events, while the other is limited only to EventHandler<TEventArgs>
event handlers or use an Observer pattern by implementing generic interfaces on subscriber objects.
What I wanted is a simple POCO event definition which I can easily define, serialize when needed for the cross process communication, and publish directly on the EventAgrregator
object.
So the requirements are as follows:
- Subscription may be strong or weak (using the
WeakRefenence
class). - Events are POCO objects.
- Event handlers may be static, public, and nonpublic.
- All event handlers limited to one and single parameter.
- Event handlers may be invoked on the publisher thread, worker thread, or UI thread (a/sync).
- IPC mechanism must be one to many without any configuration.
- Event handlers may be invoked with the derived types of the events (i.e., polymorphic invocation).
- Subscribers may use attributes to mark event handlers, or subscribe directly to some delegate.
Implementation
With all these in mind, I produced the following interface:
public interface IEventRouter
{
void Register(object subscriber);
void Register<T>(Action<T> action);
void Register<T>(Action<T> action, bool useStrongReference,
ThreadingOption tOption, string id);
void Publish(object notification);
void PublishLocal(object notification);
void SendTo(object notification, string subscriptionId);
void Unregister(object subscriber);
void Unregister(string subscriptionId);
bool PolymorphicInvokationEnabled { get; set; }
ILogger Logger { get; set; }
}
The EventRouter
implementation should be singleton as you should use only one instance throughout your process space; however, I didn't implement it as a singleton because most of the EventRouter
usage will be done using some IoC container (I am using Castle Windsor), and there it can be easily configured to singleton lifetime:
IWindsorContainer container = new WindsorContainer();
container.Register(Component.For<ILogger>().ImplementedBy<NLogger>());
container.Register(Component.For<IPCoordinator>().ImplementedBy<WindowsManager>());
container.Register(Component.For<IUISyncronizable>().
ImplementedBy<WPFUISynchronizer>());
container.Register(Component.For<IEventRouter>().
ImplementedBy<EventRouter>().ActAs(LifestyleType.Singleton));
IEventRouter eventRouter = container.Resolve<IEventRouter>();
IPC
The Windows platform offers a wide range of IPC technologies and APIs. Having in mind the requirement of one to many distribution and no configuration, I decided to go with the Windows Messages API, in particular, WM_COPYDATA
and SendMessage
. While browsing on the web, I found this great project called XDMessaging which does almost exactly what I needed: it uses the SendMessageTimeout
API to send the WM_COPYDATA
message, and uses EnumWindows
to find all the top-level windows registered to receive this message. The only mismatch with this implementation is that it uses binary serialized strings to pass data across processes.
So I copy/pasted the core of this library functionality, and replaced its transport medium to binary serialized objects. The implementation itself is pretty simple: each instance creates a top-level zero size window, and uses its window procedure to find WM_COPYDATA
messages.
public class WindowsManager : NativeWindow, IPCoordinator
{
const string ChannelName = "MainChannel";
List<IntPtr> m_windowHandles = new List<IntPtr>();
object m_lock = new object();
public WindowsManager()
{
CreateParams p = new CreateParams();
p.Width = 0;
p.Height = 0;
p.X = 0;
p.Y = 0;
p.Caption = Guid.NewGuid().ToString();
p.Parent = IntPtr.Zero;
base.CreateHandle(p);
Native.SetProp(base.Handle, ChannelName, (int)base.Handle);
}
protected override void WndProc(ref Message m)
{
base.WndProc(ref m);
if (m.Msg == Native.WM_COPYDATA)
{
using (DataGram dataGram = DataGram.FromPointer(m.LParam))
{
if (NotifyEvent != null)
{
NotifyEvent(dataGram.Data);
}
}
}
}
public void BroadcastMessage(object data)
{
IntPtr outPtr = IntPtr.Zero;
lock (m_lock)
{
IEnumerable<IntPtr> windows = GetWindowHandles();
using (DataGram dg = new DataGram(data))
{
Native.COPYDATASTRUCT dataStruct = dg.ToStruct();
foreach (var handle in windows)
{
Native.SendMessageTimeout(handle, Native.WM_COPYDATA,
IntPtr.Zero, ref dataStruct,
Native.SendMessageTimeoutFlags.SMTO_ABORTIFHUNG,
1000, out outPtr);
}
}
}
}
public IEnumerable<IntPtr> GetWindowHandles()
{
m_windowHandles.Clear();
Native.EnumWindows(OnWindowEnum, IntPtr.Zero);
return m_windowHandles;
}
private int OnWindowEnum(IntPtr hWnd, IntPtr lParam)
{
if (hWnd != base.Handle && Native.GetProp(hWnd, ChannelName) != 0)
{
m_windowHandles.Add(hWnd);
}
return 1;
}
#region IPCoordinator Members
public void Broadcast(object notifictaion)
{
ThreadPool.QueueUserWorkItem(state => BroadcastMessage(notifictaion));
}
public event NotificationEventHandler NotifyEvent;
#endregion
}
The message broadcasting executed on a thread pool thread since the SendMessageTimeout
API uses a timeout of 1 second for each window, so it may block (rarely) for N seconds where N is the number of EventRouter
instances.
You can apply any other IPC API by implementing the IPCoordinator
interface and passing it to the EventRouter
constructor:
public interface IPCoordinator
{
void Broadcast(object notifictaion);
event NotificationEventHandler NotifyEvent;
}
Thread synchronization
Since most event consumer handlers would be inside some UI resource, it is necessary to synchronize thread execution in order to avoid the nasty Cross-Thread exception. The current EventRouter
implementation contains thread synchronization for both Windows Forms applications by using System.Threading.SynchronizationContext
, and WPF applications by using System.Windows.Threading.Dispatcher
.
I prefer using the Dispatcher
object since it is created always and never returns null
like the SynchronizationContext.Current
method. The only limitations are that it can only be used with .NET 3.0 or higher and must be constructed on the UI thread.
You can implement you own UI sync by implementing the IUISyncronizable
interface:
public interface IUISyncronizable
{
void SyncInvoke(Delegate method, params object[] args);
void AsyncInvoke(Delegate method, params object[] args);
}
Exception handling
Exceptions thrown by the event handlers do not stop the event propagation, and if the IEventRouter.Logger
property is initialized, will be logged according to the application configuration.
Using the code
Subscribing for event notifications
You can subscribe to events using the EventConsumer
attribute:
[EventConsumer(ThreadingOption = ThreadingOption.UIAsync)]
private void OnNewOrder(OrderAdded notif)
{
}
and then call the IEventRouter.Register(object subscriber)
method.
If you want to explicitly subscribe to some notification, use one of the IEventRouter.Register(Action<T> action)
overloads:
router.Register<Notification>(delegate(Notification n)
{
}
, true, ThreadingOption.DefaultThread, null);
Publishing events
router.Publish(new OrderAdded());
Sending events to specific subscriber/s
router.SendTo(new Notification(), "MySpecialOne");
Using polymorphic invocation
When the EnablePolymorphicInvokation
property is enabled (its disabled by default), your event handlers will be invoked also on event derived types:
public class CustomerRemoved
{
public int CustomerID;
}
public class VipCustomerRemoved : CustomerRemoved
{
...
}
[EventConsumer]
private void OnCustomerChange(CustomerRemoved notif)
{
}
In this case, the OnCustomerChange
method will be called when both CustomerRemoved
and VipCustomerRemoved
events are published.
Removing subscriptions
By default, all subscriptions are implemented using weak references, so when the subscriber object goes out of scope, it will be reclaimed. Nevertheless, you can explicitly unregister the subscriber object either by its reference or by its subscription ID, if it is present. Please note that subscription ID is used like a single or group subscription rather than a unique key.
Some implementation details
The registration of a subscriber object based on reflecting on method calls and finding the EventConsumer
attribute:
public void Register(object subscriber)
{
Type sType = subscriber.GetType();
MethodInfo[] methods = sType.GetMethods(BindingFlags.Public |
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Static);
foreach (var method in methods)
{
EventConsumer attr =
(EventConsumer)Attribute.GetCustomAttribute(method, typeof(EventConsumer));
if (attr == null)
{
continue;
}
ParameterInfo[] pInfo = method.GetParameters();
if (pInfo.Count() != 1)
{
throw new InvalidOperationException("Method must have a single parameter");
}
Type notifyType = pInfo[0].ParameterType;
SubscriptionInfo info = new SubscriptionInfo();
info.ID = attr.SubscriptionID;
info.Method = method;
info.Target = subscriber;
info.ThreadOption = attr.ThreadingOption;
info.UseStrongReference = attr.UseStrongReference;
AddSubscription(notifyType, info);
}
}
The subscriptions are stored in a MultiMap
object which is actually a dictionary with an event type as a key and a list of subscriptions as a value. MultiMap
is a small extension to a HashList
class from the excellent NGenerics project.
All the actions performed on the MultiMap
instance (subscriptions, invocations, and un-subscriptions) are inside a lock
statement, so the EventRouter
implementation is thread safe.
Notes
When using IEventRouter.Register(Action<T> action)
with an anonymous method, it may be reclaimed right after the first method invocation since there are no strong references to it. So to be on the safe side, either use strong reference for anonymous methods or don't use them at all :)
References
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.