Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

1. 前言

分布式锁的场景,大家应该都有遇到过。比如对可靠性有较高要求的系统中,我们需要做主备切换。这时我们可以利用分布式锁,来做选主动作,抢到锁作为主,执行对应的任务,剩余的实例作为备份

redis和zookeeper都可以用来做分布式锁,典型的如redis,可以使用SETNX命令来实现分布式锁。本文将介绍基于consul的分布式锁实现

2. 例子

测试例子
test_lock.go

package main

import (
    "github.com/hashicorp/consul/api"
    "log"
    "strconv"
    "sync"
    "time"
)

func main() {
    wg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go tryLock("mylock", "session"+strconv.Itoa(i), wg)
    }
    wg.Wait()

}

func tryLock(key string, sessionName string, wg *sync.WaitGroup) {
    defer wg.Done()
    // Get a new client
    config := &api.Config{
        Address: "dev1:8500",
        Scheme:  "http",
    }

    client, err := api.NewClient(config)
    if err != nil {
        panic(err)
    }

    opts := &api.LockOptions{
        Key:         key,
        SessionName: sessionName,
    }

    lock, err := client.LockOpts(opts)
    log.Println(sessionName, "try to get lock obj")
    for i := 0; i < 3; i++ {
        leaderCh, err := lock.Lock(nil)
        if err != nil {
            log.Println("err", err, sessionName)
        }
        if leaderCh == nil{
            log.Println("err", err, sessionName)
            continue
        }
        log.Println(sessionName, "lock and sleep")
        time.Sleep(5 * time.Second)
        err = lock.Unlock()
        if err != nil {
            log.Fatal("err", err)
        }
        log.Println(sessionName, "unlock")
        time.Sleep(5 * time.Second)
    }
}

3. 原理

image_1d9peo6elqmc1ia7v31t79ect9.png-31.8kB
consul中锁的主要是依赖KV Store和Session相关API

3.1 创建session

PUT /v1/session/create 创建1个session

3.2 加锁

PUT /v1/kv/{key}?acquire={sessionId}

这里的key是锁的名称
如果成功加锁,则consul返回true, 否则返回false
注意:获取失败,接口并不阻塞,如果想要加锁,需要再次发起请求
可以使用 GET /v1/kv/{key} 获取锁信息

[
    {
        "LockIndex": 91,
        // 锁的名称
        "Key": "mylock",
        "Flags": 3304740253564472344,
        "Value": null,
        // sessionId
        // 从这里可以看出当前是那个session持有锁
        "Session": "c090b464-23f3-bce1-d999-6163ba6eb91f",
        "CreateIndex": 2588219,
        "ModifyIndex": 2590269
    }
]

3.3 保持会话

PUT /v1/session/renew/{sessionId}

锁是有生命周期,它的生命周期是与session的生命周期一致 因此对于锁的持有者,它需要周期性的执行renew session,以确保session关联的锁不被释放。
image_1d9pg08nb10hudmt96bo2i1maq16.png-70.6kB
hashicorp/consul/api的实现是 每隔TTL/2 执行1次renew

3.4 释放锁

有2种方式可以释放锁
1) 主动释放

PUT /v1/kv/{key}?release={sessionId}

2)被动释放(session超时或者被check为invalidate)
锁释放以后

[
    {
        "LockIndex": 109,
        "Key": "mylock",
        "Flags": 3304740253564472344,
        "Value": null,
        // 可以看到已经没有session信息里
        "CreateIndex": 2588219,
        "ModifyIndex": 2592871
    }
]

4. 注意

1个client释放锁之后,其它client无法立刻获得锁,这可能是由于lock-delay设置引起的。

The final nuance is that sessions may provide a lock-delay. This is a time duration, between 0 and 60 seconds. When a session invalidation takes place, Consul prevents any of the previously held locks from being re-acquired for the lock-delay interval; this is a safeguard inspired by Google's Chubby. The purpose of this delay is to allow the potentially still live leader to detect the invalidation and stop processing requests that may lead to inconsistent state. While not a bulletproof method, it does avoid the need to introduce sleep states into application logic and can help mitigate many issues. While the default is to use a 15 second delay, clients are able to disable this mechanism by providing a zero delay value.

为防止由于网络波动等原因,session的状态被错误的检查为invalidate 导致锁被释放。此时如果其它client需要加锁,则需要等待lock-delay,才能再次加锁成功。(主动释放没有这个问题)
可以将lock-delay设置成0,表示不启用lock-delay机制

后记

其实consul还支持信号量的玩法,也就是说可以有多个client持有锁,详情见参考资料4.

3. 参考资料

  1. 基于redis的分布式锁实现
  2. SETNX
  3. 基于Consul的分布式锁实现
  4. Semaphore with Consul

请我喝瓶饮料

微信支付码

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注