-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patholed-display
executable file
·187 lines (152 loc) · 5.79 KB
/
oled-display
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env python
import sys
import time
import json
import threading
from Queue import Queue
from collections import deque
from monotonic import time as mt
from datetime import timedelta
from rpjios import Util
from rpjios.SubscriberBase import PSubscriber
import Adafruit_GPIO.SPI as SPI
import Adafruit_SSD1306
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
DEFAULT_GPIO_RESET_PIN = 24
class OLEDScreen(object):
def __init__(self, reset_pin=DEFAULT_GPIO_RESET_PIN):
self._disp = Adafruit_SSD1306.SSD1306_128_32(rst=reset_pin)
self._disp.begin()
self.clear()
self._img = Image.new('1', (self._disp.width, self._disp.height))
self._draw = ImageDraw.Draw(self._img)
self._font = ImageFont.load_default()
self.__ic = 0
@property
def width(self):
return self._disp.width
@property
def height(self):
return self._disp.height
@property
def font(self):
return self._font
def clear(self):
self._disp.clear()
self._disp.display()
def draw(self):
return self._draw
def render(self):
self.__ic += 1
if not (self.__ic % 10):
self._img.save("/tmp/oled-display.bmp")
self.__ic = 0
self._disp.image(self._img)
self._disp.display()
class OLEDSubscriber(object):
def __init__(self):
self._hn = Util.hostname()
self._q = Queue()
self._counts = {'hosts':{},'total':0}
self._stats = {'mps':deque(maxlen=30)}
self._run = True
self._psub = PSubscriber("*").add_listener(lambda x: self._q.put(x))
self._statthread = threading.Thread(target=self._statthreadfunc)
self._statthread.daemon = True
self._subthread = threading.Thread(target=self._subthreadfunc)
self._subthread.daemon = True
self._statthread.start()
self._subthread.start()
def __del__(self):
self._run = False
self._subthread.join()
self._statthreadfunc.join()
@property
def statistics(self):
mpsl = list(self._stats['mps'])
return {'per_second': (float(sum(mpsl)) / float(len(mpsl)) if len(mpsl) > 0 else 0),
'counts': self._counts }
@property
def local_interest(self):
return {k: 0.0 if not len(v) else (float(sum(v)) / float(len(v))) for (k, v) in self._local_interest.items()}
@local_interest.setter
def local_interest(self, li_list):
self._local_interest = {k: deque(maxlen=30) for k in li_list}
def _statthreadfunc(self):
_lc = 0
_lb = 0
while self._run:
_s = mt.time()
_cc = self._counts['total']
if _cc > 0:
self._stats['mps'].append(_cc - _lc)
_lc = _cc
time.sleep(1.0 - (mt.time() - _s))
def _subthreadfunc(self):
def _tvik(tree, key):
chk_f = (lambda l_key, l_tree: reduce(lambda b, kx: b + (kx if kx.startswith(l_key) else ""), l_tree.keys(), ""))
if type(tree) is dict:
chk_v = chk_f(key, tree)
if not bool(chk_v):
for k in tree.iterkeys():
tmp = _tvik(tree[k], key)
if tmp is not None:
return tmp
else:
return tree[chk_v]
return None
while self._run:
msg = self._q.get()
(host, mtype, ename) = msg["channel"].split(':')
self._counts['hosts'][host] = 0 if host not in self._counts['hosts'] else (self._counts['hosts'][host] + 1)
self._counts['total'] += 1
if 'data' in msg and host == self._hn:
jd = msg["data"] if type(msg["data"]) == dict else json.loads(msg["data"])
if "value" in jd:
en = None
jd = jd["value"]
if type(jd) == dict:
for (k, v) in {k: _tvik(jd, k) for k in self._local_interest.keys()}.items():
if v is not None:
self._local_interest[k].append(v)
else:
if ename in self._local_interest:
self._local_interest[ename].append(jd)
if __name__ == '__main__':
d = {}
os = OLEDScreen()
osd = os.draw()
x = 0
top = -2
lastMark = mt.time()
tchar = ':'
ols = OLEDSubscriber()
ols.local_interest = ["temp", "hum", "pres", "Soil"]
while 1:
_lst = mt.time()
osd.rectangle((0,0,os.width,os.height), outline=0, fill=0)
_s = ols.statistics
_stc = float(_s["counts"]["total"])
_dc = 0
while _stc >= 1000.0:
_stc /= 1000.0
_dc += 1
_tcs_fs = "{: >6.0f}{}" if _dc == 0 else ("{: >4.2f}{}" if int(_stc / 10.0) == 0 else "{: >5.1f}{}")
_tcs = _tcs_fs.format(_stc, {0:"",1:"K",2:"M",3:"B"}[_dc])
L1 = "[MSGS] {: >6s}, {:0.2f}/s".format(_tcs, _s["per_second"])
_li = ols.local_interest
L2 = "{:0.1f}F {:0.1f}% {:0.2f}".format(_li["temp"], _li["hum"], _li["pres"])
tm = mt.localtime()
if (_lst - lastMark) > 1.0:
tchar = ' ' if tchar is ':' else ':'
lastMark = _lst
_soil = int(((float(_li["Soil"]) / 200.0) * 1.4286) if bool(_li["Soil"]) else 0)
L3 = "<{:02d}{}{:02d}{}{:02d}> {}".format((tm.tm_hour%12), tchar, tm.tm_min, tchar, tm.tm_sec,
"[S] {}{}".format("." * (5 - _soil), "|" * _soil))
osd.text((x, top), str(L1), font=os.font, fill=255)
osd.text((x, top+12), str(L2), font=os.font, fill=255)
osd.text((x, top+23), str(L3), font=os.font, fill=255)
os.render()
time.sleep(1.0 - (mt.time() - _lst))