grpc的反射机制

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 读者们可能使用过 fullstorydev/grpcurl 来对grpc服务进行调用调试。 假定有一个简单的grpc的服务 main.go package main import ( "context" "log" "net" "google.golang.org/grpc" pb "google.golang.org/grpc/examples/helloworld/helloworld" "google.golang.org/grpc/reflection" ) const ( port = ":50051" ) // server is used to implement helloworld.GreeterServer. type server struct { pb.UnimplementedGreeterServer } // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetName()) return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() helloService := &server{} pb.RegisterGreeterServer(s, helloService) // 注册反射服务 // Register reflection service on gRPC server. reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 使用grpcurl来请求grpc服务 ...

May 11, 2021 · 2 min

一个关于go module的有趣话题

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 引言 假如你正在使用go mod管理某个项目的代码库依赖 case1: 某个项目的tag是超过v1版本的,你该怎么定义module case2: 某个项目要同时进行2个版本的开发,v2和v3 你该怎么定义module 2. 问题 为了验证效果,萌叔创建了项目 vearne/mod-multi-version 这个代码库只有一个文件 package mmv import "fmt" const Versoin = "v1.0.0" func PrintVersion(){ fmt.Println("version:", Versoin) } 其它项目使用vearne/mod-multi-version package main import ( "github.com/vearne/mod-multi-version" ) func main() { mmv.PrintVersion() } 代码库的tag列表中包含以下tag v1.0.0 v1.0.2 v2.0.1-Alpha v2.0.5 v2.0.6 v3.0.0 v3.0.1 但如果你试图拉取v2.0.1-Alpha ╰─$ go get github.com/vearne/mod-multi-version@v2.0.1-Alpha go get: github.com/vearne/mod-multi-version@v2.0.1-Alpha: invalid version: module contains a go.mod file, so major version must be compatible: should be v0 or v1, not v2 错误提示主版本号只能是v0或者v1 ...

February 25, 2021 · 1 min

聊聊k8s调试工具kt-connect的实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 引言 kt-connect是阿里开源的k8s的调试工具,它的作用类似于VPN,能够打通k8s集群和本地的网络。 传送门: alibaba/kt-connect 它有3种模式 Connect 本地网络直接访问k8s集群网络 Exchange 转发集群流量到本地 Service Mesh 支持 另外它提供了一个Dashboard可以查看k8s集群内的所有可访问的service资源以及正在进行调试的Connect和Exchange数量, 用处不大。 2. 使用介绍 这里萌叔只简单介绍Connect和Exchange2种模式,更详细的使用说明见参考资料1 2.1 Connect模式 sudo ktctl -i ik8share/kt-connect-shadow:stable connect 注意: kt-connect 依赖sshuttle, 且运行时必须拥有root权限。另外sshuttle 又依赖了iptables(linux操作系统), ptctl(macOS) -i 参数指定镜像的地址 这里ik8share/kt-connect-shadow:stable是镜像的名字,阿里默认提供的镜像地址rdc-incubator/kt-connect-shadow在萌叔的测试k8s集群中无法正常拉取。这里提供了一个docker hub的镜像地址。 2.2 Exchange模式 sudo ktctl -n test -i ik8share/kt-connect-shadow:stable exchange dm-backend-v0-0-1 --expose 3000 注意: 这里的dm-backend-v0-0-1 是k8s集群中Deployment资源的名称。 该命令会将所有发往dm-backend-v0-0-1所属Pod的3000端口的请求都转发到本地的3000端口上。 总结:Connect和Exchange 模式都是单向的,一个是从集群外部到集群内部,一个是从集群内部到集群外部。 3. 原理和实现 kt-connect设计巧妙,且最大限度的避免了重复发明轮子,值得称赞。 3.1 Connect模式 我们先来看看Connect模式要达到的目标 假定 Namespace: test Service: sv-backend-v0-0-1 Pod的IP: 172.20.1.29 ...

December 1, 2020 · 1 min

