版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

1. 引言

Gossip是一种去中心化、容错并保证最终一致性的协议。它的基本思想是通过不断的和集群中的节点gossip交换信息,经过 O(log(n))个回合, gossip协议即可将信息传递到所有的节点。

这里介绍Gossip的一个实现库hashicorp/memberlist,并讲一下需要注意的事项。

2. hashicorp/memberlist

memberlist是HashiCorp公司开源的Gossip库,这个库被consul(也是HashiCorp公司开源的)所引用。
它是SWIM的一个扩展实现。

下面的例子test_gossip.go中它被用来做集群节点发现

package main

import (
    "flag"
    "fmt"
    "github.com/hashicorp/memberlist"
    // "net"
    "os"
    "strconv"
    "time"
)

var (
    bindPort = flag.Int("port", 8001, "gossip port")
)

func main() {
    flag.Parse()
    hostname, _ := os.Hostname()
    config := memberlist.DefaultLocalConfig()
    config.Name = hostname + "-" + strconv.Itoa(*bindPort)
    // config := memberlist.DefaultLocalConfig()
    config.BindPort = *bindPort
    config.AdvertisePort = *bindPort

    fmt.Println("config.DisableTcpPings", config.DisableTcpPings)
    fmt.Println("config.IndirectChecks", config.IndirectChecks)
    fmt.Println("config.RetransmitMult", config.RetransmitMult)

    fmt.Println("config.PushPullInterval", config.PushPullInterval)

    fmt.Println("config.ProbeInterval", config.ProbeInterval)

    fmt.Println("config.GossipInterval", config.GossipInterval)
    fmt.Println("config.GossipNodes", config.GossipNodes)

    fmt.Println("config.BindPort", config.BindPort)

    list, err := memberlist.Create(config)
    if err != nil {
        panic("Failed to create memberlist: " + err.Error())
    }

    // Join an existing cluster by specifying at least one known member.
    // 配置种子节点, 这里我直接写死了
    _, err = list.Join([]string{"127.0.0.1:8001", "127.0.0.1:8002"})
    fmt.Println("err", err)

    if err != nil {
        panic("Failed to join cluster: " + err.Error())
    }

    // Ask for members of the cluster
    for {
        fmt.Println("-------------start--------------")
        for _, member := range list.Members() {
            fmt.Printf("Member: %s %s\n", member.Name, member.Addr)
        }
        fmt.Println("-------------end--------------")
        time.Sleep(time.Second * 3)

    }

}

可以直接在单机上进行测试,启动

go run test_gossip.go --port 8001
go run test_gossip.go --port 8002
go run test_gossip.go --port 8003

程序会不断在窗口打印发现的所有集群节点, 形如

-------------start--------------
Member: MacBook-Pro.local-8002 192.168.13.110
Member: MacBook-Pro.local-8001 192.168.13.110
Member: MacBook-Pro.local-8003 192.168.13.110
-------------end--------------

如果需要做其它信息交换,请阅读参考资料2

3. 基本过程

3.1 状态

节点的state有3种
alive 节点是”活的”
suspect 对于PingMsg没有应答或应答超时,这个节点的状态是”可疑的”
dead 节点”已死亡”

此处输入图片的描述
1) 如果节点B无法被对节点A发出的PingMsg(这里是作者自定义的UDP协议,不是ICMP)进行响应,或者响应超时,它会被节点A标为suspect, 如果suspect持续一段时间(或它收到足够多的其它节点关于B的SuspectMsg),节点A会在集群中广播SuspectMsg,告知集群中的其它节点,节点B很可疑
2)如果B收到了针对它的SuspectMsg,这显然是对它的不利言论,B可以通过发送AliveMsg告知对方, “I’m alive”。那么在对方节点看来B的state从suspect变为alive
3) 如果一段时间内,B的状态仍然是suspect, 那么对节点A而言,B的状态会被置为dead
4) 如果节点B在down掉一段时间后,重新上线,它可以通过与种子节点的Gossip(push/pull) 重新被认为alive

