-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Integrate mobile robots by interfacing with airo-tulip (#147)
Adds mobile robot support with airo-tulip v0.1.0. --------- Co-authored-by: bitscuity <thomas@bitscuit.be>
- Loading branch information
1 parent
5cfb763
commit 86da798
Showing
9 changed files
with
250 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import math | ||
import time | ||
from threading import Thread | ||
|
||
import numpy as np | ||
from airo_robots.awaitable_action import AwaitableAction | ||
from airo_robots.drives.mobile_robot import CompliantLevel, MobileRobot | ||
from airo_tulip.api.client import KELORobile as KELORobileClient # type: ignore | ||
from airo_tulip.hardware.platform_driver import PlatformDriverType # type: ignore | ||
from airo_typing import Vector3DType | ||
|
||
|
||
class KELORobile(MobileRobot): | ||
"""KELO Robile platform implementation of the MobileRobot interface. | ||
The KELO Robile platform consists of several drives with castor wheels which are connected via EtherCAT and can | ||
be controlled individually. The airo-tulip API, which is used here, provides a higher level interface which | ||
controls the entire platform.""" | ||
|
||
def __init__(self, robot_ip: str, robot_port: int = 49789): | ||
"""Connect to the KELO robot. | ||
The KELO robot should already be running the airo-tulip server. If this is not the case, any messages | ||
sent from the client may be queued and executed once the server is started, resulting in unexpected | ||
and/or sudden movements. | ||
Args: | ||
robot_ip: IP address of the KELO CPU brick. | ||
robot_port: Port to connect on (default: 49789).""" | ||
|
||
self._kelo_robile = KELORobileClient(robot_ip, robot_port) | ||
|
||
# Position control. | ||
self._control_loop_done = True | ||
|
||
def align_drives(self, x: float, y: float, a: float, timeout: float = 1.0) -> AwaitableAction: | ||
"""Align all drives for driving in a direction given by the linear and angular velocities. | ||
Beware that sending any other velocity commands before the awaitable is done may cause unexpected behaviour, | ||
as this affects the "aligned" condition. | ||
Args: | ||
x: The x velocity (linear, m/s). | ||
y: The y velocity (linear, m/s) | ||
a: The angular velocity (rad/s). | ||
timeout: The awaitable will finish after this time at the latest. | ||
Returns: | ||
An AwaitableAction which will check if the drives are aligned, or if the timeout has expired.""" | ||
self._kelo_robile.align_drives(x, y, a, timeout=timeout) | ||
|
||
action_sent_time = time.time_ns() | ||
return AwaitableAction( | ||
lambda: self._kelo_robile.are_drives_aligned() or time.time_ns() - action_sent_time > timeout * 1e9, | ||
default_timeout=2 * timeout, | ||
default_sleep_resolution=0.002, | ||
) | ||
|
||
def set_platform_velocity_target(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction: | ||
self._kelo_robile.set_platform_velocity_target(x, y, a, timeout=timeout) | ||
|
||
action_sent_time = time.time_ns() | ||
return AwaitableAction( | ||
lambda: time.time_ns() - action_sent_time > timeout * 1e9, | ||
default_timeout=2 * timeout, | ||
default_sleep_resolution=0.002, | ||
) | ||
|
||
def _move_platform_to_pose_control_loop( | ||
self, target_pose: Vector3DType, action_start_time: float, action_timeout_time: float, timeout: float | ||
) -> None: | ||
stop = False | ||
while not stop: | ||
current_pose = self._kelo_robile.get_odometry() | ||
delta_pose = target_pose - current_pose | ||
|
||
vel_vec_angle = np.arctan2(delta_pose[1], delta_pose[0]) - current_pose[2] | ||
vel_vec_norm = min(np.linalg.norm(delta_pose[:2]), 0.5) | ||
vel_x = vel_vec_norm * np.cos(vel_vec_angle) | ||
vel_y = vel_vec_norm * np.sin(vel_vec_angle) | ||
|
||
delta_angle = np.arctan2(np.sin(delta_pose[2]), np.cos(delta_pose[2])) | ||
vel_a = max(min(delta_angle, math.pi / 4), -math.pi / 4) | ||
|
||
command_timeout = (action_timeout_time - time.time_ns()) * 1e-9 | ||
if command_timeout >= 0.0: | ||
self._kelo_robile.set_platform_velocity_target(vel_x, vel_y, vel_a, timeout=command_timeout) | ||
|
||
at_target_pose = bool(np.linalg.norm(delta_pose) < 0.01) | ||
stop = at_target_pose or time.time_ns() - action_start_time > timeout * 1e9 | ||
|
||
self._kelo_robile.set_platform_velocity_target(0.0, 0.0, 0.0) | ||
self._control_loop_done = True | ||
|
||
def move_platform_to_pose(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction: | ||
target_pose = np.array([x, y, a]) | ||
action_start_time = time.time_ns() | ||
action_timeout_time = action_start_time + timeout * 1e9 | ||
|
||
self._control_loop_done = False # Will be set to True by the below thread once it's finished. | ||
thread = Thread( | ||
target=self._move_platform_to_pose_control_loop, | ||
args=(target_pose, action_start_time, action_timeout_time, timeout), | ||
) | ||
thread.start() | ||
|
||
return AwaitableAction( | ||
lambda: self._control_loop_done, | ||
default_timeout=2 * timeout, | ||
default_sleep_resolution=0.002, | ||
) | ||
|
||
def enable_compliant_mode( | ||
self, enabled: bool, compliant_level: CompliantLevel = CompliantLevel.COMPLIANT_WEAK | ||
) -> None: | ||
if enabled: | ||
if compliant_level == CompliantLevel.COMPLIANT_WEAK: | ||
self._kelo_robile.set_driver_type(PlatformDriverType.COMPLIANT_WEAK) | ||
elif compliant_level == CompliantLevel.COMPLIANT_MODERATE: | ||
self._kelo_robile.set_driver_type(PlatformDriverType.COMPLIANT_MODERATE) | ||
else: | ||
self._kelo_robile.set_driver_type(PlatformDriverType.COMPLIANT_STRONG) | ||
else: | ||
self._kelo_robile.set_driver_type(PlatformDriverType.VELOCITY) | ||
|
||
def get_odometry(self) -> Vector3DType: | ||
return self._kelo_robile.get_odometry() | ||
|
||
def get_velocity(self) -> Vector3DType: | ||
return self._kelo_robile.get_velocity() | ||
|
||
def reset_odometry(self) -> None: | ||
self._kelo_robile.reset_odometry() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
For information on how to set up the KELO Robile, please refer to the documentation of [airo-tulip](https://pypi.org/project/airo-tulip/). |
31 changes: 31 additions & 0 deletions
31
airo-robots/airo_robots/drives/hardware/manual_robot_testing.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
"""code for manual testing of mobile robot base class implementations. | ||
""" | ||
from airo_robots.drives.hardware.kelo_robile import KELORobile | ||
from airo_robots.drives.mobile_robot import MobileRobot | ||
|
||
|
||
def manually_test_robot_implementation(robot: MobileRobot) -> None: | ||
if isinstance(robot, KELORobile): | ||
input("robot will rotate drives") | ||
robot.align_drives(0.1, 0.0, 0.0, 1.0).wait() | ||
|
||
input("robot will now move forward 10cm") | ||
robot.set_platform_velocity_target(0.1, 0.0, 0.0, 1.0).wait() | ||
|
||
input("robot will now move left 10cm") | ||
robot.set_platform_velocity_target(0.0, 0.1, 0.0, 1.0).wait() | ||
|
||
input("robot will now make two short rotations") | ||
robot.set_platform_velocity_target(0.0, 0.0, 0.1, 1.0).wait() | ||
robot.set_platform_velocity_target(0.0, 0.0, -0.1, 1.0).wait() | ||
|
||
input("robot will now return to original position") | ||
robot.set_platform_velocity_target(-0.1, -0.1, 0.0, 1.0).wait() | ||
|
||
input("robot will now move to 10 cm along +X relative from its starting position with position control") | ||
robot.move_platform_to_pose(0.1, 0.0, 0.0, 10.0).wait() | ||
|
||
|
||
if __name__ == "__main__": | ||
mobi = KELORobile("10.10.129.21") | ||
manually_test_robot_implementation(mobi) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from abc import ABC, abstractmethod | ||
from enum import Enum | ||
|
||
from airo_robots.awaitable_action import AwaitableAction | ||
from airo_typing import Vector3DType | ||
|
||
|
||
class CompliantLevel(Enum): | ||
"""The level of compliance expected from the mobile robot. | ||
Values may not correspond to identical behaviour on different mobile platforms, but are merely an indication. | ||
A value of weak means a very compliant robot, whereas a value of strong means a slightly compliant robot.""" | ||
|
||
COMPLIANT_WEAK = 1 | ||
COMPLIANT_MODERATE = 2 | ||
COMPLIANT_STRONG = 3 | ||
|
||
|
||
class MobileRobot(ABC): | ||
""" | ||
Base class for a mobile robot. | ||
Mobile robots typically allow to set a target velocity for the entire platform and apply torques to their wheels | ||
to achieve the desired velocity. | ||
All values are in metric units: | ||
- Linear velocities are expressed in meters/second | ||
- Angular velocities are expressed in radians/second | ||
""" | ||
|
||
@abstractmethod | ||
def set_platform_velocity_target(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction: | ||
"""Set the desired platform velocity. | ||
Args: | ||
x: Linear velocity along the X axis. | ||
y: Linear velocity along the Y axis. | ||
a: Angular velocity. | ||
timeout: After this time, the platform will automatically stop. | ||
Returns: | ||
An awaitable action.""" | ||
|
||
@abstractmethod | ||
def move_platform_to_pose(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction: | ||
"""Move the platform to the given pose, without guarantees about the followed path. | ||
Args: | ||
x: Position along the startup pose's X axis. | ||
y: Position along the startup pose's Y axis. | ||
a: Orientation around the startup pose's Z axis. | ||
timeout: After this time, the platform will automatically stop. | ||
Returns: | ||
An awaitable action.""" | ||
|
||
@abstractmethod | ||
def enable_compliant_mode(self, enabled: bool, compliant_level: CompliantLevel) -> None: | ||
"""Enable compliant mode on the robot. | ||
Args: | ||
enabled: If true, will enable compliant mode. Else, will disable compliant mode. | ||
compliant_level: The level of compliance to be expected from the robot. Ignored if `enabled` is `False`.""" | ||
|
||
@abstractmethod | ||
def get_odometry(self) -> Vector3DType: | ||
"""Get the estimated robot pose as a 3D vector comprising the `x`, `y`, and `theta` values relative to the | ||
robot's starting pose. | ||
Returns: A 3D vector with a value for the `x` position, `y` position, and `theta` angle of the robot.""" | ||
|
||
@abstractmethod | ||
def reset_odometry(self) -> None: | ||
"""Reset the robot's odometry to `(0, 0, 0)`.""" | ||
|
||
@abstractmethod | ||
def get_velocity(self) -> Vector3DType: | ||
"""Get the estimated robot's velocity as a 3D vector comprising the `x`, `y` and `theta` values relative to the | ||
robot's starting pose. | ||
Returns: | ||
A 3D vector with a value for the `x` velocity, `y` velocity, and `theta` angular velocity of the robot.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters