主题
TCP下方过快, 超过设备处理能力, 数据丢失
问题原因分析
在物联网(IoT)设备通信中,为了增强操作的可控性,通常会将设备的操作细化为多个指令。这种做法虽然提高了控制的精确度,但也带来了数据传输速度超过设备处理能力的问题。
设备接收数据的速度超过了处理速度,导致缓冲区(缓存)中的条目数达到上限。新到达的数据可能会被丢弃,导致数据丢失
。
解决方案
Netty流量控制(Flow Control)
(1) 设置高低水位
java
serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 8 * 1024 * 1024));
注:高低水位配合后面的isWritable使用
增加channel.isWritable()的判断
java
@Slf4j
@Component
public class RabbitMqCosumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_tcp"),
exchange = @Exchange(value = "topic_exchange_reply",type="topic"),key="tcp.#"))
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
String body = new String(message.getBody());
log.info("receive -> " + body);
TcpMsgDto tcpMsgDto = JSONUtil.toBean(body, TcpMsgDto.class);
Channel socketChannel = ChannelMap.getInstance().getChannel(tcpMsgDto.getImei(), tcpMsgDto.getChannelId());
//增加channel.isWritable()的判断
if (socketChannel != null && socketChannel.isActive() && socketChannel.isWritable()) {
ByteBuf buf = Unpooled.buffer(); // 非池化
// 标准协议
buf.writeBytes(new byte[]{(byte) 0xFF, (byte) 0xFF});
buf.writeBytes(tcpMsgDto.getData().getBytes(StandardCharsets.UTF_8));
buf.writeBytes("\n".getBytes(StandardCharsets.UTF_8));
socketChannel.writeAndFlush(buf);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认消息消费成功
} else {
// 这里可以打印日志和抛出异常进入mq重试机制
}
}
}
(2) Nagle算法
ChannelOption.TCP_NODELAY: Nagle算法可以将小的数据包合并成一个大包发送,减少数据包数量,从而降低流量压力。
java
serverBootstrap.option(ChannelOption.TCP_NODELAY, false);
(3) IdleStateHandler (空闲状态处理)
使用IdleStateHandler设置读写超时,检测设备的空闲状态,在一定时间内无数据交互时,暂停或减慢数据的发送速率。
java
int READ_IDEL_TIME_OUT = 0; // 读超时
int WRITE_IDEL_TIME_OUT = 0;// 写超时
/**
* 30分钟
* 所有超时(读或写), 自动断开连接
*/
int ALL_IDEL_TIME_OUT = 30 * 60;
try {
serverBootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.option(ChannelOption.SO_BACKLOG, 1024)
// 池化并使用直接内存
.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// Decoders
// pipeline.addLast("lineDecoder", new LineBasedFrameDecoder(2048));
// pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // 1
// Encoder
// pipeline.addLast("stringEncoder", new NettyStringEncoder());
// pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
// pipeline.addLast("lineEncoder", new LineEncoder());
// pipeline.addLast("bikeHandler", nettyInboundHandler);
}
});
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
parentGroup = null;
childGroup.shutdownGracefully();
childGroup = null;
}
(4) 自定义指令发送间隔
其他
数据批处理 将细化的多个小指令合并成更大的数据块进行传输,这样可以减少设备处理的次数,并提高数据传输的效率。可以通过应用层协议调整数据包的大小,避免设备处理过多的小数据包。缺点:需要设备支持
使用MQTT协议