如何为 kyanos 贡献新的协议
背景
kyanos 需要捕获协议消息组成请求响应对以供终端展示,因此 kyanos 需要每个协议的协议解析代码,目前支持HTTP、MySQL和Redis,将来会支持更多。本文将阐述 kyanos 协议解析的整体架构,以协助开发新的协议。
协议解析流程总览
从左边看起,kyanos 在read、write等系统调用上插桩,获取应用进程读写的数据, 将其通过perf event buffer发送到用户空间。
用户空间则根据这个连接是客户端侧还是服务端侧,分别放到reqStreamBuffer或者respStreamBuffer里。 这些数据是单纯的应用协议数据,没有协议头。
kyanos会使用相应协议的解析器解析放到streamBuffer的数据,然后关联解析后的请求和响应,最后根据协议的需要再进行Full Body解析,生成一个“记录”(record)。
一些术语
Message:协议的最基本数据单元,通常有一个header和body。
Request / Response: 请求或响应由一起发送的一个或多个Message组成,这些Message在一起表示一个消息(Note: 对于简单的协议比如HTTP1.1来说,一个Request/Response可能只对应一个Message,但对于像MySQL这种复杂的协议,多个Message组合起来才对应一个Request/Response)
Record:Record代表一个匹配完成的请求响应对。
Step.0-准备工作
我们需要做以下几个事情:
- 实现用户态协议解析
- 定义协议消息类型(即请求和响应的结构)
- 实现一个
Parser
, 具体来说需要实现ProtocolStreamParser
接口,该接口实现了协议解析和请求响应匹配等的具体逻辑。
- 实现内核态的协议推断逻辑。
- 增加命令行子命令,实现过滤逻辑。
- 增加e2e测试。
Step.1-定义协议消息类型
在/agent/protocol目录下新建一个目录,名称为协议名称,例如:kafka。
定义Message
一般来说,你要实现的协议会存在一个通用的Header,这个Header会包含一些元数据,比如一个标志位记录其是请求还是响应,像这些信息你需要存储到你定义的Message的字段里,比如MySQL协议定义的:
type MysqlPacket struct {
FrameBase
seqId byte
msg string
cmd int
isReq bool
}
一般来说,Message或者下面的Request/Response都需要嵌入一个FrameBase,FrameBase定义如下,包含了一些基本信息:
type FrameBase struct {
timestampNs uint64
byteSize int
seq uint64
}
而对于 HTTP 等简单的协议来说,由于其一个Message就对应一个请求或响应,没有必要进行Full Body解析,因此也就没必要定义一个Message了,直接定义Request和Response就可以。
定义请求和响应
请求和响应在Message的上层,由一个或多个Message组成。struct Request
或 struct Response
应包含特定于请求/响应的数据, 并且应该实现ParsedMessage接口,接口定义如下:
type ParsedMessage interface {
FormatToString() string
FormatToSummaryString() string
TimestampNs() uint64
ByteSize() int
IsReq() bool
Seq() uint64
}
方法名 | 作用 |
---|---|
FormatToString() | 将消息格式化为字符串表示形式。 |
TimestampNs() | 返回消息的时间戳(以纳秒为单位)。 |
ByteSize() | 返回消息的字节大小。 |
IsReq() | 判断消息是否为请求。 |
Seq() | 返回消息的字节流序列号, 可以从streamBuffer.Head().LeftBoundary() 获取。 |
HTTP的例子:
type ParsedHttpRequest struct {
FrameBase
Path string
Host string
Method string
buf []byte
}
Note. 在
protocol.BinaryDecoder
中BinaryDecoder
类提供了一些方便的实用程序函数,用于从缓冲区中提取字符、字符串或整数。在下面的实现 ParseFrame、FindFrameBoundary 和 Full Body 解析时,我们应该使用这些函数。
Step.2-实现协议解析
定义好请求响应类型之后,就可以开始实现解析流程了,具体来说需要实现接口:ProtocolStreamParser
type ProtocolStreamParser interface {
ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult
FindBoundary(streamBuffer *buffer.StreamBuffer, messageType MessageType, startPos int) int
Match(reqStream *[]ParsedMessage, respStream *[]ParsedMessage) []Record
}
- ParseStream: 解析Message
- FindBoundary: 寻找Message边界
- Match: ReqResp匹配
- Full Body Parse(可选): Full Body 解析
ParseStream (Buffer -> Message/ReqResp)
ParseStream 从网络数据解析出来 Message或者直接解析出来ReqResp。
ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult
注意 eBPF 数据事件可能会无序到达,或者丢失事件,因此数据缓冲区中的数据可能缺少数据块,参数 streamBuffer 中通过 streamBuffer.Head
函数获取到目前为止已接收到的缓冲区前面的所有连续数据。因此,此时无法保证数据有效或缓冲区与数据包的开头对齐。
如果返回 ParseResult
中的 state
为 success
,且那么kyanos会自动删除掉ParseResult.ReadBytes
个字节的数据;如果返回invalid
,那么通过 FindBoundary
找到下一个可能的Message
边界;如果返回 needsMoreData
,则不会删除数据,而是稍后重试。
FindBoundary 寻找Message边界
FindBoundary 寻找Message边界,然后从解析失败状态中恢复。
FindBoundary(streamBuffer *buffer.StreamBuffer, messageType MessageType, startPos int) int
conntrack.go 在一个循环中调用ParseStream和FindBoundary,如果ParseStream返回invalid,说明解析遇到了问题,接着会调用FindBoundary找到下一个位置,这个位置一般是Message的开头。如果请求带有一个tag比如request id,那么响应带有相同request id的可能性非常高。
Match (Messages -> reqresp -> record)
ParseStream解析成功的Message会放到对应的ReqQueue和RespQueue中,然后再它们匹配在一起创建Record,主要有两种匹配方法:基于顺序 和 基于标签。
HTTP 等协议使用顺序匹配,而 Kafka 等协议使用基于标签的匹配。
Note: 如果是使用顺序匹配,可以直接使用matchByTimestamp
。
Full Body Parsing 解析整个消息
目前,Full Body Parsing 是Match的一部分。对于大多数协议,我们如果需要解析整个消息体,只有在请求响应匹配之后才可以,比如Kafka需要知道请求的opcode,然后才可以根据opcode解析响应。
Step.3-实现协议推断
在将内核数据抓取到用户态解析之前,我们需要识别出这个流量是什么协议的流量,当连接开启上面有数据传输时,kyanos会基于一些规则判断该流量术语哪种协议,每个协议有自己的规则,HTTP协议如下:
static __always_inline enum message_type_t is_http_protocol(const char *old_buf, size_t count) {
if (count < 5) {
return 0;
}
char buf[4] = {};
bpf_probe_read_user(buf, 4, old_buf);
if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P') {
return kResponse;
}
if (buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T') {
return kRequest;
}
if (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') {
return kRequest;
}
if (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') {
return kRequest;
}
return kUnknown;
}
Warn:
- 新协议的规则可能会导致误报或漏报,导致影响其他协议的准确性。
- 规则的顺序很重要。
出于这些原因,你需要注意以下几个事情:
- 避免在协议中使用过于通用和常见的模式作为推理规则。例如,根据单独的
0x00
或0x01
的字节判断就不够严格。 - 将更严格、更健壮的规则(例如 HTTP)放在前面。
Step.4-添加命令行子命令并且实现过滤逻辑
需要添加到 watch 和 stat 命令下,增加需要的协议特定的过滤选项。
然后实现protocol.ProtocolFilter
:
type ProtocolFilter interface {
Filter(req ParsedMessage, resp ParsedMessage) bool
FilterByProtocol(bpf.AgentTrafficProtocolT) bool
FilterByRequest() bool
FilterByResponse() bool
}
方法名 | 作用 |
---|---|
Filter | 过滤请求和响应。 |
FilterByProtocol | 是否根据协议类型进行过滤。 |
FilterByRequest | 是否根据请求进行过滤。 |
FilterByResponse | 是否根据响应进行过滤。 |
Step.5-注册协议解析器
在你写的模块下增加init函数,将其写入到 ParsersMap
里,例如:
func init() {
ParsersMap[bpf.AgentTrafficProtocolTKProtocolHTTP] = func() ProtocolStreamParser {
return &HTTPStreamParser{}
}
}
Step.6-添加e2e测试
在testdata目录下添加对应协议的e2e测试,可以参考其他协议的写法(比如test_redis.sh
等)。
其他
调试建议
打印协议解析日志建议使用 common.ProtocolParserLog
. 打开 protocol 解析日志:--protocol-log-level 5
设置协议解析相关log日志为debug级别。
协议解析框架代码在 conntrack.go 的 addDataToBufferAndTryParse
函数里。
协议解析持久化信息
在某些协议中,如果需要在解析过程中保留一些数据(比如kafka中,它存储了请求缓冲区上看到的所有 correlation_id 的集合,而 FindBoundary 只返回respStreamBuffer上之前看到correlation_id的位置。)可以在协议的Parser里自定义一些变量保存(即Parser可以是有状态的),kyanos会为每个连接开启时创建独立的Parser并保持到连接关闭。
总结
恭喜你成功向 Kyanos 添加新协议!由于你的贡献,新的协议解析器将使许多其他人受益!