版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

1. 前言

笔者连续2篇文章,探讨如何开发一个gin的timeout middleware,但是”百密一疏”啊。仍然考虑的不够周全。
笔者的文章
gin的timeout middleware实现(续)
中实现的程序有2个问题

func Timeout(t time.Duration) gin.HandlerFunc {
    return func(c *gin.Context) {
        // sync.Pool
        buffer := buffpool.GetBuff()

        blw := &SimplebodyWriter{body: buffer, ResponseWriter: c.Writer}
        c.Writer = blw

        // wrap the request context with a timeout
        ctx, cancel := context.WithTimeout(c.Request.Context(), t)
        c.Request = c.Request.WithContext(ctx)

        finish := make(chan struct{})
        // 子协程
        // ****************注意***************
        // 创建的子协程没有recover,存在程序崩溃的风险
        go func() { 
            c.Next()
            finish <- struct{}{}
        }()
        // ****************注意***************
        select {
        case <-ctx.Done():
            // ****************注意***************
            // 子协程和父协程存在同时修改Header的风险
            // 由于Header是个map,可能诱发 
            // fatal error: concurrent map read and map write
            c.Writer.WriteHeader(http.StatusGatewayTimeout)
            // ****************注意***************
            c.Abort()
            // 超时发生, 通知子协程退出
            cancel()
            // 如果超时的话,buffer无法主动清除,只能等待GC回收
        case <-finish:
            // 结果只会在主协程中被写入
            blw.ResponseWriter.Write(buffer.Bytes())
            buffpool.PutBuff(buffer)
        }
    }
}

2. 解决

main.go

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    // **注意** 这里对 gin-gonic/gin 库做了修改
    "github.com/vearne/gin"
    "github.com/vearne/golib/buffpool"
    "github.com/vearne/golib/utils"
    "google.golang.org/grpc"
    pb "google.golang.org/grpc/examples/helloworld/helloworld"
    "log"
    "net/http"
    "sync"
    "time"
)

const (
    address     = "localhost:50051"
    defaultName = "world"
    HandlerFuncTimeout = "E501"

)

var greeter pb.GreeterClient

func init(){
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    //defer conn.Close()
    greeter = pb.NewGreeterClient(conn)
}

type errResponse struct {
    Code string `json:"code"`
    Msg  string `json:"msg"`
}


type TimeoutWriter struct {
    gin.ResponseWriter
    // body
    body *bytes.Buffer
    // header
    // 变动点2:  让子协程和父协程分别写不同的header
    h http.Header

    mu sync.Mutex
    timedOut    bool
    wroteHeader bool
    // 变动点3:  让子协程和父协程分别写不同的code
    code int
}

func (tw *TimeoutWriter) Write(b []byte) (int, error) {
    tw.mu.Lock()
    defer tw.mu.Unlock()
    if tw.timedOut {
        //return 0, http.ErrHandlerTimeout
        // 已经超时了,就不再写数据
        return 0, nil
    }

    return tw.body.Write(b)
}

func (tw *TimeoutWriter) WriteHeader(code int){
    fmt.Println("----xxx---", "TimeoutWriter-WriteHeader")
    checkWriteHeaderCode(code)
    tw.mu.Lock()
    defer tw.mu.Unlock()
    if tw.timedOut || tw.wroteHeader {
        return
    }
    tw.writeHeader(code)
}

func (tw *TimeoutWriter) writeHeader(code int) {
    tw.wroteHeader = true
    tw.code = code
}

func (tw *TimeoutWriter) WriteHeaderNow(){
    fmt.Println("----xxx---", "TimeoutWriter-WriteHeaderNow")
}

func (tw *TimeoutWriter) Header() http.Header {
    return tw.h
}

func checkWriteHeaderCode(code int) {
    if code < 100 || code > 999 {
        panic(fmt.Sprintf("invalid WriteHeader code %v", code))
    }
}


