RabbitMq RPC 异步通信

作者 : 慕源网 本文共8762个字,预计阅读时间需要22分钟 发布时间: 2021-12-6 共307人阅读

如果您已经成为开发人员至少几年,那么您很有可能听说过消息代理。消息代理是使服务/系统能够相互通信并可靠地交换信息的软件应用程序。RabbitMq可能是全球使用最广泛的消息代理系统之一。

在本文中,我们将使用 RabbitMq 来实现一个RPC远程过程调用)调用。系统的架构将围绕一个请求队列和一个响应队列(每个客户端都有自己的客户端队列)。我们将使用CorrelationId来区分客户端发出的各个调用。这个想法如下图所示。

RabbitMq RPC 异步通信

为了进一步说明开发客户端和服务器的想法,其每个功能可以总结如下。

  • 客户端
    显示从服务器接收到的用户名列表(只是一些字符串),应该能够向服务器发送一个数字,这将指示要返回的用户名数量。
  • 服务器
    侦听来自不同客户端的可能请求。根据客户端在请求消息中指示的计数生成并返回表示用户名的字符串集合。

服务器

我们将从创建服务器开始。服务器将在请求队列中接收一条消息。该消息还将包含有助于服务器了解要使用的响应队列的元信息。每个请求将包含

  • 实际消息
  • 响应队列名称
  • 相关 ID(唯一标识每个请求)

让我们开始定义和初始化我们的队列和侦听器。

// Rpc Client on Server
internal class RpcClient: IDisposable {
    private
    const string QueueName = "UserRpcQueue";
    private IConnection _connection;
    private IModel _channel;
    private bool _isDisposed;
    public void InitializeAndRun() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: QueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        _channel.BasicQos(0, 1, false);
        var consumer = new EventingBasicConsumer(_channel);
        _channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        consumer.Received += Consumer_Received; // we will come to this in a bit
    }
}

在上面的代码中,我们初始化了一个名为“ UserRpcQueue ”的队列。请注意,我们已将Auto Acknowledgment设置为 false。这很重要,因为我们只需要在执行方法并将响应发回后才需要发送确认。

现在让我们扩展Consumer_Recieved方法并决定我们需要对每个传入请求做什么以及我们如何响应它。请记住,我们需要发送对传入请求的确认。

private void Consumer_Received(object ? sender, BasicDeliverEventArgs ea) {
    LogMessage?.Invoke("Recieved Request from client..");
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    var replyProps = _channel.CreateBasicProperties();
    replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
    replyProps.ReplyTo = ea.BasicProperties.ReplyTo;
    var value = int.Parse(message);
    LogMessage?.Invoke($ "Preparing to generate {value} User Names");
    var userNames = GenerateUserNames(value); // the Worker method, which needs to be executed by the Server.
    foreach(var user in userNames) {
        LogMessage?.Invoke(user);
    }
    var response = JsonSerializer.Serialize < IEnumerable < string >> (userNames);
    var responseBody = Encoding.UTF8.GetBytes(response);
    _channel.BasicPublish(exchange: "", routingKey: replyProps.ReplyTo, basicProperties: replyProps, body: responseBody);
    _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}

服务器所做的实际上非常简单(是的,真正的技巧在于客户端)。它从特定队列中的一个客户端接收请求,在这种情况下,名为“ UserRpcQueue ”。然后它解包消息以检索用于响应的队列和客户端将使用的关联 ID,以唯一标识每个请求(和后续响应)。

一旦获得此信息,它现在将处理实际消息并执行所需的操作。在这种情况下,它是生成几个字符串(用户名),其中要生成的字符串数量是客户端发送的消息。

然后通过客户端提供的回复队列名称将操作的响应发送回客户端。它还记得将它在请求中收到的CorrelationId作为响应的一部分打包。服务器将使用关联 id 来识别单个请求(我们稍后会介绍)。此时,服务器也准备好确认客户端接收和处理请求。

服务器上RpcClient的完整源代码如下。

