This is Part 4 of a series of articles about a Clifton Method Core Component in which you will learn about the Semantic Publisher/Subscriber.
Article Series
Introduction
In the previous articles on the Module Manager and Service Manager, I described an dependency inversion through the use of interfaces and dynamic application configuration through the use of modules that implement services. This results in the following high level application architecture:

The problem with this architecture is that the middle bubble "Interfaces." In my opinion, there is still too close of coupling between the application and the services because of the dependency of common interfaces.
Services typically fall into three categories:
- A request for information that either returns immediately, or if it takes some time, the application probably implements a
Task
wrapper around the request. - The initiation of a "computation" (I'll use that term instead of "process") in which the application doesn't care when it returns or the service "fires and event" when the computation is complete.
- Monitoring some device, port, or other asynchronous activity and when a condition is met, it "fires an event" to initiate "computation" on that data to either the application or other services.
While there are many simple services that satisfy the first category, it is really the last two categories that I want to talk about here. For more complex services, the architecture that I found most flexible replaces the "Interfaces" bubble with a publisher/subscriber component:

Here, using the Service Manager to access services via dependency inversion (interfaces) is of course still possible, but in many cases, the primary mechanism becomes the publisher/subscriber. Instead of having a common interface specification, a shared semantic "dictionary" is used.

Using a publisher/subscriber decouples the application and services from needing to know the exact interface specification of another service. Furthermore, different versions of a service can implement different semantic messages, which increases an application's resilience to change.
The publisher/subscriber implementation I present here has the following features:
- Is semantic -- routing to the subscriber is accomplished through the use of typed message "envelopes"
- Implements automatic logging of messages handled by, you guessed it, a logger service of your choice
- Subscriber exceptions are handled by the pub-sub and can be routed to an exception handling service.
- Implements subscriber calls as either a synchronous or asynchronous call
- Utilizes a thread pool for asynchronous processing, queuing the call onto a worker thread with the least number of pending subscriber calls
- Is completely type safe
- Unhandled messages (no subscriber) are thrown away
- Subscribers are (usually) instantiated for each message, facilitating thread safety
- Isolate message publishers and subscribers in "membranes"
- Is itself implemented as a service, so it's accessible to all other services
- Multiple subscribers can receive the same message
- Automatic call on completion to
IDispose
for stateless subscribers implementing IDisposable
Use Cases
I've used this architecture very successfully for:
- Implementing web servers (in fact, it's the backbone of my defacto web server implementation nowadays.)
- Handling RabbitMQ messages.
- Processing asynchronous events from hardware such as credit card readers, pin pads, ID scanners, iButton readers, etc.
- Implementing ATM transaction processing.
- Implementing CefSharp and .NET's browser control as exchangeable services to create WinForm/WPF hosted web applications.
A short list of what the combination of the Module Manager, Service Manager, and Publisher/Subscriber allows me to do:
- Mock communication interfaces, hardware, database I/O, etc.
- Simulate inputs from protocols and hardware, which greatly facilitates workflow testing.
- Quickly configure an application for a variety of hardware and computation configurations.
The semantic publisher/subscriber is also (albeit an earlier incarnation) at the heart of the Higher Order Programming project which I've written about previously.
About Membrane Computing
A key concept in the publisher/subscriber is that of a "membrane." You can think of a membrane as a container, channel, vesicle, whatever. But it keeps the message contained in that membrane "space" unless permeability rules (which I won't discuss in this article) are set up. The term "membrane" comes from the concept membrane computing:
Membrane computing deals with distributed and parallel computing models, processing multisets of symbol objects in a localized manner. Thus, evolution rules allow for evolving objects to be encapsulated into compartments defined by membranes. The communications between compartments and with the environment play an essential role in the processes.
The intuition behind the notion of a membrane is a three-dimensional vesicle from biology. However the concept itself is more general, and a membrane is seen as a separator of two regions. The membrane provides for selective communication between the two regions. As per Gheorghe Păun, the separation is of the Euclidean space into a finite “inside” and an infinite “outside”. The selective communication is where the computing comes in.
The variety of suggestions from biology and the range of possibilities to define the architecture and the functioning of a membrane-based multiset processing device are practically endless. Indeed the membrane computing literature contains a very large number of models. Thus, MC is not merely a theory related to a specific model, it is a framework for devising compartmentalized models.
If the term is weird, deal with it. 
A Simple Hello World Example
It's probably best to start with a semantic "hello world
" example. This is the code (not including the Bootstrap, which is identical to what was used in the previous article):
using System;
using Clifton.Core.Semantics;
namespace SemanticPublisherSubscriberDemo
{
static partial class Program
{
static void Main(string[] args)
{
InitializeBootstrap();
Bootstrap((e) => Console.WriteLine(e.Message));
ISemanticProcessor semProc = serviceManager.Get<ISemanticProcessor>();
semProc.Register<SurfaceMembrane, Subscriber>();
semProc.ProcessInstance<SurfaceMembrane, ST_Message>(m => m.Text = "Hello World", true);
}
}
public class ST_Message : ISemanticType
{
public string Text { get; set; }
public ST_Message()
{
Console.WriteLine("Message Instantiated.");
}
}
public class Subscriber : IReceptor
{
public Subscriber()
{
Console.WriteLine("Subscriber Instantiated.");
}
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message msg)
{
Console.WriteLine(msg.Text);
}
}
}
And the modules.xml file (see previous article) looks like this:
="1.0"="utf-8"
<Modules>
<Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
</Modules>;
The output is:

Dissecting the Hello World Example
Here's what's going on.
Acquire the Publisher/Subscriber
After the bootstrap, the first thing we do is acquire the Semantic Publisher/Subscriber singleton:
ISemanticProcessor semProc = serviceManager.Get<ISemanticProcessor>();
Register a Subscriber
Subscribers are called "Receptors." Remember that.
semProc.Register<SurfaceMembrane, Subscriber>();
The SurfaceMembrane
is a built-in membrane that is used for convenience for messages sent to the subscriber.
Publishing a Message
The final line:
semProc.ProcessInstance<SurfaceMembrane, ST_Message>(m => m.Text = "Hello World", true);
publishes a message. In this example, the message parameters are initialized by an Action<T>
that the publisher/subscriber calls after instantiating the message.
Here, we supply the optional value true
to indicate that the message should be processed on the caller's thread. If we didn't do this, our simple console app would end without actually giving the thread pool time to process the message!
The Message Class
All messages must be derived from ISemanticType
:
public class ST_Message : ISemanticType
This is merely a placeholder that provides compile-time type checking of the generic parameter used in registration and publishing:
public interface ISemanticType { }
All receivers receive the same message instance, so the message should be treated as immutable.
The Subscriber
All subscribers must derive from IReceptor
, (remember that subscribers are also known as "receptors") again used for compile-time type of the generic parameter used in registration:
public interface IReceptor { }
I tend to prefix my semantic types with "ST_
" to distinguish them from other types.
As with ISemanticType
, the interface IReceptor
is actually just a placeholder for compile-time type checking:
public interface IReceptor { }

Because the subscriber is (usually) instantiated for every message, any non-static fields that the subscriber uses are specific to the instance that processes the message. This really helps with thread safety when subscriber is performing a complex task requiring internal state management.
Subscribing to Messages
Each message is received in an overloaded Process
method:
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message msg)
Note that the publisher/subscriber (the semantic processor) and the membrane on which the message is being sent is provided as well. This gives the responder the necessary information to publish a message on the same membrane, if it so chooses.
The class that implements the Process
methods will only receive messages published in the membrane declared when the class was registered. While messages can permeate membranes, we won't be discussing that feature in this article.
Under the Hood
Let's look next how the publisher / subscriber works.
Registering a Subscriber
There are a variety of ways of registering a subscriber, but the two common ways are with or without an initializer:
public void Register<M, T>()
where M : IMembrane, new()
where T : IReceptor
{
Register<T>();
IMembrane membrane = RegisterMembrane<M>();
membraneReceptorTypes[membrane].Add(typeof(T));
}
public void Register<M, T>(Action<IReceptor> receptorInitializer)
where M : IMembrane, new()
where T : IReceptor
{
Register<T>();
Type receptorType = typeof(T);
IMembrane membrane = RegisterMembrane<M>();
membraneReceptorTypes[membrane].Add(receptorType);
receptorInitializers[new MembraneReceptor()
{ Membrane = membrane, ReceptorType = receptorType }] =
new ReceptorInitializer() { Initializer = receptorInitializer };
}
If you provide an initializer, it will be called when the receptor (the subscriber
class) is instantiated, where IReceptor
is an instance of the class being instantiated. Initializers are specific to the membrane containing the receptor (the subscriber
), providing the ability to initialize the same receptor with different parameters depending on the membrane (channel) to which it is associated.
Message Publishing
When a message is published using ProcessInstance
, an initializer for the message can be provided, as we saw in the example above:
public void ProcessInstance<M, T>(Action<T> initializer, bool processOnCallerThread = false)
where M : IMembrane, new()
where T : ISemanticType, new()
{
T inst = new T();
initializer(inst);
ProcessInstance<M, T>(inst, processOnCallerThread);
}
public void ProcessInstance<M, T>(bool processOnCallerThread = false)
where M : IMembrane, new()
where T : ISemanticType, new()
{
T inst = new T();
ProcessInstance<M, T>(inst, processOnCallerThread);
}
Also note the option to invoke the subscriber on the caller thread, which defaults to false
.
Message Processing
The core message processor identifies all receptors (subscribers) in the membrane, instantiates them, and either queues the call or processes it immediately:
protected void ProcessInstance<T>
(IMembrane membrane, IMembrane caller, T obj, bool processOnCallerThread)
where T : ISemanticType
{
Type tsource = obj.GetType();
List<Type> receptors = GetReceptors(membrane, tsource);
Log(membrane, obj);
foreach (Type ttarget in receptors)
{
dynamic target = Activator.CreateInstance(ttarget);
ReceptorInitializer receptorInitializer;
if (receptorInitializers.TryGetValue(new MembraneReceptor()
{ Membrane = membrane, ReceptorType = ttarget }, out receptorInitializer))
{
receptorInitializer.Initializer(target);
}
if (processOnCallerThread)
{
Call(new DynamicCall() { SemanticInstance = obj, Receptor = target,
Proc = () => target.Process(this, membrane, obj) });
}
else
{
threadPool.MinBy(tp => tp.Count).Enqueue(
new DynamicCall() { SemanticInstance = obj, Receptor = target,
Proc = () => target.Process(this, membrane, obj) });
}
}
List<IReceptor> sreceptors = GetStatefulReceptors(membrane, tsource);
foreach (IReceptor receptor in sreceptors)
{
dynamic target = receptor;
if (processOnCallerThread)
{
Call(new DynamicCall() { SemanticInstance = obj, Receptor = target,
Proc = () => target.Process(this, membrane, obj), AutoDispose = false });
}
else
{
threadPool.MinBy(tp => tp.Count).Enqueue(new DynamicCall()
{ SemanticInstance = obj, Receptor = target,
Proc = () => target.Process(this, membrane, obj), AutoDispose = false });
}
}
ProcessInnerTypes(membrane, caller, obj, processOnCallerThread);
PermeateOut(membrane, caller, obj, processOnCallerThread);
}
This looks fairly complicated at first glance, but it's really quite straight forward.
- First, all the receptors (subscribers) in the specified membrane that handle the message are acquired.
- The message is logged, which can be handled by whatever logging service you've implemented.
- Each receptor (subscriber class) is instantiated, optionally initialized, and the callback is executed or queued.
- Stateful receptors (discussed later) are also invoked.
- Two concluding steps are performed
- Messages that implement inner semantic types are also published. This is a bizarre but interesting feature that enables the application to subscribe to semantic messages within the body of a wrapper message. For example, an address message may include semantic types such as Zip Code. A subscriber may be interested in processing Zip Code for its own purposes. This mechanism facilitates creating automatic processing of inner semantic types.
- As in Membrane Computing, a message can permeate other membranes if so configured. This feature enables the application to bridge membranes (channels / containers) with specific messages, so that additional computations on those messages can be performed by subscribers in other membranes.
Processing Inner Types
Handled by:
protected void ProcessInnerTypes(IMembrane membrane, IMembrane caller,
ISemanticType obj, bool processOnCallerThread)
{
var properties = obj.GetType().GetProperties(BindingFlags.Instance |
BindingFlags.Public).Where(
pi => pi.PropertyType.GetInterfaces().Contains(typeof(ISemanticType)));
properties.ForEach(pi =>
{
ISemanticType prop = (ISemanticType)pi.GetValue(obj);
prop.IfNotNull((p) => ProcessInstance(membrane, caller, p, processOnCallerThread));
});
}
A Brief Look at Permeating Membranes

From Wikipedia Membrane Computing
In Membrane Computing, data (or in our case, the message) can permeate a membrane both inwards, to inner membranes, and outwards, to the containing membrane. In the illustration above, the "environment" is represented by the SurfaceMembrane
type.
Permeating Membranes
Handled by:
protected void PermeateOut<T>(IMembrane membrane, IMembrane caller,
T obj, bool processOnCallerThread)
where T : ISemanticType
{
List<IMembrane> pmembranes = ((Membrane)membrane).PermeateTo(obj);
pmembranes.Where(m=>m != caller).ForEach((m) => ProcessInstance
(m, membrane, obj, processOnCallerThread));
}
Infinite recursion is prevented by ignoring the membrane on the original call.
Inbound and Outbound Permeability
A membrane (channel / container) is permeable to both an outer membrane and to any inner membranes.
public List<IMembrane> PermeateTo(ISemanticType st)
{
List<IMembrane> ret = new List<IMembrane>();
Type sttype = st.GetType();
if (outboundPermeableTo.Contains(sttype))
{
if ((parent != null) && (parent.inboundPermeableTo.Contains(sttype)))
{
ret.Add(parent);
}
foreach (Membrane child in childMembranes)
{
if (child.inboundPermeableTo.Contains(sttype))
{
ret.Add(child);
}
}
}
return ret;
}
Membrane Types
A custom membrane (channel) type never implements anything, it's simply used so that the "channel" can be specified as a generic type. It must however derive from Membrane
, for example:
public class LoggerMembrane : Membrane { }
The base class handles the permeability functions.
Setting up the Subscriber Call
In the code illustrated earlier, the call to the subscriber is made either immediately:
Call(new DynamicCall() { SemanticInstance = obj, Receptor = target,
Proc = () => target.Process(this, membrane, obj) });
or is queued on a worker thread:
threadPool.MinBy(tp => tp.Count).Enqueue(
new DynamicCall() { SemanticInstance = obj, Receptor = target,
Proc = () => target.Process(this, membrane, obj) });
The call is implemented as an Action
wrapped by the DynamicCall
class:
public class DynamicCall : ProcessCall
{
public Action Proc { get; set; }
public DynamicCall()
{
AutoDispose = true;
}
public override void MakeCall()
{
Proc();
}
}
The target is of dynamic
type. We use this type so that the call is routed to the correct overloaded Process
method in the subscriber.
Dequeuing Work
Dequeueing the message is handled by a thread in the thread pool:
protected void ProcessPoolItem(object state)
{
ThreadSemaphore<ProcessCall> ts = (ThreadSemaphore<ProcessCall>)state;
while (true)
{
ts.WaitOne();
ProcessCall rc;
if (ts.TryDequeue(out rc))
{
Call(rc);
}
}
}
Note that we are not using .NET's thread pooling or Task
mechanism as these introduce delays in processing the work and are meant for short lived processes. Because the application's subscriber might be a long running process, the thread pool is actual a collection of Thread
instances:
protected void InitializePoolThreads()
{
for (int i = 0; i < MAX_WORKER_THREADS; i++)
{
Thread thread = new Thread(new ParameterizedThreadStart(ProcessPoolItem));
thread.IsBackground = true;
ThreadSemaphore<ProcessCall> ts = new ThreadSemaphore<ProcessCall>();
threadPool.Add(ts);
thread.Start(ts);
}
}
Making the Call
The call itself is wrapped in an exception handler:
protected void Call(ProcessCall rc)
{
try
{
rc.MakeCall();
}
catch (Exception ex)
{
Exception ex2 = ex;
if (!(rc.SemanticInstance is ST_Exception))
{
ProcessInstance(Logger, new ST_Exception(ex), true);
}
while (ex2.InnerException != null)
{
ex2 = ex2.InnerException;
if (!(rc.SemanticInstance is ST_Exception))
{
ProcessInstance(Logger, new ST_Exception(ex2), true);
}
}
}
finally
{
if ( (rc.Receptor is IDisposable) && (rc.AutoDispose) )
{
((IDisposable)rc.Receptor).Dispose();
}
}
}
Exceptions (including inner exceptions) are published on the Logger membrane which the Semantic Processor creates for us.
public IMembrane Logger { get; protected set; }
...
Logger = RegisterMembrane<LoggerMembrane>();
Note that in the finally
block, a receptor (subscriber) has its Dispose
method called if it implements IDisposable
and is a stateless subscriber (one that was instantiated by the Semantic Processor (the Publisher/Subscriber.)
Stateful Subscribers
A stateful subscriber can be useful when, not to be redundant, the overall state of the subscriber needs to be maintained. Some use cases include a subscriber that manages a connection, in which we want to keep the connection open, rather than opening and closing it every time a message involving the connection is processed.
An example of a stateful receptor initialization looks like this:
semProc.Register<SurfaceMembrane>(new StatefulSubscriber());
Note that the subscriber type generic parameter is omitted and instead an instance of the subscriber is passed into the Register
method.
A simple implementation of stateful subscriber looks like this:
public class StatefulSubscriber : IReceptor
{
protected int counter = 0;
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Message2 msg)
{
Console.WriteLine(counter + ": " + msg.Text);
++counter;
}
}
Note that it is processing messages of type ST_Message2
, which is simply, for demo purposes, a way of keeping the two examples separate. Publishing messages is exactly the same:
semProc.ProcessInstance<SurfaceMembrane, ST_Message2>(m => m.Text = "Hello World", true);
semProc.ProcessInstance<SurfaceMembrane, ST_Message2>(m => m.Text = "Goodbye World", true);
Here, we publish two messages, with the result being:

We observe that the subscriber instance is preserved.
The Internal Magic
The publisher/subscriber extracts the messages that a stateful subscriber handles through reflection:
public void Register(IMembrane membrane, IReceptor receptor)
{
statefulReceptors.Add(receptor);
Type ttarget = receptor.GetType();
MethodInfo[] methods = ttarget.GetMethods();
foreach (MethodInfo method in methods)
{
if (method.Name == "Process")
{
ParameterInfo[] parameters = method.GetParameters();
InstanceNotify(receptor, parameters[2].ParameterType);
}
}
membranes[membrane.GetType()] = membrane;
membraneReceptorInstances[membrane].Add(receptor);
}
This inspects each Process
method in the class and assumes that these methods will have the expected signature (this code could be improved). The message types are extracted so that later it can be determined whether the stateful subscriber handles the published message.
Performance
From the above code, you've probably noticed that in the typical scenario, a subscriber is being instantiated (and optionally disposed) for every message, and an optional initializer is being executed. Also, there is the use of the dynamic
subscriber instance hides internal reflection. There are also nested levels of calls and the overhead of creating the call, either to be executed immediately or queued on a worker thread. A complex configuration, involving membrane permeation and inner message publishing, adds to the performance overhead.
Conversely, the implementation of this publisher/subscriber is very flexible, thread safe, and the application is resilient to exceptions that the subscriber may throw. This is always the tradeoff when implementing a general purpose component -- useful functionality at the cost of raw performance. Usually, the messages that the publisher/subscriber processes come from low bandwidth events, whether they are user inputs from hardware or even page or web service requests. If you really need a high performance, there are simpler publisher/subscriber implementations, or you may not even want to even use this pattern.
Putting It All Together - An Example
Let's write a web server using the Module Manager, Service Manager, and Publisher Subscriber!
First, A Logger Service
We start with a couple modules defined in the modules.xml file:
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
<Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
<Module AssemblyName='ConsoleLoggerService.dll'/>
</Modules>
Being able to log things (especially exceptions!) is really the first thing any development process should start with. The logger service should be able to handle both calls made to it as a service as well as logging exceptions messages published by the Publisher / Subscriber. Here's the logger service we'll use:
using System;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;
using ServiceInterfaces;
namespace ConsoleLoggerService
{
public class LoggerModule : IModule
{
public void InitializeServices(IServiceManager serviceManager)
{
serviceManager.RegisterSingleton<IConsoleLoggerService, LoggerService>();
}
}
public class LoggerService : ServiceBase, IConsoleLoggerService, IReceptor
{
public override void FinishedInitialization()
{
ISemanticProcessor semProc = ServiceManager.Get<ISemanticProcessor>();
semProc.Register<LoggerMembrane, GenericTypeLogger>();
semProc.Register<LoggerMembrane, LoggerService>();
}
public void Log(string msg)
{
Console.WriteLine(msg);
}
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Exception msg)
{
Log(msg.Exception.Message);
Log(msg.Exception.StackTrace);
}
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_Log msg)
{
Log(msg.Message);
}
}
public class GenericTypeLogger : IReceptor
{
public void Process(ISemanticProcessor semProc, IMembrane membrane, ISemanticType t)
{
if ( (!(t is ST_Log)) && (!(t is ST_Exception)) )
{
Console.WriteLine("Publishing type: " + t.GetType().Name);
}
}
}
}
There are three things that are interesting about this logger:
- It is a service, so we can treat it as such.
- However, the service also implements
IReceptor
, enabling it to process both ST_Log
messages that the application posts as well as ST_Exception
messages issued by the Publisher/Subscriber on its internal Logger
"channel". - A generic type logger is instantiated as a separate receptor, which always logs (courtesy of the Publisher/Subscriber's ability to issue messages for base types/interface as well) the message type. We ignore log and exception message types, as logging the type, then the actual log or exception message, seems silly.
The above is an interesting implementation because a singleton service is registered, but the Publisher/Subscriber creates an instance for each log message!
A test application shows this all working:
static partial class Program
{
static void Main(string[] args)
{
InitializeBootstrap();
Bootstrap((e) => Console.WriteLine(e.Message));
TestLogging();
Console.WriteLine("Press ENTER to exit the server.");
Console.ReadLine();
}
static void TestLogging()
{
serviceManager.Get<IConsoleLoggerService>().Log("Foobar");
serviceManager.Get<ISemanticProcessor>().ProcessInstance<LoggerMembrane,
ST_Log>(l => l.Message = "Hi there!", true);
serviceManager.Get<ISemanticProcessor>().Register<SurfaceMembrane, ExceptionProcess>();
serviceManager.Get<ISemanticProcessor>().
ProcessInstance<SurfaceMembrane, ST_TestException>();
}
}
public class ST_TestException : ISemanticType { }
public class ExceptionProcess : IReceptor
{
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_TestException msg)
{
throw new ApplicationException("I Broke!");
}
}
And here's the output:

