Skip to content

Commit c6f2281

Browse files
committed
Fixup test server creation and parsing
1 parent ed27916 commit c6f2281

File tree

2 files changed

+65
-51
lines changed

2 files changed

+65
-51
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
'''
2+
File: **mock_server.py**
3+
Copyright (c) 2024 Loupe
4+
https://loupe.team
5+
6+
This file is part of Omniverse_BnR_Bridge_Extension, licensed under the MIT License.
7+
8+
Mock OMJSON server
9+
This is a simple Websockets server.
10+
It acts as a mock of a AR instance with OMJSON, responding
11+
to varaible requests and writing back data.
12+
13+
It's intended to be used for initial testing, adding some convenience
14+
by not having to spin up a whole other simulated PLC.
15+
16+
Here are docs on the message formats OMJSON uses:
17+
https://loupeteam.github.io/LoupeDocs/libraries/omjson/jsonwebsocketserver.html
18+
'''
19+
20+
import asyncio
21+
import json
22+
23+
from websockets.server import serve
24+
25+
# Populate this list of dictionaries with the variables you want to mock
26+
mock_plc_data = [{"TestProg:counter": 0}]
27+
28+
INITIAL_VALUE_NEW_READ_VAR = 0
29+
30+
async def mock_omjson_plc(websocket):
31+
async for message in websocket:
32+
response = {
33+
"type": "readresponse",
34+
"data": []
35+
}
36+
37+
#print(f"Received message from client: {message}")
38+
message_dict = json.loads(message)
39+
40+
if message_dict['type'] == "read":
41+
for plc_var in message_dict["data"]:
42+
for plc_var_dict in mock_plc_data: # for every dict in the list
43+
if plc_var in plc_var_dict.keys(): # if the requested var is in the keys
44+
response["data"].append(plc_var_dict)
45+
# increment the value of the variable every time it's read, just so it changes
46+
plc_var_dict[plc_var] = int(plc_var_dict[plc_var]) + 1
47+
else:
48+
print('not in dict')
49+
50+
await websocket.send(json.dumps(response))
51+
52+
elif message_dict['type'] == "write":
53+
for plc_write_var in message_dict["data"].keys():
54+
for plc_var_dict in mock_plc_data:
55+
if plc_write_var in plc_var_dict.keys():
56+
plc_var_dict[plc_write_var] = message_dict["data"][plc_write_var]
57+
else:
58+
print('write failed, not in dict')
59+
60+
61+
async def main():
62+
async with serve(mock_omjson_plc, "localhost", 8000):
63+
await asyncio.Future()
64+
65+
asyncio.run(main())

exts/loupe.simulation.br_bridge/loupe/simulation/br_bridge/tests/server.py

-51
This file was deleted.

0 commit comments

Comments
 (0)