Click here to Skip to main content
15,885,216 members
Articles / Desktop Programming / ATL
Technical Blog

TPL-based TCP server with cancellation support

Rate me:
Please Sign up or sign in to vote.
0.00/5 (No votes)
19 Jul 2013CPOL 10.4K   1  
TPL-based TCP server with cancellation support.

When running TCP server there are two points where you have to either block and wait or use asynchronous "Begin"..."End" function pair:

  • When listening for incoming connections
  • When reading from connected socket

None of these options support cancelation. It seems the only way to stop the server is to periodically poll TcpListener.Pending and TcpClient.Available and check CancellationToken.IsCancellationRequested.

This sample demonstrates a simple TCP server wrapped as a Task. The task supports cancelation, both listening thread and all client threads are canceled by single CancellationToken.

The task calls your callback function and passes list of lines received from the client. You can write your data back to client and close the connection in the end. Like so:

C#
_server = TcpServer.Run(80, cancellationToken, (lines, client, cancellation) =>
    {
        while (DoStuff(lines) && !cancellation.IsCancellationRequested)
        {
            client.GetStream().WriteStuff();
        }

        client.Close();
    });

...

cancellationToken.Cancel();
_server.Wait();

This is the server code:

C#
public static class TcpServer
{
    private const int SleepTimeout = 50;// miliseconds

    public static Task Run(int port, CancellationToken cancellation, 
         Action<List<string>, 
         TcpClient, CancellationToken> onLineReceived)
    {
        return Accept(port, cancellation, (client, token) =>
            {
                var receivedLines = new List<string>();

                while(!cancellation.IsCancellationRequested && client.Connected)
                {
                    var available = client.Available;

                    if (available == 0)
                    {
                        Thread.Sleep(SleepTimeout);
                        continue;
                    }

                    var buffer = new byte[available];
                    client.GetStream().Read(buffer, 0, available);

                    var newData = Encoding.UTF8.GetString(buffer);
                    bool newLine;
                    AppendLines(receivedLines, newData, out newLine);

                    if (newLine) onLineReceived(receivedLines, client, cancellation);
                }

                client.Close();
            });
    }

    private static Task Accept(int port, CancellationToken cancellation, 
         Action<TcpClient, CancellationToken> onClientAccepted)
    {
        var childTasks = new List<Task>();
        var listener = new TcpListener(IPAddress.Any, port);
        listener.Server.SetSocketOption(SocketOptionLevel.Socket, 
                 SocketOptionName.ReuseAddress, 1);
        listener.Start();

        return Task.Factory.StartNew(() =>
            {
                while (!cancellation.IsCancellationRequested)
                {
                    if (!listener.Pending())
                    {
                        Thread.Sleep(SleepTimeout);
                        continue;
                    }

                    var client = listener.AcceptTcpClient();

                    var childTask = new Task(() => onClientAccepted(client, cancellation));
                    childTasks.Add(childTask);
                    childTask.ContinueWith(t => childTasks.Remove(t));
                    childTask.Start();
                }
            }, cancellation).ContinueWith(t =>
                {
                    Task.WaitAll(childTasks.ToArray());
                    listener.Stop();
                });
    }
    private static void AppendLines(List<string> lines, string newData, out bool newLine)
    {
        int i;
        int pos = 0;
        string line;
        newLine = false;

        for (i = pos; i < newData.Length; i++)
        {
            if (newData[i] == '\n')
            {
                line = (i > 0 && newData[i - 1] == '\r') ?
                                newData.Substring(pos, i - pos -1) :
                                newData.Substring(pos, i - pos);

                if (lines.Count == 0) lines.Add(line);
                else
                {
                    if (newLine) lines.Add(line);
                    else lines[lines.Count - 1] = lines[lines.Count - 1] + line;
                }

                newLine = true;
                pos = i + 1;
            }
        }

        line = newData.Substring(pos);
        if (!string.IsNullOrEmpty(line)) lines[lines.Count - 1] = lines[lines.Count - 1] + line;
    }
}

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
-- There are no messages in this forum --