聊聊Raft的一个实现(2)-日志提交

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 在我的上一篇文章聊聊Raft的一个实现(1),我简要的介绍了 goraft/raftd。这篇文章我将结合goraft的实现,来聊聊raft中的一些场景 2. 场景1-正常的执行1条WriteCommand命令 在上一篇文章,我们已经提到WriteCommand和NOPCommand、JoinCommand一样,对goraft而言都是LogEntry, 执行它时,这条命令会被分发到整个Cluster,让我们看看其中的详细过程 当前我们有3个node |节点|state|name|connectionString|term|lastLogIndex|commitIndex| |:—|:—|:—|:—|:—|:—| |node1|leader|2832bfa|localhost:4001|17|26|26| |node2|follower|3320b68|localhost:4002|17|26|26| |node3|follower|7bd5bdc|localhost:4003|17|26|26| 从上表可以看出整个集群处于完全一致的状态,我们开始执行WriteCommand step1 client:通过API提交WriteCommand命令 curl -XPOST http://localhost:4001/db/aaa -d 'bbb' step2 node1:收到指令后,生成LogEntry 写入logfile (磁盘文件) 2)添加到Log.entries (内存) step3 node1:等待Heartbeat(周期性由leader发往其它每个node1和node2), 把LogEntry带给其它node(这里的node1,node2状态相同,所以AppendEntriesRequest是一样的) AppendEntriesRequest { "Term": 17, "PrevLogIndex": 26, "PrevLogTerm": 17, "CommitIndex": 26, "LeaderName": "2832bfa", "Entries": [{ "Index": 27, "Term": 17, "CommandName": "write", "Command": "eyJrZXkiOiJhYWEiLCJ2YWx1ZSI6ImJiYiJ9Cg==" }] } 这里对PrevLogIndex做下简单的解释,PrevLogIndex表示的是leader所认为的follower与leader保持一致的最后一个日志index。PrevLogTerm是与PrevLogIndex对应的term。 Command做了base64编码解码后 { "key": "aaa", "value": "bbb" } 现在解释下上面的AppendEntriesRequest,node1(leader)告诉node2(follower) 如果LogIndex 26, 咱们是一致的, 那么Append LogIndex 27 ...

November 29, 2018 · 2 min

聊聊Raft的一个实现(1)--goraft

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 最近花了不少时间来学习raft相关的知识,写这篇文章的目的 1)为了对raft中让我困惑的问题进行总结 为了对其它人的阅读便利,提供必要的帮助 网上raft的实现不少,但能够独立运行的样本工程实在不多。我选择的是 goraft/raftd raft的paper是为数不多的连实现都写的比较清楚的文档,因此在阅读本文前,请确保你已经大致了解过raft的协议 2. 概述 goraft/raftd 依赖 goraft/raft, goraft/raft是分布式一致性算法raft的实现,而goraft/raftd 已经可以称之最简单的分布式key-value数据库(服务)了 聊到一个服务,我们最常用的手法, 看它的接口。 看它的数据如何存储,包括内部的数据结构,数据如何落地 2.1 接口 接口大致可以分为2组 2.1.1 HTTP之上RPC协议 这其实goraft的作者对raft协议所要求的几个RPC的实现,包括 AppendEntries RPC RequestVote RPC Snapshot RPC (非raft协议要求的) SnapshotRecovery RPC (非raft协议要求的) 2.1.2 普通的HTTP API POST /db/{key} 写入key-value curl -XPOST localhost:4001/db/foo -d 'bar' GET /db/{key} 读取key对应的value curl localhost:4001/db/foo POST /join 把某个节点加入集群 curl http://localhost:4001/join -d '{"name":"5f2ac6d","connectionString":"http://localhost:4001"}' 这里有3个有趣的事情 1)这个节点可以是虚假的节点 2)由于JoinCommand 本身也是一条LogEntry, 所以当这条消息被发给Leader时,Cluster中的所有成员都会知道有一个新节点的加入(加入 raft.Server.peers) 3) 1个节点掉线后,它不会从raft.Server.peers移除 为了便于观察raft.Server的内部状态 我又扩充了3个接口 传送门vearne/raftd ...

November 28, 2018 · 2 min

