玩转NSQ(1)-鉴权
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
环境
软件版本
NSQ v1.2.0
golang
1.12
go-nsq
github.com/nsqio/go-nsq v1.0.7
1.引言
NSQ 是一个基于 Go 语言的分布式实时消息平台,它具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。它的吞吐能力非常强,在单实例4核CPU可以达到19wop/s。足以应付日常使用 (见参考资料1) 。
不过它没有内建的数据副本机制(如果某个nsqd宕机,则该nsqd上的消息会丢失),且默认数据也不持久化(需要将mem_queue_size修改为0),使用NSQ仍然需要谨慎。
当我们考查一个消息队列,除了吞吐能力和可靠性,安全性也是重要的考察点。NSQ默认提供了鉴权机制(见查考资料2)
2. 使用鉴权
2.1 启用鉴权&对鉴权服务的要求
启动鉴权
nsqd --auth-http-address=nsqauth.example.com:4181 --http-address=127.0.0.1:4151
NSQ把权限的管理委托给了外部的鉴权服务,通过auth-http-address指定
官方给的鉴权服务的参考代码是 jehiah/nsqauth-contrib
笔者跟据它的思路重新实现了Golang的版本 vearne/go-nsqauthd
NSQ要求鉴权服务能够对如下的HTTP进行响应
/auth?secret=xxxx&remote_ip=39.156.69.79&tls=true
nsqd会拿着客户端的信息到鉴权服务进行鉴权
- secret是客户端提供的秘钥(相当于redis中的密码)
- remote_ip是客户端的IP地址
- tls表示客户端与nsqd之间的通讯是否有TLS保护
鉴权返回形如如下的信息,表明此用户可以操作的topic
{
"identity": "xxxx",
"ttl": 3600,
"Authorizations": [
{
"channels": [
".*"
],
"topic": "topic1",
"permissions": [
"subscribe", // 有topic1上的订阅权限
"publish" // 有topic1上的发布权限
]
}
]
}
vearne/go-nsqauthd为了简单期间,将秘钥直接存储在了csv文件中。
login | ip | tls_required | topic | channel | subscribe | publish |
---|---|---|---|---|---|---|
4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6 | false | test | .* | subscribe | publish | |
test_local | 127.0.0.1 | true | local | .* | subscribe | publish |
test_publish | 127.0.0.1/24 | false | test_publish | .* | publish |
其中login
对应secret
是秘钥信息, ip
可以是某个子网, 如果不写表示没有ip限制,channel
和topic
都支持通配符的表达式。
2.3 Golang Client示例
生产者
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
config.AuthSecret = "4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6"
q, _ := nsq.NewProducer("127.0.0.1:4150", config)
err := q.Publish("test", []byte("test12"))
if err != nil {
fmt.Println(err)
}
}
消费者
package main
import (
//"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
config.AuthSecret = "4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6"
q, _ := nsq.NewConsumer("test", "test", config)
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("Got a message: %v", string(message.Body))
return nil
}))
err := q.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Panic("Could not connect")
}
<-q.StopChan
}
多个消费者并发消费
package main
import (
//"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
config.AuthSecret = "4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6"
q, _ := nsq.NewConsumer("test", "test", config)
// 启动5个协程来消费
q.AddConcurrentHandlers(&NSQHandler{}, 5)
err := q.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Panic("Could not connect")
}
<-q.StopChan
}
type NSQHandler struct {
NSQProducter *nsq.Producer
}
func (handler *NSQHandler) HandleMessage(msg *nsq.Message) error {
log.Printf("Got a message: %v", string(msg.Body))
return nil
}
2.4 注意
1) 目前nsqd的鉴权只针对TCP协议有效,对HTTP协议不鉴权,因此笔者建议将
http-address改为127.0.0.1:4151, 或者通过防火墙进行有效的保护
NOTE: It is expected when using authorization that only the nsqd TCP protocol is exposed to external clients, not the HTTP(S) endpoints. See the note below about exposing stats and lookup to clients with auth.
2) nsqd通过消息的确认机制(FIN CMD)来确保消息至少被消费一次(at least once)。对于go-nsq
, 只要HandleMessage()不返回异常,go-nsq
会自动发送FIN CMD,自动确认。如果返回异常,除非达到最大重试次数,否则go-nsq
会将发送REQ CMD, 使消息重新被放回消息队列,以便将来重试。