ASP.NET Core: Implementing CQRS and MediatR pattern in ASP.NET Core API
In this article, we will see an implementation of the Command Query Responsibility Segregation (CQRS) pattern in ASP.NET Core API with MediatR. In my earlier article, I have already explained the CQRS. In this article, we will focus on Mediator Pattern.
Mediator Pattern
Mediator is a behavioral design pattern that reduces coupling between components of the program. This is achieved by using a special mediator object. This object decouples components by managing indirect communication across them. This pattern makes the application easy to extend, reuse, and modify individual components because they do not have any direct dependency across them. Figure 1, will provide a good idea of the Mediator pattern.
Figure 1: The Mediator Pattern
As shown in Figure 1, the Mediator object is responsible for listening to requests from each service object and passing them to the actual Handler object hence implementing the decoupling across the Services and Handlers. This helps to further maintain the usability of the application even if the handlers are extended, changed, etc.
Using MediatR in ASP.NET Core API for Implementing CQRS
The MediatR is a simple Mediator implementation in .NET. This supports request/response, Commands, Queries, Notifications, and events, Synchronous and Asynchronous intelligent dispatching via C#. Following are some of the types exposed by MediatR
- IMediator, this defines a Mediator for encapsulation of request/response and publishing interaction pattern.
- IRequest, a Marker interface that represents a request with a response.
- IRequetHandler, this is used to define a handler for the request
Figure 2 shows the implementation of the ASP.NET Core API Application
Figure 2: The CQRS with the MidaitR
As shown in Figure 2, we will be creating separate APIs for Read and Write operations. These APIs will use Query and Command to make data Read and Write requests and the MedtatR object will handle these requests using Query and Command Handlers to perform database operations.
The Implementation
We will be implementing this application using entity Framework Core (EF Core), MediatR, AutoMapper, and Custom Middleware. I have already explained the Auto MApper on this link. The Listing 1 shows the Database query
Create Database Company; GO USE [Company] GO CREATE TABLE [dbo].[ProductInfo]( [ProductId] [char](20) Primary Key, [ProductName] [varchar](200) NOT NULL, [Manufacturer] [varchar](200) NOT NULL, [Description] [varchar](400) NOT NULL, [BasePrice] [decimal](18, 0) NOT NULL, [Tax] as ([BasePrice] * 0.2), [TotalPrice] AS ([BasePrice]+([BasePrice] * 0.2)), ) select * from ProductInfo Insert into [ProductInfo] values('Prd-0001','Laptop','MS-IT','Gaming Laptop',120000) Insert into [ProductInfo] values('Prd-0002','Iron','MS-Ect','Household Cotton Iron',2000) Insert into [ProductInfo] values('Prd-0003','Water Bottle','MS-Home Appliances','2 Lts. Cold Storage',200) Insert into [ProductInfo] values('Prd-0004','RAM','MS-IT','32 GB DDR 6 Fast Memory',20000) Insert into [ProductInfo] values('Prd-0005','Mixer','MS-Ect','Kitchen Appliances',16000) Insert into [ProductInfo] values('Prd-0006','Mouse','MS-IT','Optical USB Gaming Mouse',1000) Insert into [ProductInfo] values('Prd-0007','Keyboard','MS-IT','Gaming Lights 110 Keys Stroke Kayboard',2000)
Listing 1: The Database Query
Step 1: Open Visual Studio 2022 and create a new ASP.NET Core API Project Targetted to .NET 7 r .NET 8. Name this project as Core_CQRS_Mediatr. In this project, add the following NuGet Packages
- Microsoft.EntityFrameworkCore
- Microsoft.EntityFrameworCore.Relational
- Microsoft.EntityFrameworCore.SqlServer
- Microsoft.EntityFrameworCore.Tools
- Microsoft.EntityFrameworCore.Design
- AutoMapper
- MediatR
dotnet ef dbcontext scaffold "Data Source=.;Initial Catalog=Company;Integrated Security=SSPI" Microsoft.EntityFrameworkCore.SqlServer -o Models
public class ResponseBase { public string? Message { get; set; } public int StatusCode { get; set; } public bool IsSuccess { get; set; } = false; }
public class ResponseRecord<TEntity>: ResponseBase where TEntity : class { public TEntity? Record { get; set; } }
public class ResponseRecords<TEntity> : ResponseBase where TEntity : class { public IEnumerable<TEntity>? Records { get; set; } }
public class ProductInfoViewModel { [Display(Name ="Product Id")] [Required(ErrorMessage = "Product Id is required.")] public string ProductId { get; set; } = null!; [Display(Name = "Product Name")] [Required(ErrorMessage = "Product Name is required.")] public string ProductName { get; set; } = null!; [Display(Name = "Manufacturer")] [Required(ErrorMessage = "Manufacturer is required.")] public string Manufacturer { get; set; } = null!; [Display(Name = "Description")] [Required(ErrorMessage = "Description is required.")] public string Description { get; set; } = null!; [Display(Name = "Base Price")] [Required(ErrorMessage = "Base Price is required.")] public decimal BasePrice { get; set; } }
public class ProductInfoProfile : Profile { public ProductInfoProfile() { CreateMap<ProductInfo, ProductInfoViewModel>().ReverseMap(); } }
using System.Security.Cryptography; namespace Core_CQRS_Mediatr.CustomMIddlewares { public class ErrorResponse { public string? ErrorMessage { get; set; } public int ErrorCode { get; set; } } /// <summary> /// The Custom Middeware Logic class /// </summary> public class ExceptinHandlerMidleware { RequestDelegate _next; public ExceptinHandlerMidleware(RequestDelegate next) { _next = next; } /// <summary> /// When this Middeware is applied in HttpPipeline, teh RequestDelegate will /// Auto-Invoke this method /// THis method will contain the Logic for the MIddleware /// </summary> /// <param name="ctx"></param> /// <returns></returns> public async Task InvokeAsync(HttpContext ctx) { try { // If no Error Then Continue with the HTTP Pipeline Execution // by invoking the next middleware in the Pipeline await _next(ctx); } catch (Exception ex) { // If error occures, handle the exception by // listeng to the exception and send the error response // This will directly start the Http Response Process // Set the Response Error Code // Either Hard-Code it or therwise read it from // THe database or other configuration respures ctx.Response.StatusCode = 500; string message = ex.Message; var errorResponse = new ErrorResponse() { ErrorCode = ctx.Response.StatusCode, ErrorMessage = message }; // Generate the error response await ctx.Response.WriteAsJsonAsync(errorResponse); } } } // Create an Extension Method class that will register the ExceptinHandlerMidleware class // as custom Middlware public static class ErrorHandlerMiddlewareExtension { public static void UseErrorExtender(this IApplicationBuilder builder) { // Register the ExceptinHandlerMidleware class as custome middleware builder.UseMiddleware<ExceptinHandlerMidleware>(); } } }
public class GetProductInfoQuery : IRequest<ResponseRecords<ProductInfo>> { }
public class GetProductInfoByIdQuery: IRequest<ResponseRecord<ProductInfo>> { public string? Id { get; set; } }
public class RegisterProductInfoCommand : IRequest<ResponseRecord<ProductInfo>> { public ProductInfo ProductInfo { get; set; } public RegisterProductInfoCommand(ProductInfo product) { ProductInfo = product; } }
public class UpdateProductInfoCommand : IRequest<ResponseRecord<ProductInfo>> { public ProductInfo ProductInfo { get; set; } public UpdateProductInfoCommand(ProductInfo product) { ProductInfo = product; } }
public class DeleteProductInfoCommand : IRequest<ResponseRecord<ProductInfo>> { public string Id { get; set; } public DeleteProductInfoCommand(string id) { Id = id; } }
public interface IDataAccessService<TEntity, in TPk> where TEntity : class { Task<ResponseRecords<TEntity>> GetAsync(); Task<ResponseRecord<TEntity>> GetByIdAsync(TPk id); Task<ResponseRecord<TEntity>> CreateAsync(TEntity entity); Task<ResponseRecord<TEntity>> UpdateAsync(TPk id,TEntity entity); Task<ResponseRecord<TEntity>> DeleteAsync(TPk id); }
public class ProductInfoDataAccessService : IDataAccessService<ProductInfo, string> { CompanyContext Ctx; ResponseRecords<ProductInfo> ResponseRecords; ResponseRecord<ProductInfo> ResponseRecord; public ProductInfoDataAccessService(CompanyContext Ctx) { this.Ctx = Ctx; ResponseRecords = new ResponseRecords<ProductInfo>(); ResponseRecord = new ResponseRecord<ProductInfo>(); } async Task<ResponseRecord<ProductInfo>> IDataAccessService<ProductInfo, string>.CreateAsync(ProductInfo entity) { var result = await Ctx.ProductInfos.AddAsync(entity); await Ctx.SaveChangesAsync(); ResponseRecord.Record = result.Entity; ResponseRecord.Message = "Record is Added Successfully."; ResponseRecord.IsSuccess = true; return ResponseRecord; } async Task<ResponseRecord<ProductInfo>> IDataAccessService<ProductInfo, string>.DeleteAsync(string id) { ResponseRecord.Record = await Ctx.ProductInfos.FindAsync(id); if (ResponseRecord.Record is null) throw new Exception($"Recortd with ProductId : {id} is not found."); int recordDeleted = Ctx.ProductInfos.Where(prd => prd.ProductId == id).ExecuteDelete(); if (recordDeleted == 0) { ResponseRecord.Message = "Record Delete Failed"; ResponseRecord.IsSuccess = false; } else { ResponseRecord.Message = "Record is Deleted Successfully."; ResponseRecord.IsSuccess = true; } return ResponseRecord; } async Task<ResponseRecords<ProductInfo>> IDataAccessService<ProductInfo, string>.GetAsync() { ResponseRecords.Records = await Ctx.ProductInfos.ToListAsync(); ResponseRecords.Message = "Record is Read Successfully."; ResponseRecords.IsSuccess = true; return ResponseRecords; } async Task<ResponseRecord<ProductInfo>> IDataAccessService<ProductInfo, string>.GetByIdAsync(string id) { ResponseRecord.Record = await Ctx.ProductInfos.FindAsync(id); ResponseRecord.Message = "Record is Read Successfully."; ResponseRecord.IsSuccess = true; return ResponseRecord; } async Task<ResponseRecord<ProductInfo>> IDataAccessService<ProductInfo, string>.UpdateAsync(string id, ProductInfo entity) { ResponseRecord.Record = await Ctx.ProductInfos.FindAsync(id); if (ResponseRecord.Record is null) throw new Exception($"Record with ProductId : {id} is not found."); int recordUpdated = Ctx.ProductInfos .Where(prd => prd.ProductId == id) .ExecuteUpdate(setters => setters .SetProperty(prd => prd.ProductName, entity.ProductName) .SetProperty(prd => prd.Manufacturer, entity.Manufacturer) .SetProperty(prd => prd.Description, entity.Description) .SetProperty(prd => prd.BasePrice, entity.BasePrice) ); if (recordUpdated == 0) { ResponseRecord.Message = "Record Update Failed"; ResponseRecord.IsSuccess = false; } else { ResponseRecord.Record = await Ctx.ProductInfos.FindAsync(id); ResponseRecord.Message = "Record is Updated Successfully."; ResponseRecord.IsSuccess = true; } return ResponseRecord; } }
public class GetProductInfoListHandler : IRequestHandler<GetProductInfoQuery, ResponseRecords<ProductInfo>> { IDataAccessService<ProductInfo,string> ProductInfoService; public GetProductInfoListHandler(IDataAccessService<ProductInfo, string> productInfoService) { ProductInfoService = productInfoService; } public async Task<ResponseRecords<ProductInfo>> Handle(GetProductInfoQuery request, CancellationToken cancellationToken) { return await ProductInfoService.GetAsync(); } }
public class GetProductInfoByIdHandler : IRequestHandler<GetProductInfoByIdQuery, ResponseRecord<ProductInfo>> { IDataAccessService<ProductInfo, string> ProductInfoService; public GetProductInfoByIdHandler(IDataAccessService<ProductInfo, string> productInfoService) { ProductInfoService = productInfoService; } public async Task<ResponseRecord<ProductInfo>> Handle(GetProductInfoByIdQuery request, CancellationToken cancellationToken) { return await ProductInfoService.GetByIdAsync(request.Id); } }
public class CreateProductInfoHandler : IRequestHandler<RegisterProductInfoCommand, ResponseRecord<ProductInfo>> { IDataAccessService<ProductInfo,string> ProductInfoService; public CreateProductInfoHandler(IDataAccessService<ProductInfo, string> productInfoService) { this.ProductInfoService = productInfoService; } public async Task<ResponseRecord<ProductInfo>> Handle(RegisterProductInfoCommand request, CancellationToken cancellationToken) { return await ProductInfoService.CreateAsync(request.ProductInfo); } }
public class UpdateProductInfoHandler : IRequestHandler<UpdateProductInfoCommand, ResponseRecord<ProductInfo>> { IDataAccessService<ProductInfo,string> ProductInfoService; public UpdateProductInfoHandler(IDataAccessService<ProductInfo, string> productInfoService) { ProductInfoService = productInfoService; } public async Task<ResponseRecord<ProductInfo>> Handle(UpdateProductInfoCommand request, CancellationToken cancellationToken) { return await ProductInfoService.UpdateAsync(request.ProductInfo.ProductId, request.ProductInfo); } }
public class DeleteProductInfoHandler : IRequestHandler<DeleteProductInfoCommand, ResponseRecord<ProductInfo>> { IDataAccessService<ProductInfo, string> ProductInfoService; public DeleteProductInfoHandler(IDataAccessService<ProductInfo, string> productInfoService) { ProductInfoService = productInfoService; } public async Task<ResponseRecord<ProductInfo>> Handle(DeleteProductInfoCommand request, CancellationToken cancellationToken) { return await ProductInfoService.DeleteAsync(request.Id); } }
"ConnectionStrings": { "AppConnStr": "Data Source=.;Initial Catalog=Company;Integrated Security=SSPI;TrustServerCertificate=True" }
// Add services to the container. builder.Services.AddAutoMapper(typeof(Program)); builder.Services.AddMediatR(Assembly.GetExecutingAssembly()); builder.Services.AddDbContext<CompanyContext>( options=> options.UseSqlServer(builder.Configuration.GetConnectionString("AppConnStr"))); builder.Services.AddScoped<IDataAccessService<ProductInfo,string>, ProductInfoDataAccessService>();
..... app.UseErrorExtender(); .....
using Core_CQRS_Mediatr.Commands; using Core_CQRS_Mediatr.Models; using Core_CQRS_Mediatr.Queries; using MediatR; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; namespace Core_CQRS_Mediatr.Controllers { [Route("api/[controller]")] [ApiController] public class ReadProductInfoController : ControllerBase { IMediator mediator; public ReadProductInfoController(IMediator mediator) { this.mediator = mediator; } [HttpGet] public async Task<IActionResult> Get() { var response = await mediator.Send(new GetProductInfoQuery()); if (!response.IsSuccess) throw new Exception($"Error occurred while reading data"); response.StatusCode = 200; return Ok(response); } [HttpGet("{id}")] public async Task<IActionResult> Get(string id) { var response = await mediator.Send(new GetProductInfoByIdQuery() { Id = id}); if (!response.IsSuccess) throw new Exception($"Error occurred while reading data base don Id= {id}"); response.StatusCode = 200; return Ok(response); } } }
using AutoMapper; using Core_CQRS_Mediatr.Commands; using Core_CQRS_Mediatr.Models; using Core_CQRS_Mediatr.ViewModels; using MediatR; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.ModelBinding; namespace Core_CQRS_Mediatr.Controllers { [Route("api/[controller]")] [ApiController] public class WriteProductInfoController : ControllerBase { IMediator Mediator; IMapper Mapper; public WriteProductInfoController(IMapper mapper, IMediator mediator) { Mediator = mediator; Mapper = mapper; } [HttpPost] public async Task<IActionResult> Post(ProductInfoViewModel product) { if (!ModelState.IsValid) throw new Exception(GetModelErrorMessagesHelper(ModelState)); ProductInfo prd = Mapper.Map<ProductInfo>(product); var response = await Mediator.Send(new RegisterProductInfoCommand(prd)); if(!response.IsSuccess) throw new Exception($"The create request faild"); response.StatusCode = 200; return Ok(response); } [HttpPut("{id}")] public async Task<IActionResult> Put(string id, ProductInfoViewModel product) { if (String.IsNullOrEmpty(id)) throw new Exception($"The Id={id} value is invalid"); if (!ModelState.IsValid) throw new Exception(GetModelErrorMessagesHelper(ModelState)); if (!id.Equals(product.ProductId)) throw new Exception($"The update request cannot be processed because the provided data is mismatched"); ProductInfo prd = Mapper.Map<ProductInfo>(product); var response = await Mediator.Send(new UpdateProductInfoCommand(prd)); if (!response.IsSuccess) throw new Exception($"The update request faild"); response.StatusCode = 200; return Ok(response); } [HttpDelete("{id}")] public async Task<IActionResult> Delete(string id) { if(String.IsNullOrEmpty(id)) throw new Exception($"The Id={id} value is invalid"); var response = await Mediator.Send(new DeleteProductInfoCommand(id)); if (!response.IsSuccess) throw new Exception($"The delete request faild for Id= {id}"); response.StatusCode = 200; return Ok(response); } private string GetModelErrorMessagesHelper(ModelStateDictionary errors) { string messages = ""; foreach (var item in errors) { for (int j = 0; j < item.Value.Errors.Count; j++) { messages += $"{item.Key} \t {item.Value.Errors[j].ErrorMessage} \n"; } } return messages; } } }