Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

1. 前言

笔者连续2篇文章,探讨如何开发一个gin的timeout middleware,但是"百密一疏"啊。仍然考虑的不够周全。
笔者的文章
gin的timeout middleware实现(续)
中实现的程序有2个问题

func Timeout(t time.Duration) gin.HandlerFunc {
    return func(c *gin.Context) {
        // sync.Pool
        buffer := buffpool.GetBuff()

        blw := &SimplebodyWriter{body: buffer, ResponseWriter: c.Writer}
        c.Writer = blw

        // wrap the request context with a timeout
        ctx, cancel := context.WithTimeout(c.Request.Context(), t)
        c.Request = c.Request.WithContext(ctx)

        finish := make(chan struct{})
        // 子协程
        // ****************注意***************
        // 创建的子协程没有recover,存在程序崩溃的风险
        go func() { 
            c.Next()
            finish <- struct{}{}
        }()
        // ****************注意***************
        select {
        case <-ctx.Done():
            // ****************注意***************
            // 子协程和父协程存在同时修改Header的风险
            // 由于Header是个map,可能诱发 
            // fatal error: concurrent map read and map write
            c.Writer.WriteHeader(http.StatusGatewayTimeout)
            // ****************注意***************
            c.Abort()
            // 超时发生, 通知子协程退出
            cancel()
            // 如果超时的话,buffer无法主动清除,只能等待GC回收
        case <-finish:
            // 结果只会在主协程中被写入
            blw.ResponseWriter.Write(buffer.Bytes())
            buffpool.PutBuff(buffer)
        }
    }
}

2. 解决

main.go

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    // **注意** 这里对 gin-gonic/gin 库做了修改
    "github.com/vearne/gin"
    "github.com/vearne/golib/buffpool"
    "github.com/vearne/golib/utils"
    "google.golang.org/grpc"
    pb "google.golang.org/grpc/examples/helloworld/helloworld"
    "log"
    "net/http"
    "sync"
    "time"
)

const (
    address     = "localhost:50051"
    defaultName = "world"
    HandlerFuncTimeout = "E501"

)

var greeter pb.GreeterClient

func init(){
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    //defer conn.Close()
    greeter = pb.NewGreeterClient(conn)
}

type errResponse struct {
    Code string `json:"code"`
    Msg  string `json:"msg"`
}


type TimeoutWriter struct {
    gin.ResponseWriter
    // body
    body *bytes.Buffer
    // header
    // 变动点2:  让子协程和父协程分别写不同的header
    h http.Header

    mu sync.Mutex
    timedOut    bool
    wroteHeader bool
    // 变动点3:  让子协程和父协程分别写不同的code
    code int
}

func (tw *TimeoutWriter) Write(b []byte) (int, error) {
    tw.mu.Lock()
    defer tw.mu.Unlock()
    if tw.timedOut {
        //return 0, http.ErrHandlerTimeout
        // 已经超时了,就不再写数据
        return 0, nil
    }

    return tw.body.Write(b)
}

func (tw *TimeoutWriter) WriteHeader(code int){
    fmt.Println("----xxx---", "TimeoutWriter-WriteHeader")
    checkWriteHeaderCode(code)
    tw.mu.Lock()
    defer tw.mu.Unlock()
    if tw.timedOut {
        return
    }
    tw.writeHeader(code)
}

func (tw *TimeoutWriter) writeHeader(code int) {
    tw.wroteHeader = true
    tw.code = code
}

func (tw *TimeoutWriter) WriteHeaderNow(){
    fmt.Println("----xxx---", "TimeoutWriter-WriteHeaderNow")
}

func (tw *TimeoutWriter) Header() http.Header {
    return tw.h
}

func checkWriteHeaderCode(code int) {
    if code < 100 || code > 999 {
        panic(fmt.Sprintf("invalid WriteHeader code %v", code))
    }
}


