Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

环境

软件版本

NSQ v1.2.0

golang

1.12

go-nsq

github.com/nsqio/go-nsq v1.0.7

1.引言

NSQ 是一个基于 Go 语言的分布式实时消息平台,它具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。它的吞吐能力非常强,在单实例4核CPU可以达到19wop/s。足以应付日常使用 (见参考资料1) 。

不过它没有内建的数据副本机制(如果某个nsqd宕机,则该nsqd上的消息会丢失),且默认数据也不持久化(需要将mem_queue_size修改为0),使用NSQ仍然需要谨慎。

当我们考查一个消息队列,除了吞吐能力和可靠性,安全性也是重要的考察点。NSQ默认提供了鉴权机制(见查考资料2)

2. 使用鉴权

2.1 启用鉴权&对鉴权服务的要求

启动鉴权

nsqd --auth-http-address=nsqauth.example.com:4181 --http-address=127.0.0.1:4151

NSQ把权限的管理委托给了外部的鉴权服务,通过auth-http-address指定

官方给的鉴权服务的参考代码是 jehiah/nsqauth-contrib
笔者跟据它的思路重新实现了Golang的版本 vearne/go-nsqauthd

NSQ要求鉴权服务能够对如下的HTTP进行响应

/auth?secret=xxxx&remote_ip=39.156.69.79&tls=true

nsqd会拿着客户端的信息到鉴权服务进行鉴权

  • secret是客户端提供的秘钥(相当于redis中的密码)
  • remote_ip是客户端的IP地址
  • tls表示客户端与nsqd之间的通讯是否有TLS保护

鉴权返回形如如下的信息,表明此用户可以操作的topic

{
    "identity": "xxxx",
    "ttl": 3600,
    "Authorizations": [
        {
            "channels": [
                ".*"
            ],
            "topic": "topic1",
            "permissions": [
                "subscribe",  // 有topic1上的订阅权限
                "publish"    //  有topic1上的发布权限
            ]
        }
    ]
}

vearne/go-nsqauthd为了简单期间,将秘钥直接存储在了csv文件中。

login ip tls_required topic channel subscribe publish
4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6 false test .* subscribe publish
test_local 127.0.0.1 true local .* subscribe publish
test_publish 127.0.0.1/24 false test_publish .* publish

其中login对应secret是秘钥信息, ip可以是某个子网, 如果不写表示没有ip限制,channeltopic都支持通配符的表达式。

2.3 Golang Client示例

生产者

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

func main() {
    config := nsq.NewConfig()
    config.AuthSecret = "4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6"
    q, _ := nsq.NewProducer("127.0.0.1:4150", config)
    err := q.Publish("test", []byte("test12"))
    if err != nil {
        fmt.Println(err)
    }
}

消费者

package main

import (
    //"fmt"
    "log"

    "github.com/nsqio/go-nsq"
)

func main() {
    config := nsq.NewConfig()
    config.AuthSecret = "4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6"
    q, _ := nsq.NewConsumer("test", "test", config)
    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        log.Printf("Got a message: %v", string(message.Body))
        return nil
    }))
    err := q.ConnectToNSQD("127.0.0.1:4150")
    if err != nil {
        log.Panic("Could not connect")
    }

    <-q.StopChan
}

多个消费者并发消费

package main

import (
    //"fmt"
    "log"

    "github.com/nsqio/go-nsq"
)

func main() {
    config := nsq.NewConfig()
    config.AuthSecret = "4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6"
    q, _ := nsq.NewConsumer("test", "test", config)
    // 启动5个协程来消费
    q.AddConcurrentHandlers(&NSQHandler{}, 5)
    err := q.ConnectToNSQD("127.0.0.1:4150")
    if err != nil {
        log.Panic("Could not connect")
    }

    <-q.StopChan
}

type NSQHandler struct {
    NSQProducter *nsq.Producer
}

func (handler *NSQHandler) HandleMessage(msg *nsq.Message) error {
    log.Printf("Got a message: %v", string(msg.Body))
    return nil
}

2.4 注意

1) 目前nsqd的鉴权只针对TCP协议有效,对HTTP协议不鉴权,因此笔者建议将
http-address改为127.0.0.1:4151, 或者通过防火墙进行有效的保护

NOTE: It is expected when using authorization that only the nsqd TCP protocol is exposed to external clients, not the HTTP(S) endpoints. See the note below about exposing stats and lookup to clients with auth.

2) nsqd通过消息的确认机制(FIN CMD)来确保消息至少被消费一次(at least once)。对于go-nsq, 只要HandleMessage()不返回异常,go-nsq会自动发送FIN CMD,自动确认。如果返回异常,除非达到最大重试次数,否则go-nsq会将发送REQ CMD, 使消息重新被放回消息队列,以便将来重试。

参考资料

  1. benchmark
  2. nsq-auth

请我喝瓶饮料

微信支付码

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据