最近本人在搞毕设,准备做一个oj,其中判题沙盒模块准备用消息队列来实现判题的并行化。下面是用docker来搭建和调试RocketMQ的过程

安装 Namesrv

拉取官方镜像

1
docker pull rocketmqinc/rocketmq:4.4.0

启动容器

1
docker run -d -p 9876:9876 -v {RmHome}/data/namesrv/logs:/root/logs -v {RmHome}/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv

注意

{RmHome} 要替换成你的宿主机想保存 MQ 的日志与数据的地方,通过 docker 的 -v 参数使用 volume 功能,把你本地的目录映射到容器内的目录上。否则所有数据都默认保存在容器运行时的内存中,重启之后就又回到最初的起点。

安装 broker 服务器

与上一个是同一个docker镜像,无需拉取

创建broker.conf文件

  1. 在 {RmHome}/conf 目录下创建 broker.conf 文件
  2. 在 broker.conf 中写入如下内容
    1
    2
    3
    4
    5
    6
    7
    8
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    brokerIP1 = {本机局域网 IP}

启动容器

1
docker run -d -p 10911:10911 -p 10909:10909 -v  {RmHome}/data/broker/logs:/root/logs -v  {RmHome}/rocketmq/data/broker/store:/root/store -v  {RmHome}/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

其中 brokerIP1 需要自己修改,不能是127.0.0.1 因为运行在docker容器里面的,localhost与宿主机器隔离,需要使用连接的局域网ip,否则会有各种异常产生

安装rocketmq控制台

拉取镜像

1
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr={本机局域网 IP}:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t pangliang/rocketmq-console-ng

Drocketmq.namesrv.addr的ip就是上一步配置文件设置的brokerIP1

此时运行若无异常,console日志打出如下,则运行成功

1
2
3
4
5
6
7
[2020-04-03 06:23:30.049]  INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true
[2020-04-03 06:23:59.976] INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true
[2020-04-03 06:24:00.045] INFO closeChannel: close the connection to remote address[192.168.31.156:9876] result: true
[2020-04-03 06:24:30.031] INFO closeChannel: close the connection to remote address[192.168.31.156:9876] result: true
[2020-04-03 06:24:30.035] INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true
[2020-04-03 06:24:59.961] INFO closeChannel: close the connection to remote address[192.168.31.156:9876] result: true
[2020-04-03 06:25:00.027] INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true

涉及到的容器如下

1
2
3
4
5
▶ docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
fea34a9874b0 pangliang/rocketmq-console-ng "sh -c 'java $JAVA_O…" 13 minutes ago Up 13 minutes 0.0.0.0:8080->8080/tcp ecstatic_ritchie
50d543af5485 rocketmqinc/rocketmq:4.4.0 "sh mqbroker -c /opt…" 17 hours ago Up 6 seconds 0.0.0.0:10909->10909/tcp, 9876/tcp, 0.0.0.0:10911->10911/tcp rmqbroker
f4c9e15adb96 rocketmqinc/rocketmq:4.4.0 "sh mqnamesrv" 17 hours ago Up 17 hours 10909/tcp, 0.0.0.0:9876->9876/tcp, 10911/tcp rmqnamesrv

安装c语言依赖库

第一次跑go程序之前,还要先安装相关依赖包,不然可能会出现

1
2
3
4
rocketmq-client-go/core/cfuns.go:21:10: fatal error: 'rocketmq/CMessageExt.h' file not found
#include "rocketmq/CMessageExt.h"
^~~~~~~~~~~~~~~~~~~~~~~~
1 error generated.

下面是解决步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
1. git clone https://github.com/apache/rocketmq-client-cpp

2. mkdir -p /usr/local/include/rocketmq/