The output illustrates:
- using the logger as a service
- publishing a log message
- logging generic type messages
- the Publisher/Subscriber handling an exception and sending it to our logger
So now, we have the logger module done.
Next, a Web Server
I prefer using my own technology rather than IIS, ASP.NET, Razor, MVC, whatever. So we'll write a simple web server, built from several services.
An HTTP Listener Service
Here's an implementation of a very simple HTTP listener. It receives requests and posts the request as a message to the Publisher/Subscriber:
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;
using Semantics;
using ServiceInterfaces;
namespace WebServerService
{
public class WebServerModule : IModule
{
public void InitializeServices(IServiceManager serviceManager)
{
serviceManager.RegisterSingleton<IWebServerService, WebServer>();
}
}
public class WebServer : ServiceBase, IWebServerService
{
protected HttpListener listener;
protected ILoggerService logger;
protected ISemanticProcessor semProc;
protected bool httpOnly;
public virtual void Start(string ip, int port)
{
logger = ServiceManager.Get<ILoggerService>();
semProc = ServiceManager.Get<ISemanticProcessor>();
listener = new HttpListener();
string url = IpWithPort(ip, port);
logger.Log("Listening on " + ip + ":" + port);
listener.Prefixes.Add(url);
}
listener.Start();
Task.Run(() => WaitForConnection(listener));
}
protected virtual void WaitForConnection(object objListener)
{
HttpListener listener = (HttpListener)objListener;
while (true)
{
HttpListenerContext context = listener.GetContext();
string verb = context.Request.HttpMethod;
string path = context.Request.RawUrl.LeftOf("?").RightOf("/");
string parms = context.Request.RawUrl.RightOf("?");
logger.Log(verb + ": " + path);
string data = new StreamReader(context.Request.InputStream,
context.Request.ContentEncoding).ReadToEnd();
ServiceManager.Get<ISemanticProcessor>().ProcessInstance<WebServerMembrane,
ST_HttpRequest>(r =>
{
r.Context = context;
r.Verb = verb;
r.Path = path;
r.Parameters = parms;
r.Data = data;
});
}
}
protected string IpWithPort(string ip, int port)
{
string ret;
if (port == 80)
{
ret = "http://" + ip + "/";
}
else
{
ret = "http://" + ip + ":" + port.ToString() + "/";
}
return ret;
}
}
}
The message gets handled on a separate thread, letting the listener loop return immediately to the activity of waiting for another connection.
We add this module to our modules.xml file:
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
<Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
<Module AssemblyName='ConsoleLoggerService.dll'/>
<Module AssemblyName='WebServerService.dll'/>
</Modules>
And we can test the server by adding a single line to our application:
static void Main(string[] args)
{
InitializeBootstrap();
Bootstrap((e) => Console.WriteLine(e.Message));
serviceManager.Get<IWebServerService>().Start("127.0.0.1", 80);
Console.WriteLine("Press ENTER to exit the server.");
Console.ReadLine();
}
Even though there is no subscriber, our logger will tell us that it has received the web request message:

