# 消息队列

在日常业务开发需求中,经常会用到消息队列 rocketmq,nsq,kafka等工具来完成诸如排队,秒杀,异步处理等业务;

为了能让我们方便快捷的接入各种消息队列,Orange框架对日常使用的队列生产/队列消费进行了封装,基于接口进行实现,无论使用何种消息队列,都只是一个配置的问题;

领导说用rocketmq我们就用rocketmq,领导说换成redis,我们就给他换成redis,完全不慌,极大简化接入成本;😁😁😁

# 组件说明

  • 组件基于接口实现,不同的消息队列通过配置文件指定
  • 目前支持了 rocketmq、redis驱动;

# 配置说明

如下是完整的配置选项:

[queue]
    driver="redis" // rocketmq 或 redis
    retry=2 // 重试次数
    endpoints=["192.168.1.1:6379"] // brocker 或 redis 服务地址,当用redis驱动是只用填一个主库地址即可
    passwd="123456" // redis驱动专属配置,redis密码
    redisdb=4 // redis驱动专属配置,redis库编号
    timeout=3600 // redis专属配置,队列超时时间(s) ,当队列一直没有被消费到达超时时间则队列会被销毁

# 使用指南

# 注册生产者

  • 通过如下代码注册生产者并向队列发送一条消息;
  • 参数说明,注册方法 NewProducer 需要传入队列 group 名称;
  • 消息发送方法 SendMsg 需要指定消息队列 topic 名称和消息体内容;
client, _:= queue.NewProducer("test")
ret, err := client.SendMsg("test-topic", "Hello World!")

# 注册消费者

消费者采用的是监听并执行回调方法的模式实现,注册好消费者后,程序会持续地监听队列并消费数据;

和生产者一样,消费者方法 NewConsumer 需要传入队列 group 名称;

消费者消息监听方法 ListenReceiveMsgDo 传入消息 topic 和一个消息处理的回调方法,接受到消息后的处理逻辑都放在该回调方法中即可完成消费流程;

client, _ := queue.NewConsumer("test")

client.ListenReceiveMsgDo("test", func(mqMsg queue.MqMsg) {
   fmt.Println("recive===>", mqMsg, mqMsg.BodyString())
})

# 完整示例

  • 先准备好一个redis服务作为示例
  • 在 GOPATH 下创建一个demo目录,目录中创建两个文件 main.go, config.toml;
main.go
------------------
package main

import (
   "gitee.com/zhucheer/orange/app"
   "gitee.com/zhucheer/orange/queue"
   "fmt"
)

func main(){
   router := &Route{}
   app.AppStart(router)
}

type Route struct {
}

func (s *Route) ServeMux() {
   app.NewRouter("").GET("/", func(ctx *app.Context) error {
   
     client, _ := queue.NewProducer("test")

     ret, _ := client.SendMsg("test", "hello mq producer!")
   
      return ctx.ToJson(ret)
   })
}

func (s *Route) Register() {


   client, _ := queue.NewConsumer("test")
   client.ListenReceiveMsgDo("test", func(mqMsg queue.MqMsg) {
      fmt.Println("recive msg===>", mqMsg.BodyString())
   })

}

在配置文件中添加好对应的redis服务信息

config.toml
------------------
[queue]
    driver="redis"
    retry=2
    endpoints=["192.168.1.1:6379"]
    passwd="123456"
    redisdb=4
    timeout=3600

最后拉取依赖并执行(开启 go moduls)

go mod init
go mod tidy
go run main.go --config=config.toml

程序启动后访问 http://127.0.0.1:8088/ 即可看到生产和消费的过程;