Click here to Skip to main content
15,887,264 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
I'm starting to dabble in the TPL, and I'm looking for comments regarding the code to see if there's something I'm missing. It seems to work, but I can't find any decent intros to the subject. The following is what I've managed to glean from Google.

The tasks I'll eventually be creating will live in a Windows service and must be cancelable. Am I approaching that requirement correctly?

C#
class Program
{
    static void Main(string[] args)
    {
        // create the tasks
        Task[] tasks = new Task[] 
        {
            CreateTask("Task1", 5), 
            CreateTask("Task2", 1)};

        // fire them up
        Parallel.Invoke(()=>tasks[0].Start(), 
                        ()=>tasks[1].Start());

        // wait for them
        Task.WaitAll(tasks);

        // clean up
        foreach(Task task in tasks)
        {
            task.Dispose();
        }
    }

    private static Task CreateTask(string name, int interval)
    {
        Task                    task        = null;
        TaskCreationOptions     taskOptions = TaskCreationOptions.LongRunning;
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        CancellationToken       cancelToken = tokenSource.Token;
        cancelToken.ThrowIfCancellationRequested();

        try
        {
            task = new Task((Object stateObj)=>DoWork(name, interval, cancelToken), taskOptions, cancelToken);
        }
        catch (AggregateException aggEx)
        {
        }
        catch(Exception ex)
        {
        }
        finally
        {
        }
        return task;
    }

    private static void DoWork(string name, int interval, CancellationToken cancelToken)
    {
        int count = 1;
        do
        {
            string msg = string.Format("{2} Tick {0:000} {1}", count, DateTime.Now.ToString("HH:mm:ss"), name);
            Console.WriteLine(msg);
            Thread.Sleep(interval * 1000);
            count++;
        } while (!cancelToken.IsCancellationRequested && count<10);
    }
}


I'm writing a Windows service that will use MEF to dynamically load one or more assemblies. Each assembly will have at least one thread to perform work, and a FileSystemWatcher (possibly in another thread) to monitor its config file for updates. Each assembly will need to be able to cancel its running thread in order to load a new config, and then restart itself. The threads sit-and-spin waiting for a specific datetime to perform their work.

The windows service needs to be able to stop/start these assemblies when the service is started/stopped, or when a new assembly is detected, or an existing one is deleted.

The code above is simply a POC for myself, and I was just curious as to what - if anything - I'm doing wrong, or where I'm not understanding the way the TPL works.

I saw several examples of doing it this way, as well as other ways, but there is no clear explanation as to HOW you're supposed to do it. Since the threads have to be stopped/restarted, it made sense - to me - to do it this way.
Posted
Updated 9-Feb-15 7:27am
v3
Comments
Veronica S. Zotali 9-Feb-15 12:03pm    
Creating and starting tasks is different than Parallelism. Do you want to start your tasks at the same time?

It seems that you are trying to use asynchronous and parallel programming at the same time. What are you trying to achieve?
#realJSOP 9-Feb-15 12:10pm    
A task isn't necessarily a thread, and I need them to run at the same time. They don't have to interact, but they do need to run side-by-side.

so smtg like that won't do ?

C#
private void ProcessRequests(IList<request> requests)
{
            var observer = observers.First();

            var task = new Task(() => requests.ToList().ForEach(ProcessRequest));

            task.Start();

            task.ContinueWith((con) =>
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
               
                observer.OnCompleted();
            });
}
 
Share this answer
 
v2
You're currently calling the new Task(Action<object>, object, CancellationToken) constructor; the TaskCreationOptions you're passing in are ignored, because they're being treated as the state parameter to the action parameter.

You should generally try to avoid creating tasks which aren't already started. There are a few cases where it's necessary, but it's more efficient to use the Task.Factory.StartNew or Task.Run methods.
"Task.Factory.StartNew" vs "new Task(...).Start"[^]

The Parallel.Invoke line doesn't look right to me. You're firing up two threads simply to start the tasks, which will run on a different thread. If you change your CreateTask method to return already-started tasks, you can simply remove this line.

You don't really need to call Dispose on the tasks, particularly in .NET 4.5:
Do I need to dispose of Tasks?[^]

I'd be inclined to pass a single CancellationTokenSource to the CreateTask method, since you'll need to keep hold of it if you want to cancel the tasks.

