Skip to content

feat(zero-pg): support async post-mutation tasks in PushProcessor #4281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

erikmunson
Copy link
Contributor

@erikmunson erikmunson commented Apr 29, 2025

Discord thread with more context: https://discord.com/channels/830183651022471199/1363537428165558372

Background

This adds first-class support to PushProcessor for executing async tasks transactionally with mutations — i.e. tasks are only run if a mutation succeeds. Tasks queued by failed mutations are skipped.

zbugs already had a postCommitTasks pattern written in userland, but it was not transactional. If a mutation's code threw or the database failed to commit, it would still execute the tasks. While this behavior is theoretically possible to build by inspecting the push response payload returned by process(), it seems more ergonomic to keep app devs out of the business of directly parsing push requests and responses for very common use cases. It's also likely that without first class support devs will end up re-building various versions of this, not all of which will be reliably transactional.

tx.after()

PushProcessor now provides DatabaseProvider implementations with an after function, which can be used to enqueue async tasks for a given mutation. The ZQL ServerTransaction interface forwards this after function along as a top level field:

tx.after(() =>
  postToDiscord({
    title: `${modifierUser.login} reported an issue`,
    message: [issue.title, clip(issue.description ?? '')]
      .filter(Boolean)
      .join('\n'),
    link: `https://bugs.rocicorp.dev/issue/${issue.shortID}`,
  }),
);

When a mutation successfully commits to the DB, the tasks it enqueued are committed to the set of tasks to be executed for the overall push request. When all the mutations in the push have been handled, all committed tasks are executed. If the promise returned by a task rejects, the exception is sent to the logger but does not fail the push request. Since the mutations for those tasks have already been committed and are making their way to the client via replication, it doesn't make sense to fail the push — work should continue to progress similar to other forms of mutation error.

Async mode

The default behavior of process() is to wait for all post-commit tasks to settle before returning. This is necessary for serverless functions where responding to the push request will suspend the runtime immediately.

When running in a long-lived server process, the more common pattern is to execute async work after a response has been sent to the client, which can achieve higher throughput and lower latency for workloads where the async work takes a while to complete. For example if a post commit task in push A takes 3 seconds, a client might be have their mutations in push B queued up behind those tasks in zero-cache waiting for the push to respond before continuing.

This PR introduces an async configuration option which allows opt-in backgrounding of post commit tasks. In this mode process() returns immediately and tracks post commit tasks async.

To ensure background tasks are not interrupted, a new close() method is added which prevents new pushes from being processed and returns a promise that resolves when all in-flight tasks have settled. This method mimics the behavior of other libraries designed for long lived server processes, like fastify.close() or similar. It can be used in developers' existing graceful shutdown code alongside the other close()-style methods:

const server = fastify()
const processor = new PushProcessor(/* connection/db details */, { async: true })

server.post(('/api/push', async function (request, reply) {
  return processor.process(/* mutators, etc. */)
})

await server.listen()

process.on('SIGTERM', async () => {
  await Promise.all([server.close(), processor.close()])
  process.exit(0)
})

This option could be used for backgrounding other async work in future changes, if desired.

Future work

Some serverless runtimes like Vercel support some form of waitUntil or after function natively, which allows responses to return to clients while async work runs in the background. Other libraries check for the existence of these special globals in the runtime to automatically background async work. I believe it would be straightforward to do this in PushProcessor, running all post commit tasks for a push inside a waitUntil handler automatically even if the async flag is set to false.

Copy link

vercel bot commented Apr 29, 2025

@erikmunson is attempting to deploy a commit to the Rocicorp Team on Vercel.

A member of the Team first needs to authorize it.

@erikmunson
Copy link
Contributor Author

FYI @tantaman this is another PushProcessor rough edge that i ran into when setting my push endpoint up, feedback welcome.

// Wait for the full mutation flow to complete w/ PG.
for (let i = 0; i < 10; i++) {
await sleep(10);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't love the timing stuff in this and subsequent test cases but i didn't see a better way to avoid flaking, if there is a nicer pattern for this from other tests in the codebase point it my way and I will improve it!

} as const satisfies CustomMutatorDefs<typeof schema>;
} as const satisfies CustomMutatorDefs<
ServerTransaction<typeof schema, PostgresJSTransaction>
>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

necessary — or at least more ergonomic — to narrow the type here because .after() is only on server transactions. The alternative would've been to check for existence of the function or verify that tx.location was server to narrow, which felt not as good. I suppose this is technically a more correct typing of the server mutators anyways 🤷

@tantaman
Copy link
Contributor

We also explored some other API ideas for this sort of thing:

  • Letting mutators return arbitrary values. On the server you could return a callback that is some task to run.
const response = await processor.process(mutators, params, body);
const postCommitTasks = response.mutations.filter(r => 'postCommitTask' in r.result).map(r => r.result.postCommitTask);
await Promise.allSettled(postCommitTasks);
  • Adding an explicit hook when creating a mutator.