func Timeout(t time.Duration) gin.HandlerFunc {
    return func(c *gin.Context) {
        // wrap the request context with a timeout
        // sync.Pool
        buffer := buffpool.GetBuff()

        tw := &TimeoutWriter{body: buffer, ResponseWriter: c.Writer, h: make(http.Header)}
        c.Writer = tw


        ctx, cancel := context.WithTimeout(c.Request.Context(), t)
        c.Request = c.Request.WithContext(ctx)

        // channel 容量必须大于0
        // 否则母协程因超时退出,子协程可能永远无法退出
        finish := make(chan struct{}, 1) 
        panicChan := make(chan interface{}, 1)
        go func() {
            // 变动点1: 增加子协程的recover
            defer func() {
                if p := recover(); p != nil {
                    fmt.Println("handler error", p, string(utils.Stack()))
                    panicChan <- p
                }
            }()

            c.Next()
            finish <- struct{}{}
        }()

        select {
        case p := <-panicChan:
            panic(p)
        case <-ctx.Done():
            tw.mu.Lock()
            defer tw.mu.Unlock()

            tw.ResponseWriter.WriteHeader(http.StatusServiceUnavailable)
            bt, _ := json.Marshal(errResponse{Code: HandlerFuncTimeout,
                Msg: http.ErrHandlerTimeout.Error()})
            tw.ResponseWriter.Write(bt)
            c.Abort()
            cancel()
            tw.timedOut = true
            // 如果超时的话,buffer无法主动清除,只能等待GC回收
        case <-finish:
            tw.mu.Lock()
            defer tw.mu.Unlock()
            dst := tw.ResponseWriter.Header()
            for k, vv := range tw.Header() {
                dst[k] = vv
            }
            fmt.Println("tw.code", tw.code)
            tw.ResponseWriter.WriteHeader(tw.code)
            tw.ResponseWriter.Write(buffer.Bytes())
            buffpool.PutBuff(buffer)
        }
    }
}


func short(c *gin.Context) {
    time.Sleep(1 * time.Second)
    // 子协程操作的header,其实是TimeoutWriter中的Header
    c.JSON(http.StatusOK, gin.H{"hello": "world"})
}

func nocontent(c *gin.Context) {
    //c.Status(204)
    time.Sleep(1 * time.Second)
    c.Data(http.StatusNoContent, "", []byte{})
}

func long(c *gin.Context) {
    name := defaultName
    ctx := c.Request.Context()
    r, err := greeter.SayHello(ctx, &pb.HelloRequest{Name: name})
    if err != nil {
        log.Printf("could not greet: %v\n", err)
        return
    }
    log.Printf("Greeting: %s", r.Message)
    c.JSON(http.StatusOK, gin.H{"hello": "world"})
}

func main() {
    // create new gin without any middleware
    engine := gin.Default()

    // add timeout middleware with 2 second duration
    engine.Use(Timeout(time.Second * 1))

    // create a handler that will last 1 seconds
    engine.GET("/short", short)

    // create a route that will last 5 seconds
    engine.GET("/long", long)

    engine.GET("/nocontent", nocontent)

    // run the server
    log.Fatal(engine.Run(":8080"))
}

3. 后记

3.1 建议大家务必阅读下参考资料1

其中有golang标准库中给出的TimeoutHandler实现

3.2 笔者修改了 gin-gonic/gin 库

context.go

// Status sets the HTTP response code.
func (c *Context) Status(code int) {
    // c.writermem.WriteHeader(code)
    c.Writer.WriteHeader(code) 
}

3.3 子协程和父协程的并发问题

子协程和父协程操作(写操作)的是不同的
1. Http code
2. header
3. body
因此没有并发问题
也不会再产生

[GIN-debug] [WARNING] Headers were already written

4. 特别提示

*gin.Context 中

    // Keys is a key/value pair exclusively for the context of each request.
    Keys map[string]interface{}

对Keys的操作,仍有并发风险存在

func (c *Context) Set(key string, value interface{})

func (c *Context) Get(key string) (value interface{}, exists bool)

如果想要在上下文传递变量,可以使用

4.1 写入变量

c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), "key1", value1))

4.2 读取变量

value1 := ctx.Request.Context().Value("key1")

5. 参考资料

  1. golang标准库中给出的TimeoutHandler实现

后记

  • 2019年6月6日 gin-gonic/gin v1.5的开发计划中,已经包含了对 Status()函数的修改, 预计完成时间是2019年8月15日, 让我们耐心的等一下吧.

  • 2020年2月8日 v1.5.0已经于2019年11月28日发布,使用此版本即可

  • 2020年6月11日 笔者封装了一个库,可以直接使用了 vearne/gin-timeout


如果我的文章对你有帮助,你可以给我打赏以促使我拿出更多的时间和精力来分享我的经验和思考总结。

微信支付码

发表评论

电子邮件地址不会被公开。

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据