使用 WebSocket 通过 ASP.NET Core 构建实时应用程序
介绍
WebSocket 是一种通过单个 TCP 连接提供全双工通信通道的协议,它使浏览器和 Web 服务器之间的更多交互成为可能,从而促进从服务器到服务器的实时数据传输。
在本文中,我将向您展示一个示例,该示例使用 WebSocket 构建有关分发任务的实时应用程序。
假设我们有一个任务中心,有一些任务会发布到任务中心,当中心接收到一些任务时,它应该将任务分发给工人。
我们先来看看结果。
有四个客户端连接到 WebSocket Server,通过 RabbitMQ 管理发布消息后,消息会及时分发到不同的客户端。
这是显示其工作原理的架构图。
让我们来看看如何实现它。
设置RabbitMQ
在编写一些代码之前,我们应该首先运行 RabbitMQ 服务器。最快的方法是使用 Docker。
docker run -p 5672:5672 -p 15672:15672 rabbitmq:management
WebSocket Server
这是最重要的部分!
首先,我们应该在 Startup 类中配置 WebSocket。
public class Startup
{
// other ...
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<Handlers.IDisHandler, Handlers.DisHandler>();
services.AddControllers();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
// other ..
var webSocketOptions = new WebSocketOptions()
{
KeepAliveInterval = TimeSpan.FromSeconds(120),
ReceiveBufferSize = 4 * 1024
};
app.UseWebSockets(webSocketOptions);
app.Use(async (context, next) =>
{
// ws://www.yourdomian.com/push
if (context.Request.Path == "/push")
{
if (context.WebSockets.IsWebSocketRequest)
{
WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync();
try
{
var handler = app.ApplicationServices.GetRequiredService<Handlers.IDisHandler>();
await handler.PushAsync(context, webSocket);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
else
{
context.Response.StatusCode = 400;
}
}
else
{
await next();
}
});
}
}
下一步是处理有关如何向客户端推送消息的逻辑。这里创建一个名为DisHandler的类 来完成它。
在构造函数上创建RabbitMQ连接、通道和消费者,以及消费者的Received事件的重要部分。
我们来看一下消费者的Received事件。
consumer.Received += async (ch, ea) =>
{
var content = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"received content = {content}");
// {"TaskName":"Demo", "TaskType":1}
var msg = Newtonsoft.Json.JsonConvert.DeserializeObject<MqMsg>(content);
var workIds = Worker.GetByTaskType(msg.TaskType);
var onlineWorkerIds = _sockets.Keys.Intersect(workIds).ToList();
if (onlineWorkerIds == null || !onlineWorkerIds.Any())
{
if (!ea.Redelivered)
{
Console.WriteLine("No online worker, reject the message and requeue");
// should requeue here
_channel.BasicReject(ea.DeliveryTag, true);
}
else
{
// should not requeue here, but this message will be discarded
_channel.BasicReject(ea.DeliveryTag, false);
}
}
else
{
// free or busy
var randomNumberBuffer = new byte[10];
new RNGCryptoServiceProvider().GetBytes(randomNumberBuffer);
var rd = new Random(BitConverter.ToInt32(randomNumberBuffer, 0));
var index = rd.Next(0, 9999) % onlineWorkerIds.Count;
var workerId = onlineWorkerIds[index];
if (_sockets.TryGetValue(workerId, out var ws) && ws.State == WebSocketState.Open)
{
// simulating handle the message an get the result.
// put your own logic here
var val = msg.TaskName;
if (msg.TaskType != 1) val = $"Special-{msg.TaskName}";
var task = Encoding.UTF8.GetBytes(val);
Console.WriteLine($"send to {workerId}-{val}");
// should ack here? or when to ack is better?
_channel.BasicAck(ea.DeliveryTag, false);
// sending message to specify client
await ws.SendAsync(
new ArraySegment<byte>(task, 0, task.Length),
WebSocketMessageType.Text,
true,
CancellationToken.None);
}
else
{
Console.WriteLine("Not found a worker");
}
}
};
当我们收到消息时,我们应该选择一个可以处理这个任务的worker。
如果没有在线工作人员,则服务器应拒绝该消息并使其重新排队一次。
如果有在线worker,服务器会选择一个可以处理这个任务的worker,这里用一个随机数来模拟这个场景。
我们还应该确保这个worker的连接仍然打开,以便服务器可以向它发送消息。
处理WebSocket请求的入口是PushAsync 方法,它只是维护客户端的连接,当客户端把它的客户端id发送给服务器时,它会记录下来,当客户端与服务器断开连接时,它会删除客户端。
public async Task PushAsync(HttpContext context, WebSocket webSocket)
{
var buffer = new byte[1024 * 4];
WebSocketReceiveResult result =
await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
string clientId = Encoding.UTF8.GetString(buffer, 0, result.Count);
// record the client id and it's websocket instance
if (_sockets.TryGetValue(clientId, out var wsi))
{
if (wsi.State == WebSocketState.Open)
{
Console.WriteLine($"abort the before clientId named {clientId}");
await wsi.CloseAsync(WebSocketCloseStatus.InternalServerError,
"A new client with same id was connected!",
CancellationToken.None);
}
_sockets.AddOrUpdate(clientId, webSocket, (x, y) => webSocket);
}
else
{
Console.WriteLine($"add or update {clientId}");
_sockets.AddOrUpdate(clientId, webSocket, (x, y) => webSocket);
}
while (!result.CloseStatus.HasValue)
{
result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
Console.WriteLine("close=" + clientId);
_sockets.TryRemove(clientId, out _);
}
客户
客户端代码仅使用ASP.NET Core WebSocket文档中的示例。
这是您可以在我的 GitHub 页面中找到的源代码。
概括
本文向您展示了一个示例,该示例使用 WebSocket 通过 ASP.NET Core 构建实时应用程序。
常见问题FAQ
- 程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
- 请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!