Click here to Skip to main content
15,881,089 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
I am implementing queues using https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-studio example.

this is how my code looks:

in startup.cs I am adding my hosted service and background queue
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();


then I implement scoped service, hosted service and background queue as following:

What I have tried:

namespace Services.Services {
  public class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private readonly IServiceProvider _serviceProvider;
    public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      _serviceProvider = serviceProvider;
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
    }
    public IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {

        }
      }
    }
  }
}


public interface IBackgroundTaskQueue {
  void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
  Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);
    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }
      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);
      return workItem;
    }
  }
}


// scoped service
namespace Services.Services {
  public class ImportService: BaseService, IImportService {
    private readonly IFileProcessingService _scopedProcessingService;

    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue {
      get;
    }
    private
    const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public IServiceProvider Services {
      get;
    }

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
      Services = services;
      _configurationSettings = services.GetService < ConfigurationSettings > ();
      _scopedProcessingService = services.GetProcessingService();
      Queue = queue;
    }

    // ---- Main file
    public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
      await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
      var fileFormat = GetFileFormat(file);
      var tempFilePath = await GetTemporaryPath(file);
      var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
      // ....... //

      ProcessFile(tempFilePath, fileFormat, file, type, userId);
    }

    private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
      var delimiter = ",";
      Queue.QueueBackgroundWorkItem(async token => {
        using(var scope = Services.CreateScope()) {
          var scopedProcessingService =
            scope.ServiceProvider
            .GetRequiredService < IFileProcessingService > ();

          // do the processing
          switch (type) {
            case "csv":
              await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
              break;
          }
        }
      });
    }
  }
}


I am adding elemeents to queue on request in controller. Now I want to add another queue for pocessing other requests. Is it possible to use another queue using same Hosted service? I have trouble finding examples how to do that. Should I just add another scoped servide and another background queue?
Posted

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900