Skip to content

Commit 9de29e6

Browse files
authored
feat: add NMEAQueue to support flexible queueing (#166)
* feat: add NMEAQueue to support flexible queueing * chore: renames examples/tee.py to examples/queue.py * chore: renames examples/tee.py to examples/nmea_queue.py * chore: adds docs, bumps version, updates changelog
1 parent 1442703 commit 9de29e6

9 files changed

+307
-2
lines changed

CHANGELOG.txt

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
====================
22
pyais CHANGELOG
33
====================
4+
-------------------------------------------------------------------------------
5+
Version 2.9.0 23 Feb 2025
6+
-------------------------------------------------------------------------------
7+
* added NMEAQueue class
8+
* supports both single-line and multi-line NMEA sentences
9+
* buffers fragments until all parts are available
10+
* handles tag blocks
11+
* supports gatehouse wrappers
412
-------------------------------------------------------------------------------
513
Version 2.8.4 26 Jan 2025
614
-------------------------------------------------------------------------------

docs/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ Welcome to Pyais's documentation!
1414
examples
1515
messages
1616
filters
17+
queue

docs/queue.rst

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
NMEAQueue: Assembling Complete NMEA Sentences
2+
=============================================
3+
4+
The ``NMEAQueue`` class provides a robust mechanism for assembling and managing NMEA sentences, including both single-line and multi-line messages. It is designed to handle AIS messages and integrates with ``TagBlockQueue`` for processing tag blocks.
5+
6+
Features
7+
--------
8+
- **Single-Line Sentence Handling**: Single-line NMEA sentences are added directly to the queue.
9+
- **Multi-Line Sentence Assembly**: Multi-line sentences are buffered until all fragments are available, after which they are assembled and added to the queue.
10+
- **Gatehouse Wrappers**: Supports gatehouse wrappers for AIS messages, associating them with the next AIS message in the sequence.
11+
- **Graceful Error Handling**: Invalid or malformed sentences are skipped without interrupting the processing flow.
12+
- **Integration with TagBlockQueue**: If a ``TagBlockQueue`` instance is provided, sentences are added to it for tag block processing.
13+
14+
Constructor
15+
-----------
16+
.. code-block:: python
17+
18+
NMEAQueue(maxsize: int = 0, tbq: typing.Optional[TagBlockQueue] = None)
19+
20+
- ``maxsize``: The maximum size of the queue. Defaults to 0 (unlimited).
21+
- ``tbq``: An optional ``TagBlockQueue`` instance for handling tag blocks.
22+
23+
Methods
24+
-------
25+
- ``put_line(line: bytes, block: bool = True, timeout: typing.Optional[float] = None)``: Adds a line of raw bytes to the queue. This method processes the line, handles multi-line assembly, and integrates with ``TagBlockQueue`` if provided.
26+
- ``get_or_none() -> typing.Optional[NMEASentence]``: Retrieves the last message from the queue in a non-blocking manner. Returns ``None`` if the queue is empty.
27+
28+
Example Usage
29+
-------------
30+
.. code-block:: python
31+
32+
from pyais.stream import TagBlockQueue
33+
from my_module import NMEAQueue
34+
35+
# Initialize a TagBlockQueue
36+
tbq = TagBlockQueue()
37+
38+
# Create an NMEAQueue instance
39+
queue = NMEAQueue(tbq=tbq)
40+
41+
# Add a line of raw bytes to the queue
42+
queue.put_line(b"!AIVDM,1,1,,A,15N:;R0P00PD;88MD5NS8v2P00,0*3C")
43+
44+
# Retrieve the assembled sentence
45+
sentence = queue.get_or_none()
46+
if sentence:
47+
print(sentence)
48+
49+
Notes
50+
-----
51+
- **Error Handling**: The ``put_line`` method skips invalid messages (e.g., malformed sentences or those with non-printable characters) without raising exceptions.
52+
- **Buffering**: Multi-line sentences are buffered using a unique key based on the sequence ID and channel. Once all fragments are received, the sentence is assembled and added to the queue.
53+
54+
This class simplifies the process of handling NMEA sentences, making it easier to work with AIS data streams in real-time applications.

examples/nmea_queue.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import pathlib
2+
from pyais.queue import NMEAQueue
3+
4+
5+
if __name__ == '__main__':
6+
7+
# Example 1: use a queue to read NMEA/AIS messages from a file
8+
filename = pathlib.Path(__file__).parent.joinpath('../tests/mixed.txt')
9+
10+
q = NMEAQueue()
11+
12+
with open(filename, 'rb') as fd:
13+
for line in fd.readlines():
14+
q.put_line(line)
15+
16+
if x := q.get_or_none():
17+
print(x.decode())
18+
else:
19+
print(line)
20+
21+
# Example 2: put lines into the queue manually
22+
23+
q = NMEAQueue()
24+
q.qsize() # Initially empty
25+
26+
# Raw text.
27+
q.put_line(b'Hello there!')
28+
q.qsize() # Still empty
29+
30+
# Put a multi-line message into the queue
31+
q.put_line(b'!AIVDM,2,1,1,A,55?MbV02;H;s<HtKR20EHE:0@T4@Dn2222222216L961O5Gf0NSQEp6ClRp8,0*1C')
32+
q.put_line(b'!AIVDM,2,2,1,A,88888888880,2*25')
33+
q.qsize() # Returns 1
34+
q.get_or_none()
35+
q.qsize() # Empty again
36+
37+
# A multi-line message with tag blocks
38+
q.put_line(b'\\g:1-2-73874*A\\!AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F')
39+
q.put_line(b'\\g:2-2-73874,n:157037*A\\!AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B')
40+
q.get_or_none()

pyais/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pyais.tracker import AISTracker, AISTrack
66

77
__license__ = 'MIT'
8-
__version__ = '2.8.4'
8+
__version__ = '2.9.0'
99
__author__ = 'Leon Morten Richter'
1010

1111
__all__ = (

pyais/queue.py

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
2+
import queue
3+
import typing
4+
5+
from pyais.exceptions import InvalidNMEAMessageException, NonPrintableCharacterException, UnknownMessageException
6+
from pyais.messages import AISSentence, GatehouseSentence, NMEASentence, NMEASentenceFactory
7+
from pyais.stream import TagBlockQueue
8+
9+
10+
class NMEAQueue(queue.Queue[AISSentence]):
11+
"""Assembles complete NMEA sentences.
12+
13+
Single-line sentences are added to the queue directly. Multi-line sentences
14+
are buffered until all fragments are available, after which they are added to the queue."""
15+
16+
def __init__(self, maxsize: int = 0, tbq: typing.Optional[TagBlockQueue] = None) -> None:
17+
super().__init__(maxsize)
18+
self.tbq = tbq
19+
self.buffer: typing.Dict[typing.Tuple[int, str], typing.List[typing.Optional[AISSentence]]] = {}
20+
self.last_wrapper: typing.Optional[GatehouseSentence] = None
21+
22+
def __add_to_tbq(self, sentence: NMEASentence) -> None:
23+
if not self.tbq:
24+
# Tag Block Queue not defined. Do nothing.
25+
return
26+
self.tbq.put_sentence(sentence)
27+
28+
def put(self, item: object, block: bool = True, timeout: typing.Optional[float] = None) -> None:
29+
"""This method only exists to please mypy. Use put_line instead."""
30+
raise ValueError('do not call NMEAQueue.put() directly. Use NMEAQueue.put_line() instead!')
31+
32+
def put_line(self, line: bytes, block: bool = True, timeout: typing.Optional[float] = None) -> None:
33+
"""Put a line of raw bytes, as part of an NMEA sentence, into the queue."""
34+
try:
35+
sentence = NMEASentenceFactory.produce(line)
36+
self.__add_to_tbq(sentence)
37+
if sentence.TYPE == GatehouseSentence.TYPE:
38+
# Remember gatehouse wrappers for the next AIS message
39+
sentence = typing.cast(GatehouseSentence, sentence)
40+
self.last_wrapper = sentence
41+
return None
42+
except (InvalidNMEAMessageException, NonPrintableCharacterException, UnknownMessageException, IndexError):
43+
# Be gentle and just skip invalid messages
44+
return None
45+
46+
if not sentence.TYPE == AISSentence.TYPE:
47+
return None
48+
49+
sentence = typing.cast(AISSentence, sentence)
50+
51+
if sentence.is_single:
52+
if self.last_wrapper:
53+
# Check if there was a wrapper message right before this line
54+
sentence.wrapper_msg = self.last_wrapper
55+
self.last_wrapper = None
56+
super().put(sentence, block, timeout)
57+
else:
58+
# Instead of None use -1 as a seq_id
59+
seq_id = sentence.seq_id
60+
if seq_id is None:
61+
seq_id = -1
62+
63+
# seq_id and channel make a unique stream
64+
slot = (seq_id, sentence.channel)
65+
66+
if slot not in self.buffer:
67+
# Create a new array in the buffer that has enough space for all fragments
68+
self.buffer[slot] = [None, ] * max(sentence.fragment_count, 0xff)
69+
70+
self.buffer[slot][sentence.frag_num - 1] = sentence
71+
msg_parts = self.buffer[slot][0:sentence.fragment_count]
72+
73+
# Check if all fragments are found
74+
not_none_parts = [m for m in msg_parts if m is not None]
75+
if len(not_none_parts) == sentence.fragment_count:
76+
# Assemble the full message and clear the buffer
77+
full = AISSentence.assemble_from_iterable(not_none_parts)
78+
del self.buffer[slot]
79+
super().put(full, block, timeout)
80+
81+
def get_or_none(self) -> typing.Optional[NMEASentence]:
82+
"""Non-blocking helper method to retrieve the last message, if one is available"""
83+
try:
84+
return self.get(block=False)
85+
except queue.Empty:
86+
return None

tests/mixed.txt

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# taken from https://www.aishub.net/ais-dispatcher
2+
!AIVDM,1,1,,A,13HOI:0P0000VOHLCnHQKwvL05Ip,0*23
3+
!AIVDM,1,1,,A,133sVfPP00PD>hRMDH@jNOvN20S8,0*7F
4+
!AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B
5+
!AIVDM,1,1,,B,13eaJF0P00Qd388Eew6aagvH85Ip,0*45
6+
7+
$GPGGA,184353.07,1929.045,S,02410.506,E,1,04,2.6,100.00,M,-33.9,M,,0000*6D
8+
$GPRTE,2,1,c,0,PBRCPK,PBRTO,PTELGR,PPLAND,PYAMBU,PPFAIR,PWARRN,PMORTL,PLISMR*73
9+
10+
!AIVDM,1,1,,A,14eGrSPP00ncMJTO5C6aBwvP2D0?,0*7A
11+
!AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F
12+
13+
$GPR00,A,B,C*29
14+
foobar
15+
!AIVDM,2,1,1,A,55?MbV02;H;s<HtKR20EHE:0@T4@Dn2222222216L961O5Gf0NSQEp6ClRp8,0*1C
16+
!AIVDM,2,2,1,A,88888888880,2*25
17+
18+
$IIMWV,271.0,R,000.2,N,A*3B
19+
# Lines with a leading `#` are ignored
20+
# Also invalid lines or invalid messages are ignored
21+
IamNotAnAisMessage1111
22+
# Tag blocks are also supported
23+
\g:1-2-73874,n:157036,s:r003669945,c:12415440354*A\!AIVDM,1,1,,B,15N4cJ005Jrek0H@9nDW5608EP,013
24+
\g:1-2-27300,n:636994,s:b003669710,c:1428621738*5F\!SAVDM,2,1,2,B,55Mw@A7J1adAL@?;7WPl58F0U<h4pB222222220t1PN5553fN4g?`4iSp5Rc,0*26
25+
\g:2-2-27300,n:636995*15\!SAVDM,2,2,2,B,iP`88888880,2*5E
26+
\g:1-2-73874*A\!AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F
27+
\g:2-2-73874,n:157037*A\!AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B
28+
29+
$PGHP,1,2008,5,9,0,0,0,10,338,2,,1,09*17
30+
!AIVDM,1,1,,B,15NBj>PP1gG>1PVKTDTUJOv00<0M,0*09
31+
32+
# More comments
33+
34+
!AIVDM,1,1,,A,16:=?;0P00`SstvFnFbeGH6L088h,0*44
35+
$PGHP,1,2009,5,9,0,0,0,10,338,2,,1,09*17

tests/test_examples.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,4 @@ def test_run_every_file(self):
3333
if csv_file.exists():
3434
csv_file.unlink()
3535

36-
assert i == 23
36+
assert i == 24

tests/test_queue.py

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import pathlib
2+
import unittest
3+
4+
from pyais.queue import NMEAQueue
5+
from pyais.stream import FileReaderStream
6+
7+
8+
class QueueTestCase(unittest.TestCase):
9+
10+
def test_against_file_reader_stream(self):
11+
# HAVING a file with all sorts of NMEA messages as well as other kinds of messages
12+
filename = pathlib.Path(__file__).parent.joinpath('mixed.txt')
13+
14+
# WHEN streaming NMEA/AIS messages from this files using FileReaderStream
15+
expected = []
16+
with FileReaderStream(filename) as stream:
17+
for msg in stream:
18+
expected.append(msg)
19+
20+
# WHEN reading the same file putting the lines into a NMEAQueue
21+
q = NMEAQueue()
22+
actual = []
23+
with open(filename, 'rb') as fd:
24+
for line in fd.readlines():
25+
q.put_line(line)
26+
if x := q.get_or_none():
27+
actual.append(x)
28+
29+
# THEN both methods of reading a file return the exact same result
30+
self.assertEqual(len(expected), len(actual), 'expected list differs from actual list')
31+
self.assertEqual(expected, actual, 'expected list differs from actual list')
32+
33+
for a, b, in zip(expected, actual):
34+
self.assertEqual(a, b)
35+
self.assertEqual(a.wrapper_msg, b.wrapper_msg)
36+
if a.tag_block or b.tag_block:
37+
a.tag_block.init()
38+
b.tag_block.init()
39+
self.assertEqual(a.tag_block.text, b.tag_block.text)
40+
41+
def test_that_put_raises_value_error(self):
42+
q = NMEAQueue()
43+
44+
with self.assertRaises(ValueError):
45+
q.put(b"line")
46+
47+
def test_manually(self):
48+
49+
q = NMEAQueue()
50+
assert q.qsize() == 0 # Initially empty
51+
52+
# Raw text.
53+
q.put_line(b'Hello there!')
54+
assert q.qsize() == 0 # Still empty
55+
assert q.get_or_none() is None
56+
57+
# Put a multi-line message into the queue
58+
q.put_line(b'!AIVDM,2,1,1,A,55?MbV02;H;s<HtKR20EHE:0@T4@Dn2222222216L961O5Gf0NSQEp6ClRp8,0*1C')
59+
assert q.qsize() == 0 # Still empty
60+
q.put_line(b'!AIVDM,2,2,1,A,88888888880,2*25')
61+
assert q.qsize() == 1 # Returns 1
62+
assert q.get_or_none() is not None
63+
assert q.qsize() == 0 # Empty again
64+
65+
# A multi-line message with tag blocks
66+
q.put_line(b'\\g:1-2-73874*A\\!AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F')
67+
q.put_line(b'\\g:2-2-73874,n:157037*A\\!AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B')
68+
assert q.qsize() == 2
69+
assert q.get_or_none() is not None
70+
assert q.qsize() == 1
71+
assert q.get_or_none() is not None
72+
assert q.qsize() == 0
73+
74+
q.put_line(b'!AIVDM,1,1,,A,169a:nP01g`hm4pB7:E0;@0L088i,0*5E')
75+
q.put_line(b'!AIVDM,1,1,,A,169a:nP01g`hm4pB7:E0;@0L088i,0*5E')
76+
q.put_line(b'!AIVDM,1,1,,A,169a:nP01g`hm4pB7:E0;@0L088i,0*5E')
77+
assert q.qsize() == 3
78+
79+
80+
if __name__ == '__main__':
81+
unittest.main()

0 commit comments

Comments
 (0)