NSQ分析-概述

消息队列

编程时我们常常会应用到“队列”这一数据结构。在单一程序里应用队列很容易,但当有多个进程,甚至是多个不同机器上的多个进程需要共享一个队列里的数据时,情况就比较复杂。这时不同机器上的不同进程,有的作为生产者推送消息到队列中,有的作为消费者订阅并获取队列中的数据。这时我们就需要一个作为中间件的分布式消息队列。

NSQ是一个Golang实现的开源的分布式消息队列。优点……Go写的东西优点就是轻量易部署,学习门槛相对低点,语言层面支持并发什么的,没用过其它的MQ也不太好对比。不过官方介绍上说,相对于Kafka,NSQ的Feature会少一些,当然学习难度也低点。

项目结构

NSQ的核心组件有三个。从目录结构就能看出来,分别是nsqadmin,nsqd,nsqlookupd。nsqadmin是一套Web端的可视化工具(可有可无),最重要的还是用来提供消息处理和转发的nsqd,和提供服务发现的nsqlookupd。nsqd和nsqlookupd都提供了http和tcp的接口调用。

消息处理逻辑

首先要引入topic和channel的概念。每条消息都有一个topic,这条消息会被推送给订阅了这个topic的消费者。假设我有若干台机器都订阅了某个topic,并且在被推送消息后做的操作相同,那么这几台机器就构成了一个channel。个人觉得可以理解成给消费者做了一次负载均衡。

关于单个nsqd的处理消息的逻辑,官网给了个动图:

nsqd

nsqd中,信息被复制给对应topics下的所有channel。channel把这个消息分配给隶属于channel的某一个消费者。

nsqd具体实现还要更复杂一些,还要考虑流量控制等等问题,留到下回慢慢研究。

nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应topic和channel的消费者。

那么,nsq怎么知道哪些消费者订阅了topic为“order_created”的消息呢?我们需要一个类似于微服务里头的注册中心的模块,来实现服务发现的功能,这就是nslookupd所起的作用。

nsqlookupd本质上提供了一个key-value存储的服务,记录了topic,channel,consumer之间的对应关系。

nsq很重要的一点是要支持分布式,官网推荐我们一个生产者对应部署一个nsqd就行了。

应用

官方提供了一个go-nsq直接把通过tcp/http调用的接口封装起来了。通过这个包就能开始在代码里使用nsq了。

试着写了一下Demo。

客户端接受消息:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nsqio/go-nsq"
)

type ConsumerT struct{}

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
    fmt.Println("from: ", msg.NSQDAddress, " msg: ", string(msg.Body))
    return nil
}

func RunConsumer() {
    c, err := nsq.NewConsumer("test", "hi", nsq.NewConfig()) // topic为test channel为hi
    if err != nil {
        log.Fatal(err)
    }
    c.AddHandler(&ConsumerT{})
    if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
        log.Fatal(err)
    }
}

func main() {
    RunConsumer()

    for {
        time.Sleep(3 * time.Second)
    }
}

客户端读取终端中的输入,按行发送消息:

package main

import (
    "bufio"
    "log"
    "fmt"
    "os"

    "github.com/nsqio/go-nsq"
)

var producer *nsq.Producer

func InitProducer(str string) {
    var err error
    producer, err = nsq.NewProducer(str, nsq.NewConfig())
    if err != nil {
        log.Fatal(err)
    }
}

func SendMsg(topic string, msg string) error {
    if producer != nil {
        if msg == "" {
            return nil
        }
        err := producer.Publish(topic, []byte(msg))
        return err
    }
    return fmt.Errorf("Producer is nil")
}

func main() {
    strHost := "127.0.0.1:4150"
    InitProducer(strHost)
    reader := bufio.NewReader(os.Stdin)
    for true {
        data, _, _ := reader.ReadLine()
        command := string(data)
        if command == "stop" {
            break
        }
        err := SendMsg("test", command)
        if err != nil {
            log.Fatal(err)
        }
    }
    producer.Stop()
}