ASP .NET Core 创建 WebSockets 中间件

作者 : 慕源网 本文共11795个字,预计阅读时间需要30分钟 发布时间: 2022-01-24 共231人阅读

本文探讨了 .NET Core 3 中的 websockets API,并为 ASP.NET Core 3 构建了一个 websockets 中间件。如果您正在考虑为您的 .NET 应用程序添加实时功能,您应该首先探索 SignalR 。

这是一个教学项目,目的是更好地理解如何在比 SignalR 更低的级别处理 WebSocket。

在传统的 Web 范例中,客户端负责启动与服务器的通信,除非客户端先前已请求,否则服务器无法将数据发回。使用 WebSockets,您可以通过单个 TCP 连接在服务器和客户端之间发送数据,通常 WebSockets 用于为现代应用程序提供实时功能。

ASP .NET Core 创建 WebSockets 中间件

您可以在 GitHub 上找到包含本文源代码的存储库 。

您应该首先熟悉 .NET Core 中的 WebSockets –本文提供了很好的介绍 。

ASP .NET Core Middleware

首先,了解什么是中间件以及新的请求管道如何在 ASP .NET Core 中工作,官方 ASP .NET Core 文档中有一篇很棒的文章 。

中间件是组装到应用程序管道中以处理请求和响应的软件组件。每个组件选择是否将请求传递给管道中的下一个组件,并且可以在管道中调用下一个组件之前和之后执行某些操作。请求委托用于构建请求管道。请求委托处理每个 HTTP 请求。

ASP .NET Core 创建 WebSockets 中间件

新管道由一系列RequestDelegate对象组成,一个接一个地被调用,每个组件都可以在下一个委托之前和之后执行操作,或者可以使管道短路,自己处理请求而不进一步传递上下文。

在编写中间件本身之前,我们需要一些处理连接和处理消息的类。

编写 WebSocket 连接管理器

使用 WebSocket 包时,我们首先注意到的是一切都是低级的:我们处理单个连接、缓冲区和取消令牌。没有内置的存储socket的方法,也没有以任何方式识别它们。所以我们正在构建一个类,它将所有活动的socket保存在一个线程安全的集合中,并为每个socket分配一个唯一的 ID,同时还维护集合(获取、添加和删除套接字)。

public class ConnectionManager
{
    private ConcurrentDictionary<string, WebSocket> _sockets = new ConcurrentDictionary<string, WebSocket>();

    public WebSocket GetSocketById(string id)
    {
        return _sockets.FirstOrDefault(p => p.Key == id).Value;
    }

    public ConcurrentDictionary<string, WebSocket> GetAll()
    {
        return _sockets;
    }

    public string GetId(WebSocket socket)
    {
        return _sockets.FirstOrDefault(p => p.Value == socket).Key;
    }
    public void AddSocket(WebSocket socket)
    {
        _sockets.TryAdd(CreateConnectionId(), socket);
    }

    public async Task RemoveSocket(string id)
    {
        WebSocket socket;
        _sockets.TryRemove(id, out socket);

        await socket.CloseAsync(closeStatus: WebSocketCloseStatus.NormalClosure,
                                statusDescription: "Closed by the ConnectionManager",
                                cancellationToken: CancellationToken.None);
    }

    private string CreateConnectionId()
    {
        return Guid.NewGuid().ToString();
    }
}

注意ConcurrentDictionary集合的使用- 这是因为我们将尝试同时使用来自多个用户请求的集合。

编写 WebSocket 处理程序

现在我们有了跟踪已连接客户端的方法,我们需要一个类来处理连接和断开事件并管理从socket发送和接收消息。让我们看看像这样的类可能是什么样子:

public abstract class WebSocketHandler
{
    protected ConnectionManager WebSocketConnectionManager { get; set; }

    public WebSocketHandler(ConnectionManager webSocketConnectionManager)
    {
        WebSocketConnectionManager = webSocketConnectionManager;
    }

    public virtual async Task OnConnected(WebSocket socket)
    {
        WebSocketConnectionManager.AddSocket(socket);
    }

    public virtual async Task OnDisconnected(WebSocket socket)
    {
        await WebSocketConnectionManager.RemoveSocket(WebSocketConnectionManager.GetId(socket));
    }

    public async Task SendMessageAsync(WebSocket socket, string message)
    {
        if(socket.State != WebSocketState.Open)
            return;

        await socket.SendAsync(buffer: new ArraySegment<byte>(array: Encoding.ASCII.GetBytes(message),
                                                                offset: 0,
                                                                count: message.Length),
                                messageType: WebSocketMessageType.Text,
                                endOfMessage: true,
                                cancellationToken: CancellationToken.None);
    }

