-
Notifications
You must be signed in to change notification settings - Fork 0
User/jvollmer/read can with submodule #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
read_can_messages.py
Outdated
signal_definitions = { | ||
"0x20A": {"default": {"name": "ACC_IN", "data_type": "float"}}, | ||
"0x200": {"default": {"name": "MCU_ACC", "data_type": "float"}}, | ||
"0x201": {"default": {"name": "MCU_RGN_BRK", "data_type": "float"}}, | ||
"0x207": { | ||
0: {"name": "MCU_DIR", "data_type": "boolean"}, | ||
1: {"name": "MCU_ECO", "data_type": "boolean"}, | ||
3: {"name": "PRK_BRK_TELEM", "data_type": "boolean"}, | ||
}, | ||
"0x202": {"default": {"name": "LV_12V_TELEM", "data_type": "float"}}, | ||
"0x203": {"default": {"name": "LV_5V_TELEM", "data_type": "float"}}, | ||
"0x204": {"default": {"name": "I_OUT_5V_TELEM", "data_type": "float"}}, | ||
"0x205": {"default": {"name": "I_IN_TELEM", "data_type": "float"}}, | ||
"0x209": {2: {"name": "MCU_MC_ON", "data_type": "boolean"}}, | ||
"0x206": {"default": {"name": "BRK_PRES_TELEM", "data_type": "float"}}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove
read_can_messages.py
Outdated
print(type(id)) | ||
print(type(offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can move to logging.debug()
to avoid printing unless we're debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
read_can_messages.py
Outdated
s = s[:-2] | ||
# Add/Update the message data in the processed data format | ||
p = processed.setdefault(id, {}) | ||
categories = SignalInfo(key, s[0], s[1], s[2], s[3], s[4], s[5]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reference each element s[0]
through s[1]
by name for clarity here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
read_can_messages.py
Outdated
# Add/Update the message data in the processed data format | ||
p = processed.setdefault(id, {}) | ||
categories = SignalInfo(key, s[0], s[1], s[2], s[3], s[4], s[5]) | ||
p[offset] = asdict(categories) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep this as a SignalInfo
instance instead of casting to a dict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
read_can_messages.py
Outdated
print( | ||
f"New Message: ID={message_data['id']},Name={signal['name']} Value={float_value}, Time Stamp={message_data['timestamp']}" | ||
) | ||
|
||
# handle booleans | ||
else: | ||
for key in signals: | ||
signal = signals[key] | ||
if signal["data_type"] == "boolean": | ||
byte_array = bytes(message.data) | ||
if not byte_array: | ||
logging.error( | ||
f"No data available for boolean signal in CAN ID {can_id}." | ||
) | ||
continue | ||
bit_offset = key | ||
bool_value = bool((byte_array[0] >> bit_offset) & 1) | ||
print( | ||
f"New Message: ID={message_data['id']},Name={signals[key]['name']} Value={bool_value}, Time Stamp={message_data['timestamp']}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Can we change the print()
calls to logging.debug()
to avoid printing by default? If we do want to print by default, can we use logging.info()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
read_can_messages.py
Outdated
|
||
|
||
logging.basicConfig( | ||
level=logging.ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we decrease to logging.INFO
? Generally, it's good to show info messages but filter out debug messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
send_messages.py
Outdated
args = parser.parse_args() | ||
# add logging | ||
logging.basicConfig( | ||
level=logging.ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we decrease to logging.INFO
? Generally, it's good to show info messages but filter out debug messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
send_messages.py
Outdated
# Allow user to input CAN channel | ||
parser = argparse.ArgumentParser(description="Specify the CAN channel.") | ||
# Define a positional argument for channel | ||
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") | ||
args = parser.parse_args() | ||
# add logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move arg parsing inside if __name__ == '__main__':
block to avoid argument parsing if imported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
send_messages.py
Outdated
print(f"Byte out of range (0-255): {data_byte}") | ||
bus.shutdown() | ||
return | ||
data_bytes.append(data_byte) | ||
except ValueError: | ||
logging.error("Invalid data byte input.") | ||
bus.shutdown() | ||
return | ||
|
||
# Create the CAN message | ||
msg = can.Message( | ||
arbitration_id=arbitration_id, data=data_bytes, is_extended_id=False | ||
) | ||
|
||
# Send message using bus.send() | ||
try: | ||
bus.send(msg) | ||
print( | ||
f"Message sent successfully!\n Message Details: ID={msg.arbitration_id}, Data={msg.data}" | ||
) | ||
except can.CanError as e: | ||
print(f"Message failed to send: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Please use logging.debug/info/warning/error
as appropriate instead of print
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits
read_can_messages.py
Outdated
"timestamp": message.timestamp, | ||
} | ||
|
||
# loop through signal definitions to find can_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not looping through
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
read_can_messages.py
Outdated
print(offset) | ||
print(signals_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can revert print statements or convert to debug logging statements for committed code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
read_can_messages.py
Outdated
if data_type == "float": | ||
if len(byte_array) < 4: | ||
logging.error( | ||
f"Insufficient data for float signal in CAN ID {can_id:0x}." | ||
) | ||
return | ||
else: | ||
# Unpack the first 4 bytes as a little-endian float. | ||
float_value = struct.unpack("<f", byte_array[:4])[0] | ||
logging.debug( | ||
f"New Message: ID={can_id:0x},Name={signal_name} Value={float_value}, Time Stamp={message_data['timestamp']}" | ||
) | ||
elif data_type == "boolean": | ||
bool_value = bool((byte_array[0] >> offset) & 1) | ||
logging.debug( | ||
f"New Message: ID={can_id:0x},Name={signal_name} Value={bool_value}, Time Stamp={message_data['timestamp']}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we differentiate between all data types in the data format? Also, can we do a more strict check against the expected length in bytes for each value?
# set up listener | ||
listener = MyListener() | ||
|
||
# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message in MyListener. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message in MyListener. | |
# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message_received in MyListener. |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's supposed to be on_message_received
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") | ||
args = parser.parse_args() | ||
transmit_can_message() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split out transmitting and reading functionality for the test script (i.e. add another "tx" argument to enable transmitting (a) message(s))
Parse data