玩转KCP(1)-快速开始
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
1. 前言
KCP协议是一种快速可靠传输ARQ(Automatic Repeat-reQuest)协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。它跟QUIC协议一样也是基于UDP协议的实现。KCP从TCP协议中借鉴了大量的思路,是理解TCP/IP协议栈的非常好的资料。
补充说明下,无论是QUIC还是KCP只有在弱网络条件下,相比TCP才有优势。
KCP协议在网络分层模型的位置
+-----------------+
| SESSION |
+-----------------+
| KCP(ARQ) |
+-----------------+
| FEC(OPTIONAL) |
+-----------------+
| CRYPTO(OPTIONAL)|
+-----------------+
| UDP(PACKET) |
+-----------------+
| IP |
+-----------------+
| LINK |
+-----------------+
| PHY |
+-----------------+
KCP的设计者有意识的把KCP依赖的网络通讯给解耦了
纯算法实现,并不负责底层协议(如UDP)的收发,需要使用者自己定义下层数据包的发送方式,以 callback的方式提供给 KCP。
如果读者阅读 skywind3000/kcp 源码,可能会发现如下代码
test.cpp
// 创建模拟网络:丢包率10%,Rtt 60ms~125ms
vnet = new LatencySimulator(10, 60, 125);
测试和验证是不需要再真实网络上进行。
skywind3000/kcp 只是设计了最初的协议, 包括
- 非延迟ACK
- 快速重传(TCP协议也有)
- 非退让流控(拥塞控制,和TCP实现类同)
xtaci/kcp-go 在此基础上,又增加了
- 加密机制
- FEC(Forward Error Correction)前向纠错
PS: skywind3000
和xtaci
其实是大学同学
萌叔的这个系列只打算探讨标准的KCP协议实现
2. KCP协议简介
0 4 5 6 8 (BYTE)
+---------------+---+---+-------+
| conv |cmd|frg| wnd |
+---------------+---+---+-------+ 8
| ts | sn |
+---------------+---------------+ 16
| una | len |
+---------------+---------------+ 24
| |
| DATA (optional) |
| |
+-------------------------------+
conv
:连接号。UDP是无连接的,conv用于表示来自于哪个客户端。对连接的一种替代, 因为有conv
, 所以KCP也是支持多路复用的。
cmd
:命令类型,只有四种
frg
:分片,用户数据可能会被分成多个KCP包,发送出去
在xtaci/kcp-go
的实现中,这个字段始终为0,以及没有意义了, 详见issues/121
wnd
:接收窗口大小,发送方的发送窗口不能超过接收方给出的数值, (其实是接收窗口的剩余大小,这个大小是动态变化的)
ts
: 时间序列
sn
: 序列号
una
:下一个可接收的序列号。其实就是确认号,收到sn=10的包,una为11
len
:数据长度(DATA的长度)
data
:用户数据
CMD四种类型
cmd | 作用 | 备注 |
---|---|---|
IKCP_CMD_PUSH | 数据推送命令 | |
IKCP_CMD_ACK | 确认命令 | |
IKCP_CMD_WASK | 接收窗口大小询问命令 | |
IKCP_CMD_WINS | 接收窗口大小告知命令 |
IKCP_CMD_PUSH
和IKCP_CMD_ACK
关联
IKCP_CMD_WASK
和IKCP_CMD_WINS
关联
读者可以看出KCP协议本身还是比QUIC简单了不少
3. 示例
package main
import (
"github.com/xtaci/kcp-go"
"log"
"net"
"os"
"os/signal"
"syscall"
)
const portEcho = "127.0.0.1:8081"
func listenEcho() (net.Listener, error) {
return kcp.Listen(portEcho)
}
func handleEcho(sess *kcp.UDPSession) {
sess.SetWindowSize(4096, 4096)
sess.SetWriteDelay(true)
sess.SetACKNoDelay(false)
// NoDelay options
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
sess.SetNoDelay(1, 100, 2, 0)
for {
buf := make([]byte, 65536)
n, err := sess.Read(buf)
if err != nil {
panic(err)
}
sess.Write(buf[:n])
}
}
func echoServer() {
l, err := listenEcho()
if err != nil {
log.Println("listenEcho", err)
panic(err)
}
go func() {
kcplistener := l.(*kcp.Listener)
kcplistener.SetReadBuffer(4 * 1024 * 1024)
kcplistener.SetWriteBuffer(4 * 1024 * 1024)
kcplistener.SetDSCP(46)
for {
s, err := l.Accept()
if err != nil {
log.Println("Accept", err)
return
}
// coverage test
s.(*kcp.UDPSession).SetReadBuffer(4 * 1024 * 1024)
s.(*kcp.UDPSession).SetWriteBuffer(4 * 1024 * 1024)
go handleEcho(s.(*kcp.UDPSession))
}
}()
}
func main() {
echoServer()
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT)
sig := <-ch
log.Println("get signal", sig)
}
package main
import (
"bytes"
"fmt"
"github.com/xtaci/kcp-go"
"io"
"log"
"time"
)
const serverPortEcho = "127.0.0.1:8081"
const N = 100
func dialEcho() (*kcp.UDPSession, error) {
conn, err := kcp.Dial(serverPortEcho)
if err != nil {
fmt.Println(err)
panic(err)
}
return conn.(*kcp.UDPSession), err
}
func test1(){
sess, err := dialEcho()
if err != nil {
panic(err)
}
sess.SetWindowSize(4096, 4096)
sess.SetWriteDelay(true)
sess.SetACKNoDelay(false)
// NoDelay options
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
sess.SetNoDelay(1, 100, 2, 0)
for i := 0; i < N; i++ {
time.Sleep(1 * time.Second)
data := time.Now().String()
sess.Write([]byte(data))
buf := make([]byte, len(data))
if n, err := io.ReadFull(sess, buf); err == nil {
log.Println("got len of(data)", n, data)
if string(buf[:n]) != data {
log.Println("不一致", n, len([]byte(data)))
}
} else {
panic(err)
}
}
time.Sleep(1 * time.Second)
sess.Close()
}
func test2(){
sess, err := dialEcho()
if err != nil {
panic(err)
}
sess.SetWindowSize(4096, 4096)
sess.SetWriteDelay(true)
sess.SetACKNoDelay(false)
// NoDelay options
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
sess.SetNoDelay(1, 100, 2, 0)
var buffer bytes.Buffer
for i:=0;i< 1000;i++{
buffer.WriteString(fmt.Sprintf("%5d", i))
}
bt := buffer.Bytes()
for i := 0; i < 1; i++ {
sess.Write(bt)
buf := make([]byte, len(bt))
if n, err := io.ReadFull(sess, buf); err == nil {
log.Println("got len of(data)", n, buffer.String())
if string(buf[:n]) != buffer.String() {
log.Println("不一致", n, len(bt))
}
} else {
panic(err)
}
}
time.Sleep(10 * time.Second)
sess.Close()
}
func main() {
// 测试小包
//test1()
// 测试拆包的情况
test2()
}
读者还可以使用chosen0ne/kcp-dissector-plugin 配置wireshark来进行抓包
抓包的过程,萌叔发现UDP的数据部分,可能包含多个KCP包,如上图,同时有ACK
和PSH