玩转KCP(3)-流量控制

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 KCP协议的很多东西都是脱胎于TCP协议,所以他们在思想和实现上是完全相通的。xtaci/kcp-go 包含FEC也不过4000多行代码,skywind3000/kcp 主要是C/C++的代码,也就2000多行,萌叔建议大家都去阅读下源码。 慢启动、拥塞避免、拥塞发生、快速重传,这些概念都非常唬人,但看完代码你会发现不过尔尔。 在开始正式的文章之前,萌叔打算问几个问题? 流控是为了保护谁? 在实现中如何体现? 2. 流控是为了保护谁? TCP是全双工,这里简化一下,我们只看半双工的情况 Sender发送数据给Receiver 1) Sender中的应用程序把数据写入到本机的发送缓冲区 2) 数据从发送缓冲区写入到链路中,链路可能是由实际的光缆、电缆、多个路由器节点组成。 3)数据从链路转交到Receiver的接收缓冲区 4)数据从接收缓冲区交给Receiver的应用程序 发送缓冲区大小是有限的,它必须被保护起来 Sender和Receiver之间的链路的收发能力也是有限的,且是与网络中的其它节点共享的,因此Link也必须受到保护 接收缓冲区大小也是受限的,它也应该受到保护 3. 在实现中如何体现? 在实际实现中每一个需要保护的点,都有与之对应的参数,先上结论 3.1 使用发送端的发送窗口(snd_wnd)保护本机的发送缓冲区 3.2 使用拥塞窗口(cwnd)来保护发送端与接收端之间的链路 cwnd是动态变化的值, 算法与TCP协议基本相同 3.3 使用接收端的接收窗口(rmt_wnd, 表示接收窗口的空闲大小)保护接收端的接收缓冲区 rmt_wnd对应KCP协议的wnd, 由接收端汇报 回顾一下KCP协议 0 4 5 6 8 (BYTE) +---------------+---+---+-------+ | conv |cmd|frg| wnd | +---------------+---+---+-------+ 8 | ts | sn | +---------------+---------------+ 16 | una | len | +---------------+---------------+ 24 | | | DATA (optional) | | | +-------------------------------+ 4. 分析一次完整的写入动作 4.1 发送窗口和接收窗口对写入的影响 在KCP中,数据被拆分成Segment后 ...

May 10, 2020 · 3 min