// Rpc Client on Server
internal class RpcClient: IDisposable {
    private
    const string QueueName = "UserRpcQueue";
    private IConnection _connection;
    private IModel _channel;
    private bool _isDisposed;
    public void InitializeAndRun() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: QueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        _channel.BasicQos(0, 1, false);
        var consumer = new EventingBasicConsumer(_channel);
        _channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        consumer.Received += Consumer_Received;
    }
    private void Consumer_Received(object ? sender, BasicDeliverEventArgs ea) {
        LogMessage?.Invoke("Recieved Request from client..");
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        var replyProps = _channel.CreateBasicProperties();
        replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
        replyProps.ReplyTo = ea.BasicProperties.ReplyTo;
        var value = int.Parse(message);
        LogMessage?.Invoke($ "Preparing to generate {value} User Names");
        var userNames = GenerateUserNames(value);
        foreach(var user in userNames) {
            LogMessage?.Invoke(user);
        }
        var response = JsonSerializer.Serialize < IEnumerable < string >> (userNames);
        var responseBody = Encoding.UTF8.GetBytes(response);
        _channel.BasicPublish(exchange: "", routingKey: replyProps.ReplyTo, basicProperties: replyProps, body: responseBody);
        _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }
    public IEnumerable < string > GenerateUserNames(int count) {
        return Enumerable.Range(1, count).Select(x => $ "UserName {x}");
    }
    public delegate void LogMessageHandler(string message);
    public event LogMessageHandler LogMessage;
    private void Dispose(bool disposing) {
        if (_isDisposed) return;
        if (disposing) {
            _channel.Close();
        }
        _isDisposed = true;
    }
    public void Dispose() {
        Dispose(true);
        GC.SuppressFinalize(this);
    }~RpcClient() {
        Dispose(false);
    }
}

客户端

此时,我们已准备好设置我们的客户端。客户端应该能够发送一个请求消息,该消息是一个数字,指示服务器需要发回的用户名数量。正如在我们的服务器代码中所注意到的,它还会发送一些额外的信息。我们现在将看到为什么客户端发送这些信息以及它如何需要它们来处理来自服务器的响应。

第一步当然是初始化我们需要的队列。请求将在名为“ UserRpcQueue ”的公共队列上发送。但是,响应队列专用于每个客户端。每个客户端维护自己唯一命名的响应队列。

// Rpc Client on Client
internal class RpcClient: IDisposable {
    private IConnection _connection;
    private IModel _channel;
    private string _responseQueueName;
    private string QueueName = "UserRpcQueue";
    private bool _isDisposed;
    public void Initialiaze() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _responseQueueName = _channel.QueueDeclare().QueueName;
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += Consumer_Received;
        _channel.BasicConsume(queue: _responseQueueName, consumer: consumer, autoAck: true);
    }
}

正如您在上面的代码中看到的,我们正在设置连接以侦听名为“UserRpcQueue”的队列。我们还创建了一个唯一的队列名称_responseQueueName,用于从服务器接收针对此特定客户端实例的响应。在我们为Recieved处理程序(在本例中为Consumer_Recieved)添加代码之前,让我们添加发送消息的方法。

private ConcurrentDictionary < string, TaskCompletionSource < string >> _activeTaskQueue = new ConcurrentDictionary < string, TaskCompletionSource < string >> ();
public Task < string > SendAsync(string message) {
    var basicProperties = _channel.CreateBasicProperties();
    basicProperties.ReplyTo = _responseQueueName;
    var messageId = Guid.NewGuid().ToString();
    basicProperties.CorrelationId = messageId;
    var taskCompletionSource = new TaskCompletionSource < string > ();
    var messageToSend = Encoding.UTF8.GetBytes(message);
    _channel.BasicPublish(exchange: "", routingKey: "UserRpcQueue", basicProperties: basicProperties, body: messageToSend);
    _activeTaskQueue.TryAdd(messageId, taskCompletionSource);
    return taskCompletionSource.Task;
}

SendAsync  方法将被用来发送消息到服务器。然而,它的作用远不止于此。第一的。它将一些关于客户端的元信息添加到消息包中(虽然不是实际的消息)。这包括唯一的响应队列名称和一个额外的 Guid,称为CorrelationId

