Skip to content

Introduce orjson as optional dependency #662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.12"

- name: Install dependencies
run: |
Expand Down
35 changes: 34 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ before_script:
- export PATH=$HOME/nats-server:$PATH

install:
- pip install -e .[nkeys,aiohttp,fast-mail-parser]
- pip install -e .[nkeys,aiohttp,fast-mail-parser,orjson]

script:
- make ci
Expand All @@ -39,6 +39,16 @@ jobs:
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser]
- name: "Python: 3.13/orjson"
python: "3.13"
before_install:
- sudo apt update && sudo apt install gcc build-essential -y
- sudo apt-get install python3-pip
- sudo apt-get install python3-pytest
- pip install --upgrade pip
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser,orjson]
- name: "Python: 3.12"
python: "3.12"
before_install:
Expand All @@ -49,6 +59,16 @@ jobs:
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser]
- name: "Python: 3.12/orjson"
python: "3.12"
before_install:
- sudo apt update && sudo apt install gcc build-essential -y
- sudo apt-get install python3-pip
- sudo apt-get install python3-pytest
- pip install --upgrade pip
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser,orjson]
- name: "Python: 3.11"
python: "3.11"
before_install:
Expand All @@ -59,6 +79,16 @@ jobs:
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser]
- name: "Python: 3.11/orjson"
python: "3.11"
before_install:
- sudo apt update && sudo apt install gcc build-essential -y
- sudo apt-get install python3-pip
- sudo apt-get install python3-pytest
- pip install --upgrade pip
- bash ./scripts/install_nats.sh
install:
- pip install -e .[fast-mail-parser,orjson]
- name: "Python: 3.11/uvloop"
python: "3.11"
before_install:
Expand All @@ -85,7 +115,10 @@ jobs:
allow_failures:
- name: "Python: 3.8"
- name: "Python: 3.11"
- name: "Python: 3.11/orjson"
- name: "Python: 3.11/uvloop"
- name: "Python: 3.11 (nats-server@main)"
- name: "Python: 3.12"
- name: "Python: 3.12/orjson"
- name: "Python: 3.13"
- name: "Python: 3.13/orjson"
7 changes: 4 additions & 3 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import asyncio
import base64
import ipaddress
import json
import logging
import re
import ssl
Expand All @@ -33,6 +32,8 @@
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import ParseResult, urlparse

from nats.json_util import JsonUtil as json

try:
from fast_mail_parser import parse_email
except ImportError:
Expand Down Expand Up @@ -1621,8 +1622,8 @@ def _connect_command(self) -> bytes:
if self.options["no_echo"] is not None:
options["echo"] = not self.options["no_echo"]

connect_opts = json.dumps(options, sort_keys=True)
return b"".join([CONNECT_OP + _SPC_ + connect_opts.encode() + _CRLF_])
connect_opts = json.dump_bytes(options, sort_keys=True)
return b"".join([CONNECT_OP + _SPC_ + connect_opts + _CRLF_])

async def _process_ping(self) -> None:
"""
Expand Down
4 changes: 2 additions & 2 deletions nats/aio/msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from __future__ import annotations

import datetime
import json
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from nats.errors import Error, MsgAlreadyAckdError, NotJSMessageError
from nats.json_util import JsonUtil as json

if TYPE_CHECKING:
from nats import NATS
Expand Down Expand Up @@ -132,7 +132,7 @@ async def nak(self, delay: Union[int, float, None] = None) -> None:
if delay:
json_args["delay"] = int(delay * 10**9) # from seconds to ns
if json_args:
payload += b" " + json.dumps(json_args).encode()
payload += b" " + json.dump_bytes(json_args)
await self._client.publish(self.reply, payload)
self._ackd = True

Expand Down
8 changes: 4 additions & 4 deletions nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from __future__ import annotations

import asyncio
import json
import time
from email.parser import BytesParser
from secrets import token_hex
Expand Down Expand Up @@ -50,6 +49,7 @@
VALID_BUCKET_RE,
ObjectStore,
)
from nats.json_util import JsonUtil as json

if TYPE_CHECKING:
from nats import NATS
Expand Down Expand Up @@ -1135,7 +1135,7 @@ async def _fetch_one(

await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
json.dump_bytes(next_req),
self._deliver,
)

Expand Down Expand Up @@ -1220,7 +1220,7 @@ async def _fetch_n(
next_req["no_wait"] = True
await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
json.dump_bytes(next_req),
self._deliver,
)
await asyncio.sleep(0)
Expand Down Expand Up @@ -1286,7 +1286,7 @@ async def _fetch_n(

await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
json.dump_bytes(next_req),
self._deliver,
)
await asyncio.sleep(0)
Expand Down
52 changes: 22 additions & 30 deletions nats/js/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
from __future__ import annotations

import base64
import json
from email.parser import BytesParser
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional

from nats.errors import NoRespondersError
from nats.js import api
from nats.js.errors import APIError, NotFoundError, ServiceUnavailableError
from nats.json_util import JsonUtil as json

if TYPE_CHECKING:
from nats import NATS
Expand Down Expand Up @@ -60,9 +60,9 @@ async def find_stream_name_by_subject(self, subject: str) -> str:
"""

