forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_darwin_framework_ota_test.py
executable file
·188 lines (153 loc) · 6.71 KB
/
run_darwin_framework_ota_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#! /usr/bin/env -S python3 -B
import io
import json
import logging
import time
from subprocess import PIPE
import click
from chiptest.accessories import AppsRegister
from chiptest.runner import Runner
from chiptest.test_definition import App, ExecutionCapture
from chipyaml.paths_finder import PathsFinder
TEST_NODE_ID = '0x12344321'
TEST_VID = '0xFFF1'
TEST_PID = '0x8001'
class DarwinToolRunner:
def __init__(self, runner, command):
self.process = None
self.outpipe = None
self.runner = runner
self.lastLogIndex = 0
self.command = command
self.stdin = None
def start(self):
self.process, self.outpipe, errpipe = self.runner.RunSubprocess(self.command,
name='DARWIN-TOOL',
wait=False,
stdin=PIPE)
self.stdin = io.TextIOWrapper(self.process.stdin, line_buffering=True)
def stop(self):
if self.process:
self.process.kill()
def waitForMessage(self, message):
logging.debug('Waiting for %s' % message)
start_time = time.monotonic()
ready, self.lastLogIndex = self.outpipe.CapturedLogContains(
message, self.lastLogIndex)
while not ready:
if self.process.poll() is not None:
died_str = ('Process died while waiting for %s, returncode %d' %
(message, self.process.returncode))
logging.error(died_str)
raise Exception(died_str)
if time.monotonic() - start_time > 10:
raise Exception('Timeout while waiting for %s' % message)
time.sleep(0.1)
ready, self.lastLogIndex = self.outpipe.CapturedLogContains(
message, self.lastLogIndex)
logging.debug('Success waiting for: %s' % message)
class InteractiveDarwinTool(DarwinToolRunner):
def __init__(self, runner, binary_path):
self.prompt = "WAITING FOR COMMANDS NOW"
super().__init__(runner, [binary_path, "interactive", "start", "--additional-prompt", self.prompt])
def waitForPrompt(self):
self.waitForMessage(self.prompt)
def sendCommand(self, command):
logging.debug('Sending command %s' % command)
print(command, file=self.stdin)
self.waitForPrompt()
@click.group(chain=True)
@click.pass_context
def main(context):
pass
@main.command(
'run', help='Execute the test')
@click.option(
'--darwin-framework-tool',
help="what darwin-framework-tool to use")
@click.option(
'--ota-requestor-app',
help='what ota requestor app to use')
@click.option(
'--ota-data-file',
required=True,
help='The file to use to store our OTA data. This file does not need to exist.')
@click.option(
'--ota-image-file',
required=True,
help='The file to use to store the OTA image we plan to send. This file does not need to exist.')
@click.option(
'--ota-destination-file',
required=True,
help='The destination file to use for the requestor\'s download. This file does not need to exist.')
@click.option(
'--ota-candidate-file',
required=True,
help='The file to use for our OTA candidate JSON. This file does not need to exist.')
@click.pass_context
def cmd_run(context, darwin_framework_tool, ota_requestor_app, ota_data_file, ota_image_file, ota_destination_file, ota_candidate_file):
paths_finder = PathsFinder()
if darwin_framework_tool is None:
darwin_framework_tool = paths_finder.get('darwin-framework-tool')
if ota_requestor_app is None:
ota_requestor_app = paths_finder.get('chip-ota-requestor-app')
runner = Runner()
runner.capture_delegate = ExecutionCapture()
apps_register = AppsRegister()
apps_register.init()
darwin_tool = None
try:
apps_register.createOtaImage(ota_image_file, ota_data_file, "This is some test OTA data", vid=TEST_VID, pid=TEST_PID)
json_data = {
"deviceSoftwareVersionModel": [{
"vendorId": int(TEST_VID, 16),
"productId": int(TEST_PID, 16),
"softwareVersion": 2,
"softwareVersionString": "2.0",
"cDVersionNumber": 18,
"softwareVersionValid": True,
"minApplicableSoftwareVersion": 0,
"maxApplicableSoftwareVersion": 100,
"otaURL": ota_image_file
}]
}
with open(ota_candidate_file, "w") as f:
json.dump(json_data, f)
requestor_app = App(runner, [ota_requestor_app, '--otaDownloadPath', ota_destination_file])
apps_register.add('default', requestor_app)
requestor_app.start()
pairing_cmd = [darwin_framework_tool, 'pairing', 'code', TEST_NODE_ID, requestor_app.setupCode]
runner.RunSubprocess(pairing_cmd, name='PAIR', dependencies=[apps_register])
# pairing get-commissioner-node-id does not seem to work right in interactive mode for some reason
darwin_tool = DarwinToolRunner(runner, [darwin_framework_tool, 'pairing', 'get-commissioner-node-id'])
darwin_tool.start()
darwin_tool.waitForMessage(": Commissioner Node Id")
nodeIdLine = darwin_tool.outpipe.FindLastMatchingLine('.*: Commissioner Node Id (0x[0-9A-F]+)')
if not nodeIdLine:
raise Exception("Unable to find commissioner node id")
commissionerNodeId = nodeIdLine.group(1)
darwin_tool.stop()
darwin_tool = InteractiveDarwinTool(runner, darwin_framework_tool)
darwin_tool.start()
darwin_tool.waitForPrompt()
darwin_tool.sendCommand("otasoftwareupdateapp candidate-file-path %s" % ota_candidate_file)
darwin_tool.sendCommand("otasoftwareupdateapp set-reply-params --status 0")
darwin_tool.sendCommand("otasoftwareupdaterequestor announce-otaprovider %s 0 0 0 %s 0" %
(commissionerNodeId, TEST_NODE_ID))
# Now wait for the OTA download to finish.
requestor_app.waitForMessage("OTA image downloaded to %s" % ota_destination_file)
# Make sure the right thing was downloaded.
apps_register.compareFiles(ota_data_file, ota_destination_file)
except Exception:
logging.error("!!!!!!!!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!!!!!!!!!!")
runner.capture_delegate.LogContents()
raise
finally:
if darwin_tool is not None:
darwin_tool.stop()
apps_register.killAll()
apps_register.factoryResetAll()
apps_register.removeAll()
apps_register.uninit()
if __name__ == '__main__':
main(auto_envvar_prefix='CHIP')