ASP.NET Core 9: Creating Asynchronous APIs to handle complex operations using The .NET 'Channel' class from System.Threading.Channels namespace
REST APIs or APIs is the standard for service creations in most of the Line-Of-Business applications. The use of APIs is most common across various data communication applications across homogeneous or heterogeneous, cross-platform applications. APIs offers a seamless integration across application for smooth data communication still in loosely coupled architecture. Whether you are developing Web Application, Mobile Application, Browser Apps, etc. the APIs plays very important role of effective data communication in the JSON format. APIs have an extended capability of file upload and download also.
With all these great uses of APIs, sometimes in the case of the large data processing on APIs side the performance of the API may still be declined and hence the response from APIs may get delayed. In this case, since the client applications may also have the execution blocking because of this delayed response. This situation generally occurs when there are several client applications sending large data e.g. JSON files, Images, etc. to the API and API now have to process these files. Naturally, these types of resources processing will cause a considerable delay in the API execution. In this case the API developer have to use wise coding practice to offload the processing time on API end so that the API endpoint will be executed smoothly and immediately it will generate response to the client. The question is what to do here?
Well, we do have an asynchronous programming feature in .NET using Thread and Task classes. In .NET Framework and hence from .NET core onwards we have been provided with async/await approaches to implement no-blocking asynchronous code, but when we have a large and time-consuming operations to perform, then we will have to opt for a better approach. The Channel class can be a better solution here.
System.Threading.Channels.Channel Class
The .NET Channel class is part of the System.Threading.Channels namespace and provides a set of synchronization data structures for passing data between producers and consumers asynchronously. This is particularly useful in scenarios where you need to manage data flow in a producer/consumer model. Channel is a concept for passing data between producers and consumers. Specifically, a channel has a queue where multiple producers can write data to it, and then multiple consumers can come and read this data. Figure 1, demonstrate the concept of the channel:
Figure 1: The Channel Concept
In .NET Channels uses ConcurrentQueue as a data structure in for thread safety. This can be used by multiple producers/consumers to perform the read/write operations safely.
Producer/Consumer Model:
Channels implement the producer/consumer model, where producers asynchronously produce data, and consumers asynchronously consume it. This data is passed through a first-in, first-out (FIFO) queue. A channel has a type, this type act like the topic of this channel, any data of specific type will be written or read from its own channel and two channels cannot have the same data type.
Bounded and Unbounded Channels:
Bounded Channels:
These channels have a maximum capacity. When the limit is reached, the producer is blocked until space becomes available. You can create a bounded channel using Channel.CreateBounded<T>(capacity) method.
Unbounded Channels:
These channels do not have a capacity limit and can be used by any number of readers and writers concurrently without a fixed limit of number of messages. You can create an unbounded channel using Channel.CreateUnbounded<T>() method.
Synchronization and Concurrency:
Channels manage synchronization and provide various consumption models through factory creation options, allowing for multiple producers and consumers to interact concurrently.
The more information of the Channel can be read from this link.
The Implementation
In this article, we will be implementing the Channel in ASP.NET Core 9 API project. We will be designing this API to accept JSON files from the client, the API will then write these files to Channel and then we will create a background service that will read files from the Channel and process it to write JSON file into the database. Figure 2 demonstrate the implementation of the application.
Figure 2: The Implementation
As mentioned in the various numbered points in Figure 1, the client application will access the API endpoint will upload JSON files and hence the endpoint will be acting as a Producer. Then this Producer will write these files into the Channel. Furter, these files will be read by the Consumer created using the ASP.NET Core Background Service. The background service will process these JSON files and then data from these files will be written into the SQL Server database.
Step 1: Let's create a SQL Server database named EComm and the table in it named ProductInfo as shown in Listing 1.
Create Database EComm GO USE [EComm] GO CREATE TABLE [dbo].[ProductInfo]( [ProductId] [varchar](20) NOT NULL, [ProductName] [varchar](100) NOT NULL, [CategoryName] [varchar](100) NOT NULL, [Description] [varchar](200) NOT NULL, [UnitPrice] [int] NOT NULL, [ProductRecordId] [int] IDENTITY(1,1) NOT NULL, PRIMARY KEY CLUSTERED ( [ProductId] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] ) ON [PRIMARY] GO
Listing 1: The SQL Server Database and Table
Step 2: Open Visual Studio 2022 and create a ASP.NET Core API project targeted to .NET 9. Name this project as 'API_JSONFile_NETChannel'. In this project, add Entity Framework Core packages so that we can access the SQL Server database. Packages are listed as follows:
- Microsoft.EntityFrameworkCore
- Microsoft.EntityFrameworkCore.Design
- Microsoft.EntityFrameworkCore.Relational
- Microsoft.EntityFrameworkCore.SqlServer
- Microsoft.EntityFrameworkCore.Tools
dotnet ef dbcontext scaffold "Data Source=.;Initial Catalog=EComm;Integrated Security=SSPI;TrustServerCertificate=True" Microsoft.EntityFrameworkCore.SqlServer -o Models -t ProductInfo
The above command will add Models folder in the project that contains EcommContext and ProductInfo classes in it,
Move the connection string from the OnConfiguring() method of the EcommContext class to appsettings.json as shown in Listing 2
"ConnectionStrings": { "AppConnStr": "Data Source=.;Initial Catalog=EComm;Integrated Security=SSPI;TrustServerCertificate=True" }
Listing 2: The Connection String
Step 3: In the project, add a new folder named uploads, processedFiles, and FileProcessor. The uploads folder will store uploaded files from the client and processedFiles folder will store all processed files (files read from channel and its data is inserted in SQL Server database). In the FileProcessor folder, add a new class file named JsonFileProcessor.cs. This file will contain code for JsonFileProcessor class that will have methods to read JSON files and insert contents from JSON files into the SQL Server Database Table. JSON files will be deserialized into the List of the ProductInfo class and then this list will be written to the DbSet of ProductInfo created using Entity Framework as seen in step 2. Listing 3 shows the code for JsonFileProcessor class.
using API_JSONFile_NETChannel.Models; using System.Text.Json; namespace API_JSONFile_NETChannel.FileProcessor { public class JsonFileProcessor(EcommContext ctx) { private readonly string _uploadsFolder = "uploads"; private readonly string _processedFilesFolder = "processedFiles"; public async Task ProcessFilesAsync(string file) { var products = await ReadJsonFileAsync(file); await InsertProductsIntoDatabaseAsync(products); MoveFileToProcessedFolder(file); } /// <summary> /// Read JSON file and Deserialize into the List of ProductInfo class /// </summary> /// <param name="filePath"></param> /// <returns></returns> private async Task<List<ProductInfo>> ReadJsonFileAsync(string filePath) { using var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read); var products = await JsonSerializer.DeserializeAsync<List<ProductInfo>>(stream); return products ?? new List<ProductInfo>(); } /// <summary> /// Insert ProductInfo List into the ProductInfo Table /// </summary> /// <param name="products"></param> /// <returns></returns> private async Task InsertProductsIntoDatabaseAsync(List<ProductInfo> products) { Task.Delay(7000).Wait(); await ctx.ProductInfos.AddRangeAsync(products); await ctx.SaveChangesAsync(); } /// <summary> /// Once the JSON file's data is inserted into the Table Move the file to 'processedFiles' folder /// </summary> /// <param name="filePath"></param> private void MoveFileToProcessedFolder(string filePath) { var fileName = Path.GetFileName(filePath); var destinationPath = Path.Combine(_processedFilesFolder, fileName); if (File.Exists(destinationPath)) { var fileExtension = Path.GetExtension(fileName); var fileNameWithoutExtension = Path.GetFileNameWithoutExtension(fileName); var counter = 1; while (File.Exists(destinationPath)) { var newFileName = $"{fileNameWithoutExtension}_{counter}{fileExtension}"; destinationPath = Path.Combine(_processedFilesFolder, newFileName); counter++; } } File.Move(filePath, destinationPath); } } }
Listing 3: The JsonFileProcessor class
The JsonFileProcessor class is constructor injected with EcommContext class. This class has following methods
- ReadJsonFileAsync(), this method will read the JSON file and deserialize its contents into the List of ProductInfo class.
- InsertProductsIntoDatabaseAsync(), this method will use the Entity Framework Core to insert data from the List of ProductInfo into the ProductInfo table.
- MoveFileToProcessedFolder(), once the contents of the file are inserted into the table, the file will be moved from uploads folder to processedFiles folder. If the file is already present into the processedFiles folder, then it will be renamed using the counter variable.
- ProcessFilesAsync() method will invoke all other private methods of the class to complete execution of the files into database table.
using System.Threading.Channels; namespace API_JSONFile_NETChannel.FileProcessor { public class ChannellingJob { private readonly Channel<string> _channel; private readonly JsonFileProcessor _processor; public ChannellingJob(JsonFileProcessor processor, Channel<string> channel) { _channel = channel; _processor = processor; } /// <summary> /// Write File to Channel /// </summary> /// <param name="file"></param> /// <param name="cancellationToken"></param> /// <returns></returns> public async Task EnqueueFileAsync(string file, CancellationToken cancellationToken = default) { await _channel.Writer.WriteAsync(file); } /// <summary> /// Read File from Channel /// </summary> /// <param name="cancellationToken"></param> /// <param name="channel"></param> /// <returns></returns> public async Task DequeueFilesAsync(CancellationToken cancellationToken, Channel<string> channel) { /// wait for the data available to read var res = await _channel.Reader.WaitToReadAsync(); while (res) { await foreach (var filePath in _channel.Reader.ReadAllAsync()) { await _processor.ProcessFilesAsync(filePath); } } } } }
Listing 4: ChannellingJob class
The ChannllingJob class have following methods
- EnqueueFileAsync(), this method will accept the file name as input parameter, and this will be written into the channel.
- DequeueFilesAsync(), this method will read file names from the channel once they are available to read. This file name will be passed to the ProcessFilesAsync() method of the JsonFileProcessor class, where this file will be processed to save its contents to the Database table.
using API_JSONFile_NETChannel.FileProcessor; using System.Threading.Channels; namespace API_JSONFile_NETChannel { /// <summary> /// The Consumer /// </summary> public class FileProcessorBackgroundService : BackgroundService { private readonly IServiceProvider _serviceProvider; Channel<string> _channel; public FileProcessorBackgroundService(IServiceProvider serviceProvider, Channel<string> channel) { _serviceProvider = serviceProvider; _channel = channel; } /// <summary> /// Method to Consume data from Channel by invoking the DequeueFilesAsync() method /// </summary> /// <param name="stoppingToken"></param> /// <returns></returns> protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { using (var scope = _serviceProvider.CreateScope()) { var jsonFileProcessor = scope.ServiceProvider.GetRequiredService<JsonFileProcessor>(); var channellingJob = new ChannellingJob(jsonFileProcessor, _channel); // Conditional execution if (ShouldExecute()) { await channellingJob.DequeueFilesAsync(stoppingToken,_channel); } } } catch (Exception ex) { // Log the exception or handle it accordingly throw; } // Wait for a specified interval before checking again await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); // Adjust the interval as needed } } private bool ShouldExecute() { const string uploadsFolderPath = "uploads"; return Directory.Exists(uploadsFolderPath) && Directory.GetFiles(uploadsFolderPath).Length > 0; } } }
Listing 6: The FileProcessorBackgroundService class
Step 6: In the Program.cs, add code for registering EcommContext, ChannellingJob, JsonFileProcessor classed in dependency container. Since we will be uploading JSN files we need to add service for Antiforgery. We also need to register the FileProcessorBackgroundService class as hosted service, and the most important part is we need to create and unbound channel and register it as a singleton so that it will be available as a global channel for all requests for all sessions to API. Listing 7 shows the code for all registration.
.....
builder.Services.AddDbContext<EcommContext>(options => { options.UseSqlServer(builder.Configuration.GetConnectionString("AppConnStr")); }); builder.Services.AddScoped<ChannellingJob>(); builder.Services.AddScoped<JsonFileProcessor>(); builder.Services.AddAntiforgery(); builder.Services.AddOpenApi(); builder.Services.AddSingleton(Channel.CreateUnbounded<string>()); builder.Services.AddHostedService<FileProcessorBackgroundService>();
......
Listing 7: The Dependency Registration code
In the Program.cs add an endpoint that will accept files from the client application. Listing 8 shows the code for the endpoint
........
app.MapPost("/upload", async (IFormFileCollection files, ChannellingJob channellingJob) => { // ChannellingJob channellingJob = new ChannellingJob(ctx); if (files == null || files.Count == 0) { return Results.BadRequest("No files uploaded"); } var filePaths = new List<string>(); foreach (var file in files) { if (file.Length > 0) { var filePath = Path.Combine("uploads"); string fileName = Path.GetFileName(file.FileName); var fileExtension = Path.GetExtension(fileName); var fileNameWithoutExtension = Path.GetFileNameWithoutExtension(fileName); var counter = Guid.NewGuid(); var newFileName = $"{fileNameWithoutExtension}_{counter}{fileExtension}"; filePath = Path.Combine(filePath, newFileName); using (var stream = new FileStream(filePath, FileMode.Create)) { await file.CopyToAsync(stream); } filePaths.Add(filePath); // The Producer await channellingJob.EnqueueFileAsync(filePath); } } return Results.Accepted($"Accepted All Uploaded Files"); }) .WithName("UploadJsonFiles").DisableAntiforgery();
......
Listing 8: Endpoint for accepting files from the client
The endpoint code in Listing 8 shows that files will be accepted from the client using IFromFileCollection interface. The endpoint also uses the ChannellingJob class instance from the dependency services. The coed shows that the uploaded file will be renamed using GUID so that if multiple sessions upload files with same name, then before creating the file in uploads folder it will be renamed to avoid the same file name exception. The endpoint uses the instance of the ChannellingJob class to call its EnqueueFileAsync() method to add file name into the channel, hence this endpoint will be acting as a producer.
To test the application, I have used 2 JSON files File1.json and File2.json with sample code shown in Listing 9.
File1.json [ { "ProductId":"Prod-0001", "ProductName":"Product-1", "CategoryName":"Category1", "Description":"Some Product", "UnitPrice":10000 }, { "ProductId":"Prod-0002", "ProductName":"Product-2", "CategoryName":"Category2", "Description":"Some Product", "UnitPrice":11000 }, { "ProductId":"Prod-0003", "ProductName":"Product-3", "CategoryName":"Category1", "Description":"Some Product", "UnitPrice":12000 }, { "ProductId":"Prod-0004", "ProductName":"Product-4", "CategoryName":"Category2", "Description":"Some Product", "UnitPrice":13000 } ] File2.json [ { "ProductId":"Prod-0005", "ProductName":"Product-5", "CategoryName":"Category3", "Description":"Some Product", "UnitPrice":13000 }, { "ProductId":"Prod-0006", "ProductName":"Product-6", "CategoryName":"Category4", "Description":"Some Product", "UnitPrice":15000 }, { "ProductId":"Prod-0007", "ProductName":"Product-7", "CategoryName":"Category3", "Description":"Some Product", "UnitPrice":16000 }, { "ProductId":"Prod-0008", "ProductName":"Product-8", "CategoryName":"Category4", "Description":"Some Product", "UnitPrice":14000 } ]
Listing 9: File1.json and File2.json
Run the Application and using AdvancedRESTClient(ARC), upload these files as shown in Figure 3.
Figure 3: Uploading Files using ARC
Once the Send button is clicked, you will see that the files are uploaded into the uploads folder as shown in Figure 4
Figure 4: Uploads folder
The ARC will show an immediate response as Accepted as shown in Figure 5
Figure 5: Response from API
File names will be consumed from the channel by the background service and then these files will be processed and will be moved to the processedFiles folder as shown in Figure 6
Figure 6: Processed files in processedFiles folder
The data from JSON files will be inserted into the ProductInfo table of the EComm database as shown in Figure 7
Figure 7: Data in Table
That's it. The API is not blocked, it immediately sends the response to the client once the data is added to the Channel. The API does not wait for the file to be processed. This is what really is Asynchronous API creation strategy. There is no blocking of response, and the API is immediately responding. You can handle exception in the Endpoint as well as in background service to generate error response using notification techniques.
One thing I want to make it clear that this solution is suitable if you are having the API hosted with a Single-Server hosting strategy. Otherwise, there can be solutions like you can write a file into an Azure Blob Storage or in RedisCache. To process file from these stores, you can use Azure Function with BlobTrigger or RedisCacheTrigger. This type of solution is suitable for distributed deployment of the API.
The code for this article can be downloaded from this link.