本文主要包括:
- Mqtt简介
- centos搭建EMQ mqtt服务器
Mqtt简介
centos搭建EMQ emqx服务器
安装
curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash
yum install emqx
emqx start | stop | restart
emqx_ctl status
## 卸载
emqx uninstall
# 添加用户
emqx_ctl admins add gujc gujc@123
# 给admin用户赋密码
emqx_ctl admins passwd admin admin@123
默认占用的 TCP 端口
端口 | 用途 |
---|---|
1883 | MQTT 协议端口 |
8883 | MQTT/SSL 端口 |
8083 | MQTT/WebSocket 端口 |
8080 | HTTP API 端口 |
18083 | Dashboard 管理控制台端口 |
具体可以参考 centos搭建EMQ mqtt服务器
MQTT+Flink实现实时消息的订阅与发布
具体可以参考MQTT+Flink实现实时消息的订阅与发布
Centos安装mosquitto
在使用EMQX过程中,发现设置Qos不生效,具体情况表现为:
当Qos=1时:
- 不启动consumer,数据发送会被直接丢弃
- 启动consumer,被丢弃的数据不会重发,不会被消费掉,但是启动了consumer后,再次发送消息,就能正常消费
- 再次停止consumer,再次发送新的消息,数据会存储在类似一个队列里,再次启动consumer会把消息一口气消费进来
- 停止consumer,并把session也关闭,发送数据,这时候数据还是会被直接丢弃掉
当Qos=0的时候,效果和Qos=1一摸一样,没有生效
这里考虑使用mosquitto测试一下看看,首先安装
yum -y install epel-release
yum -y install mosquitto
# 启动mosquitto
systemctl start mosquitto
# 开机自启
systemctl enable mosquitto
测试案例:
- 发布消息
参数说明:mosquitto_pub -h localhost -t "test" -i "client2" -q 0 -m "bbb" # -q 0 代表qos=0
-d 打印debug信息 -f 将指定文件的内容作为发送消息的内容 -h 指定要连接的域名 默认为localhost -i 指定要给哪个clientId的用户发送消息 -I 指定给哪个clientId前缀的用户发送消息 -m 消息内容 -n 发送一个空(null)消息 -p 连接端口号(小写) -q 指定QoS的值(0,1,2) -t 指定topic -u 指定broker访问用户 -P 指定broker访问密码(大写) -V 指定MQTT协议版本 --will-payload 指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与--will-topic一起使用 --will-qos Will的QoS值。该参数需要与--will-topic一起使用 --will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与--will-topic一起使用 --will-topic 用户发送Will消息的topic
- 订阅消息
参数说明:mosquitto_sub -c -h localhost -t "test" -i "client1" -q 0 # -c 代表
测试结果,使用mosquitto能达到预期效果,不知道EMQX是不是哪里设置不对,还是开源版有bug-c 设定‘clean session’为无效状态,这样一直保持订阅状态,即便是已经失去连接,如果再次连接仍旧能够接收的断开期间发送的消息。 -d 打印debug信息 -h 指定要连接的域名 默认为localhost -i 指定clientId -I 指定clientId前缀 -k keepalive 每隔一段时间,发PING消息通知broker,仍处于连接状态。 默认为60秒。 -q 指定希望接收到QoS为什么的消息 默认QoS为0 -R 不显示陈旧的消息 -t 订阅topic -v 打印消息 -u 指定broker访问用户 -P 指定broker访问密码(大写) --will-payload 指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与--will-topic一起使用 --will-qos Will的QoS值。该参数需要与--will-topic一起使用 --will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与--will-topic一起使用 --will-topic 用户发送Will消息的topic
自定义flume-mqtt source
自定义flume source比较简单,网上一大堆,这里直接上代码
package org.apache.flume.source.mqtt;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class MqttSource extends AbstractSource
implements Configurable, PollableSource {
private static final Logger log = LoggerFactory.getLogger(MqttSource.class);
private String host;
private String topic;
private Integer qos;
private Integer batchSize;
private Boolean cleanSession;
private Integer connectionTimeout;
private Integer keepAliveInterval;
private String username;
private String password;
private Boolean retryConnection;
private String clientId = MqttSourceConstants.getUuid();
private MemoryPersistence memoryPersistence = null;
private MqttConnectOptions mqttConnectOptions = null;
private MqttClient mqttClient = null;
private Context context;
private Event event;
private final List<Event> eventList = new ArrayList<Event>();
/**
* Process business
*
* @return
*/
@Override
public Status process() {
if (null != mqttClient && mqttClient.isConnected()) {
if (null != topic && null != qos) {
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
log.error("Subscription topic {} has an exception, the exception is: {}", topic, e.fillInStackTrace());
}
} else {
throw new ConfigurationException("Mqtt host or topic config error.");
}
return Status.READY;
} else {
if (retryConnection) {
log.error("The MQTT connection is disconnected and reconnected ......");
reConnect();
return Status.READY;
}
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
/**
* Mqtt configure
*
* @param context
*/
@Override
public void configure(Context context) {
this.context = context;
host = context.getString(MqttSourceConstants.HOST);
log.info("host =====> {}" ,host);
if (host == null) {
throw new ConfigurationException("Mqtt host must be specified.");
}
topic = context.getString(MqttSourceConstants.TOPIC);
log.info("topic =====> {}" ,topic);
if (topic == null) {
throw new ConfigurationException("Mqtt topic must be specified.");
}
qos = context.getInteger(MqttSourceConstants.QOS, MqttSourceConstants.DEFAULT_QOS);
batchSize = context.getInteger(MqttSourceConstants.BATCH_SIZE);
cleanSession = context.getBoolean(MqttSourceConstants.IF_SESSION_CLEAN, MqttSourceConstants.DEFAULT_IF_SESSION_CLEAN);
connectionTimeout = context.getInteger(MqttSourceConstants.CONNECTION_TIMEOUT, MqttSourceConstants.DEFAULT_CONNECTION_TIMEOUT);
keepAliveInterval = context.getInteger(MqttSourceConstants.KEEP_ALIVE_INTERVAL, MqttSourceConstants.DEFAULT_KEEP_ALIVE_INTERVAL);
username = context.getString(MqttSourceConstants.USERNAME);
password = context.getString(MqttSourceConstants.PASSWORD);
retryConnection = context.getBoolean(MqttSourceConstants.RETRY_CONNECTION, MqttSourceConstants.DEFAULT_RETRY_CONNECTION);
}
/**
* Start Job
*/
@Override
public synchronized void start() {
log.info("Starting mqtt source job ......");
mqttConnectOptions = new MqttConnectOptions();
memoryPersistence = new MemoryPersistence();
if (null != memoryPersistence && null != host) {
try {
mqttClient = new MqttClient(host, clientId, memoryPersistence);
} catch (MqttException e) {
throw new ConfigurationException("Mqtt host config error and error message : {}", e.fillInStackTrace());
}
}
if (null != mqttConnectOptions) {
mqttConnectOptions.setCleanSession(cleanSession);
mqttConnectOptions.setConnectionTimeout(connectionTimeout);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
if (null != username) {
mqttConnectOptions.setUserName(username);
}
if (null != password) {
mqttConnectOptions.setPassword(password.toCharArray());
}
if (null != mqttClient && !mqttClient.isConnected()) {
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("MqttClient disconnect call back retry connect......");
reConnect();
}
@Override
public void messageArrived(String topic, MqttMessage message) {
event = EventBuilder.withBody(message.getPayload());
if (null == batchSize) {
getChannelProcessor().processEvent(event);
} else if (eventList.size() < batchSize) {
eventList.add(event);
} else {
getChannelProcessor().processEventBatch(eventList);
eventList.clear();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
log.error("Get the MQTT connection exception, exception information is : {}", e.fillInStackTrace());
reConnect();
}
}
}
}
/**
* 重连
*/
public void reConnect() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
log.error("Retry Get the MQTT Thread sleep exception, exception information is : {}", e.fillInStackTrace());
}
if (null != mqttClient && !mqttClient.isConnected() && null != mqttConnectOptions) {
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
log.error("Retry Get the MQTT connection exception, exception information is : {}", e.fillInStackTrace());
}
} else {
start();
}
}
/**
* close source
*/
@Override
public synchronized void stop() {
if (mqttClient != null) {
try {
mqttClient.close();
} catch (MqttException e) {
log.error("mqttClient close an error occurs : {}", e.fillInStackTrace());
}
}
if (mqttConnectOptions != null) {
mqttConnectOptions = null;
}
if (memoryPersistence != null) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
log.error("memoryPersistence close an error occurs : {}", e.fillInStackTrace());
}
}
log.info("Mqtt Source {} stopped success.", getName());
super.stop();
}
}
package org.apache.flume.source.mqtt;
import java.util.UUID;
public class MqttSourceConstants {
public static final String HOST = "host";
public static final String TOPIC = "topic";
public static final String QOS = "qos";
public static final String BATCH_SIZE = "batchSize";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String IF_SESSION_CLEAN = "cleanSession";
public static final String CONNECTION_TIMEOUT = "connectionTimeout";
public static final String KEEP_ALIVE_INTERVAL = "keepAliveInterval";
public static final String RETRY_CONNECTION = "retryConnection";
public static final Integer DEFAULT_QOS = 1;
public static final Boolean DEFAULT_IF_SESSION_CLEAN = false;
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 30;
public static final Integer DEFAULT_KEEP_ALIVE_INTERVAL = 60;
public static final Boolean DEFAULT_RETRY_CONNECTION = false;
public static String getUuid() {
return UUID.randomUUID().toString().replaceAll("-", "").substring(0, 8);
}
}
任务配置:
# 配置文件名称为 mqtt.conf
# 指定Source的类型
agent.sources = mqtt
agent.channels = memory-channel
agent.sinks = logger
agent.sources.mqtt.type = org.apache.flume.source.mqtt.MqttSource
agent.sources.mqtt.host = tcp://150.158.190.192:1883
agent.sources.mqtt.topic = flume-demo
agent.sources.mqtt.qos = 1
#agent.sources.mqtt.batchSize = 1000
agent.sources.mqtt.cleanSession = false
agent.sources.mqtt.connectionTimeout = 10
agent.sources.mqtt.keepAliveInterval = 100
#agent.sources.mqtt.username = admin
#agent.sources.mqtt.password = admin@123
agent.sources.mqtt.retryConnection = true
agent.channels.memory-channel.type = memory
agent.sinks.logger.type = logger
agent.sources.mqtt.channels = memory-channel
agent.sinks.logger.channel = memory-channel
任务启动:
flume-ng agent --name agent --conf $FLUME_HOME/conf --conf-file mqtt.conf -Dflume.root.logger=INFO,console
这里需要注意的是,–name 的值,应该与配置文件里的一样,这里一开始测试的时候,写成了mqtt,导致flume获取不到任务配置
02 十一月 2022 17:29:35,413 INFO [main] (org.apache.flume.node.Application.startAllComponents:207) - Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }
如何保证数据不丢失
mqtt的每个client都有一个session,emqx的每个session都有一个飞行队列,防止因网络等原因客户端失联导致的数据丢失,使用方法:
- 订阅客户端设置:
cleanSession = False
- 在页面上设置
- emqx代理配置`/etc/emqx/emqx.conf
注意:force_shutdown { enable = true max_message_queue_len = 100000 max_heap_size = 1024MB } 如果使用默认的force_shutdown,会导致数据丢失,在页面上
- 如果不设置2,就会导致只保留最近的1000条数据
- 如果不设置3,就会导致订阅客户端重连后自动断开,emqx日志中会出现
Context: maximum heap size reached
,表示相应的客户端进程已经达到了最大堆栈内存占用限制,之后这个进程就会被 EMQX 强制 Kill。这一机制存在的原因是为了保证 EMQX 的可用性,避免客户端进程的内存占用无限制增长最终导致 EMQX OOM。客户端进程的堆栈占用主要来源于飞行窗口和消息队列中未完成确认或未投递的消息,而这两处消息堆积的主要原因通常是客户端消费能力不足,无法及时处理响应消息。
具体可以参考户端出现连接订阅等问题时如何排查?