版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

1. 前言

笔者连续2篇文章,探讨如何开发一个gin的timeout middleware,但是"百密一疏"啊。仍然考虑的不够周全。 笔者的文章 gin的timeout middleware实现(续) 中实现的程序有2个问题

func Timeout(t time.Duration) gin.HandlerFunc {
	return func(c *gin.Context) {
		// sync.Pool
		buffer := buffpool.GetBuff()

		blw := &SimplebodyWriter{body: buffer, ResponseWriter: c.Writer}
		c.Writer = blw

		// wrap the request context with a timeout
		ctx, cancel := context.WithTimeout(c.Request.Context(), t)
		c.Request = c.Request.WithContext(ctx)

		finish := make(chan struct{})
		// 子协程
		// ****************注意***************
		// 创建的子协程没有recover,存在程序崩溃的风险
		go func() { 
			c.Next()
			finish <- struct{}{}
		}()
        // ****************注意***************
		select {
		case <-ctx.Done():
		    // ****************注意***************
		    // 子协程和父协程存在同时修改Header的风险
		    // 由于Header是个map,可能诱发 
		    // fatal error: concurrent map read and map write
			c.Writer.WriteHeader(http.StatusGatewayTimeout)
			// ****************注意***************
			c.Abort()
			// 超时发生, 通知子协程退出
			cancel()
			// 如果超时的话,buffer无法主动清除,只能等待GC回收
		case <-finish:
			// 结果只会在主协程中被写入
			blw.ResponseWriter.Write(buffer.Bytes())
			buffpool.PutBuff(buffer)
		}
	}
}

2. 解决

main.go

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	// **注意** 这里对 gin-gonic/gin 库做了修改
	"github.com/vearne/gin"
	"github.com/vearne/golib/buffpool"
	"github.com/vearne/golib/utils"
	"google.golang.org/grpc"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
	"log"
	"net/http"
	"sync"
	"time"
)

const (
	address     = "localhost:50051"
	defaultName = "world"
	HandlerFuncTimeout = "E501"

)

var greeter pb.GreeterClient

func init(){
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	//defer conn.Close()
	greeter = pb.NewGreeterClient(conn)
}

type errResponse struct {
	Code string `json:"code"`
	Msg  string `json:"msg"`
}


type TimeoutWriter struct {
	gin.ResponseWriter
	// body
	body *bytes.Buffer
	// header
	// 变动点2:  让子协程和父协程分别写不同的header
	h http.Header

	mu sync.Mutex
	timedOut    bool
	wroteHeader bool
	// 变动点3:  让子协程和父协程分别写不同的code
	code int
}

func (tw *TimeoutWriter) Write(b []byte) (int, error) {
	tw.mu.Lock()
	defer tw.mu.Unlock()
	if tw.timedOut {
		//return 0, http.ErrHandlerTimeout
		// 已经超时了,就不再写数据
		return 0, nil
	}

	return tw.body.Write(b)
}

func (tw *TimeoutWriter) WriteHeader(code int){
	fmt.Println("----xxx---", "TimeoutWriter-WriteHeader")
	checkWriteHeaderCode(code)
	tw.mu.Lock()
	defer tw.mu.Unlock()
	if tw.timedOut {
		return
	}
	tw.writeHeader(code)
}

func (tw *TimeoutWriter) writeHeader(code int) {
	tw.wroteHeader = true
	tw.code = code
}

func (tw *TimeoutWriter) WriteHeaderNow(){
	fmt.Println("----xxx---", "TimeoutWriter-WriteHeaderNow")
}

func (tw *TimeoutWriter) Header() http.Header {
	return tw.h
}

func checkWriteHeaderCode(code int) {
	if code < 100 || code > 999 {
		panic(fmt.Sprintf("invalid WriteHeader code %v", code))
	}
}