If you're using .NET 4.5, I'd be inclined to use async / await, which makes TPL code much easier to write: Asynchronous Programming with Async and Await (C# and Visual Basic)[^]

Sacha Barber has a good series of articles on TPL: Task Parallel Library: 1 of n[^]

You might also want to check out Stephen Toub's blog posts: Parallel Programming with .NET[^]




Based on your expanded description, you probably want to break this down into smaller composable blocks.

The following code provides two task-returning methods:
  • WaitUntil returns a cancellable Task which will be completed on a specific date/time (to within 10 seconds);
  • WhenFileChanged returns a cancellable Task which will be completed when a specified file is changed;

C#
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public static class TaskUtilities
{
    private static Task FromCancellation()
    {
        var tcs = new TaskCompletionSource<bool>();
        tcs.SetCanceled();
        return tcs.Task;
    }

    public static Task WaitUntil(DateTimeOffset triggerTime, CancellationToken cancellationToken)
    {
        if (cancellationToken.IsCancellationRequested) return FromCancellation();
        return new WaitUntilPromise(triggerTime, cancellationToken).Task;
    }

    private sealed class WaitUntilPromise
    {
        private const int CheckFrequencyInMilliseconds = 10000; // 10 seconds

        private readonly TaskCompletionSource<bool> _taskCompletionSource = new TaskCompletionSource<bool>();
        private readonly DateTimeOffset _triggerTime;
        private readonly CancellationToken _token;

        private readonly CancellationTokenRegistration _registration;
        private readonly Timer _timer;

        public WaitUntilPromise(DateTimeOffset triggerTime, CancellationToken cancellationToken)
        {
            _triggerTime = triggerTime;
            _token = cancellationToken;

            if (triggerTime <= DateTimeOffset.UtcNow)
            {
                _taskCompletionSource.SetResult(true);
            }
            else
            {
                if (cancellationToken.CanBeCanceled)
                {
                    _registration = cancellationToken.Register(
                        state => ((WaitUntilPromise)state).Complete(),
                        this);
                }

                _timer = new Timer(
                    state => ((WaitUntilPromise)state).CheckTime(),
                    this, 0, CheckFrequencyInMilliseconds);
            }
        }

        public Task Task
        {
            get { return _taskCompletionSource.Task; }
        }

        private void CheckTime()
        {
            if (_triggerTime <= DateTimeOffset.UtcNow)
            {
                Complete();
            }
        }

        private void Complete()
        {
            bool completed;
            if (_token.IsCancellationRequested)
            {
                completed = _taskCompletionSource.TrySetCanceled();
            }
            else
            {
                completed = _taskCompletionSource.TrySetResult(true);
            }
            if (completed)
            {
                if (_timer != null) _timer.Dispose();
                _registration.Dispose();
            }
        }
    }

    public static Task WhenFileChanged(string filePath, CancellationToken cancellationToken)
    {
        if (string.IsNullOrWhiteSpace(filePath))
        {
            throw new ArgumentNullException("filePath");
        }

        filePath = Path.GetFullPath(filePath);

        if (!File.Exists(filePath))
        {
            throw new FileNotFoundException(null, filePath);
        }

        return new WhenFileChangedPromise(filePath, cancellationToken).Task;
    }

    private sealed class WhenFileChangedPromise
    {
        private readonly TaskCompletionSource<bool> _taskCompletionSource = new TaskCompletionSource<bool>();
        private readonly CancellationToken _token;
        private readonly string _filePath;

        private readonly CancellationTokenRegistration _registration;
        private readonly FileSystemWatcher _watcher;

        public WhenFileChangedPromise(string filePath, CancellationToken cancellationToken)
        {
            _filePath = filePath;
            _token = cancellationToken;

            string directoryName = Path.GetDirectoryName(filePath);
            string fileName = Path.GetFileName(filePath);

            _watcher = new FileSystemWatcher(directoryName, fileName);
            _watcher.Changed += OnChanged;

            if (cancellationToken.CanBeCanceled)
            {
                _registration = cancellationToken.Register(
                    state => ((WhenFileChangedPromise)state).Complete(),
                    this);
            }

            _watcher.EnableRaisingEvents = true;
        }

        public Task Task
        {
            get { return _taskCompletionSource.Task; }
        }

        private void OnChanged(object sender, FileSystemEventArgs e)
        {
            if (string.Equals(e.FullPath, _filePath, StringComparison.OrdinalIgnoreCase))
            {
                Complete();
            }
        }

        private void Complete()
        {
            bool completed;
            if (_token.IsCancellationRequested)
            {
                completed = _taskCompletionSource.TrySetCanceled();
            }
            else
            {
                completed = _taskCompletionSource.TrySetResult(true);
            }
            if (completed)
            {
                _watcher.Changed -= OnChanged;
                _watcher.Dispose();
                _registration.Dispose();
            }
        }
    }
}


With .NET 4.5, your assembly's code would then look something like this:
C#
public async Task Run()
{
    while (shouldContinueToScheduleWork)
    {
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        
        Task configChanged = MonitorConfiguration(tokenSource.Token);
        Task scheduleWork = ScheduleWork(tokenSource.Token);
        
        // Wait for one of the tasks to complete:
        Task completed = await Task.WhenAny(configChanged, scheduleWork);
        
        // Cancel the other task:
        tokenSource.Cancel();
        
        if (completed == configChanged)
        {
            await ReloadConfiguration();
        }
    }
}

private Task MonitorConfiguration(CancellationToken cancellationToken)
{
    return TaskUtilities.WhenFileChanged(pathToTheConfiguration, cancellationToken);
}

private async Task ReloadConfiguration()
{
    ...
}

private async Task ScheduleWork(CancellationToken cancellationToken)
{
    await TaskUtilities.WaitUntil(timeWhenWorkShouldRun, cancellationToken);
    
    if (!cancellationToken.IsCancellationRequested)
    {
        await DoWork(cancellationToken);
    }
}

private async Task DoWork(CancellationToken cancellationToken)
{
    ...
}
 
Share this answer
 
v3
Comments
#realJSOP 9-Feb-15 12:47pm    
0) Each task has to be run on a different thread.

