This repository was archived by the owner on May 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathindex.js
127 lines (102 loc) · 3 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
const QuickLRU = require('quick-lru');
const KnackProducers = require('@optum/knack-producer');
const SchemaRegistry = require('@optum/knack-sr');
const {toAvroBuffer} = require('@optum/knack-avro');
const cache = new QuickLRU({maxSize: 1000});
const defaultOptions = {
srOptions: {
scheme: 'http',
domain: 'localhost:8081'
},
producerConfig: {
'metadata.broker.list': ['localhost:9092']
}
};
const resolveProducer = (options = defaultOptions) => {
const cacheKey = `p-${options.producerConfig['metadata.broker.list']}`;
let producer = cache.get(cacheKey);
if (!producer) {
producer = new KnackProducerClient(options);
cache.set(cacheKey, producer);
}
return producer;
};
class KnackProducerClient {
constructor(options = defaultOptions) {
this._options = options;
const {srOptions, useHighLevelProducer} = options;
if (useHighLevelProducer) {
const {KnackHighLevelProducer} = KnackProducers;
this._producer = new KnackHighLevelProducer(options);
} else {
const {KnackProducer} = KnackProducers;
this._producer = new KnackProducer(options);
}
this._sr = new SchemaRegistry(srOptions);
this.knackConnected = false;
}
static async connectInstance(options = defaultOptions) {
const producer = resolveProducer(options);
const connection = await producer.connect();
this.knackConnected = true;
return connection;
}
static disconnectInstances() {
const tasks = [];
for (const p of cache.values()) {
if (p.disconnect) {
tasks.push(p.disconnect());
}
}
return Promise.all(tasks);
}
static async resolveInstance(options = defaultOptions) {
const producer = resolveProducer(options);
if (!producer.knackConnected) {
await producer.connect();
producer.knackConnected = true;
}
return producer;
}
static instance(options = defaultOptions) {
return resolveProducer(options);
}
get options() {
return this._options;
}
get producer() {
return this._producer;
}
get schemaRegistry() {
return this._sr;
}
async encode({topic, type, val}) {
const subject = `${topic}-${type}`;
const cacheKey = `s-${subject}`;
let schema = cache.get(cacheKey);
if (!schema) {
// NOTE: add option to disable throwing an error when 404 returns
// from sr ... this way it will support publishing json, strings, etc.
schema = await this.schemaRegistry.getSchemaBySubject(subject);
cache.set(cacheKey, schema);
}
return toAvroBuffer(val, schema.schema, schema.id);
}
async connect() {
const connection = await this.producer.connect();
return connection;
}
async produce(...args) {
return this.producer.produce(...args);
}
async publish({topic, key, value}) {
const encodedKey = await this.encode({topic, type: 'key', val: key});
const encodedValue = await this.encode({topic, type: 'value', val: value});
return this.producer.produce(topic, null, encodedValue, encodedKey, Date.now());
}
async disconnect() {
const disconnection = await this.producer.disconnect();
return disconnection;
}
}
module.exports = KnackProducerClient;