版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

1. 前言

KCP协议是一种快速可靠传输ARQ(Automatic Repeat-reQuest)协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。它跟QUIC协议一样也是基于UDP协议的实现。KCP从TCP协议中借鉴了大量的思路,是理解TCP/IP协议栈的非常好的资料。
补充说明下,无论是QUIC还是KCP只有在弱网络条件下,相比TCP才有优势。

KCP协议在网络分层模型的位置

+-----------------+
| SESSION         |
+-----------------+
| KCP(ARQ)        |
+-----------------+
| FEC(OPTIONAL)   |
+-----------------+
| CRYPTO(OPTIONAL)|
+-----------------+
| UDP(PACKET)     |
+-----------------+
| IP              |
+-----------------+
| LINK            |
+-----------------+
| PHY             |
+-----------------+

KCP的设计者有意识的把KCP依赖的网络通讯给解耦了

纯算法实现,并不负责底层协议(如UDP)的收发,需要使用者自己定义下层数据包的发送方式,以 callback的方式提供给 KCP。

如果读者阅读 skywind3000/kcp 源码,可能会发现如下代码 test.cpp

// 创建模拟网络:丢包率10%,Rtt 60ms~125ms
vnet = new LatencySimulator(10, 60, 125);

测试和验证是不需要再真实网络上进行。 skywind3000/kcp 只是设计了最初的协议, 包括

  • 非延迟ACK
  • 快速重传(TCP协议也有)
  • 非退让流控(拥塞控制,和TCP实现类同)

xtaci/kcp-go 在此基础上,又增加了

  • 加密机制
  • FEC(Forward Error Correction)前向纠错

PS: skywind3000xtaci其实是大学同学

萌叔的这个系列只打算探讨标准的KCP协议实现

2. KCP协议简介

0               4   5   6       8 (BYTE)
+---------------+---+---+-------+
|     conv      |cmd|frg|  wnd  |
+---------------+---+---+-------+   8
|     ts        |     sn        |
+---------------+---------------+  16
|     una       |     len       |
+---------------+---------------+  24
|                               |
|        DATA (optional)        |
|                               |
+-------------------------------+

conv:连接号。UDP是无连接的,conv用于表示来自于哪个客户端。对连接的一种替代, 因为有conv, 所以KCP也是支持多路复用的cmd:命令类型,只有四种

frg:分片,用户数据可能会被分成多个KCP包,发送出去 在xtaci/kcp-go的实现中,这个字段始终为0,以及没有意义了, 详见issues/121 wnd:接收窗口大小,发送方的发送窗口不能超过接收方给出的数值, (其实是接收窗口的剩余大小,这个大小是动态变化的) ts: 时间序列 sn: 序列号 una:下一个可接收的序列号。其实就是确认号,收到sn=10的包,una为11 len:数据长度(DATA的长度) data:用户数据

CMD四种类型

cmd作用备注
IKCP_CMD_PUSH数据推送命令
IKCP_CMD_ACK确认命令
IKCP_CMD_WASK接收窗口大小询问命令
IKCP_CMD_WINS接收窗口大小告知命令

IKCP_CMD_PUSHIKCP_CMD_ACK 关联
IKCP_CMD_WASKIKCP_CMD_WINS 关联

读者可以看出KCP协议本身还是比QUIC简单了不少

3. 示例

kcp_server.go

package main

import (
	"github.com/xtaci/kcp-go"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"
)

const portEcho = "127.0.0.1:8081"
func listenEcho() (net.Listener, error) {
	return kcp.Listen(portEcho)
}

func handleEcho(sess *kcp.UDPSession) {
	sess.SetWindowSize(4096, 4096)
	sess.SetWriteDelay(true)
	sess.SetACKNoDelay(false)
	// NoDelay options
	// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
	// nodelay: 0:disable(default), 1:enable
	// interval: internal update timer interval in millisec, default is 100ms
	// resend: 0:disable fast resend(default), 1:enable fast resend
	// nc: 0:normal congestion control(default), 1:disable congestion control
	sess.SetNoDelay(1, 100, 2, 0)


	for {
		buf := make([]byte, 65536)
		n, err := sess.Read(buf)
		if err != nil {
			panic(err)
		}
		sess.Write(buf[:n])
	}
}