    public async Task SendMessageAsync(string socketId, string message)
    {
        await SendMessageAsync(WebSocketConnectionManager.GetSocketById(socketId), message);
    }

    public async Task SendMessageToAllAsync(string message)
    {
        foreach(var pair in WebSocketConnectionManager.GetAll())
        {
            if(pair.Value.State == WebSocketState.Open)
                await SendMessageAsync(pair.Value, message);
        }
    }

    public abstract Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer);
}

首先要注意的是类是abstract. 这意味着您需要继承它并为该ReceiveAsync方法提供实际实现,并且您可以覆盖标记为的方法virtual并为每当新socket连接或现有socket发送消息时执行的OnConnectedOnDisconnected方法提供自定义逻辑Close

还有SendMessageAsync向特定的socketIdSendMessageToAllAsync发送消息,向所有连接的客户端发送消息。

中间件本身

到目前为止,我们构建的类有助于维护已连接socket的记录并处理与这些socket之间的消息发送和接收。现在是时候构建实际的中间件了:

与任何中间件一样,它需要接收管道中的下一个组件,同时在调用下一个组件之前和之后RequestDelegate执行操作,并且它需要一个方法。它不必继承或实现任何东西,只需拥有方法即可。HttpContextasync Task InvokeInvoke

public class WebSocketManagerMiddleware
{
    private readonly RequestDelegate _next;
    private WebSocketHandler _webSocketHandler { get; set; }

    public WebSocketManagerMiddleware(RequestDelegate next,
                                        WebSocketHandler webSocketHandler)
    {
        _next = next;
        _webSocketHandler = webSocketHandler;
    }

    public async Task Invoke(HttpContext context)
    {
        if(!context.WebSockets.IsWebSocketRequest)
            return;

        var socket = await context.WebSockets.AcceptWebSocketAsync();
        await _webSocketHandler.OnConnected(socket);

        await Receive(socket, async(result, buffer) =>
        {
            if(result.MessageType == WebSocketMessageType.Text)
            {
                await _webSocketHandler.ReceiveAsync(socket, result, buffer);
                return;
            }

            else if(result.MessageType == WebSocketMessageType.Close)
            {
                await _webSocketHandler.OnDisconnected(socket);
                return;
            }

        });
    }

    private async Task Receive(WebSocket socket, Action<WebSocketReceiveResult, byte[]> handleMessage)
    {
        var buffer = new byte[1024 * 4];

        while(socket.State == WebSocketState.Open)
        {
            var result = await socket.ReceiveAsync(buffer: new ArraySegment<byte>(buffer),
                                                    cancellationToken: CancellationToken.None);

            handleMessage(result, buffer);
        }
    }
}

中间件被传递一个WebSocketHandler和的实现RequestDelegate。如果请求不是 WebSocket 请求,则将中间件短路并返回。如果它是一个 WebSockets 请求,那么它接受连接并将socketOnConnectedWebSocketHandler.

然后,当socket处于Open状态时,它等待新数据。当它接收到新数据时,它决定是将上下文传递给 的ReceiveAsync方法WebSocketHandler(注意为什么需要传递抽象WebSocketHandler类的实际实现)或OnDisconnected方法(如果消息类型是Close)。

添加中间件的扩展方法

很可能在现代应用程序中,您只想将通知和消息发送到连接到应用程序特定部分的客户端(想想 SignalR 集线器)。

使用此中间件,您可以将应用程序的不同路径映射到 的特定实现WebSocketHandler,因此您将获得完全隔离的环境(以及 的不同实例WebSocketConnectionManager,但稍后会详细介绍)。

所以映射中间件是使用以下扩展方法完成的IApplicationBuilder

public static IApplicationBuilder MapWebSocketManager(this IApplicationBuilder app,
                                                        PathString path,
                                                        WebSocketHandler handler)
{
    return app.Map(path, (_app) => _app.UseMiddleware<WebSocketManagerMiddleware>(handler));
}

它接收一个路径,并使用与作为扩展方法参数提供WebSocketManagerMiddleware的特定实现传递的路径映射该路径。WebSocketHandlerMapWebSocketManager

您还需要添加一些服务才能使用它们,这是在另一个扩展方法中完成的IServiceCollection