Of course, the browser waits patiently for a response, which we're not giving it!
A Semantic Router Service
Let's add a semantic router. This will have the responsibility of mapping the request verb and path to a semantic type, rather than a method that handles the path. The semantic type is dynamically instantiated and either populated with query values on the URL itself or JSON data in the data stream. I'll put together a rather simple one:
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Reflection;
using Newtonsoft.Json;
using Clifton.Core.Utils;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;
using Semantics;
using ServiceInterfaces;
namespace SemanticWebRouterService
{
public struct Route
{
public string Verb { get; set; }
public string Path { get; set; }
}
public class SemanticWebRouterModule : IModule
{
public void InitializeServices(IServiceManager serviceManager)
{
serviceManager.RegisterSingleton<ISemanticWebRouterService, WebRouterService>();
}
}
public class WebRouterService : ServiceBase, ISemanticWebRouterService
{
protected Dictionary<Route, Type> semanticRoutes;
public WebRouterService()
{
semanticRoutes = new Dictionary<Route, Type>();
}
public override void FinishedInitialization()
{
base.FinishedInitialization2();
ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, RouteProcessor>();
}
public void Register<T>(string verb, string path) where T : ISemanticRoute
{
semanticRoutes[new Route() { Verb = verb.ToLower(), Path = path.ToLower() }] = typeof(T);
}
public void RouteRequest(ST_HttpRequest req)
{
Route route = new Route() { Verb = req.Verb.ToLower(), Path = req.Path.ToLower() };
Type routeHandler;
bool found = semanticRoutes.TryGetValue(route, out routeHandler);
ISemanticRoute semanticRoute = null;
if (found)
{
semanticRoute = InstantiateRouteHandler(routeHandler, req);
semanticRoute.Context = req.Context;
ServiceManager.Get<ISemanticProcessor>().
ProcessInstance<WebServerMembrane>(semanticRoute);
}
else
{
ServiceManager.Get<ILoggerService>().Log("Route not found.");
}
}
protected ISemanticRoute InstantiateRouteHandler(Type routeHandler, ST_HttpRequest req)
{
ISemanticRoute semanticRoute = (ISemanticRoute)Activator.CreateInstance(routeHandler);
if (!string.IsNullOrEmpty(req.Data))
{
JsonConvert.PopulateObject(req.Data, semanticRoute);
}
else if (req.Verb.ToLower() == "get")
{
PopulateFromQueryString(req, semanticRoute);
}
return semanticRoute;
}
protected void PopulateFromQueryString(ST_HttpRequest req, ISemanticRoute semanticRoute)
{
NameValueCollection nvc = req.Context.Request.QueryString;
foreach (string key in nvc.AllKeys)
{
PropertyInfo pi = semanticRoute.GetType().GetProperty(key,
BindingFlags.Public | BindingFlags.Instance | BindingFlags.IgnoreCase);
if (pi != null)
{
object valOfType = Converter.Convert
(Uri.UnescapeDataString(nvc[key].Replace('+', ' ')), pi.PropertyType);
pi.SetValue(semanticRoute, valOfType);
}
}
}
}
public class RouteProcessor : IReceptor
{
public void Process(ISemanticProcessor semProc, IMembrane membrane, ST_HttpRequest req)
{
semProc.ServiceManager.Get<ISemanticWebRouterService>().RouteRequest(req);
}
}
}
The pattern here should emerging:
- Create a class that implements
IModule
- Register the service
- In this case, the service registers a subscriber to the
ST_HttpRequest
message - The message subscriber simply calls back into the service for processing
Testing
Let's add it to our modules.xml file:
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
<Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
<Module AssemblyName='ConsoleLoggerService.dll'/>
<Module AssemblyName='WebServerService.dll'/>
<Module AssemblyName='SemanticWebRouterService.dll'/>
</Modules>
Next, we'll write register a route that logs setting a property so we can test at least the query parameter initialization process. Here's how we register the route:
ISemanticWebRouterService router = serviceManager.Get<ISemanticWebRouterService>();
router.Register<ST_Foobar>("get", "foobar");

