Click here to Skip to main content
15,884,826 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
I am using https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1&tabs=visual-studio as a reference to create Background service and process queues.

I have ImportService.cs class where csvfile from HTTP request comes, then I want to add it to queue which processes that CSV file and writes results to database. This is my service class where I have
IBackgroundTaskQueue
instance

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CsvHelper;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

namespace Services.Services
{
public class ImportService : BaseService, IImportService
{
    private readonly IUploadDataRepository _uploadDataRepository;
    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue { get; }
    private const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue) : base(services)
    {
        _uploadDataRepository = services.GetUploadDataRepository();
        _configurationSettings = services.GetService<ConfigurationSettings>();
        Queue = queue;
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, Type type)
    {        
        var fileFormat = GetFileFormat(file);
        var tempFilePath = await GetTemporaryPath(file);
        var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
        string storageConnectionString = _configurationSettings.ConnectionStrings.BlobStorageConnection;
        CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
        var blobClient = account.CreateCloudBlobClient();

        // Make sure container is there
        var blobContainer = blobClient.GetContainerReference(AZURE_BLOB_CONTAINER);
        await blobContainer.CreateIfNotExistsAsync();

        // set the permission to blob type
        await blobContainer.SetPermissionsAsync(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });
        CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(fileName);
        using (var fileStream = File.OpenRead(tempFilePath))
        {
          await blockBlob.UploadFromStreamAsync(fileStream);
        }
            // ADD FILE TO QUEUE AND PROCESS IT
        Queue.QueueBackgroundWorkItem(async token =>
        {
            Console.WriteLine("ITEM QUEUED PROCESS IT??");

        });
        await _uploadDataRepository.Add(uploadData);
    }
}


Below I'll add classes created from microsoft example:
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Services.Services.Contracts {
  public interface IBackgroundTaskQueue {
    void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
    Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
  }
}

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Services.Services {
  /// <summary>
  /// Queued Hosted Service class
  /// </summary>
  public abstract class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
      Console.WriteLine("QueuedHostedService initialized");
    }
    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      _logger.LogInformation("Queued Hosted Service is starting.");

      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {
          _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
        }
      }
    }

    private void DoWork(object state) {
      Console.WriteLine("PROCCESS FILEE???");
    }
  }
}

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems =
      new ConcurrentQueue < Func < CancellationToken, Task >> ();

    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }

      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);

      return workItem;
    }
  }
}


What I have tried:

My question is where should that file be processed? In ImportService? Or in QueuedHostedService? If in
QueuedHostedService
how should I pass and access that file? What would be the best practice for that? I wanted to create
DoWork()<\pre> function in <pre>QueuedHostedService
which processes that file but I'm not sure how. Or should processing be done in Import service class?
Posted
Updated 8-May-20 3:37am

1 solution

Can't say as I've used that particular scheme for queuing work items but the logic is the same regardless of the scheme. You do the processing in the work item. All the code you've posted deals with the service and its queues, but not the work item itself.
Queue.QueueBackgroundWorkItem(async token =>
        {
            Console.WriteLine("ITEM QUEUED PROCESS IT??");
    });
is the key area. You need to create the work item to be queued and as, part of that creation, save off the blob that contains the file you received in the work item. Then, when the work item is dequeued and executed, it uses that saved blob and processes the file.
 
Share this answer
 
Comments
Member 12885549 8-May-20 9:42am    
So all the processing should go inside here right?
 Queue.QueueBackgroundWorkItem(async token =>
        {
            Console.WriteLine("Processing goes here?");
    });
JudyL_MD 8-May-20 9:58am    
Yes, that's where the reference page you linked to does its work. Replace that WriteLine with the actions you want to execute.

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900