func Timeout(t time.Duration) gin.HandlerFunc {
    return func(c *gin.Context) {
        // wrap the request context with a timeout
        // sync.Pool
        buffer := buffpool.GetBuff()

        tw := &TimeoutWriter{body: buffer, ResponseWriter: c.Writer, h: make(http.Header)}
        c.Writer = tw


        ctx, cancel := context.WithTimeout(c.Request.Context(), t)
        c.Request = c.Request.WithContext(ctx)

        // channel 容量必须大于0
        // 否则母协程因超时退出,子协程可能永远无法退出
        finish := make(chan struct{}, 1) 
        panicChan := make(chan interface{}, 1)
        go func() {
            // 变动点1: 增加子协程的recover
            defer func() {
                if p := recover(); p != nil {
                    fmt.Println("handler error", p, string(utils.Stack()))
                    panicChan <- p
                }
            }()

            c.Next()
            finish <- struct{}{}
        }()

        select {
        case p := <-panicChan:
            panic(p)
        case <-ctx.Done():
            tw.mu.Lock()
            defer tw.mu.Unlock()

            tw.ResponseWriter.WriteHeader(http.StatusServiceUnavailable)
            bt, _ := json.Marshal(errResponse{Code: HandlerFuncTimeout,
                Msg: http.ErrHandlerTimeout.Error()})
            tw.ResponseWriter.Write(bt)
            c.Abort()
            cancel()
            tw.timedOut = true
            // 如果超时的话,buffer无法主动清除,只能等待GC回收
        case <-finish:
            tw.mu.Lock()
            defer tw.mu.Unlock()
            dst := tw.ResponseWriter.Header()
            for k, vv := range tw.Header() {
                dst[k] = vv
            }
            fmt.Println("tw.code", tw.code)
            tw.ResponseWriter.WriteHeader(tw.code)
            tw.ResponseWriter.Write(buffer.Bytes())
            buffpool.PutBuff(buffer)
        }
    }
}


func short(c *gin.Context) {
    time.Sleep(1 * time.Second)
    // 子协程操作的header,其实是TimeoutWriter中的Header
    c.JSON(http.StatusOK, gin.H{"hello": "world"})
}

func nocontent(c *gin.Context) {
    //c.Status(204)
    time.Sleep(1 * time.Second)
    c.Data(http.StatusNoContent, "", []byte{})
}

func long(c *gin.Context) {
    name := defaultName
    ctx := c.Request.Context()
    r, err := greeter.SayHello(ctx, &pb.HelloRequest{Name: name})
    if err != nil {
        log.Printf("could not greet: %v\n", err)
        return
    }
    log.Printf("Greeting: %s", r.Message)
    c.JSON(http.StatusOK, gin.H{"hello": "world"})
}

func main() {
    // create new gin without any middleware
    engine := gin.Default()

    // add timeout middleware with 2 second duration
    engine.Use(Timeout(time.Second * 1))

    // create a handler that will last 1 seconds
    engine.GET("/short", short)

    // create a route that will last 5 seconds
    engine.GET("/long", long)

    engine.GET("/nocontent", nocontent)

    // run the server
    log.Fatal(engine.Run(":8080"))
}

3. 后记

3.1 建议大家务必阅读下参考资料1

其中有golang标准库中给出的TimeoutHandler实现

3.2 笔者修改了 gin-gonic/gin 库

context.go

// Status sets the HTTP response code.
func (c *Context) Status(code int) {
    // c.writermem.WriteHeader(code)
    c.Writer.WriteHeader(code) 
}

3.3 子协程和父协程的并发问题

子协程和父协程操作(写操作)的是不同的
1. Http code
2. header
3. body
因此没有并发问题
也不会再产生

[GIN-debug] [WARNING] Headers were already written

4. 特别提示

*gin.Context 中

    // Keys is a key/value pair exclusively for the context of each request.
    Keys map[string]interface{}

对Keys的操作,仍有并发风险存在

func (c *Context) Set(key string, value interface{})

func (c *Context) Get(key string) (value interface{}, exists bool)

如果想要在上下文传递变量,可以使用

4.1 写入变量

c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), "key1", value1))

4.2 读取变量

value1 := ctx.Request.Context().Value("key1")

5. 参考资料

  1. golang标准库中给出的TimeoutHandler实现

后记

  • 2019年6月6日 gin-gonic/gin v1.5的开发计划中,已经包含了对 Status()函数的修改, 预计完成时间是2019年8月15日, 让我们耐心的等一下吧.

  • 2020年2月8日 v1.5.0已经于2019年11月28日发布,使用此版本即可

  • 2020年6月11日 笔者封装了一个库,可以直接使用了 vearne/gin-timeout

  • 2021年5月13日 萌叔不保证文章中代码的准确性,请以 vearne/gin-timeout 为准

请我喝瓶饮料

微信支付码

3 对 “gin的timeout middleware实现(续2)”的想法;

  1. recover一般会用单独的中间件实现的,只是搞不太懂为什么没有超时也要处理response,一般来说如果超时直接写入StatusGatewayTimeout到响应内容然后Abort(),如果没有超时,这个中间件不需要管任何事情

    1. 错误recover交给专门的中间件来做。https://github.com/vearne/gin-timeout/blob/master/example/panic.go

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注