Note that we're not registering a method to handle the route, we're registering a semantic type that gets instantiated for a particular route.
Here's a simple test type:
public class ST_Foobar : SemanticRoute
{
public string Test
{
get { return Test; }
set
{
Program.serviceManager.Get<ILoggerService>().Log
("test parameter set to: " + value.Quote());
}
}
}
Now we can try it:

Of course, this is just a bare bones example. There is no authentication, authorization, session management, and so forth. And we still aren't responding to the browser request!
A few interesting points:
- The router thread will exit after finding (or not) the route, and the route handler message is published in a manner in which a new thread will receive the call.
- Because we're using the publisher/subscriber, the route can be processed my multiple subscribers. Why you'd want to do that, I'm not sure.
- Because we're associating a route with a semantic type (ok, fancy name for a class), we can deserialize the parameters into that class and, as it is actually a semantic message, it fits perfectly into the publisher/subscriber concept.
- Ideally, we would implement the route handlers as modules themselves, keeping the application free of doing the route registration.
- This has the advantage of being able to add new behaviors to our web server by adding modules that define the semantic types for a given route and implementing the subscribers for those messages.
Responders
Lastly, our web server needs a few simple responders, again implemented as a module. Note in this case, the service interface is a placeholder, as there are no public
methods that are directly callable:
using System.Text;
using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;
using Semantics;
namespace WebResponseService
{
public interface IWebResponseService : IService { }
public class WebResponseModule : IModule
{
public void InitializeServices(IServiceManager serviceManager)
{
serviceManager.RegisterSingleton<IWebResponseService, WebResponseService>();
}
}
public class WebResponseService : ServiceBase, IWebResponseService
{
public override void FinishedInitialization()
{
base.FinishedInitialization2();
ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, WebResponder>();
}
}
public class WebResponder : IReceptor
{
public void Process(ISemanticProcessor proc, IMembrane membrane, ST_JsonResponse resp)
{
resp.Context.Response.StatusCode = resp.StatusCode;
resp.Context.Response.ContentType = "text/json";
resp.Context.Response.ContentEncoding = Encoding.UTF8;
byte[] byteData = resp.Json.to_Utf8();
resp.Context.Response.ContentLength64 = byteData.Length;
resp.Context.Response.OutputStream.Write(byteData, 0, byteData.Length);
resp.Context.Response.Close();
}
public void Process(ISemanticProcessor proc, IMembrane membrane, ST_HtmlResponse resp)
{
byte[] utf8data = resp.Html.to_Utf8();
resp.Context.Response.ContentType = "text/html";
resp.Context.Response.ContentEncoding = Encoding.UTF8;
resp.Context.Response.ContentLength64 = utf8data.Length;
resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
resp.Context.Response.Close();
}
public void Process(ISemanticProcessor proc, IMembrane membrane, ST_CssResponse resp)
{
byte[] utf8data = resp.Css.to_Utf8();
resp.Context.Response.ContentType = "text/css";
resp.Context.Response.ContentEncoding = Encoding.UTF8;
resp.Context.Response.ContentLength64 = utf8data.Length;
resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
resp.Context.Response.Close();
}
public void Process
(ISemanticProcessor proc, IMembrane membrane, ST_JavascriptResponse resp)
{
byte[] utf8data = resp.Javascript.to_Utf8();
resp.Context.Response.ContentType = "text/javascript";
resp.Context.Response.ContentEncoding = Encoding.UTF8;
resp.Context.Response.ContentLength64 = utf8data.Length;
resp.Context.Response.OutputStream.Write(utf8data, 0, utf8data.Length);
resp.Context.Response.Close();
}
public void Process(ISemanticProcessor proc, IMembrane membrane, ST_RouteNotFound resp)
{
resp.Context.Response.StatusCode = 404;
resp.Context.Response.Close();
}
}
}
Now let's go back to the semantic router and publish the ST_RouteNotFound
message for undefined routes:
ServiceManager.Get<ILoggerService>().Log("Route not found.");
ServiceManager.Get<ISemanticProcessor>().ProcessInstance<WebServerMembrane,
ST_RouteNotFound>(r=>r.Context=req.Context);
We add this module to our modules.xml file:
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
<Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
<Module AssemblyName='ConsoleLoggerService.dll'/>
<Module AssemblyName='WebServerService.dll'/>
<Module AssemblyName='SemanticWebRouterService.dll'/>
<Module AssemblyName='WebResponseService.dll'/>
</Modules>
We now have our first successful response to the browser!