func echoServer() {
	l, err := listenEcho()
	if err != nil {
		log.Println("listenEcho", err)
		panic(err)
	}

	go func() {
		kcplistener := l.(*kcp.Listener)
		kcplistener.SetReadBuffer(4 * 1024 * 1024)
		kcplistener.SetWriteBuffer(4 * 1024 * 1024)
		kcplistener.SetDSCP(46)
		for {
			s, err := l.Accept()
			if err != nil {
				log.Println("Accept", err)
				return
			}

			// coverage test
			s.(*kcp.UDPSession).SetReadBuffer(4 * 1024 * 1024)
			s.(*kcp.UDPSession).SetWriteBuffer(4 * 1024 * 1024)
			go handleEcho(s.(*kcp.UDPSession))
		}
	}()
}

func main() {
	echoServer()
	ch := make(chan os.Signal)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT)
	sig := <-ch
	log.Println("get signal", sig)
}

kcp_client.go

package main

import (
	"bytes"
	"fmt"
	"github.com/xtaci/kcp-go"
	"io"
	"log"
	"time"
)

const serverPortEcho = "127.0.0.1:8081"
const N = 100

func dialEcho() (*kcp.UDPSession, error) {
	conn, err := kcp.Dial(serverPortEcho)
	if err != nil {
		fmt.Println(err)
		panic(err)
	}

	return conn.(*kcp.UDPSession), err
}

func test1(){
	sess, err := dialEcho()
	if err != nil {
		panic(err)
	}
	sess.SetWindowSize(4096, 4096)
	sess.SetWriteDelay(true)
	sess.SetACKNoDelay(false)
	// NoDelay options
	// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
	// nodelay: 0:disable(default), 1:enable
	// interval: internal update timer interval in millisec, default is 100ms
	// resend: 0:disable fast resend(default), 1:enable fast resend
	// nc: 0:normal congestion control(default), 1:disable congestion control
	sess.SetNoDelay(1, 100, 2, 0)


	for i := 0; i < N; i++ {
		time.Sleep(1 * time.Second)
		data := time.Now().String()
		sess.Write([]byte(data))
		buf := make([]byte, len(data))
		if n, err := io.ReadFull(sess, buf); err == nil {
			log.Println("got len of(data)", n, data)
			if string(buf[:n]) != data {
				log.Println("不一致", n, len([]byte(data)))
			}
		} else {
			panic(err)
		}

	}
	time.Sleep(1 * time.Second)
	sess.Close()
}


func test2(){
	sess, err := dialEcho()
	if err != nil {
		panic(err)
	}
	sess.SetWindowSize(4096, 4096)
	sess.SetWriteDelay(true)
	sess.SetACKNoDelay(false)
	// NoDelay options
	// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
	// nodelay: 0:disable(default), 1:enable
	// interval: internal update timer interval in millisec, default is 100ms
	// resend: 0:disable fast resend(default), 1:enable fast resend
	// nc: 0:normal congestion control(default), 1:disable congestion control
	sess.SetNoDelay(1, 100, 2, 0)

	var buffer bytes.Buffer
	for i:=0;i< 1000;i++{
		buffer.WriteString(fmt.Sprintf("%5d", i))
	}

	bt := buffer.Bytes()

	for i := 0; i < 1; i++ {
		sess.Write(bt)
		buf := make([]byte, len(bt))
		if n, err := io.ReadFull(sess, buf); err == nil {
			log.Println("got len of(data)", n, buffer.String())
			if string(buf[:n]) != buffer.String() {
				log.Println("不一致", n, len(bt))
			}
		} else {
			panic(err)
		}

	}
	time.Sleep(10 * time.Second)
	sess.Close()
}

func main() {
	// 测试小包
	//test1()
	// 测试拆包的情况
	test2()
}

读者还可以使用chosen0ne/kcp-dissector-plugin 配置wireshark来进行抓包 此处输入图片的描述 抓包的过程,萌叔发现UDP的数据部分,可能包含多个KCP包,如上图,同时有ACKPSH

参考资料

  1. kcp–wiki
  2. KCP协议简介

请我喝瓶饮料

微信支付码