Skip to content

TCP下方过快, 超过设备处理能力, 数据丢失

问题原因分析

在物联网(IoT)设备通信中,为了增强操作的可控性,通常会将设备的操作细化为多个指令。这种做法虽然提高了控制的精确度,但也带来了数据传输速度超过设备处理能力的问题。

设备接收数据的速度超过了处理速度,导致缓冲区(缓存)中的条目数达到上限。新到达的数据可能会被丢弃,导致数据丢失

解决方案

Netty流量控制(Flow Control)

(1) 设置高低水位

java
serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 8 * 1024 * 1024));

注:高低水位配合后面的isWritable使用

增加channel.isWritable()的判断

java
@Slf4j
@Component
public class RabbitMqCosumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_tcp"),
            exchange = @Exchange(value = "topic_exchange_reply",type="topic"),key="tcp.#"))
    public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
        String body = new String(message.getBody());
        log.info("receive -> " + body);
        TcpMsgDto tcpMsgDto = JSONUtil.toBean(body, TcpMsgDto.class);
        Channel socketChannel = ChannelMap.getInstance().getChannel(tcpMsgDto.getImei(), tcpMsgDto.getChannelId());
        //增加channel.isWritable()的判断
        if (socketChannel != null && socketChannel.isActive() && socketChannel.isWritable()) {
            ByteBuf buf = Unpooled.buffer(); // 非池化

            // 标准协议
            buf.writeBytes(new byte[]{(byte) 0xFF, (byte) 0xFF});
            buf.writeBytes(tcpMsgDto.getData().getBytes(StandardCharsets.UTF_8));
            buf.writeBytes("\n".getBytes(StandardCharsets.UTF_8));

            socketChannel.writeAndFlush(buf);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认消息消费成功
        } else {
            // 这里可以打印日志和抛出异常进入mq重试机制
        }
    }
}

(2) Nagle算法

Nagle算法基础扫盲

ChannelOption.TCP_NODELAY: Nagle算法可以将小的数据包合并成一个大包发送,减少数据包数量,从而降低流量压力。

java
serverBootstrap.option(ChannelOption.TCP_NODELAY, false);

(3) IdleStateHandler (空闲状态处理)

使用IdleStateHandler设置读写超时,检测设备的空闲状态,在一定时间内无数据交互时,暂停或减慢数据的发送速率。

java
int READ_IDEL_TIME_OUT = 0; // 读超时
int WRITE_IDEL_TIME_OUT = 0;// 写超时
/**
 * 30分钟
 * 所有超时(读或写), 自动断开连接
 */
int ALL_IDEL_TIME_OUT = 30 * 60;

try {
    serverBootstrap.group(parentGroup, childGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
        .option(ChannelOption.SO_BACKLOG, 1024)
        // 池化并使用直接内存
        .childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                // Decoders
                // pipeline.addLast("lineDecoder", new LineBasedFrameDecoder(2048));
                // pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));

                pipeline.addLast(new LoggingHandler(LogLevel.INFO));

                pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
                        WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // 1
                // Encoder
//                        pipeline.addLast("stringEncoder", new NettyStringEncoder());
//                        pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
//                        pipeline.addLast("lineEncoder", new LineEncoder());

                // pipeline.addLast("bikeHandler", nettyInboundHandler);
            }
        });
    ChannelFuture future = server.bind(port).sync();
    future.channel().closeFuture().sync();
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    parentGroup.shutdownGracefully();
    parentGroup = null;
    childGroup.shutdownGracefully();
    childGroup = null;
}

(4) 自定义指令发送间隔

参考: 延迟任务处理 HashedWheelTimer

其他

  • 数据批处理 将细化的多个小指令合并成更大的数据块进行传输,这样可以减少设备处理的次数,并提高数据传输的效率。可以通过应用层协议调整数据包的大小,避免设备处理过多的小数据包。缺点:需要设备支持

  • 使用MQTT协议

粤ICP备20009776号