Kitex 泛化调用案例:基于 API 网关的支付开放平台

泛化调用

泛化调用是不需要依赖生成代码即可对 RPC 服务发起调用的一种特性。通常用于不需要生成代码的中台服务,场景如流量中转、API 网关等。

调用方式

Kitex 泛化调用目前仅支持 Thrift 协议,调用方式如下:

    1. 二进制泛化调用
    1. HTTP 映射泛化调用
    1. Map 映射泛化调用
    1. JSON 映射泛化调用

其中 HTTP 映射泛化对 IDL 编写规范有专门文章介绍《Thrift-HTTP 映射的 IDL 规范》,里面详细介绍了泛化调用解析 Thrift IDL 文件整体规范、约定和已支持的注解。

IDLProvider

HTTP/Map/JSON 映射的泛化调用虽然不需要生成代码,但需要使用者提供 IDL,来定义入参位置和映射关系。

目前 Kitex 有两种 IDLProvider 实现,使用者可以选择指定 IDL 路径,也可以选择传入 IDL 内容。当然也可以根据需求自行扩展 generci.DescriptorProvider。如果有 IDL 管理平台,最好与平台打通,可以及时更新 IDL。

支付开放平台

支付开放平台通常是开放给服务商或商户提供收款记账等能力的服务入口,常见于支付宝、微信、银联等第三方或第四方支付渠道商,特别是前几年发展起来的聚合支付方向。

该演示项目规划要点如下:

  1. 对外暴露的是 HTTP 接口,可以用 Hertz 来做网关入口,根据 HTTP 请求使用 Kitex 泛化调用对请求分发到具体的 RPC 服务;
  2. 需要加签、验签,可以演示 Hertz 自定义 middleware;
  3. 业务服务通常有商户、支付、对账、安全等模块,业务边界清晰,为了演示仅做支付服务;
  4. 关注工程化,如 ORM、分包、代码分层、错误的统一定义及优雅处理等;

工程目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
├── Makefile
├── README.md
├── cmd
│ └── payment
│ ├── main.go
│ ├── wire.go
│ └── wire_gen.go
├── configs
│ └── sql
│ └── payment.sql
├── docker-compose.yaml
├── docs
│ └── open-payment-platform.png
├── go.mod
├── go.sum
├── hertz-gateway
│ ├── README.md
│ ├── biz
│ │ ├── errors
│ │ │ └── errors.go
│ │ ├── handler
│ │ │ └── gateway.go
│ │ ├── middleware
│ │ │ └── gateway_auth.go
│ │ ├── router
│ │ │ └── register.go
│ │ └── types
│ │ └── response.go
│ ├── main.go
│ ├── router.go
│ └── router_gen.go
├── idl
│ ├── common.thrift
│ └── payment.thrift
├── internal
│ ├── README.md
│ └── payment
├── kitex_gen
└── pkg
└── auth
└── auth.go

泛化调用的最简单实现

解析IDL
1
provider, err := generic.NewThriftFileProvider(entry.Name(), idlPath)
构建泛化策略
1
2
//将上面解析到的 IDL 内容,根据场景需要构建 HTTP 的泛化策略
g, err := generic.HTTPThriftGeneric(provider)
生成泛化调用客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
cli, err := genericclient.NewClient(
svcName,
g,
client.WithResolver(nacosResolver),
client.WithTransportProtocol(transport.TTHeader),
client.WithMetaHandler(transmeta.ClientTTHeaderHandler),
)
if err != nil {
hlog.Fatal(err)
}

// 保存映射关系
handler.SvcMap[svcName] = cli

具体实现

网关参数

1
2
3
4
5
6
7
8
{
"sign":"xxx", // 必填,签名
"sign_type":"RSA", // 必填,加签方法
"nonce_str":"J84FJIUH93NFSUH894NJOF", // 必填,随机字符串
"merchant_id":"xxxx", // 必填,用于签名验证
"method":"svc-function-name", // 必填,RPC 调用的具体方法
"biz_params":"{'key':'value'}" // 必填,RPC 业务参数
}

路由规则

将上述的三步构建泛化调用客户端的代码放在了 Hertz 启动服务注册路由时的实现,服务的路由规则是 /gateway/:svc,即构建 gateway 的路由组,使用参数路由知道要泛化调用 RPC 服务的具体服务名。