No more spinny!
Responding with Data from Files (HTML, CSS, etc.)
We'll add one more service for routes that respond with data from HTML/CSS files. Who knows, you may want to replace or extend this with data returned from a server instead, so it makes sense to make this a YAM - Yet Another Module. Again, the service interface is merely a placeholder, as there are no exposed service methods. We'll only handle three file types at the moment: HTML, CSS, and JavaScript:
using System;
using System.IO;
using System.Net;
using Clifton.Core.ExtensionMethods;
using Clifton.Core.ModuleManagement;
using Clifton.Core.Semantics;
using Clifton.Core.ServiceManagement;
using Semantics;
using ServiceInterfaces;
namespace WebFileResponseService
{
public interface IFileResponseService : IService { }
public class FileResponseModule : IModule
{
public void InitializeServices(IServiceManager serviceManager)
{
serviceManager.RegisterSingleton<FileResponseService, FileResponseService>();
}
}
public class FileResponseService : ServiceBase, IFileResponseService
{
public override void FinishedInitialization()
{
base.FinishedInitialization2();
ServiceManager.Get<ISemanticProcessor>().Register<WebServerMembrane, FileResponder>();
}
}
public class FileResponder : IReceptor
{
public void Process(ISemanticProcessor proc, IMembrane membrane, ST_FileResponse resp)
{
ProcessFileRequest(proc, resp.Context);
}
protected void ProcessFileRequest(ISemanticProcessor semProc, HttpListenerContext context)
{
bool handled = false;
string path = context.Request.RawUrl.LeftOf("?").RightOf("/").LeftOfRightmostOf('.');
string ext = context.Request.RawUrl.RightOfRightmostOf('.');
if (String.IsNullOrEmpty(path))
{
path = "index";
}
if (String.IsNullOrEmpty(ext))
{
ext = "html";
}
path = path + "." + ext;
path = Path.Combine("Website", path);
if (File.Exists(path))
{
switch (ext)
{
case "html":
semProc.ProcessInstance<WebServerMembrane, ST_HtmlResponse>(r =>
{
r.Context = context;
r.Html = ReadTextFile(path);
});
break;
case "js":
semProc.ProcessInstance<WebServerMembrane, ST_JavascriptResponse>(r =>
{
r.Context = context;
r.Javascript = ReadTextFile(path);
});
break;
case "css":
semProc.ProcessInstance<WebServerMembrane, ST_CssResponse>(r =>
{
r.Context = context;
r.Css = ReadTextFile(path);
});
break;
}
handled = true;
}
if (!handled)
{
semProc.ServiceManager.Get<ILoggerService>().Log("Route not found.");
semProc.ProcessInstance<WebServerMembrane, ST_RouteNotFound>(r => r.Context = context);
}
}
protected string ReadTextFile(string fn)
{
string text = File.ReadAllText(fn);
return text;
}
protected byte[] ReadBinaryFile(string fn)
{
FileStream fStream = new FileStream(fn, FileMode.Open, FileAccess.Read);
BinaryReader br = new BinaryReader(fStream);
byte[] data = br.ReadBytes((int)fStream.Length);
br.Close();
fStream.Close();
return data;
}
}
}
Testing
Again, we had this module to the modules.xml file:
<?xml version="1.0" encoding="utf-8" ?>
<Modules>
<Module AssemblyName='Clifton.SemanticProcessorService.dll'/>
<Module AssemblyName='ConsoleLoggerService.dll'/>
<Module AssemblyName='WebServerService.dll'/>
<Module AssemblyName='SemanticWebRouterService.dll'/>
<Module AssemblyName='WebResponseService.dll'/>
<Module AssemblyName='WebFileResponseService.dll'/>
</Modules>
Now let's make simple change to our application. We'll use map the ST_FileResponse
semantic message the route "foobar
" so that a specific page will be loaded. Here's the entire program:
static partial class Program
{
static void Main(string[] args)
{
InitializeBootstrap();
Bootstrap((e) => Console.WriteLine(e.Message));
serviceManager.Get<IWebServerService>().Start("127.0.0.1", 80);
ISemanticWebRouterService router = serviceManager.Get<ISemanticWebRouterService>();
router.Register<ST_FileResponse>("get", "foobar");
Console.WriteLine("Press ENTER to exit the server.");
Console.ReadLine();
}
}
The bolded line:
router.Register<ST_FileResponse>("get", "foobar");
does the mapping of the route.
Now let's add an HTML page to our Website folder, which lives in bin\Debug (yes, I hard-coded the website path in the above code, typically I retrieve it from the app.config file using, you guessed it, the AppConfigService
I described in a previous article in this series.