玩转NSQ(3)-漂亮的代码实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 创建协程 创建协程示例 n.waitGroup.Wrap(func() { // do something }) util.WaitGroupWrapper package util import ( "sync" ) type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } 2. 协程池 用于从Channel扫描数据到Client func (n *NSQD) queueScanLoop() { ... for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() // 协程池中的协程的数量是动态变化的 // 理想数量是与channel的数量保持一致 n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } ... } // resizePool adjusts the size of the pool of queueScanWorker goroutines // // 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax) // func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // contract closeCh <- 1 n.poolSize-- } else { // expand n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } } 3. 通讯协议 nsq中数据被分为2类指令型数据,消息型数据, 2种数据类型,格式不相同 ...

October 9, 2019 · 2 min

玩转GRPC(1)-整合JSON编码方式

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 提到GRPC大家想到的就是 HTTP2 + protobuf。HTTP2能够实现多路复用, protobuf提高数据传输的压缩率。但其实GRPC还可以跟Thrift、JSON等编码方式进行整合。 gRPC lets you use encoders other than Protobuf. It has no dependency on Protobuf and was specially made to work with a wide variety of environments. We can see that with a little extra boilerplate, we can use any encoder we want. While this post only covered JSON, gRPC is compatible with Thrift, Avro, Flatbuffers, Cap’n Proto, and even raw bytes! gRPC lets you be in control of how your data is handled. (We still recommend Protobuf though due to strong backwards compatibility, type checking, and performance it gives you.) ...

September 2, 2019 · 3 min

聊聊关于es打分的有趣现象

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 引子 公司内部有简单的搜索引擎,使用ES搭建。 前两天测试人员问我,为什么同一个查询条件,同一条数据,多次查询。score会发生变化。经过验证,确实存在这种问题,那么这种情况到底怎么产生的呢? 2. 例子 来造个例子 2.1 创建index curl -XPUT -H "Content-Type: application/json" dev1:9200/test -d ' { "settings": { "index.number_of_replicas": "2", "index.number_of_shards": "1" }, "mappings": { "_default_": { "dynamic_templates": [], "properties": { "brand": { "type": "keyword" } } } } } 2.2 写入数据 第1次执行 insert1.py import requests for i in range(500): url = "http://dev1:9200/test/car/%d" % (i) res = requests.put(url, json={"brand":"buick", "age":i}) print(i, res.status_code) 第2次执行 insert2.py ...

June 20, 2019 · 2 min

gin的timeout middleware实现(续2)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 笔者连续2篇文章,探讨如何开发一个gin的timeout middleware,但是"百密一疏"啊。仍然考虑的不够周全。 笔者的文章 gin的timeout middleware实现(续) 中实现的程序有2个问题 func Timeout(t time.Duration) gin.HandlerFunc { return func(c *gin.Context) { // sync.Pool buffer := buffpool.GetBuff() blw := &SimplebodyWriter{body: buffer, ResponseWriter: c.Writer} c.Writer = blw // wrap the request context with a timeout ctx, cancel := context.WithTimeout(c.Request.Context(), t) c.Request = c.Request.WithContext(ctx) finish := make(chan struct{}) // 子协程 // ****************注意*************** // 创建的子协程没有recover,存在程序崩溃的风险 go func() { c.Next() finish <- struct{}{} }() // ****************注意*************** select { case <-ctx.Done(): // ****************注意*************** // 子协程和父协程存在同时修改Header的风险 // 由于Header是个map,可能诱发 // fatal error: concurrent map read and map write c.Writer.WriteHeader(http.StatusGatewayTimeout) // ****************注意*************** c.Abort() // 超时发生, 通知子协程退出 cancel() // 如果超时的话,buffer无法主动清除,只能等待GC回收 case <-finish: // 结果只会在主协程中被写入 blw.ResponseWriter.Write(buffer.Bytes()) buffpool.PutBuff(buffer) } } } 2. 解决 main.go ...

June 6, 2019 · 4 min

聊聊Golang中的锁(1)--可能造成较高的CPU消耗

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 关于Golang中的锁的实现和源码早就文章阐述过了,这里我从实验的角度让大家看看对协程数量对任务执行速度,以及CPU开销的影响 运行环境 CPU: Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz 8核 内存:16G GO version 1.10 协程数量分别是1, 2, 5, 10, 100, 200下完成1000000000次计数所花 总时间 CPU消耗 CPU峰值 分析 观察图可以发现, 协程数量是1的时候运行总时间反而最短,只耗时2.8秒,协程数量10时,运行总时间最长,耗时21.6秒 协程数量是100时,CPU峰值最大,为530%,协程数量是1是,CPU峰值最小,不到100% 代码 package main import ( "fmt" "sync" "time" ) type LockBox struct{ sync.Mutex Value int } func deal(wpg *sync.WaitGroup, bp *LockBox, count int){ for i:=0;i< count;i++{ bp.Lock() bp.Value++ bp.Unlock() } wpg.Done() } func main() { timeStart := time.Now() //workerCount := 1 //workerCount := 2 workerCount := 200 taskCount := 1000000000 var wg sync.WaitGroup var box LockBox for i:=0;i<workerCount;i++{ wg.Add(1) go deal(&wg, &box, taskCount/workerCount) } wg.Wait() fmt.Println("cost", time.Since(timeStart)) } 请我喝瓶饮料

October 8, 2018 · 1 min

玩转高性能日志库ZAP(3)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 前几天有朋友问我,zap库是否支持同时打印到多个目标地址,比如1份打印到文件,1份到控制台,1份打印到kafka中。这是所有日志库都会支持的功能,zap当然也不例外。 示例 package main import ( "io/ioutil" "go.uber.org/zap" "go.uber.org/zap/zapcore" "gopkg.in/natefinch/lumberjack" "os" ) func main(){ // First, define our level-handling logic. // 仅打印Error级别以上的日志 highPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { return lvl >= zapcore.ErrorLevel }) // 打印所有级别的日志 lowPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { return lvl >= zapcore.DebugLevel }) hook := lumberjack.Logger{ Filename: "/tmp/abc.log", MaxSize: 1024, // megabytes MaxBackups: 3, MaxAge: 7, //days Compress: true, // disabled by default } topicErrors := zapcore.AddSync(ioutil.Discard) fileWriter := zapcore.AddSync(&hook) // High-priority output should also go to standard error, and low-priority // output should also go to standard out. consoleDebugging := zapcore.Lock(os.Stdout) // Optimize the Kafka output for machine consumption and the console output // for human operators. kafkaEncoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()) consoleEncoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) // Join the outputs, encoders, and level-handling functions into // zapcore.Cores, then tee the four cores together. core := zapcore.NewTee( // 打印在kafka topic中(伪造的case) zapcore.NewCore(kafkaEncoder, topicErrors, highPriority), // 打印在控制台 zapcore.NewCore(consoleEncoder, consoleDebugging, lowPriority), // 打印在文件中 zapcore.NewCore(consoleEncoder, fileWriter, highPriority), ) // From a zapcore.Core, it's easy to construct a Logger. logger := zap.New(core) defer logger.Sync() logger.Info("constructed a info logger", zap.Int("test", 1)) logger.Error("constructed a error logger", zap.Int("test", 2)) } 输出结果 文件中 ...

September 17, 2018 · 2 min