gin的timeout middleware实现(续2)
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
1. 前言
笔者连续2篇文章,探讨如何开发一个gin的timeout middleware,但是"百密一疏"啊。仍然考虑的不够周全。
笔者的文章
gin的timeout middleware实现(续)
中实现的程序有2个问题
func Timeout(t time.Duration) gin.HandlerFunc {
return func(c *gin.Context) {
// sync.Pool
buffer := buffpool.GetBuff()
blw := &SimplebodyWriter{body: buffer, ResponseWriter: c.Writer}
c.Writer = blw
// wrap the request context with a timeout
ctx, cancel := context.WithTimeout(c.Request.Context(), t)
c.Request = c.Request.WithContext(ctx)
finish := make(chan struct{})
// 子协程
// ****************注意***************
// 创建的子协程没有recover,存在程序崩溃的风险
go func() {
c.Next()
finish <- struct{}{}
}()
// ****************注意***************
select {
case <-ctx.Done():
// ****************注意***************
// 子协程和父协程存在同时修改Header的风险
// 由于Header是个map,可能诱发
// fatal error: concurrent map read and map write
c.Writer.WriteHeader(http.StatusGatewayTimeout)
// ****************注意***************
c.Abort()
// 超时发生, 通知子协程退出
cancel()
// 如果超时的话,buffer无法主动清除,只能等待GC回收
case <-finish:
// 结果只会在主协程中被写入
blw.ResponseWriter.Write(buffer.Bytes())
buffpool.PutBuff(buffer)
}
}
}
2. 解决
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
// **注意** 这里对 gin-gonic/gin 库做了修改
"github.com/vearne/gin"
"github.com/vearne/golib/buffpool"
"github.com/vearne/golib/utils"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"log"
"net/http"
"sync"
"time"
)
const (
address = "localhost:50051"
defaultName = "world"
HandlerFuncTimeout = "E501"
)
var greeter pb.GreeterClient
func init(){
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
//defer conn.Close()
greeter = pb.NewGreeterClient(conn)
}
type errResponse struct {
Code string `json:"code"`
Msg string `json:"msg"`
}
type TimeoutWriter struct {
gin.ResponseWriter
// body
body *bytes.Buffer
// header
// 变动点2: 让子协程和父协程分别写不同的header
h http.Header
mu sync.Mutex
timedOut bool
wroteHeader bool
// 变动点3: 让子协程和父协程分别写不同的code
code int
}
func (tw *TimeoutWriter) Write(b []byte) (int, error) {
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.timedOut {
//return 0, http.ErrHandlerTimeout
// 已经超时了,就不再写数据
return 0, nil
}
return tw.body.Write(b)
}
func (tw *TimeoutWriter) WriteHeader(code int){
fmt.Println("----xxx---", "TimeoutWriter-WriteHeader")
checkWriteHeaderCode(code)
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.timedOut {
return
}
tw.writeHeader(code)
}
func (tw *TimeoutWriter) writeHeader(code int) {
tw.wroteHeader = true
tw.code = code
}
func (tw *TimeoutWriter) WriteHeaderNow(){
fmt.Println("----xxx---", "TimeoutWriter-WriteHeaderNow")
}
func (tw *TimeoutWriter) Header() http.Header {
return tw.h
}
func checkWriteHeaderCode(code int) {
if code < 100 || code > 999 {
panic(fmt.Sprintf("invalid WriteHeader code %v", code))
}
}
func Timeout(t time.Duration) gin.HandlerFunc {
return func(c *gin.Context) {
// wrap the request context with a timeout
// sync.Pool
buffer := buffpool.GetBuff()
tw := &TimeoutWriter{body: buffer, ResponseWriter: c.Writer, h: make(http.Header)}
c.Writer = tw
ctx, cancel := context.WithTimeout(c.Request.Context(), t)
c.Request = c.Request.WithContext(ctx)
// channel 容量必须大于0
// 否则母协程因超时退出,子协程可能永远无法退出
finish := make(chan struct{}, 1)
panicChan := make(chan interface{}, 1)
go func() {
// 变动点1: 增加子协程的recover
defer func() {
if p := recover(); p != nil {
fmt.Println("handler error", p, string(utils.Stack()))
panicChan <- p
}
}()
c.Next()
finish <- struct{}{}
}()
select {
case p := <-panicChan:
panic(p)
case <-ctx.Done():
tw.mu.Lock()
defer tw.mu.Unlock()
tw.ResponseWriter.WriteHeader(http.StatusServiceUnavailable)
bt, _ := json.Marshal(errResponse{Code: HandlerFuncTimeout,
Msg: http.ErrHandlerTimeout.Error()})
tw.ResponseWriter.Write(bt)
c.Abort()
cancel()
tw.timedOut = true
// 如果超时的话,buffer无法主动清除,只能等待GC回收
case <-finish:
tw.mu.Lock()
defer tw.mu.Unlock()
dst := tw.ResponseWriter.Header()
for k, vv := range tw.Header() {
dst[k] = vv
}
fmt.Println("tw.code", tw.code)
tw.ResponseWriter.WriteHeader(tw.code)
tw.ResponseWriter.Write(buffer.Bytes())
buffpool.PutBuff(buffer)
}
}
}
func short(c *gin.Context) {
time.Sleep(1 * time.Second)
// 子协程操作的header,其实是TimeoutWriter中的Header
c.JSON(http.StatusOK, gin.H{"hello": "world"})
}
func nocontent(c *gin.Context) {
//c.Status(204)
time.Sleep(1 * time.Second)
c.Data(http.StatusNoContent, "", []byte{})
}
func long(c *gin.Context) {
name := defaultName
ctx := c.Request.Context()
r, err := greeter.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Printf("could not greet: %v\n", err)
return
}
log.Printf("Greeting: %s", r.Message)
c.JSON(http.StatusOK, gin.H{"hello": "world"})
}
func main() {
// create new gin without any middleware
engine := gin.Default()
// add timeout middleware with 2 second duration
engine.Use(Timeout(time.Second * 1))
// create a handler that will last 1 seconds
engine.GET("/short", short)
// create a route that will last 5 seconds
engine.GET("/long", long)
engine.GET("/nocontent", nocontent)
// run the server
log.Fatal(engine.Run(":8080"))
}
3. 后记
3.1 建议大家务必阅读下参考资料1
其中有golang标准库中给出的TimeoutHandler实现
3.2 笔者修改了 gin-gonic/gin 库
context.go
// Status sets the HTTP response code.
func (c *Context) Status(code int) {
// c.writermem.WriteHeader(code)
c.Writer.WriteHeader(code)
}
3.3 子协程和父协程的并发问题
子协程和父协程操作(写操作)的是不同的
1. Http code
2. header
3. body
因此没有并发问题
也不会再产生
[GIN-debug] [WARNING] Headers were already written
4. 特别提示
*gin.Context 中
// Keys is a key/value pair exclusively for the context of each request.
Keys map[string]interface{}
对Keys的操作,仍有并发风险存在
func (c *Context) Set(key string, value interface{})
func (c *Context) Get(key string) (value interface{}, exists bool)
如果想要在上下文传递变量,可以使用
4.1 写入变量
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), "key1", value1))
4.2 读取变量
value1 := ctx.Request.Context().Value("key1")
5. 参考资料
后记
- 2019年6月6日
gin-gonic/gin
v1.5的开发计划中,已经包含了对 Status()函数的修改, 预计完成时间是2019年8月15日, 让我们耐心的等一下吧. -
2020年2月8日
v1.5.0
已经于2019年11月28日发布,使用此版本即可 -
2020年6月11日 笔者封装了一个库,可以直接使用了 vearne/gin-timeout
- 2021年5月13日 萌叔不保证文章中代码的准确性,请以 vearne/gin-timeout 为准
recover一般会用单独的中间件实现的,只是搞不太懂为什么没有超时也要处理response,一般来说如果超时直接写入StatusGatewayTimeout到响应内容然后Abort(),如果没有超时,这个中间件不需要管任何事情
错误recover交给专门的中间件来做。https://github.com/vearne/gin-timeout/blob/master/example/panic.go
gin-contrib/timeout 这个库和你写的好像….