1) I might need to know when each thread is canceled. Having a single cancel token for each task won't allow that, will it?

2) I thought that I should specify longrunning on threads that run for more than a second or so. Each thread will be running until cancelled by an external source (for days/weeks/months/years).

3) I honestly don't see how this is easier than the classic threading model (something everyone seems to say). Like I said, I haven't been able to find a decent article on TPL (one that explains it in a way I can understand).

I updated my question (after the code block).
Richard Deeming 9-Feb-15 13:22pm    
0) This is handled automatically by the Task.Run / Task.Factory.StartNew call. The action will be queued on a background thread - typically one from the ThreadPool, although that can be overridden by the TaskScheduler.

With async / await, the code runs sequentially until it hits the first await call for something which isn't already completed. In the example I posted, I use await Task.Yield(); to make sure the method is running on a background thread from the start; without that, it would run on the current thread until the first call to await Task.Delay(...).


1) A single CancellationTokenSource will let you cancel multiple tasks at the same time. If you need to be able to cancel a single task, then you would need a CancellationTokenSource per task.

2) Stephen Toub has a good answer for this on the MSDN forums: https://social.msdn.microsoft.com/Forums/vstudio/en-US/8304b44f-0480-488c-93a4-ec419327183b/when-should-a-taks-be-considered-longrunning[^]

3) Personally, I think the TPL solves a lot of problems with the "classic" approach to threading. For example, if you're using the ThreadPool, there's no standard way to cancel a work-item, or know its current state, or to be notified when it's finished. There's no built-in support for composing work items. Exception management is a pain.

The MSDN documentation[^] is surprisingly not terrible. :)
Richard Deeming 9-Feb-15 14:52pm    
I've posted some updated thoughts based on your updated description.
#realJSOP 9-Feb-15 14:56pm    
Changed it to the following, and it still appears to work:

<pre lang="csharp">
static void Main(string[] args)
{
Task[] tasks = new Task[]{CreateTask("Task1", 1), CreateTask("Task2", 5)};
// wait for them
Task.WaitAll(tasks);
}

private static Task CreateTask(string name, int interval)
{
Task task = null;
TaskCreationOptions taskOptions = TaskCreationOptions.LongRunning;
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken cancelToken = tokenSource.Token;
cancelToken.ThrowIfCancellationRequested();

try
{
task = Task.Factory.StartNew((Object stateObj)=>DoWork(name, interval, cancelToken), taskOptions, cancelToken);
}
catch (AggregateException aggEx)
{
}
catch(Exception ex)
{
}
finally
{
}
return task;
}
</pre>
Richard Deeming 9-Feb-15 15:03pm    
Your TaskCreationOptions will still be ignored - you're calling the StartNew(Action<object>, object, CancellationToken) overload, so the taskOptions will simply be passed to your delegate as the stateObj parameter.

You either need to call the StartNew(Action<object>, object, CancellationToken, TaskCreationOptions, TaskScheduler) overload, or the StartNew(Action, CancellationToken, TaskCreationOptions, TaskScheduler) overload.

(There doesn't seem to be an overload which takes both a CancellationToken and a TaskCreationOptions without a TaskScheduler.)

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900