diff --git a/Sources/MQTT/MQTTChannelHandler.swift b/Sources/MQTT/MQTTChannelHandler.swift index 8af068f..c004130 100644 --- a/Sources/MQTT/MQTTChannelHandler.swift +++ b/Sources/MQTT/MQTTChannelHandler.swift @@ -13,32 +13,45 @@ final class MQTTChannelHandler: ChannelInboundHandler { private let decoder: MQTTDecoder weak var delegate: MQTTChannelHandlerDelegate? + private var packetData = Data() init(decoder: MQTTDecoder = MQTTDecoder()) { self.decoder = decoder } - func channelRead(context _: ChannelHandlerContext, data: NIOAny) { + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + defer { context.fireChannelRead(data) } + var buf = unwrapInboundIn(data) if let bytes = buf.readBytes(length: buf.readableBytes) { - var data = Data(bytes) - do { - let packet = try decoder.decode(data: &data) - delegate?.didReceive(packet: packet) - } catch let error as DecodeError { - delegate?.didCatch(decodeError: error) - } catch { - // unhandled error - fatalError("Error while decoding packet data: \(error.localizedDescription)") - } + packetData.append(contentsOf: bytes) + } + } + + func channelReadComplete(context: ChannelHandlerContext) { + defer { context.fireChannelReadComplete() } + + var data = packetData + packetData = Data() + + do { + let packet = try decoder.decode(data: &data) + delegate?.didReceive(packet: packet) + } catch let error as DecodeError { + delegate?.didCatch(decodeError: error) + } catch { + // unhandled error + fatalError("Error while decoding packet data: \(error.localizedDescription)") } } func channelActive(context: ChannelHandlerContext) { + defer { context.fireChannelActive() } delegate?.channelActive(channel: context.channel) } - func channelInactive(context _: ChannelHandlerContext) { + func channelInactive(context: ChannelHandlerContext) { + defer { context.fireChannelInactive() } delegate?.channelInactive() } }