一旦我们将请求发布到请求队列,我们​​将向字典添加一个条目。这个字典activeTaskQueue跟踪客户端发送给客户端的所有活动请求。我们打算确保客户端能够向服务器发送多个请求,而不必等待来自客户端的每个响应。但是我们希望将每个请求与其独特的响应相关联。这是相关性 id 派上用场的地方。我们创建并作为 CorrelationId 传递的唯一 Guid 存储在字典中。一旦我们收到响应,我们将使用关联的TaskCompletionSource实例来设置结果并完成任务。

我们该怎么做呢?让我们添加之前跳过的Consumer_Recieved处理程序。

private void Consumer_Received(object ? sender, BasicDeliverEventArgs args) {
    var body = args.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    if (_activeTaskQueue.TryRemove(args.BasicProperties.CorrelationId, out
            var taskCompletionSource)) {
        taskCompletionSource.SetResult(message);
    }
}

收到响应后,我们将从活动任务字典中删除关联的条目,并使用TaskCompletionSource的实例设置结果。

让我们看看 RpcClient for Client 的完整代码。

internal class RpcClient: IDisposable {
    private IConnection _connection;
    private IModel _channel;
    private string _responseQueueName;
    private string QueueName = "UserRpcQueue";
    private bool _isDisposed;
    private ConcurrentDictionary < string, TaskCompletionSource < string >> _activeTaskQueue = new ConcurrentDictionary < string, TaskCompletionSource < string >> ();
    public void Initialiaze() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _responseQueueName = _channel.QueueDeclare().QueueName;
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += Consumer_Received;
        _channel.BasicConsume(queue: _responseQueueName, consumer: consumer, autoAck: true);
    }
    private void Consumer_Received(object ? sender, BasicDeliverEventArgs args) {
        var body = args.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (_activeTaskQueue.TryRemove(args.BasicProperties.CorrelationId, out
                var taskCompletionSource)) {
            taskCompletionSource.SetResult(message);
        }
    }
    public Task < string > SendAsync(string message) {
        var basicProperties = _channel.CreateBasicProperties();
        basicProperties.ReplyTo = _responseQueueName;
        var messageId = Guid.NewGuid().ToString();
        basicProperties.CorrelationId = messageId;
        var taskCompletionSource = new TaskCompletionSource < string > ();
        var messageToSend = Encoding.UTF8.GetBytes(message);
        _channel.BasicPublish(exchange: "", routingKey: "UserRpcQueue", basicProperties: basicProperties, body: messageToSend);
        _activeTaskQueue.TryAdd(messageId, taskCompletionSource);
        return taskCompletionSource.Task;
    }
    private void Dispose(bool disposing) {
        if (_isDisposed) return;
        if (disposing) {
            _channel.Close();
        }
        _isDisposed = true;
    }
    public void Dispose() {
        Dispose(true);
        GC.SuppressFinalize(this);
    }~RpcClient() {
        Dispose(false);
    }
}

您现在可以使用 RpcClient 如下。

public async Task ExecuteFetchCommand()
{
    var response = await _rpcClient.SendAsync(Count.ToString());
    LogMessages.Add($"Generating {Count} UserNames");
    await foreach(var userName in DeserializeStreaming<string>(response))
    {
        LogMessages.Add(userName);
    }
}

所以这很容易实现,对吗?这就是 RabbitMq 的全部优势——它非常简单易用,你仍然可以用它做很多事情。请注意,尽管我们使用了客户端-服务器架构来说明我们的示例,但在现实生活中,它并不需要如此。它可以是任何两个有兴趣以 RPC 方式相互查询的服务。

如果您有兴趣查看本文中讨论的完整源代码,其中包含相同内容。请注意,示例应用程序是作为 WPF 应用程序构建的。

 


慕源网 » RabbitMq RPC 异步通信

常见问题FAQ

程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!

发表评论

开通VIP 享更多特权,建议使用QQ登录