From b66afaa62c91f123d863423d98479cdbfd4cdf99 Mon Sep 17 00:00:00 2001 From: Tom Knapen Date: Sat, 13 Mar 2021 07:44:49 +0100 Subject: [PATCH 1/3] wait until channel read completes before decoding packet when reading large amounts of data, this data may be read in multiple passes. we must wait until all data has been read before trying to decode a packet from this data. --- Sources/MQTT/MQTTChannelHandler.swift | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/Sources/MQTT/MQTTChannelHandler.swift b/Sources/MQTT/MQTTChannelHandler.swift index 8af068f..5998e39 100644 --- a/Sources/MQTT/MQTTChannelHandler.swift +++ b/Sources/MQTT/MQTTChannelHandler.swift @@ -13,6 +13,7 @@ final class MQTTChannelHandler: ChannelInboundHandler { private let decoder: MQTTDecoder weak var delegate: MQTTChannelHandlerDelegate? + private var packetData = Data() init(decoder: MQTTDecoder = MQTTDecoder()) { self.decoder = decoder @@ -21,19 +22,25 @@ final class MQTTChannelHandler: ChannelInboundHandler { func channelRead(context _: ChannelHandlerContext, data: NIOAny) { var buf = unwrapInboundIn(data) if let bytes = buf.readBytes(length: buf.readableBytes) { - var data = Data(bytes) - do { - let packet = try decoder.decode(data: &data) - delegate?.didReceive(packet: packet) - } catch let error as DecodeError { - delegate?.didCatch(decodeError: error) - } catch { - // unhandled error - fatalError("Error while decoding packet data: \(error.localizedDescription)") - } + packetData.append(contentsOf: bytes) } } + func channelReadComplete(context: ChannelHandlerContext) { + var data = packetData + packetData = Data() + + do { + let packet = try decoder.decode(data: &data) + delegate?.didReceive(packet: packet) + } catch let error as DecodeError { + delegate?.didCatch(decodeError: error) + } catch { + // unhandled error + fatalError("Error while decoding packet data: \(error.localizedDescription)") + } + } + func channelActive(context: ChannelHandlerContext) { delegate?.channelActive(channel: context.channel) } From 44d9b0eadb519d376ec27f0b7d26f757069c64c3 Mon Sep 17 00:00:00 2001 From: Tom Knapen Date: Sat, 13 Mar 2021 07:47:55 +0100 Subject: [PATCH 2/3] forward operations to next _ChannelInboundHandler in pipeline as suggested by the documentation --- Sources/MQTT/MQTTChannelHandler.swift | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Sources/MQTT/MQTTChannelHandler.swift b/Sources/MQTT/MQTTChannelHandler.swift index 5998e39..645b7eb 100644 --- a/Sources/MQTT/MQTTChannelHandler.swift +++ b/Sources/MQTT/MQTTChannelHandler.swift @@ -19,7 +19,9 @@ final class MQTTChannelHandler: ChannelInboundHandler { self.decoder = decoder } - func channelRead(context _: ChannelHandlerContext, data: NIOAny) { + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + defer { context.fireChannelRead(data) } + var buf = unwrapInboundIn(data) if let bytes = buf.readBytes(length: buf.readableBytes) { packetData.append(contentsOf: bytes) @@ -27,6 +29,8 @@ final class MQTTChannelHandler: ChannelInboundHandler { } func channelReadComplete(context: ChannelHandlerContext) { + defer { context.fireChannelReadComplete() } + var data = packetData packetData = Data() @@ -42,10 +46,12 @@ final class MQTTChannelHandler: ChannelInboundHandler { } func channelActive(context: ChannelHandlerContext) { + defer { context.fireChannelActive() } delegate?.channelActive(channel: context.channel) } - func channelInactive(context _: ChannelHandlerContext) { + func channelInactive(context: ChannelHandlerContext) { + defer { context.fireChannelInactive() } delegate?.channelInactive() } } From c57f262f60a6f3c7ca9c4b03282adad4b3b130e0 Mon Sep 17 00:00:00 2001 From: Tom Knapen Date: Sat, 13 Mar 2021 07:56:40 +0100 Subject: [PATCH 3/3] convert indentation to spaces --- Sources/MQTT/MQTTChannelHandler.swift | 38 +++++++++++++-------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/Sources/MQTT/MQTTChannelHandler.swift b/Sources/MQTT/MQTTChannelHandler.swift index 645b7eb..c004130 100644 --- a/Sources/MQTT/MQTTChannelHandler.swift +++ b/Sources/MQTT/MQTTChannelHandler.swift @@ -13,45 +13,45 @@ final class MQTTChannelHandler: ChannelInboundHandler { private let decoder: MQTTDecoder weak var delegate: MQTTChannelHandlerDelegate? - private var packetData = Data() + private var packetData = Data() init(decoder: MQTTDecoder = MQTTDecoder()) { self.decoder = decoder } func channelRead(context: ChannelHandlerContext, data: NIOAny) { - defer { context.fireChannelRead(data) } + defer { context.fireChannelRead(data) } var buf = unwrapInboundIn(data) if let bytes = buf.readBytes(length: buf.readableBytes) { - packetData.append(contentsOf: bytes) + packetData.append(contentsOf: bytes) } } - func channelReadComplete(context: ChannelHandlerContext) { - defer { context.fireChannelReadComplete() } + func channelReadComplete(context: ChannelHandlerContext) { + defer { context.fireChannelReadComplete() } - var data = packetData - packetData = Data() + var data = packetData + packetData = Data() - do { - let packet = try decoder.decode(data: &data) - delegate?.didReceive(packet: packet) - } catch let error as DecodeError { - delegate?.didCatch(decodeError: error) - } catch { - // unhandled error - fatalError("Error while decoding packet data: \(error.localizedDescription)") - } - } + do { + let packet = try decoder.decode(data: &data) + delegate?.didReceive(packet: packet) + } catch let error as DecodeError { + delegate?.didCatch(decodeError: error) + } catch { + // unhandled error + fatalError("Error while decoding packet data: \(error.localizedDescription)") + } + } func channelActive(context: ChannelHandlerContext) { - defer { context.fireChannelActive() } + defer { context.fireChannelActive() } delegate?.channelActive(channel: context.channel) } func channelInactive(context: ChannelHandlerContext) { - defer { context.fireChannelInactive() } + defer { context.fireChannelInactive() } delegate?.channelInactive() } }