3. cp rocketmq-client-cpp/include/* /usr/local/include/rocketmq

4. sh build.sh

5. after build sucess, cp bin/librocketmq.dylib /usr/local/lib

6. install the go SDK.

7. go build your code.

go语言实现producer

参考(官方demo)[https://github.com/apache/rocketmq-client-go/blob/master/examples/producer.go]

  1. 先在控制台新建一个topic
  2. 代码中修改 NameServer 和 Topic即可

参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

package main

import (
"fmt"
"github.com/apache/rocketmq-client-go/core"
)

func Main0() {
pConfig := &rocketmq.ProducerConfig{
ClientConfig: rocketmq.ClientConfig{
GroupID: "broker-a",
NameServer: "http://192.168.31.156:9876",
},
ProducerModel: rocketmq.CommonProducer,
}
sendMessage(pConfig)
}
func sendMessage(config *rocketmq.ProducerConfig) {
producer, err := rocketmq.NewProducer(config)

if err != nil {
fmt.Println("create common producer failed, error:", err)
return
}

err = producer.Start()
if err != nil {
fmt.Println("start common producer error", err)
return
}
defer producer.Shutdown()

fmt.Printf("Common producer: %s started... \n", producer)
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "test_1", Body: msg})
if err != nil {
fmt.Println("Error:", err)
}
fmt.Printf("send message: %s result: %s\n", msg, result)
}
fmt.Println("shutdown common producer.")
}

func main() {
Main0()
}

运行生产者端代码,完成消息发送

1
2
3
4
Common producer: ProducerConfig=[GroupId: broker-a, NameServer: http://192.168.31.156:9876, ProducerModel: CommonProducer, ] started... 
send message: Hello,Common MQ Message--0 result: [status: SendOK, messageId: C0A81F9C77926C5F71990DBE59BD0000, offset: 1]
send message: Hello,Common MQ Message--1 result: [status: SendOK, messageId: C0A81F9C77926C5F71990DBE59F60001, offset: 1]
send message: Hello,Common MQ Message--2 result: [status: SendOK, messageId: C0A81F9C77926C5F71990DBE5A000002, offset: 2]

go语言实现consumer

参考官方demo

修改对应topic和NameServer即可运行

运行消费者代码,完成消息消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
INFO[0000] subscribe topic[test_1] with expression[*] successfully. 
consumer: [PushConsumerConfig=[GroupId: broker-a, NameServer: http://192.168.31.156:9876, MessageModel: Clustering, ConsumerModel: Orderly, ], subcribed topics: [test_1, ]] started...
A message received, MessageID:C0A81F9C77926C5F71990DBE59BD0000, Body:Hello,Common MQ Message--0
A message received, MessageID:C0A81F9C77926C5F71990DBE5A000002, Body:Hello,Common MQ Message--2
A message received, MessageID:C0A81F9C77926C5F71990DBE59F60001, Body:Hello,Common MQ Message--1
Consumer Later, MessageID:C0A81F9C77926C5F71990DBE59F60001
A message received, MessageID:C0A81F9C77926C5F71990DBE5A150003, Body:Hello,Common MQ Message--3
A message received, MessageID:C0A81F9C77926C5F71990DBE5A400004, Body:Hello,Common MQ Message--4
A message received, MessageID:C0A81F9C77926C5F71990DBE5A710007, Body:Hello,Common MQ Message--7
A message received, MessageID:C0A81F9C77926C5F71990DBE5A640006, Body:Hello,Common MQ Message--6
A message received, MessageID:C0A81F9C77926C5F71990DBE5A510005, Body:Hello,Common MQ Message--5
A message received, MessageID:C0A81F9C77926C5F71990DBE5A800008, Body:Hello,Common MQ Message--8
Consumer Later, MessageID:C0A81F9C77926C5F71990DBE5A800008
A message received, MessageID:C0A81F9C77926C5F71990DBE5A8D0009, Body:Hello,Common MQ Message--9

参考链接

【Rocketmq】通过 docker 快速搭建 rocketmq 环境
rocketmq 部署启动指南-Docker 版
apache/rocketmq-client-go
go get rocketmq-client-go error #3