3.2 动作

memberlist所有动作都可以在schedule()看到, 动作共有3种

state.go

// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
    m.tickerLock.Lock()
    defer m.tickerLock.Unlock()

    // If we already have tickers, then don't do anything, since we're
    // scheduled
    if len(m.tickers) > 0 {
        return
    }

    // Create the stop tick channel, a blocking channel. We close this
    // when we should stop the tickers.
    stopCh := make(chan struct{})

    // Create a new probeTicker
    if m.config.ProbeInterval > 0 {
        t := time.NewTicker(m.config.ProbeInterval)
        // 动作1
        go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
        m.tickers = append(m.tickers, t)
    }

    // Create a push pull ticker if needed
    if m.config.PushPullInterval > 0 {
        // 动作2
        go m.pushPullTrigger(stopCh)
    }

    // Create a gossip ticker if needed
    if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
        t := time.NewTicker(m.config.GossipInterval)
        // 动作3
        go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
        m.tickers = append(m.tickers, t)
    }

    // If we made any tickers, then record the stopTick channel for
    // later.
    if len(m.tickers) > 0 {
        m.stopTick = stopCh
    }
}

1) 动作1 probe

周期性的探测所有集群中状态为alivesuspect的节点

2) 动作2 pushpull

周期性的从已知的alive的集群节点中选1个节点进行push/pull交换信息
交换的信息包含2种
a) 集群信息
b) 用户自定义的状态信息,需要1个实现Delegate接口的struct(请阅读参考资料2)
config.go

type Config struct {
    // Delegate and Events are delegates for receiving and providing
    // data to memberlist via callback mechanisms. For Delegate, see
    // the Delegate interface. For Events, see the EventDelegate interface.
    //
    // The DelegateProtocolMin/Max are used to guarantee protocol-compatibility
    // for any custom messages that the delegate might do (broadcasts,
    // local/remote state, etc.). If you don't set these, then the protocol
    // versions will just be zero, and version compliance won't be done.

    Delegate                Delegate
    ... 
}
3) 动作3 gossip

不要被函数名欺骗了,其实它是广播所有处于dead的节点(只广播一次)

3.3 注意

这里需要说明的是
1)广播其实也是一种Gossip,发布者并不把消息发给集群中的每一个节点,而是随机挑选n个(默认是3个),将消息发送出去
2)处于dead状态的节点,仍然会被保留在集群信息中一段时间,以便Push/Pull的时候,这个状态能够被扩散出去

4. 关于配置种子节点的说明

看一种极端情况,集群中有A、B、C 3个几点
A为种子节点
B、C为普通节点

表格中是节点A、B、C在不同时刻看到的集群的情况
T1时刻, 节点A、B、C都认为集群中有3个节点
T2时刻, 节点A down了
T3时刻, 节点B、C都认为集群中有[B,C]2个节点
T4时刻, 节点A 重新上线,但是它所已知的集群就只有[A]1个节点,节点B、C 做push/pull的时候也只会在[B、C]中选取, 因此集群分裂了

要避免这种问题,方法是多配种子节点,只要确保即使有节点down机的情况下,只要所有的种子节点不同时down机即可。

其它的Gossip实现

  • Apache Gossip communicates using UDP written in Java, has support for arbitrary data and CRDT types.
  • gossip-python utilizes the TCP stack and it is possible to share data via the constructed network as well.
  • Smudge is written in Go and uses UDP to exchange status information; it also allows broadcasts of arbitrary data across the constructed network.

参考资料

  1. Gossip protocol
  2. gossip协议的原理和实战应用
  3. hashicorp/memberlist
  4. Gossip 协议简介
  5. THE SWIM MEMBERSHIP PROTOCOL

如果我的文章对你有帮助,你可以给我打赏以促使我拿出更多的时间和精力来分享我的经验和思考总结。

微信支付码

anyShare分享到:

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.