Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

1. 前言

KCP协议是一种快速可靠传输ARQ(Automatic Repeat-reQuest)协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。它跟QUIC协议一样也是基于UDP协议的实现。KCP从TCP协议中借鉴了大量的思路,是理解TCP/IP协议栈的非常好的资料。
补充说明下,无论是QUIC还是KCP只有在弱网络条件下,相比TCP才有优势。

KCP协议在网络分层模型的位置

+-----------------+
| SESSION         |
+-----------------+
| KCP(ARQ)        |
+-----------------+
| FEC(OPTIONAL)   |
+-----------------+
| CRYPTO(OPTIONAL)|
+-----------------+
| UDP(PACKET)     |
+-----------------+
| IP              |
+-----------------+
| LINK            |
+-----------------+
| PHY             |
+-----------------+

KCP的设计者有意识的把KCP依赖的网络通讯给解耦了

纯算法实现,并不负责底层协议(如UDP)的收发,需要使用者自己定义下层数据包的发送方式,以 callback的方式提供给 KCP。

如果读者阅读 skywind3000/kcp 源码,可能会发现如下代码
test.cpp

// 创建模拟网络:丢包率10%,Rtt 60ms~125ms
vnet = new LatencySimulator(10, 60, 125);

测试和验证是不需要再真实网络上进行。
skywind3000/kcp 只是设计了最初的协议, 包括

  • 非延迟ACK
  • 快速重传(TCP协议也有)
  • 非退让流控(拥塞控制,和TCP实现类同)

xtaci/kcp-go 在此基础上,又增加了

  • 加密机制
  • FEC(Forward Error Correction)前向纠错

PS: skywind3000xtaci其实是大学同学

萌叔的这个系列只打算探讨标准的KCP协议实现

2. KCP协议简介

0               4   5   6       8 (BYTE)
+---------------+---+---+-------+
|     conv      |cmd|frg|  wnd  |
+---------------+---+---+-------+   8
|     ts        |     sn        |
+---------------+---------------+  16
|     una       |     len       |
+---------------+---------------+  24
|                               |
|        DATA (optional)        |
|                               |
+-------------------------------+

conv:连接号。UDP是无连接的,conv用于表示来自于哪个客户端。对连接的一种替代, 因为有conv, 所以KCP也是支持多路复用的
cmd:命令类型,只有四种

frg:分片,用户数据可能会被分成多个KCP包,发送出去
xtaci/kcp-go的实现中,这个字段始终为0,以及没有意义了, 详见issues/121
wnd:接收窗口大小,发送方的发送窗口不能超过接收方给出的数值, (其实是接收窗口的剩余大小,这个大小是动态变化的)
ts: 时间序列
sn: 序列号
una:下一个可接收的序列号。其实就是确认号,收到sn=10的包,una为11
len:数据长度(DATA的长度)
data:用户数据

CMD四种类型

cmd 作用 备注
IKCP_CMD_PUSH 数据推送命令
IKCP_CMD_ACK 确认命令
IKCP_CMD_WASK 接收窗口大小询问命令
IKCP_CMD_WINS 接收窗口大小告知命令

IKCP_CMD_PUSHIKCP_CMD_ACK 关联
IKCP_CMD_WASKIKCP_CMD_WINS 关联

读者可以看出KCP协议本身还是比QUIC简单了不少

3. 示例

kcp_server.go

package main

import (
    "github.com/xtaci/kcp-go"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
)

const portEcho = "127.0.0.1:8081"
func listenEcho() (net.Listener, error) {
    return kcp.Listen(portEcho)
}

func handleEcho(sess *kcp.UDPSession) {
    sess.SetWindowSize(4096, 4096)
    sess.SetWriteDelay(true)
    sess.SetACKNoDelay(false)
    // NoDelay options
    // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
    // nodelay: 0:disable(default), 1:enable
    // interval: internal update timer interval in millisec, default is 100ms
    // resend: 0:disable fast resend(default), 1:enable fast resend
    // nc: 0:normal congestion control(default), 1:disable congestion control
    sess.SetNoDelay(1, 100, 2, 0)


    for {
        buf := make([]byte, 65536)
        n, err := sess.Read(buf)
        if err != nil {
            panic(err)
        }
        sess.Write(buf[:n])
    }
}

