saga分布式事务

作者 : 慕源网 本文共6396个字,预计阅读时间需要16分钟 发布时间: 2022-04-26 共266人阅读

saga分布式事务,在微服务中,每个服务都有自己的本地数据库,我们在微服务中所做的一些操作可能涉及多个服务的多个数据库操作。

所以一个分布式事务可能包含多个数据库的本地事务。

由不同的参与者并行更新,我们需要保证数据的一致性。

我们可以遵循一些处理微服务中分布式事务的解决方案,例如 Seata、  servicecomb-pack等。

但是 Seata 和 servicecomb-pack 在 C# 中都不友好且难以使用,因为没有 C# 版本的 SDK。

经过一番探索,我发现了一个名为 DTM的分布式事务框架,用 Golang 编写,使用方便,支持多语言技术栈。

我将使用一种流行的Saga 分布式事务模式,名为 Saga 来介绍如何使用 C# 和这个分布式事务框架进行分布式事务。

什么是Saga?

Saga 首次出现在 Hector Garcaa-Molrna 和 Kenneth Salem 于 1987 年发表的一篇论文中。

其核心思想是将一个长事务拆分为多个短事务,由 Saga 事务协调器协调,如果每个短事务成功完成,则全局事务正常完成,如果一个短事务成功完成,则按照相反的顺序一次调用一个补偿操作。步骤失败。

Saga 模式是一种广泛使用的分布式事务模式。它是异步的和反应式的。

什么是 DTM?

DTM 是一个分布式事务框架,提供跨服务的最终数据一致性。为多种应用场景提供saga、tcc、xa、2阶段消息策略。它还支持多种语言和多种商店引擎来形成交易。

DTM 有一些特点:

  • 非常容易采用
  • 便于使用
  • 多语言支持
  • 易于部署,易于扩展
  • 多种分布式事务协议

有关 DTM 的更多详细信息,您可以访问其官方网站或 Github 页面。

https://en.dtm.pub/

https://github.com/dtm-labs/dtm

经过一系列的介绍,我会用一个经典的跨银行转账的例子来演示如何使用。

设置 DTM 服务器

这里使用 docker 来快速设置 DTM。

docker run -d -p 36789:36789 -p 36790:36790 --name=dtm-svc yedf/dtm:1.13

saga分布式事务

快速体验DTM,使用boltdb存储数据,如果是生产环境部署,记得切换到mysql或者其他存储引擎

注意:支持使用 HTTP 和 GRPC 与 DTM 服务器通信,36789 为 HTTP 端口,36790 为 GRPC 端口。

基本样本

在创建 C# 微服务之前,让我们看一下成功完成 SAGA 事务的典型时序图。

saga分布式事务

我们可以看到,在这个时序图中,事务发起者在定义了整个全局事务的编排信息(包括每一步的正向和反向补偿操作)之后,将其提交给 DTM 服务器,然后由 DTM 服务器执行之前的 SAGA循序渐进的逻辑。

现在,我们将使用 .NET 6  API 创建两个微服务。

首先是转出微服务:

var builder = WebApplication.CreateBuilder(args);

// Add Dmtcli
builder.Services.AddDtmcli(x =>
{
    x.DtmUrl = "http://localhost:36789";
});

var app = builder.Build();

