高性能消息中间件 NSQ 解析-nsqd 实现细节介绍

nsq 中单个 nsqd 可以有多个 topic,每个 topic 可以有多个 channel。channel 接收这个 topic 所有消息的副本,从而实现多播分发,而 channel 上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡。

前面 介绍了 nsq 的相关概念以及 nsq 的安装与应用。从本篇开始将会结合源码介绍 nsq 的实现细节。

nsq 中单个 nsqd 可以有多个 topic,每个 topic 可以有多个 channel。channel 接收这个 topic 所有消息的副本,从而实现多播分发,而 channel 上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡。

attachments-2021-03-4yYwJQe8605dac57d164c.png

入口函数

首先看下 nsqd 的入口函数:

//位于 apps/nsqd/main.go:26
func main() {
  prg := &program{}
  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start() error {
  opts := nsqd.NewOptions()

  flagSet := nsqdFlagSet(opts)
  flagSet.Parse(os.Args[1:])
  ...
}

通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqd 实例。

配置项初始化

初始化配置项(opts, cfg),加载历史数据(nsqd.LoadMetadata)、持久化最新数据(nsqd.PersistMetadata),然后开启协程,进入 nsqd.Main() 主函数。

// 位于 apps/nsqd/main.go:64
options.Resolve(opts, flagSet, cfg)
  nsqd, err := nsqd.New(opts)
  if err != nil {
    logFatal("failed to instantiate nsqd - %s", err)
  }
  p.nsqd = nsqd

  err = p.nsqd.LoadMetadata()
  if err != nil {
    logFatal("failed to load metadata - %s", err)
  }
  err = p.nsqd.PersistMetadata()
  if err != nil {
    logFatal("failed to persist metadata - %s", err)
  }

  go func() {
    err := p.nsqd.Main()
    if err != nil {
      p.Stop()
      os.Exit(1)
    }
  }()

接着就是初始化 tcpServer, httpServer, httpsServer,然后循环监控队列信息(n.queueScanLoop)、节点信息管理(n.lookupLoop)、统计信息(n.statsdLoop)输出。

// 位于 nsqd/nsqd.go:262
  n.tcpServer.nsqd = n
  n.waitGroup.Wrap(func() {
    exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
  })
  httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
  n.waitGroup.Wrap(func() {
    exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
  })
  if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
    httpsServer := newHTTPServer(ctx, true, true)
    n.waitGroup.Wrap(func() {
      exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
    })
  }

  n.waitGroup.Wrap(n.queueScanLoop)
  n.waitGroup.Wrap(n.lookupLoop)
  if n.getOpts().StatsdAddress != "" {
    n.waitGroup.Wrap(n.statsdLoop)
  }

处理请求

分别处理 tcp/http 请求,开启 handler 协程进行并发处理,其中 newHTTPServer 注册路由采用了 Decorate 装饰器模式(后面会进一步解析);

// 位于 nsqd/http.go:44
router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
  s := &httpServer{
    ctx:         ctx,
    tlsEnabled:  tlsEnabled,
    tlsRequired: tlsRequired,
    router:      router,
  }

  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
  router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
  router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))

  // only v1
  router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))

http-Decorate 路由分发

// 位于 internal/protocol/tcp_server.go:22
for {
    clientConn, err := listener.Accept()
    if err != nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if !strings.Contains(err.Error(), "use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }

如上的实现为 tcp-handler 处理的主要代码。

tcp 解析协议

tcp 解析 V2 协议,走内部协议封装的 prot.IOLoop(conn) 进行处理;

// 位于 nsqd/tcp.go:34
var prot protocol.Protocol
  switch protocolMagic {
  case "  V2":
    prot = &protocolV2{ctx: p.ctx}
  default:
    protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)
  if err != nil {
    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    return
  }

消息生成与消费

通过内部协议进行 p.Exec(执行命令)、p.Send(发送结果),保证每个 nsqd 节点都能正确的进行消息生成与消费,一旦上述过程有 error 都会被捕获处理,确保分布式投递的可靠性。

// 位于 nsqd/protocol_v2.go:79
params := bytes.Split(line, separatorBytes)

    p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

    var response []byte
    response, err = p.Exec(client, params)
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

      sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }

      // errors of type FatalClientErr should forceably close the connection
      if _, ok := err.(*protocol.FatalClientErr); ok {
        break
      }
      continue
    }

    if response != nil {
      err = p.Send(client, frameTypeResponse, response)
      if err != nil {
        err = fmt.Errorf("failed to send response - %s", err)
        break
      }
    }

nsqd 也会同时开启 tcp 和 http 服务,两个服务都可以提供给生产者和消费者,http 服务还提供给 nsqadmin 获取该 nsqd 本地 topic 和 channel 信息。

小结

本文主要介绍 nsqd,总的来说 nsqd 的实现并不复杂。nsqd 是一个守护进程,负责接收(生产者 producer )、排队(最小堆实现)、投递(消费者 consumer )消息给客户端。nsqd 可以独立运行,但通常是由 nsqlookupd 实例所在集群配置的。


作者:aoho
来源:掘金

  • 发表于 2021-03-26 18:01
  • 阅读 ( 50 )

0 条评论

请先 登录 后评论
z老师
z老师

131 篇文章

作家榜 »

  1. NX小编 1251 文章
  2. 58沈剑 325 文章
  3. 热爱技术的小仓鼠 230 文章
  4. 奈学教育 154 文章
  5. z老师 131 文章
  6. 五十三 58 文章
  7. 李希沅 | 奈学教育 52 文章
  8. 江帅帅 | 奈学教育 32 文章