emqx集群搭建及nginx负载均衡

项目中涉及物联网功能需要搭建mqtt服务。这里使用emqx搭建mqtt集群。为什么使用emqx呢?因为免费开源的mqtt集群没有比它做的更好的了,单机百万并发,友好的dashboard。之前做物联网项目也是用的这个中间件。这里就不再阐述了,有需要可以去官网查看。本文主要记录emqx的集群搭建及nginx负载均衡。

EMQX介绍

EMQX 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,适用于 IoT、M2M 和移动应用程序,可处理千万级别的并发客户端。
从 3.0 版本开始,EMQX 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQX 3.0 单集群可支持千万级别的 MQTT 并发连接。

EMQX集群搭建

这里我们使用docker搭建伪集群。规划是搭建3台,参数如下

1
2
3
emqx1: 172.20.0.2 1883 18083 4369
emqx1: 172.20.0.3 1884 18084 4370
emqx1: 172.20.0.4 1885 18085 4371

因为我们没有emqx的配置文件,首先搭建一个初始化的emqx docker容器

1
2
docker run -d --name emqx -p 1883:1883  -p 18083:18083 -p 4369:4369 emqx/emqx
docker cp emqx:/opt/emqx /usr/local/emqx1

然后进入容器,将初始化的配置拷贝出来

搭建第一台EMQX

首先创建一个docker子网,所有的容器都使用这个子网。命令如下

1
docker network create -d bridge --subnet 172.20.0.0/16 emqx

然后运行如下命令搭建第一台emqx docker

1
docker run -d --name emqx1 --net emqx --ip=172.20.0.2 --restart=always --hostname emqx -p 18083:18083 -p 1883:1883 -p 4369:4369 -v /usr/local/emqx1/emqx:/opt/emqx  emqx/emqx:4.4.2

搭建完成后配置容器的host

1
2
3
4
5
6
7
cd /usr/local/emqx1/emqx/etc
vim emqx.conf
node.name = emqx@172.20.0.2
cd plugins/
vim emqx_dashboard.conf
dashborad.default_user.login = xx
dashborad.default_user.password = xx

后面两台直接复制emqx1的文件夹,并挂载到容器即可,还需要在emqx.conf下配置节点的ip,命令如下

1
2
3
4
cp -r emqx1 emqx2 
docker run -d --name emqx2 --net emqx --ip=172.20.0.3 --restart=always --hostname emqx -p 18084:18083 -p 1884:1883 -p 4370:4369 -v /usr/local/emqx2/emqx:/opt/emqx emqx/emqx:4.4.2
cp -r emqx1 emqx3
docker run -d --name emqx3 --net emqx --ip=172.20.0.4 --restart=always --hostname emqx -p 18085:18083 -p 1885:1883 -p 4371:4369 -v /usr/local/emqx3/emqx:/opt/emqx emqx/emqx:4.4.2

最后我们需要将节点连接起来,

1
2
3
4
docker exec -it emqx1 /bin/sh
./bin/emqx_ctl cluster join emqx@172.20.0.3
./bin/emqx_ctl cluster join emqx@172.20.0.4
./bin/emqx_ctl cluster status

最后显示的样子如下

访问控制的样子如下

Nginx代理Emqx负载均衡

集群搭建好需要搭建前端的负载均衡,让所有的连接分布在不同的节点上,这里使用nginx作为TCP负载均衡工具。配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
stream{
upstream emqx_cluster {
zone tcp_servers 64k;
hash $remote_addr;
server 127.0.0.1:1883 max_fails=2 fail_timeout=30s;
server 127.0.0.1:1884 max_fails=2 fail_timeout=30s;
server 127.0.0.1:1885 max_fails=2 fail_timeout=30s;
}

server{
listen 2883 so_keepalive=on;
proxy_connect_timeout 10s;
proxy_timeout 20s;
proxy_pass emqx_cluster;
}
}

需要注意,stream需要与http同级

Java测试负载负载均衡

我们使用官网的代码,做了一点修改,开了个线程去循环发送和接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Slf4j
@Configuration
public class App {

@Bean
MqttClient client() {
String subTopic = "testtopic/#";
String pubTopic = "testtopic/1";
String content = "Hello World";
int qos = 2;
String broker = "tcp://xx.xx.xx.xx:2883";
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = null;
try {
client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("xxx");
connOpts.setPassword("xxxxxxxxxxxxxx".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("连接丢失: {}", cause);
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.error("接收到消息: 主题 {},消息: {}", topic, new String(message.getPayload()));
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.error("消息已经送达: ID: {}", token.getMessageId());
}
});

// 建立连接
log.info("连接到broker: " + broker);
client.connect(connOpts);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
MqttClient finalClient = client;
new Thread(() -> {
try {
while (true) {
finalClient.publish(pubTopic, message);
Thread.sleep(1000);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}).start();
return client;
} catch (MqttException e) {
e.printStackTrace();
}
return null;
}
}

日志如下

现在所有的连接也分布在不同的节点

作者

Labradors

发布于

2022-04-26

更新于

2022-04-27

许可协议

评论