用 Golang 实现基于 Redis 的安全高效 RPC 通信
前言
RPC(Remote Procedure Call),翻译过来为“远程过程调用”,是一种分布式系统中服务或节点之间的有效通信机制。通过 RPC,某个节点(或客户端)可以很轻松的调用远端(或服务端)的方法或服务,就像在本地调用一样简单。现有的很多 RPC 框架都要求暴露服务端地址,也就是需要知道服务器的 IP 和 RPC 端口。而本篇文章将介绍一种不需要暴露 IP 地址和端口的 RPC 通信方式。这种方式是基于 Redis BRPOP/BLPOP 操作实现的延迟队列,以及 Golang 中的 goroutine 协程异步机制,整个框架非常简单和易于理解,同时也很高效、稳定和安全。这种方式已经应用到了 Crawlab 中的节点通信当中,成为了各节点即时传输信息的主要方式。下面我们将从 Crawlab 早期节点通信方案 PubSub 开始,介绍当时遇到的问题和解决方案,然后如何过渡到现在的 RPC 解决方案,以及它是如何在 Crawlab 中发挥作用的。
基于 PubSub 的方案
PubSub
早期的 Crawlab 是基于 Redis 的 PubSub,也就是发布订阅模式。这是 Redis 中主要用于一对多的单向通信的方案。其用法非常简单:
SUBSCRIBE channel1 channel2 ...
PUBLISH channelx message
Redis的 PubSub 可以用作广播模式,即一个发布者对应多个订阅者。而在Crawlab中,我们只有一个订阅者对应一个发布者的情况(主节点->工作节点: nodes:<node_id> )或一个订阅者对应多个发布者的情况(工作节点->主节点: nodes:master> )。这是为了方便双向通信。
以下为节点通信原理示意图。
各个节点会通过Redis的 PubSub 功能来做相互通信。
所谓 PubSub ,简单来说是一个发布订阅模式。订阅者(Subscriber)会在Redis上订阅(Subscribe)一个通道,其他任何一个节点都可以作为发布者(Publisher)在该通道上发布(Publish)消息。
通信架构
在 Crawlab 中,主节点会订阅 nodes:master 通道,其他节点如果需要向主节点发送消息,只需要向 nodes:master 发布消息就可以了。同理,各工作节点会各自订阅一个属于自己的通道 nodes:<node_id> ( node_id 是MongoDB里的节点ID,是MongoDB ObjectId),如果需要给工作节点发送消息,只需要发布消息到该通道就可以了。
一个网络请求的简单过程如下:
- 客户端(前端应用)发送请求给主节点(API);
- 主节点通过Redis PubSub 的 [nodes: 通道发布消息给相应的工作节点;
- 工作节点收到消息之后,执行一些操作,并将相应的消息通过 nodes:master 通道发布给主节点;
- 主节点收到消息之后,将消息返回给客户端。
不是所有节点通信都是双向的,也就是说,主节点只会单方面对工作节点通信,工作节点并不会返回响应给主节点,所谓的单向通信。以下是Crawlab的通信类型。
chan 和 goroutine
如果您在阅读 Crawlab 源码,会发现节点通信中有大量的 chan 语法,这是 Golang 的一个并发特性。
chan 表示为一个通道,在 Golang 中分为无缓冲和有缓冲的通道,我们用了无缓冲通道来阻塞协程,只有当 chan 接收到信号( chan <- "some signal" ),该阻塞才会释放,协程进行下一步操作)。在请求响应模式中,如果为双向通信,主节点收到请求后会起生成一个无缓冲通道来阻塞该请求,当收到来自工作节点的消息后,向该无缓冲通道赋值,阻塞释放,返回响应给客户端。
go 命令会起一个 goroutine (协程)来完成并发,配合 chan ,该协程可以利用无缓冲通道挂起,等待信号执行接下来的操作。
基于延迟队列的方案
PubSub 方案的问题
PubSub 这种消息订阅-发布设计模式是一种有效的实现节点通信的方式,但是它有两个问题:
- PubSub 的数据是即时的,会随着 Redis 宕机而丢失;
- 写基于 PubSub 的通信服务会要求用到 goroutine 和 channel ,这加大了开发难度,降低了可维护性。
其中,第二个问题是比较棘手的。如果我们希望加入更多的功能,需要写大量的异步代码,这会加大系统模块间的耦合度,造成扩展性很差,而且代码阅读起来很痛苦。
因此,为了解决这个问题,我们采用了基于 Redis 延迟消息队列的 RPC 服务。
延迟队列架构
下图是基于延迟队列架构的 RPC 实现示意图。
每一个节点都有一个客户端(Client)和服务端(Server)。客户端用于发送消息到目标节点(Target Node)并接收其返回的消息,服务端用于接收、处理源节点(Source Node)的消息并返回消息给源节点的客户端。
整个 RPC 通信的流程如下:
- 源节点的客户端通过 LPUSH 将消息推送到 Redis 的 nodes:<node_id> 中,并执行 BRPOP nodes:<node_id>:<msg_id> 阻塞并监听这个消息队列;
- 目标节点的服务端通过 BRPOP 一直在监听 nodes:<node_id> ,收到消息后,通过消息中的 Method 字段执行对应的程序;
- 目标节点执行完毕后,服务端通过 LPUSH 将消息推送到 Redis 的 nodes:<node_id>:<msg_id> 中;
- 由于源节点客户端一直在监听 nodes:<node_id>:<msg_id> 这个消息队列,当目标节点服务端推送消息到这个队列后,源节点客户端将立即收到返回的消息,再做后续处理。
这样,整个节点的通信流程就通过 Redis 完成了。这样做的好处在于不用暴露 HTTP 的 IP 地址和端口,只需要知道节点 ID 即可完成 RPC 通信。
这样设计后的 RPC 代码比较容易理解和维护。每次需要扩展新的通信类别时,只需要继承 rpc.Service 类,实现 ClientHandle (客户端处理方法)和 ServerHandle (服务端处理方法)方法就可以了。
这里多说一下 BRPOP 。它将移出并获取消息队列的最后一个元素, 如果消息队列没有元素会阻塞队列直到等待超时或发现可弹出元素为止。因此,使用 BRPOP 命令相对于轮训或其他方式,可以避免不间断的请求 Redis,避免浪费网络和计算资源。
如果对 Redis 的操作命令不熟悉的,可以参考一下掘金小册 《Redis 深度历险:核心原理与应用实践》 ,这本小册深入介绍了 Redis 的原理以及工程实践,对于应用 Redis 到实际开发中非常实用。
代码实践
讲了这么多理论知识,我们还是需要看看代码的。老师常教育我们:“Talk is cheap. Show me the code.”
由于 Crawlab 后端是 Golang 开发的,要理解以下代码需要一些 Golang 的基础知识。
消息数据结构
首先我们需要定一个传输消息的数据结构。代码如下。
package entity
type RpcMessage struct {
Id string `json:"id"` // 消息ID
Method string `json:"method"` // 消息方法
NodeId string `json:"node_id"` // 节点ID
Params map[string]string `json:"params"` // 参数
Timeout int `json:"timeout"` // 超时
Result string `json:"result"` // 结果
Error string `json:"error"` // 错误
}
这里,我们定义了消息 ID、方法、节点 ID、参数等字段。消息 ID 是 UUID,保证了消息 ID 的唯一性。
基础接口
首先,我们定义一个抽象基础接口,方便让实际业务逻辑模块继承。服务端的处理逻辑在 ServerHandle 中,返回 entity 里的 RpcMessage ,而客户端的逻辑在 ClientHandle 中。
// RPC服务基础类
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
客户端通用方法
当我们调用客户端的通用方法的时候,需要实现两个逻辑:
- 发送消息:生成消息 ID,将消息序列化为 JSON,LPUSH 推入 Redis 消息队列;
- 通过 BRPOP 延迟获取返回的消息,返回给调用方。
以下是实现的代码。
// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求ID
msg.Id = uuid.NewV4().String()
// 发送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 获取RPC回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 如果返回消息有错误,返回错误
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}
return
}
}
服务端处理
服务端处理的逻辑如下,大致的逻辑是:
- 在一个循环中,通过 BRPOP 获取该节点对应的消息;
- 当获取到消息时,生成一个 goroutine 异步处理该消息;
- 继续等待。
您可以在 InitRpcService 这个方法中看到上述逻辑。私有方法 handleMsg 实现了序列化、调用服务端 RPC 服务方法、发送返回消息的逻辑。如果需要拓展 RPC 方法类型,在工厂类方法 GetService 里添加就可以了。
// 获取RPC服务
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}
// 处理RPC消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服务端RPC服务
func InitRpcService() error {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 处理消息
go handleMsg(msgStr, node)
}
}()
return nil
}
调用方法例子
Crawlab 的节点上经常需要为爬虫安装一些第三方依赖,例如 pymongo、requests 等。而其中,我们也需要直到某个节点上是否已经安装了某个依赖,这需要跨服务器通信,也就是需要在分布式网络中进行双向通信。而这个逻辑是通过 RPC 实现的。主节点向目标节点发起 RPC 调用,目标节点运行被调用方法,将运行结果也就是安装的依赖列表返回给客户端,客户端再返回给调用者。
下面的代码实现了获取目标节点上已安装的依赖的 RPC 方法。
// 获取已安装依赖服务
// 继承Service基础类
type GetInstalledDepsService struct {
msg entity.RpcMessage
}
// 服务端处理方法
// 重载ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}
// 客户端处理方法
// 重载ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
发起调用
写好了 RPC 服务端和客户端处理方法,就可以轻松编写调用逻辑了。以下是调用获取远端已安装依赖列表的方法。首先由 GetService 工厂类获取之前定义好的 GetInstalledDepsService ,再调用其客户端处理方法 ClientHandle ,然后返回结果。这就像在本地调用方法一样。是不是很简单?
// 获取远端已安装依赖
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}
结语
本篇文章主要介绍了一种基于 Redis 延迟队列的 RPC 通信方式,这种方式不用暴露各个节点或服务的 IP 地址或端口,是一种非常安全的方式。而且,这种方式已经用 Golang 在 Crawlab 中实现了双向通信,特别是 Golang 中的天生支持异步的 goroutine,让这种方式的实现变得简单。实际上,这种方式理论上是非常高效的,能够支持高并发数据传输。
但是,在 Crawlab 的实现中还存在一些隐患,也就是它并没有限制服务端的处理并发数量。因此如果传输消息过多时,服务端资源会被占满,导致处理速度变慢甚至宕机的风险。修复方式是在服务端限制并发数量。另外,限于时间的原因,作者还没有来得及测试这种 RPC 通信方式的实际传输效率,容错机制也没有加入。因此总的来说还有很大的提升和优化空间。
虽然如此,这种方式对于 Crawlab 的低并发远程通信来说是足够的了,在实际使用中也没有出现问题,非常稳定。对于隐秘性有要求、希望不暴露地址信息的开发者,我们也推荐将该种方式在实际应用中尝试。
不错