req_sub = f"{self._prefix}.STREAM.NAMES"
req_data = json.dumps({"subject": subject})
req_data = json.dump_bytes({"subject": subject})
info = await self._api_request(
req_sub, req_data.encode(), timeout=self._timeout
req_sub, req_data, timeout=self._timeout
)
if not info["streams"]:
raise NotFoundError
Expand All @@ -76,12 +76,12 @@ async def stream_info(
"""
Get the latest StreamInfo by stream name.
"""
req_data = ""
req_data = b""
if subjects_filter:
req_data = json.dumps({"subjects_filter": subjects_filter})
req_data = json.dump_bytes({"subjects_filter": subjects_filter})
resp = await self._api_request(
f"{self._prefix}.STREAM.INFO.{name}",
req_data.encode(),
req_data,
timeout=self._timeout,
)
return api.StreamInfo.from_response(resp)
Expand Down Expand Up @@ -114,10 +114,10 @@ async def add_stream(
"path separators (forward or backward slash), or non-printable characters."
)

data = json.dumps(config.as_dict())
data = json.dump_bytes(config.as_dict())
resp = await self._api_request(
f"{self._prefix}.STREAM.CREATE.{stream_name}",
data.encode(),
data,
timeout=self._timeout,
)
return api.StreamInfo.from_response(resp)
Expand All @@ -136,10 +136,10 @@ async def update_stream(
if config.name is None:
raise ValueError("nats: stream name is required")

data = json.dumps(config.as_dict())
data = json.dump_bytes(config.as_dict())
resp = await self._api_request(
f"{self._prefix}.STREAM.UPDATE.{config.name}",
data.encode(),
data,
timeout=self._timeout,
)
return api.StreamInfo.from_response(resp)
Expand Down Expand Up @@ -171,11 +171,9 @@ async def purge_stream(
if keep:
stream_req["keep"] = keep

req = json.dumps(stream_req)
req = json.dump_bytes(stream_req)
resp = await self._api_request(
f"{self._prefix}.STREAM.PURGE.{name}",
req.encode(),
timeout=self._timeout
f"{self._prefix}.STREAM.PURGE.{name}", req, timeout=self._timeout
)
return resp["success"]

Expand All @@ -198,9 +196,7 @@ async def streams_info(self, offset=0) -> List[api.StreamInfo]:
"""
resp = await self._api_request(
f"{self._prefix}.STREAM.LIST",
json.dumps({
"offset": offset
}).encode(),
json.dump_bytes({"offset": offset}),
timeout=self._timeout,
)
streams = []
Expand All @@ -216,9 +212,7 @@ async def streams_info_iterator(self,
"""
resp = await self._api_request(
f"{self._prefix}.STREAM.LIST",
json.dumps({
"offset": offset
}).encode(),
json.dump_bytes({"offset": offset}),
timeout=self._timeout,
)

Expand All @@ -240,7 +234,7 @@ async def add_consumer(
config = config.evolve(**params)
durable_name = config.durable_name
req = {"stream_name": stream, "config": config.as_dict()}
req_data = json.dumps(req).encode()
req_data = json.dump_bytes(req)

resp = None
subject = ""
Expand Down Expand Up @@ -283,9 +277,7 @@ async def consumers_info(
"""
resp = await self._api_request(
f"{self._prefix}.CONSUMER.LIST.{stream}",
b"" if offset is None else json.dumps({
"offset": offset
}).encode(),
b"" if offset is None else json.dump_bytes({"offset": offset}),
timeout=self._timeout,
)
consumers = []
Expand Down Expand Up @@ -318,27 +310,27 @@ async def get_msg(
req["last_by_subj"] = None
req.pop("last_by_subj", None)
req["next_by_subj"] = subject
data = json.dumps(req)
data = json.dump_bytes(req)

if direct:
# $JS.API.DIRECT.GET.KV_{stream_name}.$KV.TEST.{key}
if subject and (seq is None):
# last_by_subject type request requires no payload.
data = ""
data = b""
req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}.{subject}"
else:
req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}"

resp = await self._nc.request(
req_subject, data.encode(), timeout=self._timeout
req_subject, data, timeout=self._timeout
)
raw_msg = JetStreamManager._lift_msg_to_raw_msg(resp)
return raw_msg

# Non Direct form
req_subject = f"{self._prefix}.STREAM.MSG.GET.{stream_name}"
resp_data = await self._api_request(
req_subject, data.encode(), timeout=self._timeout
req_subject, data, timeout=self._timeout
)

raw_msg = api.RawStreamMsg.from_response(resp_data["message"])
Expand Down Expand Up @@ -389,8 +381,8 @@ async def delete_msg(self, stream_name: str, seq: int) -> bool:
"""
req_subject = f"{self._prefix}.STREAM.MSG.DELETE.{stream_name}"
req = {"seq": seq}
data = json.dumps(req)
resp = await self._api_request(req_subject, data.encode())
data = json.dump_bytes(req)
resp = await self._api_request(req_subject, data)
return resp["success"]

async def get_last_msg(
Expand Down
8 changes: 4 additions & 4 deletions nats/js/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import asyncio
import base64
import io
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
Expand All @@ -34,6 +33,7 @@
ObjectNotFoundError,
)
from nats.js.kv import MSG_ROLLUP_SUBJECT
from nats.json_util import JsonUtil as json

VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")
Expand Down Expand Up @@ -343,7 +343,7 @@ async def put(
try:
await self._js.publish(
meta_subj,
json.dumps(info.as_dict()).encode(),
json.dump_bytes(info.as_dict()),
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
)
except Exception as err:
Expand Down Expand Up @@ -412,7 +412,7 @@ async def update_meta(
try:
await self._js.publish(
meta_subj,
json.dumps(info.as_dict()).encode(),
json.dump_bytes(info.as_dict()),
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
)
except Exception as err:
Expand Down Expand Up @@ -538,7 +538,7 @@ async def delete(self, name: str) -> ObjectResult:
try:
await self._js.publish(
meta_subj,
json.dumps(info.as_dict()).encode(),
json.dump_bytes(info.as_dict()),
headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
)
finally:
Expand Down
Loading