public static IServiceCollection AddWebSocketManager(this IServiceCollection services)
{
    services.AddTransient<ConnectionManager>();

    foreach(var type in Assembly.GetEntryAssembly().ExportedTypes)
    {
        if(type.GetTypeInfo().BaseType == typeof(WebSocketHandler))
        {
            services.AddSingleton(type);
        }
    }

    return services;
}

除了添加WebSocketConnectionManager服务之外,它还在执行程序集中搜索继承的类型,WebSocketHandler并使用反射将它们注册为单例(以便每个请求都获得相同的消息处理程序实例)。

现在我们将看看如何在 ASP .NET Core 应用程序中使用它,就像 SignalR 的情况一样,有两种使用场景:

  • 客户端实际发送 WebSockets 请求的地方
  • 一种客户端做一些 MVC 的东西和其他客户端得到通知的地方

场景 1 – 客户端通过 WebSockets 发送消息

使用我们刚刚创建的中间件,我们将构建一个聊天应用程序(我知道,每个人在玩 WebSockets 时都会展示聊天应用程序)。为了使用新创建的中间件,我们需要MapWebSocketManager在新项目的Startup类中调用扩展方法。

public class Startup
{
    // This method gets called by the runtime. Use this method to add services to the container.
    // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddWebSocketManager();
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        var serviceScopeFactory = app.ApplicationServices.GetRequiredService<IServiceScopeFactory>();
        var serviceProvider = serviceScopeFactory.CreateScope().ServiceProvider;

        app.UseWebSockets();
        app.MapWebSocketManager("/ws", serviceProvider.GetService<ChatMessageHandler>());
        app.UseStaticFiles();
    }
}

首先我们需要UseWebSockets(),然后映射你想要由中间件处理的路径。

我们说过我们需要向MapWebSocketManager方法传递WebSocketHandler. 请记住,在扩展方法中,我们将所有继承的类型都注册WebSocketHandler为框架中的单例。所以此时我们可以使用IServiceProvider给我们这些类的实例。

让我们来看看实际实现的WebSocketHandlerChatMessageHandler

public class ChatMessageHandler : WebSocketHandler
{
    public ChatMessageHandler(ConnectionManager webSocketConnectionManager) : base(webSocketConnectionManager)
    {
    }

    public override async Task OnConnected(WebSocket socket)
    {
        await base.OnConnected(socket);

        var socketId = WebSocketConnectionManager.GetId(socket);
        await SendMessageToAllAsync($"{socketId} is now connected");
    }

    public override async Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer)
    {
        var socketId = WebSocketConnectionManager.GetId(socket);
        var message = $"{socketId} said: {Encoding.UTF8.GetString(buffer, 0, result.Count)}";

        await SendMessageToAllAsync(message);
    }
}

它重写了OnConnected向所有人发送消息的方法,广播新客户端已连接,并实现了该ReceiveAsync方法(通过仅向所有客户端广播该方法)。

此时,您可以创建一个新的消息处理程序来将一个页面上连接的客户端与另一个页面分开,因为会有单独的连接集合。

现在让我们添加一个通过 WebSockets 连接的 Web 客户端:

<input type=text id="textInput" placeholder="Enter your text"/>
<button id="sendButton">Send</button>

<ul id="messages"></ul>

    <script language="javascript" type="text/javascript">
    var uri = "ws://" + window.location.host + "/ws";
    function connect() {
        socket = new WebSocket(uri);
        socket.onopen = function(event) {
            console.log("opened connection to " + uri);
        };
        socket.onclose = function(event) {
            console.log("closed connection from " + uri);
        };
        socket.onmessage = function(event) {
            appendItem(list, event.data);
            console.log(event.data);
        };
        socket.onerror = function(event) {
            console.log("error: " + event.data);
        };
    }
    connect();
    var list = document.getElementById("messages");
    var button = document.getElementById("sendButton");
    button.addEventListener("click", function() {

        var input = document.getElementById("textInput");
        sendMessage(input.value);

        input.value = "";
    });
    function sendMessage(message) {
        console.log("Sending: " + message);
        socket.send(message);
    }
    function appendItem(list, message) {
        var item = document.createElement("li");
        item.appendChild(document.createTextNode(message));
        list.appendChild(item);
    }
</script>

这是一个非常简单的网页,有一个输入表单和一个按钮。它连接到“/ws”路径上的服务器,因此所有请求都将被映射到ChatMessageHandler(回想一下如何ChatMessageHandler映射到接收/ws路径上的请求)。

