-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathavro-producer.py
81 lines (73 loc) · 1.88 KB
/
avro-producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/python
# docker run -it --network host hikagenji/confluent-kafka-avro-python:latest python
import random
import json
import threading
import time
import sys
from datetime import datetime
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
key_schema_str = """
{
"namespace": "indicator",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "id",
"type" : "string",
"default" : ""
}
]
}
"""
key_schema = avro.loads(key_schema_str)
value_schema_str = """
{
"namespace": "indicator",
"name": "value",
"type" : "record",
"fields" : [{
"name" : "timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
},
"default" : -1
},
{
"name" : "side",
"type" : "int",
"default" : 1
},
{
"name" : "id",
"type" : "string",
"default" : ""
}
]
}
"""
value_schema = avro.loads(value_schema_str)
p = AvroProducer({
'bootstrap.servers': 'localhost:29092',
'queue.buffering.max.messages': 1000000,
'queue.buffering.max.ms': 1,
'log.connection.close': False,
'schema.registry.url': 'http://localhost:8081'
}, default_key_schema=key_schema,
default_value_schema=value_schema)
def createOrders(n): return ["OD-" + str(i) for i in range(1, n)]
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
while True:
time.sleep(1)
r = [p.produce(topic="parameter", key={"id": o},
value={"timestamp": int(round(time.time() * 1000)), "id": o, "side": 1},
callback=delivery_callback) for o in createOrders(2)]
p.flush()