简单的GOLANG 协程池2 (带Cancel功能)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 有些朋友可能看我的这篇文章 简单的GOLANG 协程池 这是我做的简单协程池,功能还算完备,但是有些任务执行的时间很长,如果想在协程池运行起来以后就退出,就只能死等。这是非常不友好的,因此我又写了一个新的协程池GContextPool,利用context支持在任务运行过程中,结束任务。原来的GPool仍然可以继续使用。 下面我们来看看用法。 1. 安装 go get github.com/vearne/golib/utils 2. 使用 2.1 创建协程池 // 设定协程池的中协程的数量是30个 cxt, cancel := context.WithTimeout(context.Background(), 1 * time.Second) defer cancel() var p *utils.GContextPool = utils.NewGContextPool(ctx, 30) 2.2 定义任务处理函数 任务处理函数形如 type JobContextFunc func(ctx context.Context, key interface{}) *GPResult 执行任务 GContextPool.ApplyAsync(f JobContextFunc, slice []interface{}) f JobContextFunc 是目标函数 slice []interface{} 任务参数列表 示例 ctx_pool.go package main import ( "context" "fmt" "github.com/vearne/golib/utils" "log" "strconv" "time" ) func JudgeStrWithContext2(ctx context.Context, key interface{}) *utils.GPResult { num, _ := strconv.Atoi(key.(string)) result := &utils.GPResult{} var canceled bool = false for i := 0; i < 60; i++ { select { case <-ctx.Done(): canceled = true result.Value = false result.Err = fmt.Errorf("normal termination") default: time.Sleep(time.Millisecond * 50) } } if !canceled { if num < 450 { result.Value = true } else { result.Value = false } } return result } func main() { cxt, cancel := context.WithTimeout(context.Background(), 1 * time.Second) defer cancel() p := utils.NewGContextPool(cxt,30) slice := make([]interface{}, 0) for i := 0; i < 1000; i++ { slice = append(slice, strconv.Itoa(i)) } result := make([]*utils.GPResult, 0, 10) trueCount := 0 falseCount := 0 start := time.Now() for item := range p.ApplyAsync(JudgeStrWithContext2, slice) { result = append(result, item) if item.Err!= nil{ //log.Println("cancel", item.Err) continue } if item.Value.(bool) { trueCount++ } else { falseCount++ } } log.Printf("cancel, %v, true:%v, false:%v, cost:%v\n", len(result), trueCount, falseCount, time.Since(start)) } 请我喝瓶饮料

November 19, 2018 · 2 min