ASP .NET Core 创建 WebSockets 中间件

我们可以看到消息实时传递给所有连接的客户端。

场景 2 – 从控制器接收实时通知

这次我们将构建一个 MVC 应用程序并让客户端通过 MVC 管道发送数据。然后,通知将从控制器内部触发。

我们将构建一个标准的 MVC 应用程序,我们将添加WebSocketManagerMiddleware我们创建的。这Startup是这个应用程序的类:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    var serviceScopeFactory = app.ApplicationServices.GetRequiredService<IServiceScopeFactory>();
    var serviceProvider = serviceScopeFactory.CreateScope().ServiceProvider;

    app.UseWebSockets();
    app.MapWebSocketManager("/ws", serviceProvider.GetService<NotificationsMessageHandler>());
    app.UseStaticFiles();
    app.UseEndpoints(endpoints =>
    {
        endpoints.MapControllers();
    });
}

由于这次处理程序类只管理连接的客户端,不会自己处理请求,我们可以不ReceiveAsync实现该方法:

public class NotificationsMessageHandler : WebSocketHandler
{
    public NotificationsMessageHandler(WebSocketConnectionManager webSocketConnectionManager) : base(webSocketConnectionManager)
    {
    }

    public override Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer)
    {
        throw new NotImplementedException();
    }
}

和控制器:

public class MessagesController : Controller
{
    private NotificationsMessageHandler _notificationsMessageHandler { get; set; }

    public MessagesController(NotificationsMessageHandler notificationsMessageHandler)
    {
        _notificationsMessageHandler = notificationsMessageHandler;
    }

    [HttpGet]
    public async Task SendMessage([FromQueryAttribute]string message)
    {
        await _notificationsMessageHandler.SendMessageToAllAsync(message);
    }
}

在控制器中,我们将有一个NotificationsMessageHandler实例来处理向所有连接的客户端发送通知。

HTML 页面和之前的很相似,但是这次消息会使用我们创建的 API 发送,所以是通过 HTTP 而不是 WebSockets,所以只有sendMessage方法不同:

function sendMessage(message) {
    console.log("Sending: " + message);

    $.ajax({
        url: "http://" + window.location.host + "/api/messages/sendmessage?message=" + message,
        method: 'GET'
    });
}

让我们看看它是如何工作的:

ASP .NET Core 创建 WebSockets 中间件

用户没有注意到任何区别,但客户端通过 HTTP 请求发送数据并通过 WebSockets 接收通知。

额外 – 控制台客户端

您还可以使用 C# 控制台客户端连接到我们创建的应用程序。

此示例使用System.Net.WebSockets包含ClientWebSocket该类的命名空间:

public class Program
{
    public static void Main(string[] args)
    {
        RunWebSockets().GetAwaiter().GetResult();
    }

    private static async Task RunWebSockets()
    {
        var client = new ClientWebSocket();
        await client.ConnectAsync(new Uri("ws://localhost:5000/ws"), CancellationToken.None);

        Console.WriteLine("Connected!");

        var sending = Task.Run(async() =>
        {
            string line;
            while((line = Console.ReadLine()) != null && line != String.Empty)
            {
                var bytes = Encoding.UTF8.GetBytes(line);
                await client.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None);
            }

            await client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
        });

        var receiving = Receiving(client);

        await Task.WhenAll(sending, receiving);
    }

    private static async Task Receiving(ClientWebSocket client)
    {
        var buffer = new byte[1024 * 4];

        while(true)
        {
            var result = await client.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

            if(result.MessageType == WebSocketMessageType.Text)
                Console.WriteLine(Encoding.UTF8.GetString(buffer, 0, result.Count));

            else if(result.MessageType == WebSocketMessageType.Close)
            {
                await client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
                break;
            }
        }
    }
}

此时没有 WebSockets 的高级客户端,所以这就是您在 .NET 客户端应用程序中管理连接的方式。

请注意,聊天应用程序和此控制台客户端示例都必须正在运行。

ASP .NET Core 创建 WebSockets 中间件

结论

在本文中,我们从用于管理 WebSocket 连接的 API 开始,并创建了一个可用作 ASP .NET Core 3 中间件的连接管理器。虽然结果本身应用于任何生产目的,但它可以作为一个很好的实验来了解如何在 .NET Core 3 中管理 WebSocket 连接。

 


慕源网 » ASP .NET Core 创建 WebSockets 中间件

常见问题FAQ

程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!

发表评论

开通VIP 享更多特权,建议使用QQ登录