使用 WebSocket 通过 ASP.NET Core 构建实时应用程序

作者 : 慕源网 本文共4309个字,预计阅读时间需要11分钟 发布时间: 2021-12-6 共620人阅读

介绍

WebSocket 是一种通过单个 TCP 连接提供全双工通信通道的协议,它使浏览器和 Web 服务器之间的更多交互成为可能,从而促进从服务器到服务器的实时数据传输。

在本文中,我将向您展示一个示例,该示例使用 WebSocket 构建有关分发任务的实时应用程序。

假设我们有一个任务中心,有一些任务会发布到任务中心,当中心接收到一些任务时,它应该将任务分发给工人。

我们先来看看结果。

有四个客户端连接到 WebSocket Server,通过 RabbitMQ 管理发布消息后,消息会及时分发到不同的客户端。

注意
Clients就是worker,WebSocket Server就是任务中心,RabbitMQ管理模拟向任务中心发送任务,message就是应该处理的任务。

这是显示其工作原理的架构图。

让我们来看看如何实现它。

设置RabbitMQ

在编写一些代码之前,我们应该首先运行 RabbitMQ 服务器。最快的方法是使用 Docker。

docker run -p 5672:5672 -p 15672:15672 rabbitmq:management  

WebSocket Server

这是最重要的部分!

首先,我们应该在 Startup 类中配置 WebSocket。

public class Startup  
{  
    // other ...  
      
    public void ConfigureServices(IServiceCollection services)  
    {  
        services.AddSingleton<Handlers.IDisHandler, Handlers.DisHandler>();  
        services.AddControllers();  
    }  
  
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)  
    {  
        // other ..  
      
        var webSocketOptions = new WebSocketOptions()  
        {  
            KeepAliveInterval = TimeSpan.FromSeconds(120),  
            ReceiveBufferSize = 4 * 1024  
        };  
  
        app.UseWebSockets(webSocketOptions);  
  
        app.Use(async (context, next) =>  
        {  
            // ws://www.yourdomian.com/push  
            if (context.Request.Path == "/push")  
            {  
                if (context.WebSockets.IsWebSocketRequest)  
                {  
                    WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync();  
  
                    try  
                    {  
                        var handler = app.ApplicationServices.GetRequiredService<Handlers.IDisHandler>();  
                        await handler.PushAsync(context, webSocket);  
                    }  
                    catch (Exception ex)  
                    {  
                        Console.WriteLine(ex.Message);  
                    }                         
                }  
                else  
                {  
                    context.Response.StatusCode = 400;  
                }  
            }  
            else  
            {  
                await next();  
            }  
        });  
    }  
}  

下一步是处理有关如何向客户端推送消息的逻辑。这里创建一个名为DisHandler的类 来完成它。

在构造函数上创建RabbitMQ连接、通道和消费者,以及消费者的Received事件的重要部分。

我们来看一下消费者的Received事件。

consumer.Received += async (ch, ea) =>  
{  
    var content = Encoding.UTF8.GetString(ea.Body);  
    Console.WriteLine($"received content = {content}");  
  
    // {"TaskName":"Demo", "TaskType":1}  
    var msg = Newtonsoft.Json.JsonConvert.DeserializeObject<MqMsg>(content);  
  
    var workIds = Worker.GetByTaskType(msg.TaskType);  
    var onlineWorkerIds = _sockets.Keys.Intersect(workIds).ToList();                  
    if (onlineWorkerIds == null || !onlineWorkerIds.Any())  
    {  
        if (!ea.Redelivered)  
        {  
            Console.WriteLine("No online worker, reject the message and requeue");  
            // should requeue here  
            _channel.BasicReject(ea.DeliveryTag, true);  
        }  
        else  
        {       
            // should not requeue here, but this message will be discarded  
            _channel.BasicReject(ea.DeliveryTag, false);  
        }  
    }  
    else  
    {  
        // free or busy  
        var randomNumberBuffer = new byte[10];  
        new RNGCryptoServiceProvider().GetBytes(randomNumberBuffer);  
        var rd = new Random(BitConverter.ToInt32(randomNumberBuffer, 0));                                          
        var index = rd.Next(0, 9999) % onlineWorkerIds.Count;  
        var workerId = onlineWorkerIds[index];  
  
        if (_sockets.TryGetValue(workerId, out var ws) && ws.State == WebSocketState.Open)  
        {  
            // simulating handle the message an get the result.  
            // put your own logic here  
            var val = msg.TaskName;  
            if (msg.TaskType != 1) val = $"Special-{msg.TaskName}";  
  
            var task = Encoding.UTF8.GetBytes(val);  
  
            Console.WriteLine($"send to {workerId}-{val}");  
  
            // should ack here? or when to ack is better?  
            _channel.BasicAck(ea.DeliveryTag, false);  
  
            // sending message to specify client  
            await ws.SendAsync(  
            new ArraySegment<byte>(task, 0, task.Length),   
            WebSocketMessageType.Text,   
            true,   
            CancellationToken.None);  
        }  
        else  
        {  
            Console.WriteLine("Not found a worker");  
        }  
    }  
}; 

当我们收到消息时,我们应该选择一个可以处理这个任务的worker。

如果没有在线工作人员,则服务器应拒绝该消息并使其重新排队一次。

如果有在线worker,服务器会选择一个可以处理这个任务的worker,这里用一个随机数来模拟这个场景。

我们还应该确保这个worker的连接仍然打开,以便服务器可以向它发送消息。

处理WebSocket请求的入口是PushAsync 方法,它只是维护客户端的连接,当客户端把它的客户端id发送给服务器时,它会记录下来,当客户端与服务器断开连接时,它会删除客户端。

public async Task PushAsync(HttpContext context, WebSocket webSocket)  
{  
    var buffer = new byte[1024 * 4];  
    WebSocketReceiveResult result =   
        await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);  
    string clientId = Encoding.UTF8.GetString(buffer, 0, result.Count);  
  
    // record the client id and it's websocket instance  
    if (_sockets.TryGetValue(clientId, out var wsi))  
    {                  
        if (wsi.State == WebSocketState.Open)  
        {  
            Console.WriteLine($"abort the before clientId named {clientId}");  
            await wsi.CloseAsync(WebSocketCloseStatus.InternalServerError,   
                "A new client with same id was connected!",   
                CancellationToken.None);   
        }  
  
        _sockets.AddOrUpdate(clientId, webSocket, (x, y) => webSocket);  
    }  
    else  
    {  
        Console.WriteLine($"add or update {clientId}");  
        _sockets.AddOrUpdate(clientId, webSocket, (x, y) => webSocket);  
    }  
  
    while (!result.CloseStatus.HasValue)  
    {  
        result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);  
    }              
  
    await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);  
    Console.WriteLine("close=" + clientId);  
  
    _sockets.TryRemove(clientId, out _);  
} 

客户

客户端代码仅使用ASP.NET Core WebSocket文档中的示例

这是您可以在我的 GitHub 页面中找到的源代码。

概括

本文向您展示了一个示例,该示例使用 WebSocket 通过 ASP.NET Core 构建实时应用程序。


慕源网 » 使用 WebSocket 通过 ASP.NET Core 构建实时应用程序

常见问题FAQ

程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!

发表评论

开通VIP 享更多特权,建议使用QQ登录