.Net Core redis Sorted Sets实现延迟队列
在上一篇文章中,我向大家展示了如何在 ASP.NET Core 中通过 Redis 的 keyspace notifications实现延迟队列,我将介绍另一种基于 Redis 的解决方案。
Redis 的数据结构 Sorted Sets 也可以帮助我们解决这个问题。
我们可以将时间戳作为分数score,将数据作为值,把当前时间作为分数,因为Zset是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。
Sorted Sets提供了一个命令,该命令可以返回排序集合中的所有元素,每一个成员都有一个所谓的分数:score,其分数介于两个特殊分数之间。 设置0为最小分数,当前时间戳为最大分数,我们可以得到所有时间戳小于当前时间戳的值,通过Sorted Set的命令ZREVRANGEBYSCORE返回指定分数区间内的所有数据进行处理
首先添加一些值。
ZADD task:delay 1583546835 "180"
ZADD task:delay 1583546864 "181"
ZADD task:delay 1583546924 "182"
假设当前时间戳为 1583546860,那么我们可以通过以下命令获取所有值。
我们将从上面的示例中得到值 180,然后我们就可以做我们想做的事情了。
现在,让我们看看如何在 ASP.NET Core 中执行此操作。
创建项目
创建一个新的 ASP.NET Core Web API 项目并安装 CSRedisCore。
<ItemGroup>
<PackageReference Include="CSRedisCore" Version="3.4.1" />
</ItemGroup>
添加一个名为 ITaskServices 的接口和一个名为 TaskServices 的类。
public interface ITaskServices
{
Task DoTaskAsync();
Task SubscribeToDo();
}
public class TaskServices : ITaskServices
{
public async Task DoTaskAsync()
{
// do something here
// ...
// this operation should be done after some min or sec
var cacheKey = "task:delay";
int sec = new Random().Next(1, 5);
var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();
var taskId = new Random().Next(1, 10000);
await RedisHelper.ZAddAsync(cacheKey, (time, taskId));
Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");
}
public async Task SubscribeToDo()
{
var cacheKey = "task:delay";
while (true)
{
var vals = RedisHelper.ZRangeByScore(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);
if (vals != null && vals.Length > 0)
{
var val = vals[0];
// add a lock here may be more better
var rmCount = RedisHelper.ZRem(cacheKey, vals);
if (rmCount > 0)
{
Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
}
}
else
{
await Task.Delay(500);
}
}
}
}
这里我们将使用DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds()来生成时间戳,sec 参数意味着我们应该在几秒钟后执行任务。
对于延迟执行,它会从 Redis 中轮询值以消耗任务,如果无法获取某些值,则使其休眠 500 毫秒。
当我们从 Redis 中获取一个值时,在我们执行延迟任务之前,我们应该首先将它从 Redis 中移除。这里是这个操作的入口。
[ApiController]
[Route("api/tasks")]
public class TaskController : ControllerBase
{
private readonly ITaskServices _svc;
public TaskController(ITaskServices svc)
{
_svc = svc;
}
[HttpGet]
public async Task<string> Get()
{
await _svc.DoTaskAsync();
return "done";
}
}
我们将订阅一个BackgroundService
public class SubscribeTaskBgTask : BackgroundService
{
private readonly ILogger _logger;
private readonly ITaskServices _taskServices;
public SubscribeTaskBgTask(ILoggerFactory loggerFactory, ITaskServices taskServices)
{
this._logger = loggerFactory.CreateLogger<RefreshCachingBgTask>();
this._taskServices = taskServices;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
await _taskServices.SubscribeToDo();
}
}
最后,我们应该在startup 类中注册上述服务。
public class Startup
{
// ...
public void ConfigureServices(IServiceCollection services)
{
var csredis = new CSRedis.CSRedisClient("127.0.0.1:6379");
RedisHelper.Initialization(csredis);
services.AddSingleton<ITaskServices, TaskServices>();
services.AddHostedService<SubscribeTaskBgTask>();
services.AddControllers();
}
}
这是运行此应用程序后的结果。
常见问题FAQ
- 程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
- 请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!