func Timeout(t time.Duration) gin.HandlerFunc {
	return func(c *gin.Context) {
		// wrap the request context with a timeout
		// sync.Pool
		buffer := buffpool.GetBuff()

		tw := &TimeoutWriter{body: buffer, ResponseWriter: c.Writer, h: make(http.Header)}
		c.Writer = tw


		ctx, cancel := context.WithTimeout(c.Request.Context(), t)
		c.Request = c.Request.WithContext(ctx)

        // channel 容量必须大于0
		// 否则母协程因超时退出,子协程可能永远无法退出
		finish := make(chan struct{}, 1) 
		panicChan := make(chan interface{}, 1)
		go func() {
		    // 变动点1: 增加子协程的recover
			defer func() {
				if p := recover(); p != nil {
					fmt.Println("handler error", p, string(utils.Stack()))
					panicChan <- p
				}
			}()

			c.Next()
			finish <- struct{}{}
		}()

		select {
		case p := <-panicChan:
			panic(p)
		case <-ctx.Done():
			tw.mu.Lock()
			defer tw.mu.Unlock()

			tw.ResponseWriter.WriteHeader(http.StatusServiceUnavailable)
			bt, _ := json.Marshal(errResponse{Code: HandlerFuncTimeout,
				Msg: http.ErrHandlerTimeout.Error()})
			tw.ResponseWriter.Write(bt)
			c.Abort()
			cancel()
			tw.timedOut = true
			// 如果超时的话,buffer无法主动清除,只能等待GC回收
		case <-finish:
			tw.mu.Lock()
			defer tw.mu.Unlock()
			dst := tw.ResponseWriter.Header()
			for k, vv := range tw.Header() {
				dst[k] = vv
			}
			fmt.Println("tw.code", tw.code)
			tw.ResponseWriter.WriteHeader(tw.code)
			tw.ResponseWriter.Write(buffer.Bytes())
			buffpool.PutBuff(buffer)
		}
	}
}


func short(c *gin.Context) {
	time.Sleep(1 * time.Second)
	// 子协程操作的header,其实是TimeoutWriter中的Header
	c.JSON(http.StatusOK, gin.H{"hello": "world"})
}

func nocontent(c *gin.Context) {
	//c.Status(204)
	time.Sleep(1 * time.Second)
	c.Data(http.StatusNoContent, "", []byte{})
}

func long(c *gin.Context) {
	name := defaultName
	ctx := c.Request.Context()
	r, err := greeter.SayHello(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Printf("could not greet: %v\n", err)
		return
	}
	log.Printf("Greeting: %s", r.Message)
	c.JSON(http.StatusOK, gin.H{"hello": "world"})
}

func main() {
	// create new gin without any middleware
	engine := gin.Default()

	// add timeout middleware with 2 second duration
	engine.Use(Timeout(time.Second * 1))

	// create a handler that will last 1 seconds
	engine.GET("/short", short)

	// create a route that will last 5 seconds
	engine.GET("/long", long)

	engine.GET("/nocontent", nocontent)

	// run the server
	log.Fatal(engine.Run(":8080"))
}

3. 后记

3.1 建议大家务必阅读下参考资料1

其中有golang标准库中给出的TimeoutHandler实现

3.2 笔者修改了 gin-gonic/gin 库

context.go

// Status sets the HTTP response code.
func (c *Context) Status(code int) {
    // c.writermem.WriteHeader(code)
    c.Writer.WriteHeader(code) 
}

3.3 子协程和父协程的并发问题

子协程和父协程操作(写操作)的是不同的

  1. Http code
  2. header
  3. body 因此没有并发问题 也不会再产生
[GIN-debug] [WARNING] Headers were already written

4. 特别提示

*gin.Context 中

	// Keys is a key/value pair exclusively for the context of each request.
	Keys map[string]interface{}

对Keys的操作,仍有并发风险存在

func (c *Context) Set(key string, value interface{})

func (c *Context) Get(key string) (value interface{}, exists bool)

如果想要在上下文传递变量,可以使用

4.1 写入变量

c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), &quot;key1&quot;, value1))

4.2 读取变量

value1 := ctx.Request.Context().Value(&quot;key1&quot;)

5. 参考资料

  1. golang标准库中给出的TimeoutHandler实现

后记

  • 2019年6月6日 gin-gonic/gin v1.5的开发计划中,已经包含了对 Status()函数的修改, 预计完成时间是2019年8月15日, 让我们耐心的等一下吧.

  • 2020年2月8日 v1.5.0已经于2019年11月28日发布,使用此版本即可

  • 2020年6月11日 笔者封装了一个库,可以直接使用了 vearne/gin-timeout

  • 2021年5月13日 萌叔不保证文章中代码的准确性,请以 vearne/gin-timeout 为准


请我喝瓶饮料

微信支付码