在 ASP.NET Core 中发布 RabbitMQ 消息

作者 : 慕源网 本文共4279个字,预计阅读时间需要11分钟 发布时间: 2022-03-16 共687人阅读

介绍

前几天写了一篇文章,介绍如何在 ASP.NET Core 中通过后台服务消费 RabbitMQ 消息。

在本文中,您将学习如何发布 RabbitMQ 消息。

启动 RabbitMQ 服务

强烈推荐

海量程序代码,编程资源,无论你是小白还是大神研究借鉴别人优秀的源码产品学习成熟的专业技术强势助力帮你提高技巧与技能。在此处获取,给你一个全面升级的机会。只有你更值钱,才能更赚钱

海量源码程序,学习别人的产品设计思维与技术实践

RabbitMQ 设置

在appsettings.json 中添加 RabbitMQ 的配置

{  
  "Logging": {  
    "LogLevel": {  
      "Default": "Warning"  
    }  
  },  
  "AllowedHosts": "*",  
  "rabbit": {  
    "UserName": "guest",  
    "Password": "guest",  
    "HostName": "localhost",  
    "VHost": "/",  
    "Port": 5672  
  }  
} 
在appsettings.json中创建一个映射rabbit 部分的类
public class RabbitOptions  
{  
    public string UserName { get; set; }  
  
    public string Password { get; set; }  
  
    public string HostName { get; set; }  
  
    public int Port { get; set; } = 5672;  
  
    public string VHost { get; set; } = "/";  
}  

重用 RabbitMQ 连接的通道

为什么要复用渠道?

根据官方的.NET/C# Client API Guide文档,我们可以考虑重用通道,因为这些通道是长期存在的,但是由于许多可恢复的协议错误会导致通道关闭,因此每次操作关闭和打开新通道通常是不必要的。

在这里,我们将使用对象池来完成这项工作!微软提供了一个名为Microsoft.Extensions.ObjectPool的包 可以帮助我们简化一些工作。

在使用对象池之前,首先要声明通道的策略。在这里,我们创建了一个名为RabbitModelPooledObjectPolicy的类 ,它实现 了 IPooledObjectPolicy<IModel>

using Microsoft.Extensions.ObjectPool;  
using Microsoft.Extensions.Options;  
using RabbitMQ.Client;  
  
public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>  
{  
    private readonly RabbitOptions _options;  
  
    private readonly IConnection _connection;  
  
    public RabbitModelPooledObjectPolicy(IOptions<RabbitOptions> optionsAccs)  
    {  
        _options = optionsAccs.Value;  
        _connection = GetConnection();  
    }  
  
    private IConnection GetConnection()  
    {  
        var factory = new ConnectionFactory()  
        {  
            HostName = _options.HostName,  
            UserName = _options.UserName,  
            Password = _options.Password,  
            Port = _options.Port,  
            VirtualHost = _options.VHost,  
        };  
  
        return factory.CreateConnection();  
    }  
  
    public IModel Create()  
    {  
        return _connection.CreateModel();  
    }  
  
    public bool Return(IModel obj)  
    {  
        if (obj.IsOpen)  
        {  
            return true;  
        }  
        else  
        {  
            obj?.Dispose();  
            return false;  
        }  
    }  
}  

其中有两个重要的方法,一个是Create,一个是Return。

Create 方法告诉池如何创建通道对象。

Return 方法告诉池,如果通道对象仍然处于可以使用的状态,我们应该将其返回到池中;否则,我们下次不应该使用它。

RabbitMQ 管理器

我们创建一个管理接口来处理 Publish 方法。

public interface IRabbitManager  
{  
    void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey)   
        where T : class;  
}  

下面的代码演示了它的一个实现类。

public class RabbitManager : IRabbitManager  
{  
    private readonly DefaultObjectPool<IModel> _objectPool;  
  
    public RabbitManager(IPooledObjectPolicy<IModel> objectPolicy)  
    {  
        _objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);  
    }  
  
    public void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey)   
        where T : class  
    {  
        if (message == null)  
            return;  
  
        var channel = _objectPool.Get();  
  
        try  
        {  
            channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);  
  
            var sendBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));  
  
            var properties = channel.CreateBasicProperties();  
            properties.Persistent = true;  
  
            channel.BasicPublish(exchangeName, routeKey, properties, sendBytes);  
        }  
        catch (Exception ex)  
        {  
            throw ex;  
        }  
        finally  
        {  
            _objectPool.Return(channel);                  
        }  
    }  
} 

我们在构造函数中创建一个对象池。在向 RabbitMQ 发布消息之前,我们应该从对象池中获取一个通道,然后构造有效负载。

发布后,无论发布成功与否,我们都应该将此通道对象返回到对象池中。

RabbitMQ 扩展

创建一个扩展方法来简化注册。

public static class RabbitServiceCollectionExtensions  
{  
    public static IServiceCollection AddRabbit(this IServiceCollection services, IConfiguration configuration)  
    {  
        var rabbitConfig = configuration.GetSection("rabbit");  
        services.Configure<RabbitOptions>(rabbitConfig);  
  
        services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();  
        services.AddSingleton<IPooledObjectPolicy<IModel>, RabbitModelPooledObjectPolicy>();  
  
        services.AddSingleton<IRabbitManager, RabbitManager>();  
  
        return services;  
    }  
}  

打开Startup 类

public void ConfigureServices(IServiceCollection services)  
{  
    services.AddRabbit(Configuration);  
  
    services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);  
}

RabbitMQ 管理器的使用

我们在ValuesController中添加一些代码 。

[Route("api/[controller]")]  
[ApiController]  
public class ValuesController : ControllerBase  
{  
    private IRabbitManager _manager;  
  
    public ValuesController(IRabbitManager manager)  
    {  
        _manager = manager;  
    }  
  
    // GET api/values  
    [HttpGet]  
    public ActionResult<IEnumerable<string>> Get()  
    {  
        // other opreation  
  
        // if above operation succeed, publish a message to RabbitMQ  
  
        var num = new System.Random().Next(9000);  
  
        // publish message  
        _manager.Publish(new  
        {  
            field1 = $"Hello-{num}",  
            field2 = $"rabbit-{num}"                  
        }, "demo.exchange.topic.dotnetcore", "topic", "*.queue.durable.dotnetcore.#");  
  
        return new string[] { "value1", "value2" };  
    }  
} 

在这里,我们将创建一个名为demo.exchange.topic.dotnetcore的主题类型交换,它还会将消息发送到绑定名为*.queue.durable.dotnetcore.#的路由键的队列。

注意
队列中的消息只会被一个消费者消费。

结果

为了演示,我们创建一个队列并将其绑定到路由键,而不是创建消费者。

发布消息后,我们可以查看消息是否准备好。

我们可以使用 GetMessage 按钮来检查消息。

概括

本文向您展示了如何在 ASP.NET Core 中发布 RabbitMQ 消息。我希望这能帮到您!


慕源网 » 在 ASP.NET Core 中发布 RabbitMQ 消息

常见问题FAQ

程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!

发表评论

开通VIP 享更多特权,建议使用QQ登录