app.MapPost("/api/TransOut", (HttpContext httpContext, TransRequest req) =>
{
    Console.WriteLine($"TransOut, QueryString={httpContext.Request.QueryString}");
    Console.WriteLine($"User: {req.UserId}, transfer out {req.Amount} --- forward");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate", (HttpContext httpContext, TransRequest req) =>
{
    Console.WriteLine($"TransOutCompensate, QueryString={httpContext.Request.QueryString}");
    Console.WriteLine($"User: {req.UserId}, transfer out {req.Amount} --- reverse compensation");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.Run("http://*:10000");

第二个是微服务中的转移

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDtmcli(x =>
{
    x.DtmUrl = "http://localhost:36789";
});

var app = builder.Build();

app.MapPost("/api/TransIn", (HttpContext httpContext, TransRequest req) =>
{
    //
    Console.WriteLine($"TransIn, QueryString={httpContext.Request.QueryString}");
    Console.WriteLine($"User: {req.UserId}, transfer in {req.Amount} --- forward");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate", (HttpContext httpContext, TransRequest req) =>
{
    Console.WriteLine($"TransInCompensate, QueryString={httpContext.Request.QueryString}");
    Console.WriteLine($"User: {req.UserId}, transfer out {req.Amount} --- reverse compensation");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.Run("http://*:10001");

注意:为简单起见,这里不介绍具体的数据库操作,而是用日志输出代替。

添加一个控制台应用程序来模拟事务发起者。我们应该从 nuget 添加以下包之一。

dotnet add package Dtmcli --version 1.0.0

or

dotnet add package Dtmgrpc --version 1.0.0

注意:Dtmcli 使用 HTTP 协议与 DTM 服务器通信,而 Dtmgrpc 使用 GRPC 协议。

以下示例将用于 Dtmcli 演示如何提交 SAGA 事务。

var services = new ServiceCollection();

services.AddDtmcli(x =>
{
    x.DtmUrl = "http://localhost:36789";
});

var provider = services.BuildServiceProvider();
var dtmClient = provider.GetRequiredService<IDtmClient>();

var outApi = "http://192.168.0.101:10000/api";
var inApi = "http://192.168.0.101:10001/api";

var userOutReq = new Common.TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new Common.TransRequest() { UserId = "2", Amount = 30 };

var cts = new CancellationTokenSource();

var gid = await dtmClient.GenGid(cts.Token);
var saga = new Saga(dtmClient, gid)
        .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
        .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
        .EnableWaitResult()
        ;

await saga.Submit(cts.Token);

Console.WriteLine($"Submit SAGA transaction, gid is {gid}.");

几个简单的步骤,一个完整的SAGA分布式事务就写好了!!

让我们看看这个例子的结果是什么。

saga分布式事务

saga分布式事务

运行本例后,从应用日志中可以看到转入转出都成功了,从dtm服务器的日志中可以看到DTM服务器是如何调用微服务API的。

虽然已经完成了一个成功的例子,但是这个例子太简单了,转入转出都是一次性成功,并不能体现微服务世界遇到的网络抖动、机器宕机、进程崩溃等问题。

在事务域中,异常是一个关键的考虑因素,它可能导致不一致。当我们在做分布式事务的时候,那么分布式中的异常出现的频率就更高了,异常的设计和处理就显得更加重要了。

针对这些异常,DTM首创了子事务屏障技术,可以帮助我们轻松解决异常问题,大大降低了使用分布式事务的门槛。

子事务屏障可以达到如下效果,见图。

saga分布式事务

子事务(sub-transaction)屏障

同样,我们先看一下这个失败的SAGA分布式事务的时序图。

saga分布式事务

在这个例子中,我们将模拟一些异常情况,看看是否可以正确处理子事务屏障,以保证事务的准确性

让我们添加一个 API 来模拟操作中的失败传输。

app.MapPost("/api/TransInError", (HttpContext httpContext, TransRequest req) =>
{
    Console.WriteLine($"TransIn, QueryString={httpContext.Request.QueryString}");
    Console.WriteLine($"User: {req.UserId}, transfer in {req.Amount} --- forward");

    // status code = 409 || content contains FAILURE
    // return Ok(TransResponse.BuildFailureResponse());
    return Results.StatusCode(409);
});

并更改 saga 实例。

var saga = new Saga(dtmClient, gid)
        .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
        .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq)
        .EnableWaitResult()
        ;

在这种情况下会发生什么?

saga分布式事务

最直接的结果是分布式事务直接失败,因为事务发起者抛出了异常。

事务失败很常见,我们要确保数据的一致性是正确的。

但是还有一种更严重的情况,传输的前向操作失败,但是传输的回滚操作也进行了!本来,受让方在这笔交易中本应多出一些钱,但实际上是亏本。

做正向操作的补偿是容易出错的,因为正向操作的失败可能发生在事务提交之前或之后。子事务屏障为我们处理了这些情况。如果在承诺之后发生前向操作失败,障碍将要求赔偿;如果失败发生在承诺之前,障碍将忽略补偿。

子事务屏障技术的原理是在本地数据库中创建一个分支操作状态表dtm_barrier,唯一键为 gid-branch_id-branch_op.

  1. 打开本地事务
  2. 对于当前操作op(forward|try|confirm|cancel),insert忽略一行gid-branchid-[op],如果插入不成功,commit事务返回成功(常用幂等性控制方式)
  3. 如果当前操作是cancel|reverse,则insert忽略一行 gid-branchid-try,如果插入成功(注意是成功),则提交事务并返回成功
  4. 调用barrier内的业务逻辑,如果业务返回成功,则事务提交返回成功;如果业务返回失败,则事务回滚并返回失败

接下来让我们尝试这种子事务屏障技术。

SDK 为我们提供了以下方法。

public Task Call(DbConnection db, Func<DbTransaction, Task> busiCall);

我们需要做的是在 busiCall 中编写我们自己的逻辑。

以下代码改进了转入的补偿 API,并演示了如何使用子事务屏障。

app.MapPost("/api/BarrierTransInCompensate", async (HttpContext httpContext, TransRequest req, IBranchBarrierFactory factory) =>
{
    Console.WriteLine($"TransIn, QueryString={httpContext.Request.QueryString}");

    // create barrier from query
    var barrier = factory.CreateBranchBarrier(httpContext.Request.Query);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        // some exception occure, should not output this one.
        Console.WriteLine($"User: {req.UserId}, transfer in {req.Amount} --- sub-transaction handle!");

        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

我们希望 sub-transaction handle 这里不会输出日志。

var saga = new Saga(dtmClient, gid)
        .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
        .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
        .EnableWaitResult()
        ;
saga分布式事务

这个事务调用的结果确实和我们预期的一样,并且还从sdk中输出了一些信息供我们参考。

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

SDK 告诉我们它不会执行当前的业务操作,因为当前的操作是空值补偿。

与前面的例子相比,子事务屏障技术确实可以帮助我们轻松处理异常情况并保证数据的一致性。

这是您可以在我的 Github 页面上找到的源代码。

概括

本文介绍了一些简单的saga分布式事务知识,以及DTM这个非常简单易用的分布式事务框架,并用它完成了几个简单的例子。

SAGA模式是DTM中最常用的模式之一,DTM也支持2-phase message、TCC、XA模式。

我们还可以利用 DTM 探索很多场景下的分布式事务解决方案。

我希望这能帮到您!


慕源网 » saga分布式事务

常见问题FAQ

程序仅供学习研究,请勿用于非法用途,不得违反国家法律,否则后果自负,一切法律责任与本站无关。
请仔细阅读以上条款再购买,拍下即代表同意条款并遵守约定,谢谢大家支持理解!

发表评论

开通VIP 享更多特权,建议使用QQ登录