Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MongoDBEventStore] Feature - Basic implementation of MongoDBEventStoreConsumer #214

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './mongoDBEventStoreConsumer';
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import {
assertDeepEqual,

Check failure on line 2 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

'assertDeepEqual' is defined but never used. Allowed unused vars must match /^_/u
assertEqual,
assertIsNotNull,
assertIsNull,

Check failure on line 5 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

'assertIsNull' is defined but never used. Allowed unused vars must match /^_/u
assertMatches,
assertThatArray,

Check failure on line 7 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

'assertThatArray' is defined but never used. Allowed unused vars must match /^_/u
assertThrowsAsync,

Check failure on line 8 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

'assertThrowsAsync' is defined but never used. Allowed unused vars must match /^_/u
type Message,

Check failure on line 9 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

'Message' is defined but never used. Allowed unused vars must match /^_/u
} from '@event-driven-io/emmett';
import {
MongoDBContainer,
type StartedMongoDBContainer,
} from '@testcontainers/mongodb';
import { MongoClient, MongoNotConnectedError } from 'mongodb';

Check failure on line 15 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

'MongoNotConnectedError' is defined but never used. Allowed unused vars must match /^_/u
import { after, before, describe, it } from 'node:test';
import { MongoDBEventStoreConsumer } from './mongoDBEventStoreConsumer';
import { getMongoDBEventStore, toStreamName } from '../mongoDBEventStore';
import { type ShoppingCartEvent } from '../../testing';
import { v4 as uuid } from 'uuid';

