@@ -520,148 +520,174 @@ impl Decoder for LogstashDecoder {
520
520
}
521
521
// https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type
522
522
LogstashDecoderReadState :: ReadFrame ( protocol, LogstashFrameType :: Data ) => {
523
- let mut rest = src. as_ref ( ) ;
524
-
525
- if rest. remaining ( ) < 8 {
523
+ let Some ( frame) = decode_data_frame ( protocol, src) else {
526
524
return Ok ( None ) ;
527
- }
528
- let sequence_number = rest. get_u32 ( ) ;
529
- let pair_count = rest. get_u32 ( ) ;
530
-
531
- let mut fields = BTreeMap :: < KeyString , serde_json:: Value > :: new ( ) ;
532
- for _ in 0 ..pair_count {
533
- if src. remaining ( ) < 4 {
534
- return Ok ( None ) ;
535
- }
536
- let key_length = rest. get_u32 ( ) as usize ;
537
-
538
- if rest. remaining ( ) < key_length {
539
- return Ok ( None ) ;
540
- }
541
- let ( key, right) = rest. split_at ( key_length) ;
542
- rest = right;
543
-
544
- if src. remaining ( ) < 4 {
545
- return Ok ( None ) ;
546
- }
547
- let value_length = rest. get_u32 ( ) as usize ;
548
- if rest. remaining ( ) < value_length {
549
- return Ok ( None ) ;
550
- }
551
- let ( value, right) = rest. split_at ( value_length) ;
552
- rest = right;
553
-
554
- fields. insert (
555
- String :: from_utf8_lossy ( key) . into ( ) ,
556
- String :: from_utf8_lossy ( value) . into ( ) ,
557
- ) ;
558
- }
559
-
560
- let remaining = rest. remaining ( ) ;
561
- let byte_size = src. remaining ( ) - remaining;
562
-
563
- src. advance ( byte_size) ;
525
+ } ;
564
526
565
- let frames = vec ! [ (
566
- LogstashEventFrame {
567
- protocol,
568
- sequence_number,
569
- fields,
570
- } ,
571
- byte_size,
572
- ) ]
573
- . into ( ) ;
574
-
575
- LogstashDecoderReadState :: PendingFrames ( frames)
527
+ LogstashDecoderReadState :: PendingFrames ( [ frame] . into ( ) )
576
528
}
577
529
// https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type
578
530
LogstashDecoderReadState :: ReadFrame ( protocol, LogstashFrameType :: Json ) => {
579
- let mut rest = src. as_ref ( ) ;
580
-
581
- if rest. remaining ( ) < 8 {
531
+ let Some ( frame) = decode_json_frame ( protocol, src) ? else {
582
532
return Ok ( None ) ;
583
- }
584
- let sequence_number = rest. get_u32 ( ) ;
585
- let payload_size = rest. get_u32 ( ) as usize ;
533
+ } ;
586
534
587
- if rest. remaining ( ) < payload_size {
535
+ LogstashDecoderReadState :: PendingFrames ( [ frame] . into ( ) )
536
+ }
537
+ // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type
538
+ LogstashDecoderReadState :: ReadFrame ( _protocol, LogstashFrameType :: Compressed ) => {
539
+ let Some ( frames) = decode_compressed_frame ( src) ? else {
588
540
return Ok ( None ) ;
589
- }
541
+ } ;
590
542
591
- let ( slice, right) = rest. split_at ( payload_size) ;
592
- rest = right;
543
+ LogstashDecoderReadState :: PendingFrames ( frames)
544
+ }
545
+ } ;
546
+ }
547
+ }
548
+ }
593
549
594
- let fields_result: Result < BTreeMap < KeyString , serde_json:: Value > , _ > =
595
- serde_json:: from_slice ( slice) . context ( JsonFrameFailedDecodeSnafu { } ) ;
550
+ /// Decode the Lumberjack version 1 protocol, which use the Key:Value format.
551
+ fn decode_data_frame (
552
+ protocol : LogstashProtocolVersion ,
553
+ src : & mut BytesMut ,
554
+ ) -> Option < ( LogstashEventFrame , usize ) > {
555
+ let mut rest = src. as_ref ( ) ;
596
556
597
- let remaining = rest. remaining ( ) ;
598
- let byte_size = src. remaining ( ) - remaining;
557
+ if rest. remaining ( ) < 8 {
558
+ return None ;
559
+ }
560
+ let sequence_number = rest. get_u32 ( ) ;
561
+ let pair_count = rest. get_u32 ( ) ;
562
+ if pair_count == 0 {
563
+ return None ; // Invalid number of fields
564
+ }
599
565
600
- src. advance ( byte_size) ;
566
+ let mut fields = BTreeMap :: < KeyString , serde_json:: Value > :: new ( ) ;
567
+ for _ in 0 ..pair_count {
568
+ let ( key, value, right) = decode_pair ( rest) ?;
569
+ rest = right;
601
570
602
- match fields_result {
603
- Ok ( fields) => {
604
- let frames = vec ! [ (
605
- LogstashEventFrame {
606
- protocol,
607
- sequence_number,
608
- fields,
609
- } ,
610
- byte_size,
611
- ) ]
612
- . into ( ) ;
571
+ fields. insert (
572
+ String :: from_utf8_lossy ( key) . into ( ) ,
573
+ String :: from_utf8_lossy ( value) . into ( ) ,
574
+ ) ;
575
+ }
613
576
614
- LogstashDecoderReadState :: PendingFrames ( frames)
615
- }
616
- Err ( err) => return Err ( err) ,
617
- }
618
- }
619
- // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type
620
- LogstashDecoderReadState :: ReadFrame ( _protocol, LogstashFrameType :: Compressed ) => {
621
- let mut rest = src. as_ref ( ) ;
577
+ let byte_size = bytes_remaining ( src, rest) ;
578
+ src. advance ( byte_size) ;
622
579
623
- if rest. remaining ( ) < 4 {
624
- return Ok ( None ) ;
625
- }
626
- let payload_size = rest. get_u32 ( ) as usize ;
580
+ Some ( (
581
+ LogstashEventFrame {
582
+ protocol,
583
+ sequence_number,
584
+ fields,
585
+ } ,
586
+ byte_size,
587
+ ) )
588
+ }
627
589
628
- if rest. remaining ( ) < payload_size {
629
- src. reserve ( payload_size) ;
630
- return Ok ( None ) ;
631
- }
590
+ fn decode_pair ( mut rest : & [ u8 ] ) -> Option < ( & [ u8 ] , & [ u8 ] , & [ u8 ] ) > {
591
+ if rest. remaining ( ) < 4 {
592
+ return None ;
593
+ }
594
+ let key_length = rest. get_u32 ( ) as usize ;
632
595
633
- let ( slice, right) = rest. split_at ( payload_size) ;
634
- rest = right;
596
+ if rest. remaining ( ) < key_length {
597
+ return None ;
598
+ }
599
+ let ( key, right) = rest. split_at ( key_length) ;
600
+ rest = right;
635
601
636
- let mut buf = {
637
- let mut buf = Vec :: new ( ) ;
602
+ if rest. remaining ( ) < 4 {
603
+ return None ;
604
+ }
605
+ let value_length = rest. get_u32 ( ) as usize ;
606
+ if rest. remaining ( ) < value_length {
607
+ return None ;
608
+ }
609
+ let ( value, right) = rest. split_at ( value_length) ;
610
+ Some ( ( key, value, right) )
611
+ }
638
612
639
- let res = ZlibDecoder :: new ( io:: Cursor :: new ( slice) )
640
- . read_to_end ( & mut buf)
641
- . context ( DecompressionFailedSnafu )
642
- . map ( |_| BytesMut :: from ( & buf[ ..] ) ) ;
613
+ fn decode_json_frame (
614
+ protocol : LogstashProtocolVersion ,
615
+ src : & mut BytesMut ,
616
+ ) -> Result < Option < ( LogstashEventFrame , usize ) > , DecodeError > {
617
+ let mut rest = src. as_ref ( ) ;
643
618
644
- let remaining = rest. remaining ( ) ;
645
- let byte_size = src. remaining ( ) - remaining;
619
+ if rest. remaining ( ) < 8 {
620
+ return Ok ( None ) ;
621
+ }
622
+ let sequence_number = rest. get_u32 ( ) ;
623
+ let payload_size = rest. get_u32 ( ) as usize ;
646
624
647
- src. advance ( byte_size) ;
625
+ if rest. remaining ( ) < payload_size {
626
+ return Ok ( None ) ;
627
+ }
648
628
649
- res
650
- } ? ;
629
+ let ( slice , right ) = rest . split_at ( payload_size ) ;
630
+ rest = right ;
651
631
652
- let mut decoder = LogstashDecoder :: new ( ) ;
632
+ let fields: BTreeMap < KeyString , serde_json:: Value > =
633
+ serde_json:: from_slice ( slice) . context ( JsonFrameFailedDecodeSnafu { } ) ?;
653
634
654
- let mut frames = VecDeque :: new ( ) ;
635
+ let byte_size = bytes_remaining ( src, rest) ;
636
+ src. advance ( byte_size) ;
655
637
656
- while let Some ( s) = decoder. decode ( & mut buf) ? {
657
- frames. push_back ( s) ;
658
- }
638
+ Ok ( Some ( (
639
+ LogstashEventFrame {
640
+ protocol,
641
+ sequence_number,
642
+ fields,
643
+ } ,
644
+ byte_size,
645
+ ) ) )
646
+ }
659
647
660
- LogstashDecoderReadState :: PendingFrames ( frames)
661
- }
662
- } ;
663
- }
648
+ fn decode_compressed_frame (
649
+ src : & mut BytesMut ,
650
+ ) -> Result < Option < VecDeque < ( LogstashEventFrame , usize ) > > , DecodeError > {
651
+ let mut rest = src. as_ref ( ) ;
652
+
653
+ if rest. remaining ( ) < 4 {
654
+ return Ok ( None ) ;
664
655
}
656
+ let payload_size = rest. get_u32 ( ) as usize ;
657
+
658
+ if rest. remaining ( ) < payload_size {
659
+ src. reserve ( payload_size) ;
660
+ return Ok ( None ) ;
661
+ }
662
+
663
+ let ( slice, right) = rest. split_at ( payload_size) ;
664
+ rest = right;
665
+
666
+ let mut buf = Vec :: new ( ) ;
667
+
668
+ let res = ZlibDecoder :: new ( io:: Cursor :: new ( slice) )
669
+ . read_to_end ( & mut buf)
670
+ . context ( DecompressionFailedSnafu )
671
+ . map ( |_| BytesMut :: from ( & buf[ ..] ) ) ;
672
+
673
+ let byte_size = bytes_remaining ( src, rest) ;
674
+ src. advance ( byte_size) ;
675
+
676
+ let mut buf = res?;
677
+
678
+ let mut decoder = LogstashDecoder :: new ( ) ;
679
+
680
+ let mut frames = VecDeque :: new ( ) ;
681
+
682
+ while let Some ( s) = decoder. decode ( & mut buf) ? {
683
+ frames. push_back ( s) ;
684
+ }
685
+ Ok ( Some ( frames) )
686
+ }
687
+
688
+ fn bytes_remaining ( src : & BytesMut , rest : & [ u8 ] ) -> usize {
689
+ let remaining = rest. remaining ( ) ;
690
+ src. remaining ( ) - remaining
665
691
}
666
692
667
693
impl From < LogstashEventFrame > for Event {
@@ -685,6 +711,7 @@ impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
685
711
#[ cfg( test) ]
686
712
mod test {
687
713
use bytes:: BufMut ;
714
+ use futures:: Stream ;
688
715
use rand:: { thread_rng, Rng } ;
689
716
use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
690
717
use vector_lib:: lookup:: OwnedTargetPath ;
@@ -715,26 +742,32 @@ mod test {
715
742
test_protocol ( EventStatus :: Rejected , false ) . await ;
716
743
}
717
744
745
+ async fn start_logstash (
746
+ status : EventStatus ,
747
+ ) -> ( SocketAddr , impl Stream < Item = Event > + Unpin ) {
748
+ let ( sender, recv) = SourceSender :: new_test_finalize ( status) ;
749
+ let address = next_addr ( ) ;
750
+ let source = LogstashConfig {
751
+ address : address. into ( ) ,
752
+ tls : None ,
753
+ permit_origin : None ,
754
+ keepalive : None ,
755
+ receive_buffer_bytes : None ,
756
+ acknowledgements : true . into ( ) ,
757
+ connection_limit : None ,
758
+ log_namespace : None ,
759
+ }
760
+ . build ( SourceContext :: new_test ( sender, None ) )
761
+ . await
762
+ . unwrap ( ) ;
763
+ tokio:: spawn ( source) ;
764
+ wait_for_tcp ( address) . await ;
765
+ ( address, recv)
766
+ }
767
+
718
768
async fn test_protocol ( status : EventStatus , sends_ack : bool ) {
719
769
let events = assert_source_compliance ( & SOCKET_PUSH_SOURCE_TAGS , async {
720
- let ( sender, recv) = SourceSender :: new_test_finalize ( status) ;
721
- let address = next_addr ( ) ;
722
- let source = LogstashConfig {
723
- address : address. into ( ) ,
724
- tls : None ,
725
- permit_origin : None ,
726
- keepalive : None ,
727
- receive_buffer_bytes : None ,
728
- acknowledgements : true . into ( ) ,
729
- connection_limit : None ,
730
- log_namespace : None ,
731
- }
732
- . build ( SourceContext :: new_test ( sender, None ) )
733
- . await
734
- . unwrap ( ) ;
735
- tokio:: spawn ( source) ;
736
- wait_for_tcp ( address) . await ;
737
-
770
+ let ( address, recv) = start_logstash ( status) . await ;
738
771
spawn_collect_n (
739
772
send_req ( address, & [ ( "message" , "Hello, world!" ) ] , sends_ack) ,
740
773
recv,
@@ -773,6 +806,18 @@ mod test {
773
806
req. into ( )
774
807
}
775
808
809
+ #[ test]
810
+ fn v1_decoder_does_not_panic ( ) {
811
+ let seq = thread_rng ( ) . gen_range ( 1 ..u32:: MAX ) ;
812
+ let req = encode_req ( seq, & [ ( "message" , "Hello, World!" ) ] ) ;
813
+ for i in 0 ..req. len ( ) - 1 {
814
+ assert ! (
815
+ decode_data_frame( LogstashProtocolVersion :: V1 , & mut BytesMut :: from( & req[ ..i] ) )
816
+ . is_none( )
817
+ ) ;
818
+ }
819
+ }
820
+
776
821
async fn send_req ( address : SocketAddr , pairs : & [ ( & str , & str ) ] , sends_ack : bool ) {
777
822
let seq = thread_rng ( ) . gen_range ( 1 ..u32:: MAX ) ;
778
823
let mut socket = tokio:: net:: TcpStream :: connect ( address) . await . unwrap ( ) ;
0 commit comments