Skip to content

Commit f27a3bb

Browse files
authored
use new unique jobs implementation (#38)
* use new unique jobs implementation This moves the library to use the new unique jobs implementation from riverqueue/river#590 and migrates the sqlalchemy driver to use a unified insertion path, allowing bulk inserts to use unique jobs. * sort args before hashing, support partial arg extraction * work around sqlc nullable array value type issue * documentation updates, changelog * remove whitespace from unique key json component
1 parent 580a39c commit f27a3bb

21 files changed

+720
-1613
lines changed

.github/workflows/ci.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ jobs:
6363
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${TEST_DATABASE_NAME};" ${ADMIN_DATABASE_URL}
6464

6565
- name: river migrate-up
66-
run: river migrate-up --database-url "$TEST_DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
66+
run: river migrate-up --database-url "$TEST_DATABASE_URL"
6767

6868
- name: Test
6969
run: rye test
@@ -109,7 +109,7 @@ jobs:
109109
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${DATABASE_NAME};" ${ADMIN_DATABASE_URL}
110110

111111
- name: river migrate-up
112-
run: river migrate-up --database-url "$DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
112+
run: river migrate-up --database-url "$DATABASE_URL"
113113

114114
- name: Run examples
115115
run: rye run python3 -m examples.all

CHANGELOG.md

+23-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Breaking
11+
12+
- **Breaking change:** The return type of `Client#insert_many` and `Client#insert_many_tx` has been changed. Rather than returning just the number of rows inserted, it returns an array of all the `InsertResult` values for each inserted row. Unique conflicts which are skipped as duplicates are indicated in the same fashion as single inserts (the `unique_skipped_as_duplicated` attribute), and in such cases the conflicting row will be returned instead. [PR #38](https://github.com/riverqueue/riverqueue-python/pull/38).
13+
- **Breaking change:** Unique jobs no longer allow total customization of their states when using the `by_state` option. The pending, scheduled, available, and running states are required whenever customizing this list.
14+
15+
### Added
16+
17+
- The `UniqueOpts` class gains an `exclude_kind` option for cases where uniqueness needs to be guaranteed across multiple job types.
18+
- Unique jobs utilizing `by_args` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the other fields:
19+
20+
```python
21+
UniqueOpts(by_args=["customer_id"])
22+
```
23+
24+
Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result across implementations, even if the encoded JSON isn't sorted consistently.
25+
26+
### Changed
27+
28+
- Unique jobs have been improved to allow bulk insertion of unique jobs via `Client#insert_many`.
29+
30+
This updated implementation is significantly faster due to the removal of advisory locks in favor of an index-backed uniqueness system, while allowing some flexibility in which job states are considered. However, not all states may be removed from consideration when using the `by_state` option; pending, scheduled, available, and running states are required whenever customizing this list.
31+
1032
## [0.7.0] - 2024-07-30
1133

1234
### Changed
@@ -79,4 +101,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
79101

80102
### Added
81103

82-
- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).
104+
- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).

README.md

+5-13
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class JobArgs(Protocol):
4848
pass
4949
```
5050

51-
* `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
52-
* `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.
51+
- `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
52+
- `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.
5353

5454
They may also respond to `insert_opts()` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.
5555

@@ -95,22 +95,14 @@ insert_res.job
9595
insert_res.unique_skipped_as_duplicated
9696
```
9797

98-
### Custom advisory lock prefix
99-
100-
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
101-
102-
```python
103-
client = riverqueue.Client(riversqlalchemy.Driver(engine), advisory_lock_prefix=123456)
104-
```
105-
106-
Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.
98+
Unique jobs can also be inserted in bulk.
10799

108100
## Inserting jobs in bulk
109101

110102
Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:
111103

112104
```python
113-
num_inserted = client.insert_many([
105+
results = client.insert_many([
114106
SimpleArgs(job_num=1),
115107
SimpleArgs(job_num=2)
116108
])
@@ -119,7 +111,7 @@ num_inserted = client.insert_many([
119111
Or with `InsertManyParams`, which may include insertion options:
120112

121113
```python
122-
num_inserted = client.insert_many([
114+
results = client.insert_many([
123115
InsertManyParams(args=SimpleArgs(job_num=1), insert_opts=riverqueue.InsertOpts(max_attempts=5)),
124116
InsertManyParams(args=SimpleArgs(job_num=2), insert_opts=riverqueue.InsertOpts(queue="high_priority"))
125117
])

requirements-dev.lock

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# all-features: false
88
# with-sources: false
99
# generate-hashes: false
10+
# universal: false
1011

1112
-e file:.
1213
asyncpg==0.29.0

requirements.lock

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# all-features: false
88
# with-sources: false
99
# generate-hashes: false
10+
# universal: false
1011

1112
-e file:.
1213
sqlalchemy==2.0.30

0 commit comments

Comments
 (0)