func echoServer() {
    l, err := listenEcho()
    if err != nil {
        log.Println("listenEcho", err)
        panic(err)
    }

    go func() {
        kcplistener := l.(*kcp.Listener)
        kcplistener.SetReadBuffer(4 * 1024 * 1024)
        kcplistener.SetWriteBuffer(4 * 1024 * 1024)
        kcplistener.SetDSCP(46)
        for {
            s, err := l.Accept()
            if err != nil {
                log.Println("Accept", err)
                return
            }

            // coverage test
            s.(*kcp.UDPSession).SetReadBuffer(4 * 1024 * 1024)
            s.(*kcp.UDPSession).SetWriteBuffer(4 * 1024 * 1024)
            go handleEcho(s.(*kcp.UDPSession))
        }
    }()
}

func main() {
    echoServer()
    ch := make(chan os.Signal)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT)
    sig := <-ch
    log.Println("get signal", sig)
}

kcp_client.go

package main

import (
    "bytes"
    "fmt"
    "github.com/xtaci/kcp-go"
    "io"
    "log"
    "time"
)

const serverPortEcho = "127.0.0.1:8081"
const N = 100

func dialEcho() (*kcp.UDPSession, error) {
    conn, err := kcp.Dial(serverPortEcho)
    if err != nil {
        fmt.Println(err)
        panic(err)
    }

    return conn.(*kcp.UDPSession), err
}

func test1(){
    sess, err := dialEcho()
    if err != nil {
        panic(err)
    }
    sess.SetWindowSize(4096, 4096)
    sess.SetWriteDelay(true)
    sess.SetACKNoDelay(false)
    // NoDelay options
    // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
    // nodelay: 0:disable(default), 1:enable
    // interval: internal update timer interval in millisec, default is 100ms
    // resend: 0:disable fast resend(default), 1:enable fast resend
    // nc: 0:normal congestion control(default), 1:disable congestion control
    sess.SetNoDelay(1, 100, 2, 0)


    for i := 0; i < N; i++ {
        time.Sleep(1 * time.Second)
        data := time.Now().String()
        sess.Write([]byte(data))
        buf := make([]byte, len(data))
        if n, err := io.ReadFull(sess, buf); err == nil {
            log.Println("got len of(data)", n, data)
            if string(buf[:n]) != data {
                log.Println("不一致", n, len([]byte(data)))
            }
        } else {
            panic(err)
        }

    }
    time.Sleep(1 * time.Second)
    sess.Close()
}


func test2(){
    sess, err := dialEcho()
    if err != nil {
        panic(err)
    }
    sess.SetWindowSize(4096, 4096)
    sess.SetWriteDelay(true)
    sess.SetACKNoDelay(false)
    // NoDelay options
    // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
    // nodelay: 0:disable(default), 1:enable
    // interval: internal update timer interval in millisec, default is 100ms
    // resend: 0:disable fast resend(default), 1:enable fast resend
    // nc: 0:normal congestion control(default), 1:disable congestion control
    sess.SetNoDelay(1, 100, 2, 0)

    var buffer bytes.Buffer
    for i:=0;i< 1000;i++{
        buffer.WriteString(fmt.Sprintf("%5d", i))
    }

    bt := buffer.Bytes()

    for i := 0; i < 1; i++ {
        sess.Write(bt)
        buf := make([]byte, len(bt))
        if n, err := io.ReadFull(sess, buf); err == nil {
            log.Println("got len of(data)", n, buffer.String())
            if string(buf[:n]) != buffer.String() {
                log.Println("不一致", n, len(bt))
            }
        } else {
            panic(err)
        }

    }
    time.Sleep(10 * time.Second)
    sess.Close()
}

func main() {
    // 测试小包
    //test1()
    // 测试拆包的情况
    test2()
}

读者还可以使用chosen0ne/kcp-dissector-plugin 配置wireshark来进行抓包
此处输入图片的描述
抓包的过程,萌叔发现UDP的数据部分,可能包含多个KCP包,如上图,同时有ACKPSH

参考资料

  1. kcp–wiki
  2. KCP协议简介

请我喝瓶饮料

微信支付码

发表评论

电子邮件地址不会被公开。

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据