-
Notifications
You must be signed in to change notification settings - Fork 69
feat(zero-pg): support async post-mutation tasks in PushProcessor #4281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@erikmunson is attempting to deploy a commit to the Rocicorp Team on Vercel. A member of the Team first needs to authorize it. |
FYI @tantaman this is another PushProcessor rough edge that i ran into when setting my push endpoint up, feedback welcome. |
// Wait for the full mutation flow to complete w/ PG. | ||
for (let i = 0; i < 10; i++) { | ||
await sleep(10); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't love the timing stuff in this and subsequent test cases but i didn't see a better way to avoid flaking, if there is a nicer pattern for this from other tests in the codebase point it my way and I will improve it!
} as const satisfies CustomMutatorDefs<typeof schema>; | ||
} as const satisfies CustomMutatorDefs< | ||
ServerTransaction<typeof schema, PostgresJSTransaction> | ||
>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
necessary — or at least more ergonomic — to narrow the type here because .after()
is only on server transactions. The alternative would've been to check for existence of the function or verify that tx.location
was server
to narrow, which felt not as good. I suppose this is technically a more correct typing of the server mutators anyways 🤷
We also explored some other API ideas for this sort of thing:
const response = await processor.process(mutators, params, body);
const postCommitTasks = response.mutations.filter(r => 'postCommitTask' in r.result).map(r => r.result.postCommitTask);
await Promise.allSettled(postCommitTasks);
mutators: {
createIssue: {
before: ...,
after: ...,
mutate: ...,
},
} The hook is pretty inflexible and would make it hard (impossible?) to get data out of The return value is arguably more flexible than There was also a third idea to let the user control the transaction boundary on the server but we did not explore it in-depth. Something like: mutators: {
createIssue(z) {
// do stuff
z.begin((tx) => {
// actual mutation
});
// do other stuff
}
}
// z.begin is special and does our `errorMode` retry as well as advancing `lmid` wdyt about I see @aboodman suggested |
the return value thing would technically work i think, but it also (imo) ends up sort of side-stepping the DX pain around side effects in a way that keeps the PushProcessor implementation simpler but shifts the complexity back out to userland:
i think any or all of these tradeoffs would be totally fine for a use case that was rare/atypical, since it's expected that to do something advanced you might need to take on more responsibility in userland and do more work when updating to new versions of the platform. but given this is the #1 (or maybe #2 after applying permisions?) thing people will want to do in server mutator code i would personally lean toward making the setup experience for side effects really easy with as few things up for interpretation as possible. To my eyes, the updated zbugs push handler and mutator code in this diff feels really approachable to new folks, and i think usage examples in the docs will also feel that way. if we went the return value route, it feels like the version of it with the best DX would be to have a standard server mutator return value like: return {
after: () => foo(),
data: /* JSON data for the client*/
} and then PushProcessor would either return the tasks to execute as a separate field in the |
re: that third idea to have a unclear to me, though, if users would still need to implement their own async task execution and tracking in that world. e.g. i guess you'd need to use some kind of deferred execution thing if you wanted to run tasks in the background after curious to see what people's mutators end up looking like in practice — depending on usage patterns this design could end up being the winner long term 🤔 |
Can we pause on this? I reviewing (finally) the first PR tht refactored the layering and might want to make some changes which would affect this too. |
Discord thread with more context: https://discord.com/channels/830183651022471199/1363537428165558372
Background
This adds first-class support to PushProcessor for executing async tasks transactionally with mutations — i.e. tasks are only run if a mutation succeeds. Tasks queued by failed mutations are skipped.
zbugs already had a
postCommitTasks
pattern written in userland, but it was not transactional. If a mutation's code threw or the database failed to commit, it would still execute the tasks. While this behavior is theoretically possible to build by inspecting the push response payload returned byprocess()
, it seems more ergonomic to keep app devs out of the business of directly parsing push requests and responses for very common use cases. It's also likely that without first class support devs will end up re-building various versions of this, not all of which will be reliably transactional.tx.after()
PushProcessor now provides
DatabaseProvider
implementations with anafter
function, which can be used to enqueue async tasks for a given mutation. The ZQLServerTransaction
interface forwards thisafter
function along as a top level field:When a mutation successfully commits to the DB, the tasks it enqueued are committed to the set of tasks to be executed for the overall push request. When all the mutations in the push have been handled, all committed tasks are executed. If the promise returned by a task rejects, the exception is sent to the logger but does not fail the push request. Since the mutations for those tasks have already been committed and are making their way to the client via replication, it doesn't make sense to fail the push — work should continue to progress similar to other forms of mutation error.
Async mode
The default behavior of
process()
is to wait for all post-commit tasks to settle before returning. This is necessary for serverless functions where responding to the push request will suspend the runtime immediately.When running in a long-lived server process, the more common pattern is to execute async work after a response has been sent to the client, which can achieve higher throughput and lower latency for workloads where the async work takes a while to complete. For example if a post commit task in push A takes 3 seconds, a client might be have their mutations in push B queued up behind those tasks in zero-cache waiting for the push to respond before continuing.
This PR introduces an
async
configuration option which allows opt-in backgrounding of post commit tasks. In this modeprocess()
returns immediately and tracks post commit tasks async.To ensure background tasks are not interrupted, a new
close()
method is added which prevents new pushes from being processed and returns a promise that resolves when all in-flight tasks have settled. This method mimics the behavior of other libraries designed for long lived server processes, likefastify.close()
or similar. It can be used in developers' existing graceful shutdown code alongside the otherclose()
-style methods:This option could be used for backgrounding other async work in future changes, if desired.
Future work
Some serverless runtimes like Vercel support some form of
waitUntil
orafter
function natively, which allows responses to return to clients while async work runs in the background. Other libraries check for the existence of these special globals in the runtime to automatically background async work. I believe it would be straightforward to do this in PushProcessor, running all post commit tasks for a push inside awaitUntil
handler automatically even if theasync
flag is set to false.