这部分实现可参看 route.go 文件中 registerGateway 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func registerGateway(r *server.Hertz) {
group := r.Group("/gateway").Use(middleware.GatewayAuth()...)

if handler.SvcMap == nil {
handler.SvcMap = make(map[string]genericclient.Client)
}
idlPath := "./idl/"
c, err := os.ReadDir(idlPath)
if err != nil {
hlog.Fatalf("new thrift file provider failed: %v", err)
}
nacosResolver, err := resolver.NewDefaultNacosResolver()
if err != nil {
hlog.Fatalf("err:%v", err)
}

for _, entry := range c {
if entry.IsDir() || entry.Name() == "common.thrift" {
continue
}
svcName := strings.ReplaceAll(entry.Name(), ".thrift", "")

provider, err := generic.NewThriftFileProvider(entry.Name(), idlPath)
if err != nil {
hlog.Fatalf("new thrift file provider failed: %v", err)
break
}
//generic.JSONThriftGeneric()
//将上面解析到的 IDL 内容,根据场景需要构建 HTTP 的泛化策略
g, err := generic.HTTPThriftGeneric(provider)
if err != nil {
hlog.Fatal(err)
}
cli, err := genericclient.NewClient(
svcName,
g,
client.WithResolver(nacosResolver),
client.WithTransportProtocol(transport.TTHeader),
client.WithMetaHandler(transmeta.ClientTTHeaderHandler),
)
if err != nil {
hlog.Fatal(err)
}

// 保存映射关系
handler.SvcMap[svcName] = cli

//路由到处理函数
group.POST("/:svc", handler.Gateway)
}
}

发起泛化调用

路由匹配成功之后,走到绑定的 handler.Gateway 处理函数即是发起泛化调用的关键点。

首先根据 handler.SvcMap,获取泛化调用客户端 genericclient.Client,然后根据路由参数 :svc 和 POST 参数 biz_params 、method 拼凑相关参数,进行泛化调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
svcName := c.Param("svc")
cli, ok := SvcMap[svcName]
if !ok {
c.JSON(http.StatusOK, errors.New(common.Err_BadRequest))
return
}
var params requiredParams
if err := c.BindAndValidate(&params); err != nil {
hlog.Error(err)
c.JSON(http.StatusOK, errors.New(common.Err_ServerMethodNotFound))
return
}

req, err := http.NewRequest(http.MethodPost, "", bytes.NewBuffer([]byte(params.BizParams)))
if err != nil {
hlog.Warnf("new http request failed: %v", err)
c.JSON(http.StatusOK, errors.New(common.Err_RequestServerFail))
return
}
// 这里要留意 IDL 相关注解
req.URL.Path = fmt.Sprintf("/%s/%s", svcName, params.Method)

customReq, err := generic.FromHTTPRequest(req)
if err != nil {
hlog.Errorf("convert request failed: %v", err)
c.JSON(http.StatusOK, errors.New(common.Err_ServerHandleFail))
return
}
resp, err := cli.GenericCall(ctx, "", customReq)
respMap := make(map[string]interface{})
if err != nil {
hlog.Errorf("GenericCall err:%v", err)
bizErr, ok := kerrors.FromBizStatusError(err)
if !ok {
c.JSON(http.StatusOK, errors.New(common.Err_ServerHandleFail))
return
}
respMap[types.ResponseErrCode] = bizErr.BizStatusCode()
respMap[types.ResponseErrMessage] = bizErr.BizMessage()
c.JSON(http.StatusOK, respMap)
return
}
realResp, ok := resp.(*generic.HTTPResponse)
if !ok {
c.JSON(http.StatusOK, errors.New(common.Err_ServerHandleFail))
return
}
realResp.Body[types.ResponseErrCode] = 0
realResp.Body[types.ResponseErrMessage] = "ok"
c.JSON(http.StatusOK, realResp.Body)

为了更好的演示支付网关,这里做了签名验证和返回参数加签的代码。

签名

首先在路由组注册时,给 /gateway 路由组注册了一个 GatewayAuth 的中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func registerGateway(r *server.Hertz) {
group := r.Group("/gateway").Use(middleware.GatewayAuth()...)
}

type AuthParam struct {
Sign string `form:"sign,required" json:"sign"`
SignType string `form:"sign_type,required" json:"sign_type"`
MerchantId string `form:"merchant_id,required" json:"merchant_id"`
NonceStr string `form:"nonce_str,required" json:"nonce_str"`
}