describe('MongoDBEventStoreConsumer', () => {

Check failure on line 22 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
let mongodb: StartedMongoDBContainer;
let client: MongoClient;

before(async () => {
mongodb = await new MongoDBContainer().start();
client = new MongoClient(mongodb.getConnectionString(), {
directConnection: true,
});
});

after(async () => {
try {
await client.close();
await mongodb.stop();
} catch (error) {
console.log(error);
}
});

it('should publish applicable messages to subscribers when events are appended in the event store', async () => {

Check failure on line 42 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
const messagesPublishedToProductItemAdded: ShoppingCartEvent[] = [];
const messagesPublishedToDiscountApplied: ShoppingCartEvent[] = [];

const consumer = new MongoDBEventStoreConsumer<ShoppingCartEvent>()
.subscribe({
canHandle: ['ProductItemAdded'],
handle: (events) => {
messagesPublishedToProductItemAdded.push(...events);
},
})
.subscribe({
canHandle: ['DiscountApplied'],
handle: (events) => {
messagesPublishedToDiscountApplied.push(...events);
},
});

const eventStore = getMongoDBEventStore({
client,
hooks: {
onAfterCommit: (events) => {
consumer.publish(events);
},
},
});

const shoppingCardId = uuid();
const streamType = 'shopping_cart';
const streamName = toStreamName(streamType, shoppingCardId);

const productItemEvents: (ShoppingCartEvent & {
type: 'ProductItemAdded';
})[] = [
{
type: 'ProductItemAdded',
data: {
productItem: {
price: 1,
productId: 'productId1',
quantity: 1,
},
},
},
{
type: 'ProductItemAdded',
data: {
productItem: {
price: 2,
productId: 'productId2',
quantity: 2,
},
},
},
{
type: 'ProductItemAdded',
data: {
productItem: {
price: 3,
productId: 'productId3',
quantity: 3,
},
},
},
];

const discountAppliedEvent: ShoppingCartEvent & {
type: 'DiscountApplied';
} = {
type: 'DiscountApplied',
data: {
couponId: 'couponId',
percent: 10,
},
};

await eventStore.appendToStream(streamName, [
// Events appending in any order
productItemEvents[0]!,
discountAppliedEvent,
productItemEvents[1]!,
productItemEvents[2]!,
]);

assertEqual(
productItemEvents.length,
messagesPublishedToProductItemAdded.length,
);
for (const message of messagesPublishedToProductItemAdded) {
const expectedMessage = productItemEvents.find(
(e) =>
// @ts-expect-error expecting this property to exist
e.data.productItem.productId === message.data.productItem.productId,

Check failure on line 134 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.e2e.spec.ts

View workflow job for this annotation

GitHub Actions / Build application code

Unsafe member access .productId on an `error` typed value
);
assertIsNotNull(expectedMessage!);
assertMatches(message, expectedMessage);
}

assertEqual(1, messagesPublishedToDiscountApplied.length);
assertMatches(messagesPublishedToDiscountApplied[0], discountAppliedEvent);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { Message } from '@event-driven-io/emmett';

export type MongoDBEventStoreConsumerSubscription<MessageType extends Message> =
{
canHandle: MessageType['type'][];
handle: (messages: MessageType[]) => void | Promise<void>;
};

/**
* The `MongoDBEventStoreConsumer` allows you to subscribe handlers to be called when messages are published to the consumer.
*
* @example
*
* ```typescript
* import {
* getMongoDBEventStore,
* MongoDBEventStoreConsumer,
* } from '@event-driven-io/emmett-mongodb';
*
* const consumer = new MongoDBEventStoreConsumer()
* .subscribe({
* canHandle: ['MyEventType'],
* handle: (messages) => {
* // handle messages ...
* },
* })
* .subscribe({
* canHandle: ['AnotherEventType'],
* handle: (messages) => {
* // handle messages ...
* },
* })
*
* const eventStore = getMongoDBEventStore({
* // ...,
* hooks: {
* onAfterCommit: (events) => {
* consumer.publish(events);
* },
* },
* })
*/
export class MongoDBEventStoreConsumer<MessageType extends Message> {
private subscriptions: MongoDBEventStoreConsumerSubscription<MessageType>[];

constructor() {
this.subscriptions = [];
}

publish<MessageType extends Message>(messages: MessageType[]) {
for (const subscription of this.subscriptions) {
const messagesSubscriptionCanHandle = filterMessagesByType(
messages,
subscription.canHandle,
);

if (messagesSubscriptionCanHandle.length < 0) {
continue;
}

// TODO: should this be ran asynchronoously or awaited?
subscription.handle(messagesSubscriptionCanHandle);

Check failure on line 62 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.ts

View workflow job for this annotation

GitHub Actions / Build application code

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
}

return this;
}

subscribe(subscription: MongoDBEventStoreConsumerSubscription<MessageType>) {
this.subscriptions.push(subscription);
return this;
}
}

export function filterMessagesByType<
IncomingMessageType extends Message,
ExpectedMessageType extends Message,
>(
messages: IncomingMessageType[],
types: ExpectedMessageType['type'][],
): ExpectedMessageType[] {
// @ts-expect-error The `type` parameter is how we determine whether or not the `message` is an `ExpectedMessageType`
return messages.filter((m) => types.includes(m.type));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { describe, it } from 'node:test';
import { filterMessagesByType } from './mongoDBEventStoreConsumer';
import { assertEqual, assertThatArray } from 'packages/emmett/src';

describe('MongoDBEventStoreConsumer', () => {
describe('filterMessagesByType', () => {
it('should filter for the correct messages types', () => {
const messages = [
{ type: 'ProductItemAdded', data: {} },
{ type: 'DiscountApplied', data: {} },
{ type: 'ProductItemAdded', data: {} },
{ type: 'DiscountApplied', data: {} },
];
const types = ['ProductItemAdded'];
const result = filterMessagesByType(messages, types);
assertEqual(2, result.length);
assertThatArray(result).allMatch((m) => m.type === 'ProductItemAdded');
});
});
});
1 change: 1 addition & 0 deletions src/packages/emmett-mongodb/src/eventStore/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './mongoDBEventStore';
export * from './projections';
export * from './storage';
export * from './consumers';
19 changes: 11 additions & 8 deletions src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ export type MongoDBEventStoreConnectionOptions =
| MongoDBEventStoreClientOptions
| MongoDBEventStoreConnectionStringOptions;

export type MongoDBEventStoreHookHandlerContext = {
eventStore: MongoDBEventStore;
};

export type MongoDBEventStoreOptions = {
projections?: ProjectionRegistration<
'inline',
Expand All @@ -181,7 +185,10 @@ export type MongoDBEventStoreOptions = {
>[];
storage?: MongoDBEventStoreStorageOptions;
} & MongoDBEventStoreConnectionOptions &
DefaultEventStoreOptions<MongoDBEventStore>;
DefaultEventStoreOptions<
MongoDBEventStore,
MongoDBEventStoreHookHandlerContext
>;

export type MongoDBEventStore = EventStore<MongoDBReadEventMetadata> & {
projections: ProjectionQueries<StreamType>;
Expand Down Expand Up @@ -393,13 +400,9 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
);
}

await tryPublishMessagesAfterCommit<MongoDBEventStore>(
eventsToAppend,
this.options.hooks,
// {
// TODO: same context as InlineProjectionHandlerContext for mongodb?
// },
);
await tryPublishMessagesAfterCommit(eventsToAppend, this.options.hooks, {
eventStore: this,
});

return {
nextExpectedStreamVersion:
Expand Down