聊聊go-redis的一些高级用法

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 说到Golang的Redis库,用到最多的恐怕是 redigo 和 go-redis。其中 redigo 不支持对集群的访问。 本文想聊聊go-redis 2个高级用法 2. 开启对Cluster中Slave Node的访问 在一个负载比较高的Redis Cluster中,如果允许对slave节点进行读操作将极大的提高集群的吞吐能力。 开启对Slave 节点的访问,受以下3个参数的影响 type ClusterOptions struct { // Enables read-only commands on slave nodes. ReadOnly bool // Allows routing read-only commands to the closest master or slave node. // It automatically enables ReadOnly. RouteByLatency bool // Allows routing read-only commands to the random master or slave node. // It automatically enables ReadOnly. RouteRandomly bool ... } go-redis 选择节点的逻辑如下 ...

November 18, 2018 · 2 min

openresty写入用户唯一标识(cookie)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 最近打算在blog里面,增加个性化推荐,以增加访问量。这一切的前提是,我要能够标识出每个用户,以及记录用户的浏览记录。这里笔者采用openresty在cookie中写入用户标识,然后在日志中记录下用户的浏览记录,供后续分析使用 2. 详解 1)nginx 配置 upstream blog { server 127.0.0.1:8080; } server { listen 80; #端口 server_name blog.vearne.cc; location ^~ /archives/ { rewrite_by_lua_file 'conf/lua/set-ck.lua'; proxy_pass http://blog; } location /{ proxy_pass http://blog; } } 2)Lua脚本 set-ck.lua 如果用户没有对应的cookie "UT_ID", 则注入cookie的同时,返回一个302临时跳转, 客户再次访问时,由于已经拥有响应cookie,所以可以直接访问后端服务 local cookie_name = "cookie_UT_ID" local request_uri = ngx.var.request_uri if ngx.var[cookie_name] == nil then local uuid = io.open("/proc/sys/kernel/random/uuid", "r"):read() local mycookie = string.format("UT_ID=%s; Expires=%s", uuid, ngx.cookie_time(ngx.time() + 86400 * 1000)) ngx.header["Set-Cookie"] = mycookie ngx.header["Location"] = request_uri ngx.exit(302) end 这里另一个有趣的地方是,可以直接通过读取文件 "/proc/sys/kernel/random/uuid" 来生成一个uuid ...

November 5, 2018 · 1 min

Golang标准库的读写文件,没有开启用户空间文件缓冲区?

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 大家都知道写文件时数据流转的顺序是 用户空间文件缓冲区 -> 内核空间文件缓冲区 -> 内核空间IO队列 默认的ANSI C库,对用户空间文件缓冲区有三种方式 全缓冲 行缓冲 无缓冲 难道Golang没有?笔者不敢断定,但有以下的对比试验或许能说明些问题 write.go package main import ( "os" "log" "time" ) func main() { file, err := os.OpenFile("test.txt", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) if err != nil { log.Fatal(err) } for i := 0; i < 1000; i++ { time.Sleep(2 * time.Second) file.Write([]byte("xxxx")) } file.Close() } write2.go write2.go 使用了标准库提供的bufio, 给写文件增加了4096byte的缓冲区 package main import ( "os" "log" "time" "bufio" ) func main(){ file, err := os.OpenFile("test.txt", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) writer := bufio.NewWriterSize(file, 4096) if err!= nil{ log.Fatal(err) } for i:=0;i<1000;i++{ time.Sleep(2 * time.Second) writer.Write([]byte("xxxx")) } writer.Flush() file.Close() } 使用strace跟踪系统调用, 在没有使用bufio的情况如下 ...

November 2, 2018 · 2 min

聊聊几种传文件的方式

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 聊聊几种服务器之间传文件的方式 1. 使用scp命令 scp [-1246BCpqrv] [-c cipher] [-F ssh_config] [-i identity_file] [-l limit] [-o ssh_option] [-P port] [-S program] [[user@]host1:]file1 ... [[user@]host2:]file2 DESCRIPTION :scp copies files between hosts on a network. scp 支持传输单个文件和文件夹 scp /home/test/1.mp3 root@192.168.8.100:/home/root/music 2. 使用python 可以直接使用下面命令,启动简单的文件服务器(单线程) python2 python -m SimpleHTTPServer 8080 python3 python3 -m http.server --cgi 8080 下载 wget http://192.168.10.100:8080/tt.png 注意 请自行替换服务地址和文件名 3. 使用nc命令 接收端 nc -l 8080 > tt.png 发送端 nc 192.168.10.100 8080 < tt.png 注意 请自行替换服务地址和文件名 ...

October 31, 2018 · 1 min

关联规则算法-Eclat

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 最近打算给我的博客做个性化推荐,初步选定使用关联规则算法来实现。 常见关联规则算法又有Apriori算法、FP-树频集算法、Eclat算法 3种。 请务必阅读参考资料1,了解Support(支持度), Confidence(置信度), Lift(提升度) Association rules are usually required to satisfy a user-specified minimum support and a user-specified minimum confidence at the same time. Association rule generation is usually split up into two separate steps: A minimum support threshold is applied to find all frequent itemsets in a database. A minimum confidence constraint is applied to these frequent itemsets in order to form rules. While the second step is straightforward, the first step needs more attention. ...

October 23, 2018 · 1 min

gin 统计请求状态信息

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 引言 gin-gonic/gin 是Golang API 开发中最常用到的web框架。我们可以轻松的写一个gin的中间件获取HTTP的状态码, 然后暴露给phrometheus。但是如果我想获取的body体中的错误码呢? 2. 官方的例子 package main import ( "time" "log" "github.com/gin-gonic/gin" ) func Logger() gin.HandlerFunc { return func(c *gin.Context) { t := time.Now() // Set example variable c.Set("example", "12345") // before request c.Next() // after request latency := time.Since(t) log.Println("latency", latency) // access the status we are sending status := c.Writer.Status() log.Println("status_code", status) } } func main() { r := gin.New() r.Use(Logger()) r.GET("/test", func(c *gin.Context) { example := c.MustGet("example").(string) // it would print: "12345" log.Println(example) }) // Listen and serve on 0.0.0.0:8080 r.Run(":8080") } 3. 读取body中的错误码 假定我们的服务在请求处理失败的情况下,返回如下结构体 ...

October 16, 2018 · 2 min

玩转高性能日志库zap(4)--自定义日志格式

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 uber开源的高性能日志库zap, 除了性能远超logrus之外,还有很多诱人的功能,比如支持数据采样、支持通过HTTP服务动态调整日志级别。 前些天有同事问我,zap默认打印的日志格式,我不喜欢,可否自定义一下。 这个是日志库应该有的功能,zap怎么可能不支持。 下面我们来谈谈如何自定义日志格式,在zap中,它被称为Encoder, 在日志库中logrus,被做Formatter 2. 默认格式 目前 zap只支持2种日志格式 2.1 console 1.5392310605133479e+09 info golab/test_zap.go:34 Info log {"name": "buick2008", "age": 15} 2.2 json {"level":"info","ts":1539234945.009923,"caller":"golab/test_zap.go:34","msg":"Info log","name":"buick2008","age":15} 3. 实现 为了实现自定义的Encoder 需要实现 zapcore.Encoder zapcore.PrimitiveArrayEncoder 所定义的接口 大部分接口实现可以照抄zapcore.jsonEncoder的代码实现 真正需要改动的是 EncodeEntry(ent zapcore.Entry, fields [] zapcore.Field) (*buffer.Buffer, error) 注册Encoder // 注册Encoder zap.RegisterEncoder("console2", MyEncoding) 用法示例 package main import ( "fmt" "go.uber.org/zap" "go.uber.org/zap/zapcore" "time" "github.com/vearne/golab/myencoder" ) func MyEncoding(config zapcore.EncoderConfig) (zapcore.Encoder, error) { return myencoder.NewConsole2Encoder(config, true), nil } func main() { // 注册一个Encoder zap.RegisterEncoder("console2", MyEncoding) // 默认是Info级别 logcfg := zap.NewProductionConfig() // 启用自定义的Encoding logcfg.Encoding = "console2" logger, err := logcfg.Build() if err != nil { fmt.Println("err", err) } defer logger.Sync() for i := 0; i < 3; i++ { time.Sleep(1 * time.Second) logger.Info("some message", zap.String("name", "buick2008"), zap.Int("age", 15)) } } 打印出的日志效果 ...

October 11, 2018 · 1 min