From af02746838d56fad5c3cb02aa03f24635e924de5 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:39:20 +0200 Subject: [PATCH 01/15] :sparkles: Implement JsonUtil for switching between json and orjson Signed-off-by: ff137 --- nats/json_util.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 nats/json_util.py diff --git a/nats/json_util.py b/nats/json_util.py new file mode 100644 index 00000000..7a5e334f --- /dev/null +++ b/nats/json_util.py @@ -0,0 +1,66 @@ +"""A module providing a utility class for handling JSON-related operations.""" + +import json +from typing import Any + +try: + import orjson +except ImportError: + orjson = None + + +class JsonUtil: + """A utility class for handling JSON-related operations. + This class provides static methods for formatting JSON strings, and + for converting between Python objects and JSON strings/files. It uses + the `orjson` library where possible for its speed advantages, but reverts + to the standard `json` library where `orjson` does not support the required + functionality. + """ + + @staticmethod + def dumps(obj, *args, **kwargs) -> str: + """Convert a Python object into a json string. + Args: + obj: The data to be converted + *args: Extra arguments to pass to the orjson.dumps() function + **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + Returns: + The json string representation of obj + """ + + if orjson is None: + return json.dumps(obj, *args, **kwargs) + else: + return orjson.dumps(obj, *args, **kwargs).decode("utf-8") + + @staticmethod + def dump_bytes(obj, *args, **kwargs) -> bytes: + """Convert a Python object into a bytes string. + Args: + obj: The data to be converted + *args: Extra arguments to pass to the orjson.dumps() function + **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + Returns: + The json string representation of obj + """ + + if orjson is None: + return json.dumps(obj, *args, **kwargs).encode("utf-8") + else: + return orjson.dumps(obj, *args, **kwargs) + + @staticmethod + def loads(s: str, *args, **kwargs) -> Any: + """Parse a JSON string into a Python object. + Args: + s: The JSON string to be parsed + *args: Extra arguments to pass to the orjson.loads() function + **kwargs: Extra keyword arguments to pass to the orjson.loads() function + Returns: + The Python representation of s + """ + if orjson is None: + return json.loads(s, *args, **kwargs) + else: + return orjson.loads(s, *args, **kwargs) From ba06441029024e38e950136d46792f23d670e44d Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:48:09 +0200 Subject: [PATCH 02/15] :art: Replace `import json` with `from nats.json_util import JsonUtil as json` Signed-off-by: ff137 --- nats/aio/client.py | 3 ++- nats/aio/msg.py | 2 +- nats/aio/subscription.py | 10 ++-------- nats/js/client.py | 12 ++---------- nats/js/manager.py | 2 +- nats/js/object_store.py | 2 +- nats/micro/service.py | 2 +- nats/protocol/parser.py | 2 +- tests/test_client.py | 2 +- tests/test_compatibility.py | 2 +- tests/test_js.py | 2 +- 11 files changed, 14 insertions(+), 27 deletions(-) diff --git a/nats/aio/client.py b/nats/aio/client.py index be379af8..91f4884d 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -17,7 +17,6 @@ import asyncio import base64 import ipaddress -import json import logging import ssl import string @@ -32,6 +31,8 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import ParseResult, urlparse +from nats.json_util import JsonUtil as json + try: from fast_mail_parser import parse_email except ImportError: diff --git a/nats/aio/msg.py b/nats/aio/msg.py index 5724706a..980ef528 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -14,11 +14,11 @@ from __future__ import annotations import datetime -import json from dataclasses import dataclass from typing import TYPE_CHECKING, Dict, List, Optional, Union from nats.errors import Error, MsgAlreadyAckdError, NotJSMessageError +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS diff --git a/nats/aio/subscription.py b/nats/aio/subscription.py index 31fbb887..c17fe9ea 100644 --- a/nats/aio/subscription.py +++ b/nats/aio/subscription.py @@ -15,17 +15,11 @@ from __future__ import annotations import asyncio -from typing import ( - TYPE_CHECKING, - AsyncIterator, - Awaitable, - Callable, - List, - Optional, -) +from typing import TYPE_CHECKING, AsyncIterator, Awaitable, Callable, List, Optional from uuid import uuid4 from nats import errors + # Default Pending Limits of Subscriptions from nats.aio.msg import Msg diff --git a/nats/js/client.py b/nats/js/client.py index d26413c0..34d32b08 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -15,19 +15,10 @@ from __future__ import annotations import asyncio -import json import time from email.parser import BytesParser from secrets import token_hex -from typing import ( - TYPE_CHECKING, - Any, - Awaitable, - Callable, - Dict, - List, - Optional, -) +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional import nats.errors import nats.js.errors @@ -50,6 +41,7 @@ VALID_BUCKET_RE, ObjectStore, ) +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS diff --git a/nats/js/manager.py b/nats/js/manager.py index bfd5937f..c3778941 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -15,13 +15,13 @@ from __future__ import annotations import base64 -import json from email.parser import BytesParser from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional from nats.errors import NoRespondersError from nats.js import api from nats.js.errors import APIError, NotFoundError, ServiceUnavailableError +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS diff --git a/nats/js/object_store.py b/nats/js/object_store.py index 70ce3d3b..f52796b8 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -15,7 +15,6 @@ import asyncio import base64 import io -import json import re from dataclasses import dataclass from datetime import datetime, timezone @@ -34,6 +33,7 @@ ObjectNotFoundError, ) from nats.js.kv import MSG_ROLLUP_SUBJECT +from nats.json_util import JsonUtil as json VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$") VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$") diff --git a/nats/micro/service.py b/nats/micro/service.py index 159530a3..9aa46503 100644 --- a/nats/micro/service.py +++ b/nats/micro/service.py @@ -1,6 +1,5 @@ from __future__ import annotations -import json import re import time from asyncio import Event @@ -21,6 +20,7 @@ from nats.aio.client import Client from nats.aio.msg import Msg from nats.aio.subscription import Subscription +from nats.json_util import JsonUtil as json from .request import Handler, Request, ServiceError diff --git a/nats/protocol/parser.py b/nats/protocol/parser.py index 6b8c7255..8f45650b 100644 --- a/nats/protocol/parser.py +++ b/nats/protocol/parser.py @@ -17,11 +17,11 @@ from __future__ import annotations -import json import re from typing import Any, Dict from nats.errors import ProtocolError +from nats.json_util import JsonUtil as json MSG_RE = re.compile( b"\\AMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?(\\d+)\r\n" diff --git a/tests/test_client.py b/tests/test_client.py index 27b25d56..0c38272f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,5 @@ import asyncio import http.client -import json import os import ssl import time @@ -12,6 +11,7 @@ import nats.errors import pytest from nats.aio.client import Client as NATS, __version__ +from nats.json_util import JsonUtil as json from tests.utils import ( ClusteringDiscoveryAuthTestCase, ClusteringTestCase, diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py index 838c3a8a..98bca630 100644 --- a/tests/test_compatibility.py +++ b/tests/test_compatibility.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import json import os from dataclasses import dataclass, field from typing import Any, Dict, List, Optional @@ -9,6 +8,7 @@ import nats from nats.aio.subscription import Subscription +from nats.json_util import JsonUtil as json from nats.micro.request import ServiceError from nats.micro.service import ( EndpointConfig, diff --git a/tests/test_js.py b/tests/test_js.py index fce4dd44..ad354993 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -2,7 +2,6 @@ import base64 import datetime import io -import json import random import re import string @@ -20,6 +19,7 @@ from nats.aio.msg import Msg from nats.errors import * from nats.js.errors import * +from nats.json_util import JsonUtil as json from tests.utils import * try: From d3380980be9f9d8030f3565aa9edbf4672a81705 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:51:20 +0200 Subject: [PATCH 03/15] :sparkles: Handle sort_keys option in JsonUtil Signed-off-by: ff137 --- nats/json_util.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/nats/json_util.py b/nats/json_util.py index 7a5e334f..d98e746b 100644 --- a/nats/json_util.py +++ b/nats/json_util.py @@ -18,20 +18,35 @@ class JsonUtil: functionality. """ + @staticmethod + def _handle_sort_keys(kwargs): + """Internal helper to handle sort_keys parameter for orjson compatibility. + Args: + kwargs: The keyword arguments dictionary to modify + Returns: + Modified kwargs dictionary + """ + if kwargs.pop("sort_keys", False): + option = kwargs.get("option", 0) | orjson.OPT_SORT_KEYS + kwargs["option"] = option + return kwargs + @staticmethod def dumps(obj, *args, **kwargs) -> str: """Convert a Python object into a json string. Args: obj: The data to be converted - *args: Extra arguments to pass to the orjson.dumps() function - **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + *args: Extra arguments to pass to the dumps() function + **kwargs: Extra keyword arguments to pass to the dumps() function. + Special handling for 'sort_keys' which is translated to + orjson.OPT_SORT_KEYS when using orjson. Returns: The json string representation of obj """ - if orjson is None: return json.dumps(obj, *args, **kwargs) else: + kwargs = JsonUtil._handle_sort_keys(kwargs) return orjson.dumps(obj, *args, **kwargs).decode("utf-8") @staticmethod @@ -39,15 +54,17 @@ def dump_bytes(obj, *args, **kwargs) -> bytes: """Convert a Python object into a bytes string. Args: obj: The data to be converted - *args: Extra arguments to pass to the orjson.dumps() function - **kwargs: Extra keyword arguments to pass to the orjson.dumps() function + *args: Extra arguments to pass to the dumps() function + **kwargs: Extra keyword arguments to pass to the dumps() function. + Special handling for 'sort_keys' which is translated to + orjson.OPT_SORT_KEYS when using orjson. Returns: - The json string representation of obj + The json string representation of obj as bytes """ - if orjson is None: return json.dumps(obj, *args, **kwargs).encode("utf-8") else: + kwargs = JsonUtil._handle_sort_keys(kwargs) return orjson.dumps(obj, *args, **kwargs) @staticmethod From 13f03d0abfecfa3d9fcf53b315eef5189b646ea4 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 17:55:14 +0200 Subject: [PATCH 04/15] :white_check_mark: Handle difference in expected dumped string from orjson Signed-off-by: ff137 --- tests/test_client.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_client.py b/tests/test_client.py index 0c38272f..e739b971 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -37,6 +37,14 @@ def test_default_connect_command(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' + + try: + import orjson + # If using orjson, expected string is without spaces + expected = expected.replace(" ", "") + except ImportError: + pass + self.assertEqual(expected.encode(), got) def test_default_connect_command_with_name(self): @@ -48,6 +56,14 @@ def test_default_connect_command_with_name(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "name": "secret", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' + + try: + import orjson + # If using orjson, expected string is without spaces + expected = expected.replace(" ", "") + except ImportError: + pass + self.assertEqual(expected.encode(), got) From 8bf712f4855f96db0342afbf098b5e8ec8db9ad3 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:04:07 +0200 Subject: [PATCH 05/15] :zap: Replace .dumps.encode calls with .dump_bytes Signed-off-by: ff137 --- nats/aio/client.py | 4 ++-- nats/aio/msg.py | 2 +- nats/js/client.py | 6 ++--- nats/js/manager.py | 46 ++++++++++++++++++------------------- nats/js/object_store.py | 6 ++--- nats/micro/service.py | 6 ++--- tests/test_micro_service.py | 4 ++-- 7 files changed, 37 insertions(+), 37 deletions(-) diff --git a/nats/aio/client.py b/nats/aio/client.py index 91f4884d..7fbf8379 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -1583,8 +1583,8 @@ def _connect_command(self) -> bytes: if self.options["no_echo"] is not None: options["echo"] = not self.options["no_echo"] - connect_opts = json.dumps(options, sort_keys=True) - return b"".join([CONNECT_OP + _SPC_ + connect_opts.encode() + _CRLF_]) + connect_opts = json.dump_bytes(options, sort_keys=True) + return b"".join([CONNECT_OP + _SPC_ + connect_opts + _CRLF_]) async def _process_ping(self) -> None: """ diff --git a/nats/aio/msg.py b/nats/aio/msg.py index 980ef528..835b6281 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -132,7 +132,7 @@ async def nak(self, delay: Union[int, float, None] = None) -> None: if delay: json_args["delay"] = int(delay * 10**9) # from seconds to ns if json_args: - payload += b" " + json.dumps(json_args).encode() + payload += b" " + json.dump_bytes(json_args) await self._client.publish(self.reply, payload) self._ackd = True diff --git a/nats/js/client.py b/nats/js/client.py index 34d32b08..51fec6eb 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -1127,7 +1127,7 @@ async def _fetch_one( await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) @@ -1212,7 +1212,7 @@ async def _fetch_n( next_req["no_wait"] = True await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) await asyncio.sleep(0) @@ -1278,7 +1278,7 @@ async def _fetch_n( await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) await asyncio.sleep(0) diff --git a/nats/js/manager.py b/nats/js/manager.py index c3778941..9278f1a9 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -60,9 +60,9 @@ async def find_stream_name_by_subject(self, subject: str) -> str: """ req_sub = f"{self._prefix}.STREAM.NAMES" - req_data = json.dumps({"subject": subject}) + req_data = json.dump_bytes({"subject": subject}) info = await self._api_request( - req_sub, req_data.encode(), timeout=self._timeout + req_sub, req_data, timeout=self._timeout ) if not info["streams"]: raise NotFoundError @@ -76,12 +76,12 @@ async def stream_info( """ Get the latest StreamInfo by stream name. """ - req_data = "" + req_data = b"" if subjects_filter: - req_data = json.dumps({"subjects_filter": subjects_filter}) + req_data = json.dump_bytes({"subjects_filter": subjects_filter}) resp = await self._api_request( f"{self._prefix}.STREAM.INFO.{name}", - req_data.encode(), + req_data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -114,10 +114,10 @@ async def add_stream( "path separators (forward or backward slash), or non-printable characters." ) - data = json.dumps(config.as_dict()) + data = json.dump_bytes(config.as_dict()) resp = await self._api_request( f"{self._prefix}.STREAM.CREATE.{stream_name}", - data.encode(), + data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -136,10 +136,10 @@ async def update_stream( if config.name is None: raise ValueError("nats: stream name is required") - data = json.dumps(config.as_dict()) + data = json.dump_bytes(config.as_dict()) resp = await self._api_request( f"{self._prefix}.STREAM.UPDATE.{config.name}", - data.encode(), + data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -171,10 +171,10 @@ async def purge_stream( if keep: stream_req["keep"] = keep - req = json.dumps(stream_req) + req = json.dump_bytes(stream_req) resp = await self._api_request( f"{self._prefix}.STREAM.PURGE.{name}", - req.encode(), + req, timeout=self._timeout ) return resp["success"] @@ -198,9 +198,9 @@ async def streams_info(self, offset=0) -> List[api.StreamInfo]: """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dumps({ + json.dump_bytes({ "offset": offset - }).encode(), + }), timeout=self._timeout, ) streams = [] @@ -216,9 +216,9 @@ async def streams_info_iterator(self, """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dumps({ + json.dump_bytes({ "offset": offset - }).encode(), + }), timeout=self._timeout, ) @@ -240,7 +240,7 @@ async def add_consumer( config = config.evolve(**params) durable_name = config.durable_name req = {"stream_name": stream, "config": config.as_dict()} - req_data = json.dumps(req).encode() + req_data = json.dump_bytes(req) resp = None subject = "" @@ -283,9 +283,9 @@ async def consumers_info( """ resp = await self._api_request( f"{self._prefix}.CONSUMER.LIST.{stream}", - b"" if offset is None else json.dumps({ + b"" if offset is None else json.dump_bytes({ "offset": offset - }).encode(), + }), timeout=self._timeout, ) consumers = [] @@ -318,19 +318,19 @@ async def get_msg( req["last_by_subj"] = None req.pop("last_by_subj", None) req["next_by_subj"] = subject - data = json.dumps(req) + data = json.dump_bytes(req) if direct: # $JS.API.DIRECT.GET.KV_{stream_name}.$KV.TEST.{key} if subject and (seq is None): # last_by_subject type request requires no payload. - data = "" + data = b"" req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}.{subject}" else: req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}" resp = await self._nc.request( - req_subject, data.encode(), timeout=self._timeout + req_subject, data, timeout=self._timeout ) raw_msg = JetStreamManager._lift_msg_to_raw_msg(resp) return raw_msg @@ -389,8 +389,8 @@ async def delete_msg(self, stream_name: str, seq: int) -> bool: """ req_subject = f"{self._prefix}.STREAM.MSG.DELETE.{stream_name}" req = {"seq": seq} - data = json.dumps(req) - resp = await self._api_request(req_subject, data.encode()) + data = json.dump_bytes(req) + resp = await self._api_request(req_subject, data) return resp["success"] async def get_last_msg( diff --git a/nats/js/object_store.py b/nats/js/object_store.py index f52796b8..8610e607 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -343,7 +343,7 @@ async def put( try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) except Exception as err: @@ -412,7 +412,7 @@ async def update_meta( try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) except Exception as err: @@ -538,7 +538,7 @@ async def delete(self, name: str) -> ObjectResult: try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) finally: diff --git a/nats/micro/service.py b/nats/micro/service.py index 9aa46503..37343537 100644 --- a/nats/micro/service.py +++ b/nats/micro/service.py @@ -859,18 +859,18 @@ async def _handle_ping_request(self, msg: Msg) -> None: metadata=self._metadata, ).to_dict() - await msg.respond(data=json.dumps(ping).encode()) + await msg.respond(data=json.dump_bytes(ping)) async def _handle_info_request(self, msg: Msg) -> None: """Handle an info message.""" info = self.info().to_dict() - await msg.respond(data=json.dumps(info).encode()) + await msg.respond(data=json.dump_bytes(info)) async def _handle_stats_request(self, msg: Msg) -> None: """Handle a stats message.""" stats = self.stats().to_dict() - await msg.respond(data=json.dumps(stats).encode()) + await msg.respond(data=json.dump_bytes(stats)) def control_subject( diff --git a/tests/test_micro_service.py b/tests/test_micro_service.py index 9fa47fc7..b9514b09 100644 --- a/tests/test_micro_service.py +++ b/tests/test_micro_service.py @@ -79,10 +79,10 @@ async def add_handler(request: Request): for _ in range(50): await nc.request( "svc.add", - json.dumps({ + json.dump_bytes({ "x": 22, "y": 11 - }).encode("utf-8") + }) ) for svc in svcs: From 6496a8d96310eca1663bc7422cad040fbda169e7 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:08:22 +0200 Subject: [PATCH 06/15] :sparkles: Add orjson as optional dependency Signed-off-by: ff137 --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index a3955ca1..0253b7a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ classifiers = [ nkeys = ['nkeys'] aiohttp = ['aiohttp'] fast_parse = ['fast-mail-parser'] +orjson = ['orjson'] [tool.setuptools] zip-safe = true From 0ab2ce06b1630ec3082a91288c72297f8f49fae4 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:08:42 +0200 Subject: [PATCH 07/15] :construction_worker: Add orjson runs to travis jobs Signed-off-by: ff137 --- .travis.yml | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2c7578c1..b1c2ca8f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ before_script: - export PATH=$HOME/nats-server:$PATH install: - - pip install -e .[nkeys,aiohttp,fast-mail-parser] + - pip install -e .[nkeys,aiohttp,fast-mail-parser,orjson] script: - make ci @@ -39,6 +39,16 @@ jobs: - bash ./scripts/install_nats.sh install: - pip install -e .[fast-mail-parser] + - name: "Python: 3.13/orjson" + python: "3.13" + before_install: + - sudo apt update && sudo apt install gcc build-essential -y + - sudo apt-get install python3-pip + - sudo apt-get install python3-pytest + - pip install --upgrade pip + - bash ./scripts/install_nats.sh + install: + - pip install -e .[fast-mail-parser,orjson] - name: "Python: 3.12" python: "3.12" before_install: @@ -49,6 +59,16 @@ jobs: - bash ./scripts/install_nats.sh install: - pip install -e .[fast-mail-parser] + - name: "Python: 3.12/orjson" + python: "3.12" + before_install: + - sudo apt update && sudo apt install gcc build-essential -y + - sudo apt-get install python3-pip + - sudo apt-get install python3-pytest + - pip install --upgrade pip + - bash ./scripts/install_nats.sh + install: + - pip install -e .[fast-mail-parser,orjson] - name: "Python: 3.11" python: "3.11" before_install: @@ -59,6 +79,16 @@ jobs: - bash ./scripts/install_nats.sh install: - pip install -e .[fast-mail-parser] + - name: "Python: 3.11/orjson" + python: "3.11" + before_install: + - sudo apt update && sudo apt install gcc build-essential -y + - sudo apt-get install python3-pip + - sudo apt-get install python3-pytest + - pip install --upgrade pip + - bash ./scripts/install_nats.sh + install: + - pip install -e .[fast-mail-parser,orjson] - name: "Python: 3.11/uvloop" python: "3.11" before_install: @@ -85,7 +115,10 @@ jobs: allow_failures: - name: "Python: 3.8" - name: "Python: 3.11" + - name: "Python: 3.11/orjson" - name: "Python: 3.11/uvloop" - name: "Python: 3.11 (nats-server@main)" - name: "Python: 3.12" + - name: "Python: 3.12/orjson" - name: "Python: 3.13" + - name: "Python: 3.13/orjson" From 511abc2306dbdee048bbfe81c478378a4bd9b435 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:08:53 +0200 Subject: [PATCH 08/15] :bug: Remove .encode() Signed-off-by: ff137 --- nats/js/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/js/manager.py b/nats/js/manager.py index 9278f1a9..a87b7ec1 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -338,7 +338,7 @@ async def get_msg( # Non Direct form req_subject = f"{self._prefix}.STREAM.MSG.GET.{stream_name}" resp_data = await self._api_request( - req_subject, data.encode(), timeout=self._timeout + req_subject, data, timeout=self._timeout ) raw_msg = api.RawStreamMsg.from_response(resp_data["message"]) From a3030f6f3d37c654eada39eb89985b5818b0e00f Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:15:06 +0200 Subject: [PATCH 09/15] :arrow_up: Run format check with py3.12 Signed-off-by: ff137 --- .github/workflows/check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 76224e8e..01a12c54 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.12" - name: Install dependencies run: | From 47065328cb7190affad1a6d5a264ff8e9134e10f Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:19:20 +0200 Subject: [PATCH 10/15] :art: yapf format Signed-off-by: ff137 --- nats/js/manager.py | 16 ++++------------ tests/test_client.py | 2 +- tests/test_micro_service.py | 8 +------- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/nats/js/manager.py b/nats/js/manager.py index a87b7ec1..f385f143 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -173,9 +173,7 @@ async def purge_stream( req = json.dump_bytes(stream_req) resp = await self._api_request( - f"{self._prefix}.STREAM.PURGE.{name}", - req, - timeout=self._timeout + f"{self._prefix}.STREAM.PURGE.{name}", req, timeout=self._timeout ) return resp["success"] @@ -198,9 +196,7 @@ async def streams_info(self, offset=0) -> List[api.StreamInfo]: """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dump_bytes({ - "offset": offset - }), + json.dump_bytes({"offset": offset}), timeout=self._timeout, ) streams = [] @@ -216,9 +212,7 @@ async def streams_info_iterator(self, """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dump_bytes({ - "offset": offset - }), + json.dump_bytes({"offset": offset}), timeout=self._timeout, ) @@ -283,9 +277,7 @@ async def consumers_info( """ resp = await self._api_request( f"{self._prefix}.CONSUMER.LIST.{stream}", - b"" if offset is None else json.dump_bytes({ - "offset": offset - }), + b"" if offset is None else json.dump_bytes({"offset": offset}), timeout=self._timeout, ) consumers = [] diff --git a/tests/test_client.py b/tests/test_client.py index e739b971..015d7ca0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -37,7 +37,7 @@ def test_default_connect_command(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' - + try: import orjson # If using orjson, expected string is without spaces diff --git a/tests/test_micro_service.py b/tests/test_micro_service.py index b9514b09..4b936c5c 100644 --- a/tests/test_micro_service.py +++ b/tests/test_micro_service.py @@ -77,13 +77,7 @@ async def add_handler(request: Request): svcs.append(svc) for _ in range(50): - await nc.request( - "svc.add", - json.dump_bytes({ - "x": 22, - "y": 11 - }) - ) + await nc.request("svc.add", json.dump_bytes({"x": 22, "y": 11})) for svc in svcs: info = svc.info() From 50d233ca02fb5080799dc6aa649dcb79c92a52d6 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:22:48 +0200 Subject: [PATCH 11/15] :art: Update docstrings Signed-off-by: ff137 --- nats/json_util.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/nats/json_util.py b/nats/json_util.py index d98e746b..516ef36b 100644 --- a/nats/json_util.py +++ b/nats/json_util.py @@ -10,12 +10,20 @@ class JsonUtil: - """A utility class for handling JSON-related operations. - This class provides static methods for formatting JSON strings, and - for converting between Python objects and JSON strings/files. It uses - the `orjson` library where possible for its speed advantages, but reverts - to the standard `json` library where `orjson` does not support the required - functionality. + """A utility class for handling JSON serialization operations. + This class provides static methods for converting between Python objects and JSON + strings/bytes. It uses the `orjson` library when available for its performance + advantages, falling back to the standard `json` library when `orjson` is not + installed. + + The class handles compatibility between the two libraries, particularly for options + like 'sort_keys' which have different implementations in each library. All methods + maintain consistent behavior regardless of which JSON library is being used. + + Methods: + dumps(obj, *args, **kwargs) -> str: Converts object to JSON string + dump_bytes(obj, *args, **kwargs) -> bytes: Converts object to JSON bytes + loads(s, *args, **kwargs) -> Any: Parses JSON string into Python object """ @staticmethod @@ -24,7 +32,7 @@ def _handle_sort_keys(kwargs): Args: kwargs: The keyword arguments dictionary to modify Returns: - Modified kwargs dictionary + Modified kwargs dictionary with orjson-compatible options """ if kwargs.pop("sort_keys", False): option = kwargs.get("option", 0) | orjson.OPT_SORT_KEYS @@ -33,7 +41,7 @@ def _handle_sort_keys(kwargs): @staticmethod def dumps(obj, *args, **kwargs) -> str: - """Convert a Python object into a json string. + """Convert a Python object into a JSON string. Args: obj: The data to be converted *args: Extra arguments to pass to the dumps() function @@ -41,7 +49,7 @@ def dumps(obj, *args, **kwargs) -> str: Special handling for 'sort_keys' which is translated to orjson.OPT_SORT_KEYS when using orjson. Returns: - The json string representation of obj + str: A JSON string representation of obj """ if orjson is None: return json.dumps(obj, *args, **kwargs) @@ -51,7 +59,7 @@ def dumps(obj, *args, **kwargs) -> str: @staticmethod def dump_bytes(obj, *args, **kwargs) -> bytes: - """Convert a Python object into a bytes string. + """Convert a Python object into a JSON bytes string. Args: obj: The data to be converted *args: Extra arguments to pass to the dumps() function @@ -59,7 +67,7 @@ def dump_bytes(obj, *args, **kwargs) -> bytes: Special handling for 'sort_keys' which is translated to orjson.OPT_SORT_KEYS when using orjson. Returns: - The json string representation of obj as bytes + bytes: A JSON bytes string representation of obj """ if orjson is None: return json.dumps(obj, *args, **kwargs).encode("utf-8") From 52135e23e7249d9c8fd4e69fceeeefb57309a651 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 18:23:40 +0200 Subject: [PATCH 12/15] :art: isort Signed-off-by: ff137 --- nats/aio/subscription.py | 10 ++++++++-- nats/js/client.py | 10 +++++++++- tests/test_client.py | 2 ++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/nats/aio/subscription.py b/nats/aio/subscription.py index c17fe9ea..31fbb887 100644 --- a/nats/aio/subscription.py +++ b/nats/aio/subscription.py @@ -15,11 +15,17 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING, AsyncIterator, Awaitable, Callable, List, Optional +from typing import ( + TYPE_CHECKING, + AsyncIterator, + Awaitable, + Callable, + List, + Optional, +) from uuid import uuid4 from nats import errors - # Default Pending Limits of Subscriptions from nats.aio.msg import Msg diff --git a/nats/js/client.py b/nats/js/client.py index 51fec6eb..68aea6f1 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -18,7 +18,15 @@ import time from email.parser import BytesParser from secrets import token_hex -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + List, + Optional, +) import nats.errors import nats.js.errors diff --git a/tests/test_client.py b/tests/test_client.py index 015d7ca0..a919cb28 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -40,6 +40,7 @@ def test_default_connect_command(self): try: import orjson + # If using orjson, expected string is without spaces expected = expected.replace(" ", "") except ImportError: @@ -59,6 +60,7 @@ def test_default_connect_command_with_name(self): try: import orjson + # If using orjson, expected string is without spaces expected = expected.replace(" ", "") except ImportError: From 71686e2160808db10e2bd66e80aa9cb98b710395 Mon Sep 17 00:00:00 2001 From: ff137 Date: Thu, 13 Feb 2025 20:56:07 +0200 Subject: [PATCH 13/15] :heavy_plus_sign: Add orjson to setup.py extras_require Signed-off-by: ff137 --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 7e8cc758..215a9467 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ "nkeys": ["nkeys"], "aiohttp": ["aiohttp"], "fast_parse": ["fast-mail-parser"], + "orjson": ["orjson"], }, packages=["nats", "nats.aio", "nats.micro", "nats.protocol", "nats.js"], package_data={"nats": ["py.typed"]}, From e23ddbdff261df9b3ac07e684206b069d23a0a83 Mon Sep 17 00:00:00 2001 From: ff137 Date: Wed, 12 Mar 2025 10:54:10 +0200 Subject: [PATCH 14/15] :white_check_mark: Fix expected string when using orjson Signed-off-by: ff137 --- tests/test_client.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index a919cb28..001462cb 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,11 +41,10 @@ def test_default_connect_command(self): try: import orjson - # If using orjson, expected string is without spaces - expected = expected.replace(" ", "") + # If using orjson, expected string is without spaces (except for first space after CONNECT) + expected = expected.replace(" ", "").replace("CONNECT", "CONNECT ") except ImportError: pass - self.assertEqual(expected.encode(), got) def test_default_connect_command_with_name(self): @@ -61,8 +60,8 @@ def test_default_connect_command_with_name(self): try: import orjson - # If using orjson, expected string is without spaces - expected = expected.replace(" ", "") + # If using orjson, expected string is without spaces (except for first space after CONNECT) + expected = expected.replace(" ", "").replace("CONNECT", "CONNECT ") except ImportError: pass From 33f6f379717fcfc97feb3226f9e90e81ce8b3f7b Mon Sep 17 00:00:00 2001 From: ff137 Date: Wed, 9 Apr 2025 10:52:15 +0200 Subject: [PATCH 15/15] Empty commit