The file is simply:
<h1>Foobar!</h1>
And here's the result:

Conclusion
OK, that's enough for now. I had originally intended to use all the tech for writing a simple game, but I think demonstrating the Publisher/Subscriber with just the web server components is enough for this article! In the demo code, you'll notice the web server demo is actually in the project HuntTheWumpus
. That was the game I was going to use to demonstrate the Publisher/Subscriber, and that will probably be the next article anyways.
To review what has been achieved:
- Further decoupling of services. Except for core services, like the logger, and direct access to the router service for registering routes, the intercommunication is handled entirely by the Publisher/Subscriber. In fact, several of the services don't even implement service methods, the interface is simply placeholders.
- Messages are processed on independent threads, which is perfect for this kind of application.
- Subscribers are instantiated for each message, a desirable feature where thread safety is paramount.
- The Publisher/Subscriber implements logging and exception handling.
So, these four articles demonstrates the four components that are the foundation stones to The Clifton Method. Whether I'm writing a web or client app, I almost always start with these components and the growing library of services that I can just plug into an application. You can peruse the whole kit and kaboodle on GitHub. There are minor variances to the implementation there, vs. what's presented here. Through the process of writing these four articles, I've updated the code in the GitHub repo to reflect bug fixes, and some of the code for minor improvements.
History
- 25th August, 2016: Initial version