最近本人在搞毕设,准备做一个oj,其中判题沙盒模块准备用消息队列来实现判题的并行化。下面是用docker来搭建和调试RocketMQ的过程
安装 Namesrv
拉取官方镜像
1 | docker pull rocketmqinc/rocketmq:4.4.0 |
启动容器
1 | docker run -d -p 9876:9876 -v {RmHome}/data/namesrv/logs:/root/logs -v {RmHome}/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv |
注意
{RmHome} 要替换成你的宿主机想保存 MQ 的日志与数据的地方,通过 docker 的 -v 参数使用 volume 功能,把你本地的目录映射到容器内的目录上。否则所有数据都默认保存在容器运行时的内存中,重启之后就又回到最初的起点。
安装 broker 服务器
与上一个是同一个docker镜像,无需拉取
创建broker.conf文件
- 在 {RmHome}/conf 目录下创建 broker.conf 文件
- 在 broker.conf 中写入如下内容
1
2
3
4
5
6
7
8brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = {本机局域网 IP}
启动容器
1 | docker run -d -p 10911:10911 -p 10909:10909 -v {RmHome}/data/broker/logs:/root/logs -v {RmHome}/rocketmq/data/broker/store:/root/store -v {RmHome}/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf |
其中 brokerIP1 需要自己修改,不能是127.0.0.1 因为运行在docker容器里面的,localhost与宿主机器隔离,需要使用连接的局域网ip,否则会有各种异常产生
安装rocketmq控制台
拉取镜像
1 | docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr={本机局域网 IP}:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t pangliang/rocketmq-console-ng |
Drocketmq.namesrv.addr的ip就是上一步配置文件设置的brokerIP1
此时运行若无异常,console日志打出如下,则运行成功1
2
3
4
5
6
7[2020-04-03 06:23:30.049] INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true
[2020-04-03 06:23:59.976] INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true
[2020-04-03 06:24:00.045] INFO closeChannel: close the connection to remote address[192.168.31.156:9876] result: true
[2020-04-03 06:24:30.031] INFO closeChannel: close the connection to remote address[192.168.31.156:9876] result: true
[2020-04-03 06:24:30.035] INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true
[2020-04-03 06:24:59.961] INFO closeChannel: close the connection to remote address[192.168.31.156:9876] result: true
[2020-04-03 06:25:00.027] INFO closeChannel: close the connection to remote address[192.168.31.156:10911] result: true
涉及到的容器如下
1 | ▶ docker container ls |
安装c语言依赖库
第一次跑go程序之前,还要先安装相关依赖包,不然可能会出现
1 | rocketmq-client-go/core/cfuns.go:21:10: fatal error: 'rocketmq/CMessageExt.h' file not found |
下面是解决步骤
1 | 1. git clone https://github.com/apache/rocketmq-client-cpp |
go语言实现producer
参考(官方demo)[https://github.com/apache/rocketmq-client-go/blob/master/examples/producer.go]
- 先在控制台新建一个topic
- 代码中修改 NameServer 和 Topic即可
参考代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main
import (
"fmt"
"github.com/apache/rocketmq-client-go/core"
)
func Main0() {
pConfig := &rocketmq.ProducerConfig{
ClientConfig: rocketmq.ClientConfig{
GroupID: "broker-a",
NameServer: "http://192.168.31.156:9876",
},
ProducerModel: rocketmq.CommonProducer,
}
sendMessage(pConfig)
}
func sendMessage(config *rocketmq.ProducerConfig) {
producer, err := rocketmq.NewProducer(config)
if err != nil {
fmt.Println("create common producer failed, error:", err)
return
}
err = producer.Start()
if err != nil {
fmt.Println("start common producer error", err)
return
}
defer producer.Shutdown()
fmt.Printf("Common producer: %s started... \n", producer)
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "test_1", Body: msg})
if err != nil {
fmt.Println("Error:", err)
}
fmt.Printf("send message: %s result: %s\n", msg, result)
}
fmt.Println("shutdown common producer.")
}
func main() {
Main0()
}
运行生产者端代码,完成消息发送1
2
3
4Common producer: ProducerConfig=[GroupId: broker-a, NameServer: http://192.168.31.156:9876, ProducerModel: CommonProducer, ] started...
send message: Hello,Common MQ Message--0 result: [status: SendOK, messageId: C0A81F9C77926C5F71990DBE59BD0000, offset: 1]
send message: Hello,Common MQ Message--1 result: [status: SendOK, messageId: C0A81F9C77926C5F71990DBE59F60001, offset: 1]
send message: Hello,Common MQ Message--2 result: [status: SendOK, messageId: C0A81F9C77926C5F71990DBE5A000002, offset: 2]
go语言实现consumer
参考官方demo
修改对应topic和NameServer即可运行
运行消费者代码,完成消息消费1
2
3
4
5
6
7
8
9
10
11
12
13
14INFO[0000] subscribe topic[test_1] with expression[*] successfully.
consumer: [PushConsumerConfig=[GroupId: broker-a, NameServer: http://192.168.31.156:9876, MessageModel: Clustering, ConsumerModel: Orderly, ], subcribed topics: [test_1, ]] started...
A message received, MessageID:C0A81F9C77926C5F71990DBE59BD0000, Body:Hello,Common MQ Message--0
A message received, MessageID:C0A81F9C77926C5F71990DBE5A000002, Body:Hello,Common MQ Message--2
A message received, MessageID:C0A81F9C77926C5F71990DBE59F60001, Body:Hello,Common MQ Message--1
Consumer Later, MessageID:C0A81F9C77926C5F71990DBE59F60001
A message received, MessageID:C0A81F9C77926C5F71990DBE5A150003, Body:Hello,Common MQ Message--3
A message received, MessageID:C0A81F9C77926C5F71990DBE5A400004, Body:Hello,Common MQ Message--4
A message received, MessageID:C0A81F9C77926C5F71990DBE5A710007, Body:Hello,Common MQ Message--7
A message received, MessageID:C0A81F9C77926C5F71990DBE5A640006, Body:Hello,Common MQ Message--6
A message received, MessageID:C0A81F9C77926C5F71990DBE5A510005, Body:Hello,Common MQ Message--5
A message received, MessageID:C0A81F9C77926C5F71990DBE5A800008, Body:Hello,Common MQ Message--8
Consumer Later, MessageID:C0A81F9C77926C5F71990DBE5A800008
A message received, MessageID:C0A81F9C77926C5F71990DBE5A8D0009, Body:Hello,Common MQ Message--9
参考链接
【Rocketmq】通过 docker 快速搭建 rocketmq 环境
rocketmq 部署启动指南-Docker 版
apache/rocketmq-client-go
go get rocketmq-client-go error #3