-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
164 lines (132 loc) · 5.08 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# -*- coding: utf-8 -*-
"""Commute Headlights: listens for Pub/Sub events and triggers LED strip."""
import datetime
import neopixel
import time
from google.cloud import pubsub
# Configure NeoPixel
LED_COUNT = 59 # Number of LED pixels.
LED_PIN = 12 # GPIO pin connected to the pixels (must support PWM!).
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz)
LED_DMA = 5 # DMA channel to use for generating signal (try 5)
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift)
# Configure Pub/Sub
PUBSUB_PROJECT = "commute-pebble"
PUBSUB_SUBSCRIPTION = "headlights"
# Commute defines
REQUEST_TYPE_LOCATION = "0"
REQUEST_TYPE_HOME = "1"
REQUEST_TYPE_WORK = "2"
# NeoPixel LED strip functions
def strip_init():
"""Initialize and return the NeoPixel LED strip interface object."""
strip = neopixel.Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS)
strip.begin()
return strip
def strip_fill(strip, color):
"""Immediately display the same color on all LEDs."""
for i in range(strip.numPixels()):
strip.setPixelColor(i, color)
strip.show()
def strip_blink(strip, color, wait_ms=100, iterations=3):
"""Blink all LEDs simultaneously, then fade out."""
strip_fill(strip, color)
for i in range(iterations * 2 - 1):
if i % 2 == 0:
strip.setBrightness(LED_BRIGHTNESS)
else:
strip.setBrightness(0)
strip.show()
time.sleep(wait_ms / 1000.0)
strip_fade(strip)
def strip_spread(strip, color, wait_ms=10):
"""Symmetrically spread a color from the middle LED to all LEDs."""
middle = (strip.numPixels() - 1) / 2
i = middle
j = middle
while i >= 0 and j < strip.numPixels():
strip.setPixelColor(i, color)
strip.setPixelColor(j, color)
i -= 1
j += 1
strip.show()
time.sleep(wait_ms / 1000.0)
strip_fade(strip)
def strip_fade(strip, wait_ms=10):
"""Fade out, then reset all LEDs."""
for i in range(LED_BRIGHTNESS, 0, -5):
strip.setBrightness(i)
strip.show()
time.sleep(wait_ms / 1000.0)
strip_fill(strip, neopixel.Color(0, 0, 0))
strip.setBrightness(LED_BRIGHTNESS)
strip.show()
# Listen for and handle Commute events
def fire_event(event_type):
"""Display a Commute event on the LED strip."""
if event_type == "location_work":
color = neopixel.Color(255, 100, 0) # Orange
elif event_type == "location_home":
color = neopixel.Color(0, 100, 0) # Green
elif event_type == "home_work":
color = neopixel.Color(255, 200, 0) # Light orange / yellow
elif event_type == "work_home":
color = neopixel.Color(100, 255, 0) # Light green
elif event_type == "calendar":
color = neopixel.Color(0, 150, 255) # Blue
elif event_type == "settings":
color = neopixel.Color(255, 0, 50) # Pink
elif event_type == "error":
color = neopixel.Color(255, 0, 0) # Red
else:
return
# At night, return without firing LED strip
now = datetime.datetime.now()
if now.isoweekday() < 6: # Weekdays
if now.hour < 8 or now.hour >= 22:
return
else: # Weekends
if now.hour < 10 or now.hour >= 22:
return
strip = strip_init()
if event_type == "error":
strip_blink(strip, color)
else:
strip_spread(strip, color)
def handle_event(e):
"""Handle an incoming Commute event."""
# Identify event
if e['action'] == "directions":
if e['orig'] == REQUEST_TYPE_LOCATION and e['dest'] == REQUEST_TYPE_WORK:
event_type = "location_work"
elif e['orig'] == REQUEST_TYPE_LOCATION and e['dest'] == REQUEST_TYPE_HOME:
event_type = "location_home"
elif e['orig'] == REQUEST_TYPE_HOME and e['dest'] == REQUEST_TYPE_WORK:
event_type = "home_work"
elif e['orig'] == REQUEST_TYPE_WORK and e['dest'] == REQUEST_TYPE_HOME:
event_type = "work_home"
else:
return
else:
event_type = e['action']
# Fire event on LED strip
fire_event(event_type)
def pubsub_listen(project, subscription_name):
"""Listen for Commute events on Pub/Sub."""
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def pubsub_event(message):
event = message.attributes
handle_event(event)
message.ack()
# Limit the subscriber to only have 1 outstanding messages at a time.
flow_control = pubsub.types.FlowControl(max_messages=1)
subscriber.subscribe(subscription_path, callback=pubsub_event, flow_control=flow_control)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for events on {}'.format(subscription_path))
while True:
time.sleep(60)
# Start Pub/Sub listen loop
pubsub_listen(PUBSUB_PROJECT, PUBSUB_SUBSCRIPTION)