-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdownsample
executable file
·176 lines (138 loc) · 6.05 KB
/
downsample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
import os
import sys
import json
import time
import redis
import Queue
import getopt
import threading
from monotonic import time as mttime
from rpjios.Util import parse_redis_url, hostname
from rpjios.Types import Constants
def inject_custom_opts(r_opts):
r_opts['socket_connect_timeout'] = 10.0
return r_opts
def parseOptOpts(optstr):
optName = None
opts = None
if optstr != None:
optSplit = optstr.split(":")
optName = optSplit[0]
if len(optSplit) == 2:
opts = {k: val for k, val in map(lambda x: x.split('='), optSplit[1].split(','))}
for k in opts:
if os.path.exists(opts[k]):
opts[k] = json.load(open(opts[k], 'r'))
return (optName, opts)
def printOptOpts(opts):
if opts:
print " Options:"
for (k, v) in opts.items():
print " '{}' = {} ({})".format(k, v, type(v))
if __name__ == "__main__":
ops, args = getopt.getopt(sys.argv[1:], 'di:o:m:r:p:t:')
o_d = {}
for e in ops:
o_d[e[0].replace('-','')] = e[1]
if not ('i' in o_d and 'o' in o_d and 'p' in o_d):
print "Usage: {} -i input_redis_url -o output_redis_url -p subscription_pattern -m mode -r rate -t passthru_mode\n".format(sys.argv[0])
sys.exit(0)
i_urip = parse_redis_url(o_d['i'])
o_urip = parse_redis_url(o_d['o'])
if 'host' not in i_urip or 'host' not in o_urip:
raise Exception("Bad host spec(s)!")
i_r = redis.StrictRedis(**inject_custom_opts(i_urip))
o_r = redis.StrictRedis(**inject_custom_opts(o_urip))
s_r = int(o_d['r']) if 'r' in o_d else 10
(mode, modeOpts) = parseOptOpts(o_d['m']) if 'm' in o_d else (None, None)
(ptMode, ptOpts) = parseOptOpts(o_d['t']) if 't' in o_d else (None, None)
if ptMode == 'list':
if not ptOpts:
ptOpts = {}
ptOpts['suffix'] = ptOpts['suffix'] if 'suffix' in ptOpts else ".list"
ptOpts['limit'] = int(ptOpts['limit']) if 'limit' in ptOpts else 1000
print "Input: {}".format(o_d['i'])
print "Output: {}".format(o_d['o'])
print "Pattern: {}".format(o_d['p'])
print "Rate: {}".format(s_r)
print "P-thru: {}".format(ptMode)
printOptOpts(ptOpts)
print "Mode: {}".format(mode)
printOptOpts(modeOpts)
_sep = Constants.NAMESPACE_SEP
s_d = {}
i_ps = i_r.pubsub()
i_ps.psubscribe(o_d['p'])
sendQueue = Queue.Queue()
def sendThread():
while 1:
nextMsg = sendQueue.get()
prevFwd = nextMsg['data']['__ds'] if nextMsg['data'].has_key('__ds') else None
nextMsg['data']['__ds'] = {"ts": mttime.time(), "host": hostname(), "prev": prevFwd, "rate": s_r}
nextMsg['send_func'](nextMsg['channel'], json.dumps(nextMsg['data']))
sendThreadInst = threading.Thread(target=sendThread)
sendThreadInst.daemon = True
sendThreadInst.start()
for m in i_ps.listen():
try:
if 'channel' in m and 'data' in m:
if 'd' in o_d:
print ">>> MSG: {}".format(m)
if 'subscribe' in m['type']:
continue
_d_d = json.loads(m['data'])
if _d_d['type'] == 'AVAILABLE':
continue
_c = m['channel']
chanElements = _c.split(_sep)
sourceName = chanElements[-1]
forwardMsgs = {_c: _d_d}
if mode == "flatten" and type(_d_d["value"]) is dict:
forwardMsgs = {}
sufMap = modeOpts["mapping"][sourceName] if modeOpts.has_key("mapping") and modeOpts["mapping"].has_key(sourceName) else {}
for (key, val) in _d_d["value"].iteritems():
chanSuffix = sufMap[key] if sufMap.has_key(key) else key
if type(chanSuffix) is bool and chanSuffix == False:
continue
newChan = _sep.join(list(chanElements) + [chanSuffix])
newMsg = dict(_d_d)
newMsg["source"] = newChan
newMsg["value"] = val
forwardMsgs[newChan] = newMsg
if modeOpts.has_key("include-raw"):
newChan = _sep.join(list(chanElements) + [".raw"])
_d_d["source"] = newChan
forwardMsgs[newChan] = dict(_d_d)
for (key, val) in forwardMsgs.iteritems():
if key not in s_d:
s_d[key] = []
s_d[key].append(val)
if len(s_d[key]) == s_r:
sendFunc = o_r.publish
if ptMode == 'key':
sendFunc = o_r.set
elif ptMode == 'list':
# this sends outside of the sendThread! that's bad!
def listSendFunc(ch, msg):
omsg = json.loads(msg)
lname = _sep.join([ch, ptOpts['suffix']])
rpipe = o_r.pipeline(True)
if rpipe.lpush(lname, [omsg['ts'], omsg['value']]) > ptOpts['limit']:
rpipe.ltrim(lname, 0, ptOpts['limit'] - 1)
try:
rpipe.execute()
except e:
print "listSendFunc() pipe execution failure: {}".format(e)
sendFunc = listSendFunc
sendQueue.put({
'send_func': sendFunc,
'channel': key,
'data': val
})
if 'd' in o_d:
print "After 'put', queue has ~{} elements".format(sendQueue.qsize())
del s_d[key]
s_d[key] = []
except Exception as e:
print "Error: {}".format(e)