Skip to content

Commit bb96183

Browse files
committed
Initialize on listen()
1 parent 463ee4d commit bb96183

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

src/mq.ts

+14-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
MessageQueueListenOptions,
55
} from "@fedify/fedify";
66
import type { Sql } from "postgres";
7+
import postgres from "postgres";
78

89
/**
910
* Options for the PostgreSQL message queue.
@@ -94,6 +95,7 @@ export class PostgresMessageQueue implements MessageQueue {
9495
handler: (message: any) => void | Promise<void>,
9596
options: MessageQueueListenOptions = {},
9697
): Promise<void> {
98+
await this.initialize();
9799
const { signal } = options;
98100
const poll = async () => {
99101
if (signal?.aborted) return;
@@ -155,14 +157,23 @@ export class PostgresMessageQueue implements MessageQueue {
155157
*/
156158
async initialize(): Promise<void> {
157159
if (this.#initialized) return;
158-
await this.#sql`
160+
try {
161+
await this.#sql`
159162
CREATE TABLE IF NOT EXISTS ${this.#sql(this.#tableName)} (
160163
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
161164
message jsonb NOT NULL,
162165
delay interval DEFAULT '0 seconds',
163166
created timestamp with time zone DEFAULT CURRENT_TIMESTAMP
164167
);
165168
`;
169+
} catch (e) {
170+
if (
171+
!(e instanceof postgres.PostgresError &&
172+
e.constraint_name === "pg_type_typname_nsp_index")
173+
) {
174+
throw e;
175+
}
176+
}
166177
this.#initialized = true;
167178
}
168179

@@ -173,3 +184,5 @@ export class PostgresMessageQueue implements MessageQueue {
173184
await this.#sql`DROP TABLE IF EXISTS ${this.#sql(this.#tableName)};`;
174185
}
175186
}
187+
188+
// cSpell: ignore typname

0 commit comments

Comments
 (0)