func GatewayAuth() []app.HandlerFunc {
return []app.HandlerFunc{func(ctx context.Context, c *app.RequestContext) {
var authParam AuthParam

// TODO 签名相关的 key 或私钥应该根据商户号正确获取,这里仅做展示,没有做商户相关逻辑
key := "123"
p, err := auth.NewSignProvider(authParam.SignType, key)
if err != nil {
hlog.Error(err)
c.JSON(http.StatusOK, errors.New(common.Err_Unauthorized))
c.Abort()
return
}
// 验签关键点
if !p.Verify(authParam.Sign, authParam) {
hlog.Error(err)
c.JSON(http.StatusOK, errors.New(common.Err_Unauthorized))
c.Abort()
return
}

c.Next(ctx)

// 响应之后加签回去
data := make(utils.H)
if err = json.Unmarshal(c.Response.Body(), &data); err != nil {
dataJson, _ := json.Marshal(errors.New(common.Err_RequestServerFail))
c.Response.SetBody(dataJson)
return
}
data[types.ResponseNonceStr] = authParam.NonceStr
data[types.ResponseSignType] = authParam.SignType
data[types.ResponseSign] = p.Sign(data)
dataJson, _ := json.Marshal(data)
c.Response.SetBody(dataJson)
}}
}

项目优化

错误处理

在网关和 RPC 服务都要演示错误处理,可能数量比较多。为了规范实现,把错误定义收拢到 IDL 公共协议中去,根据生成的代码返回特定的错误,便于判断和管理。

错误定义

在 idl 目录中新增了 common.thrift 文件,把错误码都枚举出来,并约定不同的服务或地方使用不同的错误码段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
namespace go common

enum Err
{
// gateway 10001- 19999
BadRequest = 10001,
Unauthorized = 10002,
ServerNotFound = 10003,
ServerMethodNotFound = 10004,
RequestServerFail = 10005,
ServerHandleFail = 10006,
ResponseUnableParse = 10007,

// payment 20001- 29999
DuplicateOutOrderNo = 20001,

// other 30001- 93999
Errxxx = 30001,
}

在网关处的错误进行了简单的封装,方便使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Err struct {
ErrCode int64 `json:"err_code"`
ErrMsg string `json:"err_msg"`
}

// New Error, the error_code must be defined in IDL.
func New(errCode common.Err) Err {
return Err{
ErrCode: int64(errCode),
ErrMsg: errCode.String(),
}
}

func (e Err) Error() string {
return e.ErrMsg
}

用例:

1
2
3
4
5
6
import (
"github.com/cloudwego/biz-demo/open-payment-platform/hertz-gateway/biz/errors"
"github.com/cloudwego/biz-demo/open-payment-platform/kitex_gen/common"
)

c.JSON(http.StatusOK, errors.New(common.Err_RequestServerFail))

RPC 服务使用 Kitex 业务异常 的特性支持,只需要在泛化调用客户端和 RPC 服务端制定好相关配置即可。

具体用法如:

1
2
3
4
5
6
import (
"github.com/cloudwego/biz-demo/open-payment-platform/kitex_gen/common"
)
// 这里类型转换较为繁琐,亦可考虑如何简化优化封装
// 比如一个思路是如果想业务异常也想不依赖某个框架用法,如何做
return nil, kerrors.NewBizStatusError(int32(common.Err_DuplicateOutOrderNo), common.Err_DuplicateOutOrderNo.String())

遗留问题

  1. 该项目没有演示配置相关的使用,所以注册中心和数据库配置仅是硬编码;
  2. 签名处理如何获取商户私钥或 key ,需要实际业务考虑;
  3. 错误处理可继续优化;
  4. 泛化调用注解示例较为简单,可根据实际入参和映射关系进行灵活配置;
  5. 整洁架构在业务膨胀之后是否会遇到新的问题;

关于作者

  笔名:Damon,技术爱好者,微服务架构设计,云原生、容器化技术,现从事Go相关,涉及云原生、边缘计算、AI人工智能、云产品Devops落地实践等云原生技术。拿过专利。公众号 交个朋友之猿天地 发起人。个人微信 DamonStatham,星球:《交个朋友之猿田地》,个人网站:交个朋友之猿天地 | 微服务 | 容器化 | 自动化,欢迎來撩。

欢迎关注

公号:交个朋友之猿天地

AI绘画扫我

星球

打赏
  • Copyrights © 2020-2023 交个朋友之猿天地
  • Powered By Hexo | Title - Nothing
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信