.Net Core redis Sorted Sets实现延迟队列

作者 : 慕源网 本文共2954个字,预计阅读时间需要8分钟 发布时间: 2022-04-27 共650人阅读

在上一篇文章中,我向大家展示了如何在 ASP.NET Core 中通过 Redis 的 keyspace notifications实现延迟队列,我将介绍另一种基于 Redis 的解决方案。

Redis 的数据结构 Sorted Sets 也可以帮助我们解决这个问题。

我们可以将时间戳作为分数score,将数据作为值,把当前时间作为分数,因为Zset是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。

Sorted Sets提供了一个命令,该命令可以返回排序集合中的所有元素,每一个成员都有一个所谓的分数:score,其分数介于两个特殊分数之间。 设置0为最小分数,当前时间戳为最大分数,我们可以得到所有时间戳小于当前时间戳的值,通过Sorted Set的命令ZREVRANGEBYSCORE返回指定分数区间内的所有数据进行处理

首先添加一些值。

ZADD task:delay 1583546835 "180"  
ZADD task:delay 1583546864 "181"  
ZADD task:delay 1583546924 "182" 
假设当前时间戳为 1583546860,那么我们可以通过以下命令获取所有值。

我们将从上面的示例中得到值 180,然后我们就可以做我们想做的事情了。

现在,让我们看看如何在 ASP.NET Core 中执行此操作。

创建项目

创建一个新的 ASP.NET Core Web API 项目并安装 CSRedisCore。
<ItemGroup>  
    <PackageReference Include="CSRedisCore" Version="3.4.1" />  
</ItemGroup>  
添加一个名为 ITaskServices 的接口和一个名为 TaskServices 的类。
public interface ITaskServices  
{  
    Task DoTaskAsync();  
  
    Task SubscribeToDo();  
}  
  
public class TaskServices : ITaskServices  
{         
    public async Task DoTaskAsync()  
    {  
        // do something here  
        // ...  
  
        // this operation should be done after some min or sec  
  
        var cacheKey = "task:delay";  
        int sec = new Random().Next(1, 5);  
        var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();  
        var taskId = new Random().Next(1, 10000);  
        await RedisHelper.ZAddAsync(cacheKey, (time, taskId));  
        Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");  
    }  
  
    public async Task SubscribeToDo()  
    {  
        var cacheKey = "task:delay";  
        while (true)  
        {  
            var vals = RedisHelper.ZRangeByScore(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);  
  
            if (vals != null && vals.Length > 0)  
            {  
                var val = vals[0];  
  
                // add a lock here may be more better  
                var rmCount = RedisHelper.ZRem(cacheKey, vals);  
  
                if (rmCount > 0)  
                {  
                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");  
                }  
            }  
            else  
            {  
                await Task.Delay(500);  
            }  
        }  
    }  
}

这里我们将使用DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds()来生成时间戳,sec 参数意味着我们应该在几秒钟后执行任务。

对于延迟执行,它会从 Redis 中轮询值以消耗任务,如果无法获取某些值,则使其休眠 500 毫秒。

当我们从 Redis 中获取一个值时,在我们执行延迟任务之前,我们应该首先将它从 Redis 中移除。这里是这个操作的入口。

[ApiController]  
[Route("api/tasks")]  
public class TaskController : ControllerBase  
{  
    private readonly ITaskServices _svc;  
  
    public TaskController(ITaskServices svc)  
    {  
        _svc = svc;  
    }  
  
    [HttpGet]  
    public async Task<string> Get()  
    {  
        await _svc.DoTaskAsync();  
        return "done";  
    }  
}  

我们将订阅一个BackgroundService

public class SubscribeTaskBgTask : BackgroundService  
{  
    private readonly ILogger _logger;  
    private readonly ITaskServices _taskServices;  
  
    public SubscribeTaskBgTask(ILoggerFactory loggerFactory, ITaskServices taskServices)  
    {  
        this._logger = loggerFactory.CreateLogger<RefreshCachingBgTask>();  
        this._taskServices = taskServices;  
    }  
  
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)  
    {  
        stoppingToken.ThrowIfCancellationRequested();  
        await _taskServices.SubscribeToDo();  
    }  
}  
最后,我们应该在startup 类中注册上述服务。
public class Startup  
{  
    // ...  
      
    public void ConfigureServices(IServiceCollection services)  
    {  
        var csredis = new CSRedis.CSRedisClient("127.0.0.1:6379");  
        RedisHelper.Initialization(csredis);  
  
        services.AddSingleton<ITaskServices, TaskServices>();  
        services.AddHostedService<SubscribeTaskBgTask>();  
  
        services.AddControllers();  
    }  
} 

这是运行此应用程序后的结果。

这是您可以在我的 GitHub 页面中找到的源代码。

总结

本文向您展示了如何使用 Redis 有序队列在 ASP.NET Core 中延迟执行的简单解决方案。

我希望这能帮到您!


慕源网 » .Net Core redis Sorted Sets实现延迟队列

常见问题FAQ

程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!

发表评论

开通VIP 享更多特权,建议使用QQ登录