mutators: {
  createIssue: {
    before: ...,
    after: ...,
    mutate: ...,
  },
}

The hook is pretty inflexible and would make it hard (impossible?) to get data out of mutate and into after.

The return value is arguably more flexible than tx.after since the user is in control of how to run the returned callback(s). But... I don't see how we can get type information for the returned items. The user would have to type assert / cast the response objects. Maybe not a big deal? 🤷‍♂️

There was also a third idea to let the user control the transaction boundary on the server but we did not explore it in-depth. Something like:

mutators: {
  createIssue(z) {
    // do stuff
    z.begin((tx) => {
      // actual mutation
    });
    // do other stuff
  }
}

// z.begin is special and does our `errorMode` retry as well as advancing `lmid`

wdyt about tx.after vs the return value approach?

I see @aboodman suggested tx.after. Curious if there was a clear reason to pick that route over a return value.

@erikmunson
Copy link
Contributor Author

the return value thing would technically work i think, but it also (imo) ends up sort of side-stepping the DX pain around side effects in a way that keeps the PushProcessor implementation simpler but shifts the complexity back out to userland:

  1. still requires introspecting the push response payload, creating a pretty low-level setup step for folks getting this going and coupling user code to the current version of the push protocol/schema
  2. mixes 'i want to send something from the server back to the client' and 'i want to do a side effect' into the return value of the mutator, which i think creates some awkwardness for folks who aren't already deeply familiar with how this works. feels finicky. like do i need to remember to filter out my special callback thingy from every mutation response before returning it to the client? what happens if i don't? and how do i enforce the correct usage of my special thingy across all my mutators, do i need some kind of custom type? some of this could be mitigated by creating a new return type for server mutators that took a standard set of fields including post commit tasks and data to return to the client, but if i play that path out to it's conclusion it feels like it ends up at a pretty similar place to how tx.after() would work, except that the API is slightly different inside mutators
  3. still requires users to implement their own task execution, error handling, backgrounding/task tracking and shutdown stuff, etc. — not a huge deal at all. but also still the kind of thing that is a little fiddly extra step that requires thinking about how to handle failure, right in the initial mutator setup flow
  4. requires extra user code to implement the common pattern of letting reusable helper functions enqueue things of their own. e.g. in zbugs the tx is passed into notify, a reusable helper that enqueues tasks to be executed depending on the type of activity. in the return value design, every mutator would need to have some kind of pattern where it creates an array of tasks for that mutation, passes that down into helpers to use, and then in the returned callback for the overall mutator does like Promise.all(tasks.map((task) => task()). lots of extra boilerplate, and also means that you have to think about failure handling again in each of these (i.e. do i want to log failures when they settle, or error the whole thing if one fails? etc.)

i think any or all of these tradeoffs would be totally fine for a use case that was rare/atypical, since it's expected that to do something advanced you might need to take on more responsibility in userland and do more work when updating to new versions of the platform. but given this is the #1 (or maybe #2 after applying permisions?) thing people will want to do in server mutator code i would personally lean toward making the setup experience for side effects really easy with as few things up for interpretation as possible. To my eyes, the updated zbugs push handler and mutator code in this diff feels really approachable to new folks, and i think usage examples in the docs will also feel that way.

if we went the return value route, it feels like the version of it with the best DX would be to have a standard server mutator return value like:

return {
  after: () => foo(),
  data: /* JSON data for the client*/
}

and then PushProcessor would either return the tasks to execute as a separate field in the process() return value, alongside the response for the push, or do something like what this PR does and execute them on the user's behalf. I don't have a good solution for tradeoff #4 above — it feels like no matter what with the return value approach users would need to invent their own patterns for every mutator to pass callbacks up and down between helper functions, in addition to passing the tx object into helpers the way people are already doing.

@erikmunson
Copy link
Contributor Author

re: that third idea to have a tx.begin() or similar — i think if there ends up being a strong need to run code before the tx starts that is maybe the inevitable outcome. or at least i can't currently think of another good way to do it. from my vantage point just monitoring the public discord it doesn't seem like there's a clear motivating case for adding it right now, but if the motivation appeared it's a very reasonable idea imo.

unclear to me, though, if users would still need to implement their own async task execution and tracking in that world. e.g. i guess you'd need to use some kind of deferred execution thing if you wanted to run tasks in the background after process() returned to the client. probably a lot simpler to conceptually do that in a world where it's just regular code running outside the tx. also feels like it might (?) make the code sharing between server and client mutators a little weirder to read and write, since i guess you'd need to run the client mutator inside the tx.

curious to see what people's mutators end up looking like in practice — depending on usage patterns this design could end up being the winner long term 🤔

@aboodman
Copy link
Contributor

Can we pause on this? I reviewing (finally) the first PR tht refactored the layering and might want to make some changes which would affect this too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants