Modified rmp_interface.py
Code: Select all
\file rmp_interface.py
\brief This module contains the interface to the RMP
\Platform: Ubuntu 12.04 (Linux)
--------------------------------------------------------------------"""
from crc16 import compute_buffer_crc
from ocu_menus import index_menu
from utils import *
from system_defines import *
import pygame,time,re,math
"""
ROS Related imports
"""
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
from std_msgs.msg import Bool
from std_msgs.msg import Float64
from std_msgs.msg import Float32
#"""
#Callback related functions/methods
#"""
rawVelocity = 0.0;
rawYaw = 0.0;
angle = 0.0;
"""
Main class for the RMP interface
"""
class RMP:
def __init__(self,platform,bitmaps,gamepad):
"""
Initialize the variables used for the main screen
"""
self.platform = platform;
self.operational_state = "OFFLINE";
self.operational_time = 0.0;
self.min_prop_batt_soc = 0.0;
self.aux_batt_soc = 0.0;
self.fsw_1 = 0;
self.fsw_2 = 0;
self.fsw_3 = 0;
self.fsw_4 = 0;
self.mcu_0_fsw = 0;
self.mcu_1_fsw = 0;
self.mcu_2_fsw = 0;
self.mcu_3_fsw = 0;
"""
Initialize the bitmaps and create a dictionary for holding the
user defined feedback data
"""
self.bitmap = [];
self.bitmap.append(bitmaps[0]);
self.bitmap.append(bitmaps[1]);
self.bitmap.append(bitmaps[2]);
self.bitmap.append(bitmaps[3]);
self.user_defined_feedback = dict();
"""
Determine the way the variables should be converted to human readable form. There are
four ways, floating point, hex, IP strings, and the default is integer
"""
item = 0;
for x in range(0,4):
if (x == 0):
fp_mask = RMP_FLOATING_POINT_FEEDBACK_1_MASK;
hex_mask = RMP_HEX_FEEDBACK_1_MASK;
ip_mask = RMP_IP_FEEDBACK_1_MASK;
elif (x == 1):
fp_mask = RMP_FLOATING_POINT_FEEDBACK_2_MASK;
hex_mask = RMP_HEX_FEEDBACK_2_MASK;
ip_mask = RMP_IP_FEEDBACK_2_MASK;
elif (x == 2):
fp_mask = RMP_FLOATING_POINT_FEEDBACK_3_MASK;
hex_mask = RMP_HEX_FEEDBACK_3_MASK;
ip_mask = RMP_IP_FEEDBACK_3_MASK;
else:
fp_mask = RMP_FLOATING_POINT_FEEDBACK_4_MASK;
hex_mask = RMP_HEX_FEEDBACK_4_MASK;
ip_mask = RMP_IP_FEEDBACK_4_MASK;
for i in range(0,32):
if (self.bitmap[x] & (1<<i)):
if (fp_mask & (1<<i)):
self.user_defined_feedback[item] = 0.0;
elif (hex_mask & (1 << i)):
self.user_defined_feedback[item] = hex(0);
elif (ip_mask & (1 << i)):
self.user_defined_feedback[item] = numToDottedQuad(0);
else:
self.user_defined_feedback[item] = 0;
item +=1;
"""
Initialize all the command variables
"""
self.commands = [0] * 3;
self.disable_request = False
self.deadman_switch = 0;
self.powerdown_request = False;
self.standby_request = False;
self.tractor_request = False;
self.balance_request = False;
self.decel_request = False;
self.audio_request = False;
self.velocity_cmd_f = 0.0;
self.yaw_cmd_f = 0.0;
self.raw_velocity_cmd_f = 0.0;
self.raw_yaw_cmd_f = 0.0;
self.angle_cmd_f = 90.0;
self.general_purpose_command = RMP_CMD_NONE;
self.prev_gp_cmd = RMP_CMD_NONE;
self.general_purpose_param = 0;
self.exit_ocu = False;
rospy.init_node('segwaydriver', anonymous=True)
def modeCallback(self, data):
if(data.data == "STANDBY"):
self.standby_request = True;
else:
self.standby_request = False;
if(data.data == "TRACTOR"):
self.tractor_request = True;
else:
self.tractor_request = False
if(data.data == "BALANCE"):
self.balance_request = True;
else:
self.balance_request = False;
def disableCallback(self, data):
self.disable_request = data.data;
def powerCallback(self, data):
self.powerdown_request = data.data;
def decelCallback(self, data):
self.decel_request = data.data;
def audioCallback(self, data):
if (data.data):
self.audio_request = True;
else:
self.audio_request = False;
def rawVelocityCmdCallback(self, data):
#self.raw_velocity_cmd_f = data.data;
if(self.velocity_cmd_f != data.data):
self.velocity_cmd_f = data.data;
def rawYawCmdCallback(self, data):
#self.raw_yaw_cmd_f = data.data;
self.yaw_cmd_f = data.data;
def angleCmdCallback(self, data):
self.angle_cmd_f = data.data;
def update_rmp_commands(self,control,mouse_cmds,joystick=None):
"""
If there is a joystick input use it
"""
if ("Xbox 360" == control):
self.deadman_switch = joystick.get_axis(2);
self.standby_request = joystick.get_button(STANDBY_BUTTON);
self.tractor_request = joystick.get_button(TRACTOR_BUTTON);
self.balance_request = joystick.get_button(BALANCE_BUTTON);
self.decel_request = joystick.get_button(DECEL_BUTTON);
self.powerdown_request = joystick.get_button(POWERDOWN_BUTTON);
self.disable_request = joystick.get_button(DISABLE_BUTTON);
"""
Audio request by pressing A
"""
if (joystick.get_button(0)):
self.audio_request = True;
else:
self.audio_request = False;
"""
If the left trigger is pulled let the commands propagate otherwise
set them to zero.
"""
if (self.deadman_switch > 0.75):
self.raw_velocity_cmd_f = -0.25 * joystick.get_axis(1);
self.raw_yaw_cmd_f = joystick.get_axis(4);
else:
self.raw_velocity_cmd_f = 0.0;
self.raw_yaw_cmd_f = 0.0;
"""
Deadband the commands to account for the poor return to zero.
"""
self.velocity_cmd_f = deadband(self.raw_velocity_cmd_f,0.10);
self.yaw_cmd_f = deadband(self.raw_yaw_cmd_f,0.10);
elif ("Logitech" == control):
self.deadman_switch = joystick.get_button(LOGITECH_BUTTON_7);
self.standby_request = joystick.get_button(LOGITECH_BUTTON_3);
self.tractor_request = joystick.get_button(LOGITECH_BUTTON_1);
self.decel_request = joystick.get_button(LOGITECH_BUTTON_6);
self.powerdown_request = joystick.get_button(LOGITECH_BUTTON_10);
self.disable_request = joystick.get_button(LOGITECH_BUTTON_5);
"""
If the left button is pressed let the commands propagate otherwise
set them to zero.
"""
if (True == self.deadman_switch):
self.raw_velocity_cmd_f = -0.25 * joystick.get_axis(1);
self.raw_yaw_cmd_f = joystick.get_axis(2);
else:
self.raw_velocity_cmd_f = 0.0;
self.raw_yaw_cmd_f = 0.0;
"""
Audio request by pressing 4 and 2 together
"""
if (joystick.get_button(LOGITECH_BUTTON_4)):
self.audio_request = True;
else:
self.audio_request = False;
"""
Deadband the commands to account for the poor return to zero.
"""
self.velocity_cmd_f = deadband(self.raw_velocity_cmd_f,0.10);
self.yaw_cmd_f = deadband(self.raw_yaw_cmd_f,0.10);
elif ("Extreme 3D" == control):
"""
This input mapping is done for the Logitech Extreme 3D controller
exclusively used for the OMNI. If you wish to change it
you must figure out buttons and update this the command structure
differs and therefor this must be used for the OMNI and the OMNI
only.
"""
self.deadman_switch = joystick.get_button(LOGITECH_BUTTON_1);
self.standby_request = joystick.get_button(LOGITECH_BUTTON_3);
self.tractor_request = joystick.get_button(LOGITECH_BUTTON_4);
self.decel_request = joystick.get_button(LOGITECH_BUTTON_5);
self.powerdown_request = joystick.get_button(LOGITECH_BUTTON_8);
self.disable_request = joystick.get_button(LOGITECH_BUTTON_6);
"""
Audio request by pressing Y and A together
"""
if (joystick.get_button(10)) and (joystick.get_button(11)):
self.audio_request = True;
else:
self.audio_request = False;
"""
If the left trigger is pulled let the commands propagate otherwise
set them to zero.
"""
if (self.deadman_switch):
x = deadband(joystick.get_axis(0),0.20);
y = deadband(-joystick.get_axis(1),0.20);
self.yaw_cmd_f = -1.0 * deadband(joystick.get_axis(3),0.20);
self.angle_cmd_f = math.atan2(y,x) * (180.0/math.pi);
if (self.angle_cmd_f < 0.0):
self.angle_cmd_f = 360 + self.angle_cmd_f;
self.velocity_cmd_f = math.sqrt(math.pow(x,2) + math.pow(y,2))
self.velocity_cmd_f = clamp_value_f(self.velocity_cmd_f,0,1.0);
if (RMP_OMNI_PLATFORMS != self.platform):
self.velocity_cmd_f = y
self.yaw_cmd_f = x
else:
"""
Intentionally leave the angle alone, we do not want to change course when the deadman is let go
just come to a stop
"""
self.velocity_cmd_f = 0.0;
self.yaw_cmd_f = 0.0;
elif (control == "Mouse"):
"""
Mouse commands are given by the input stucture, just use
them. Debounce and deadman logic are done in the GUI callbacks
"""
if (mouse_cmds[2] == DISABLE_REQUEST):
self.disable_request = True;
elif (mouse_cmds[2] == POWERDOWN_REQUEST):
self.powerdown_request = True;
elif (mouse_cmds[2]== DTZ_REQUEST):
self.decel_request = True;
elif (mouse_cmds[2] == STANDBY_REQUEST):
self.standby_request = True;
elif (mouse_cmds[2] == TRACTOR_REQUEST):
self.tractor_request = True;
elif (mouse_cmds[2] == BALANCE_REQUEST):
self.balance_request = True;
elif (mouse_cmds[2] == 10):
self.audio_request = True;
else:
self.standby_request = False;
self.tractor_request = False;
self.balance_request = False;
self.decel_request = False;
self.powerdown_request = False;
self.disable_request = False;
self.audio_request = False;
self.velocity_cmd_f = mouse_cmds[0];
self.yaw_cmd_f = mouse_cmds[1];
elif (control == "Autonomous"):
#Node subscribes to topics for every segway command/state
#rospy.init_node('segway', anonymous = False)
rate = rospy.Rate(7.36) #2.9 Hz
#mode Topic correspods to the modes: standby,tractor,balance
rospy.Subscriber("modeRequest", String, self.modeCallback)
#Power Related Topics and Subscriptions
rospy.Subscriber("disableRequest", Bool, self.disableCallback)
rospy.Subscriber("powerdownRequest", Bool, self.powerCallback)
rospy.Subscriber("decelRequest", Bool, self.decelCallback)
#Audio Request Node
rospy.Subscriber("audioRequest", Bool, self.audioCallback)
#Motor request topics
rospy.Subscriber("raw_velocity_cmd_f_value", Float32, self.rawVelocityCmdCallback)
rospy.Subscriber("raw_yaw_cmd_f_value", Float32, self.rawYawCmdCallback)
rospy.Subscriber("angle_cmd_f_value", Float32, self.angleCmdCallback)
#rospy.spin()
rate.sleep()
else:
self.velocity_cmd_f = 0;
self.yaw_cmd_f = 0;
self.general_purpose_command = 0;
self.general_purpose_param = 0;
"""
Assign the commands. If there is a mode request use the CFG message. Otherwise it is
just the regular command
"""
if (True == self.disable_request):
self.general_purpose_command = RMP_CMD_SET_OPERATIONAL_MODE;
self.general_purpose_param = DISABLE_REQUEST;
elif (True == self.powerdown_request):
self.general_purpose_command = RMP_CMD_SET_OPERATIONAL_MODE;
self.general_purpose_param = POWERDOWN_REQUEST;
elif (True == self.decel_request):
self.general_purpose_command = RMP_CMD_SET_OPERATIONAL_MODE;
self.general_purpose_param = DTZ_REQUEST;
elif (True == self.standby_request):
self.general_purpose_command = RMP_CMD_SET_OPERATIONAL_MODE;
self.general_purpose_param = STANDBY_REQUEST;
elif (True == self.tractor_request):
self.general_purpose_command = RMP_CMD_SET_OPERATIONAL_MODE;
self.general_purpose_param = TRACTOR_REQUEST;
elif (True == self.balance_request):
self.general_purpose_command = RMP_CMD_SET_OPERATIONAL_MODE;
self.general_purpose_param = BALANCE_REQUEST;
elif (True == self.audio_request):
self.general_purpose_command = RMP_CMD_SET_AUDIO_COMMAND;
self.general_purpose_param = MOTOR_AUDIO_TEST_SWEEP;
else:
self.general_purpose_command = RMP_CMD_NONE;
self.general_purpose_param = 0;
if (self.general_purpose_command == self.prev_gp_cmd):
self.general_purpose_command = RMP_CMD_NONE;
self.general_purpose_param = 0;
self.prev_gp_cmd = self.general_purpose_command;
"""
now populate the message
"""
if self.general_purpose_command == RMP_CMD_NONE:
if (self.platform == RMP_OMNI_PLATFORMS):
vel_cmd = int(self.velocity_cmd_f * Q15);
yaw_cmd = int(self.yaw_cmd_f * Q15);
ang_cmd = convert_float_to_u32(self.angle_cmd_f);
self.commands[0] = int(RMP_OMNI_CAN_CMD_ID);
self.commands[1] = (((vel_cmd << 16) & 0xFFFF0000) | (yaw_cmd & 0x0000FFFF));
self.commands[2] = int(ang_cmd);
else:
vel_cmd = convert_float_to_u32(self.velocity_cmd_f);
yaw_cmd = convert_float_to_u32(self.yaw_cmd_f);
self.commands[0] = int(RMP_CAN_CMD_ID);
self.commands[1] = int(vel_cmd);
self.commands[2] = int(yaw_cmd);
else:
self.commands[0] = int(RMP_CFG_CMD_ID);
self.commands[1] = int(self.general_purpose_command);
self.commands[2] = int(self.general_purpose_param);
def convert_rmp_data(self,data):
"""
Convert Standard Variables
"""
self.fsw_1 = data[CCU_FSW_1_INDEX];
self.fsw_2 = data[CCU_FSW_2_INDEX];
self.fsw_3 = data[CCU_FSW_3_INDEX];
self.fsw_4 = data[CCU_FSW_4_INDEX];
self.mcu_0_fsw = data[CCU_MCU_0_FSW_INDEX];
self.mcu_1_fsw = data[CCU_MCU_1_FSW_INDEX];
self.mcu_2_fsw = data[CCU_MCU_2_FSW_INDEX];
self.mcu_3_fsw = data[CCU_MCU_3_FSW_INDEX];
self.operational_time = round(convert_u32_to_float(data[CCU_FRAME_COUNT_INDEX]),2);
self.min_prop_batt_soc = round(convert_u32_to_float(data[CCU_MIN_PROP_SOC_INDEX]),1);
self.aux_batt_soc = round(convert_u32_to_float(data[CCU_AUX_BATT_SOC_INDEX]),1);
"""
Decode the operational state
"""
if ((data[CCU_OPERATIONAL_STATE_INDEX] == 0) or (data[CCU_OPERATIONAL_STATE_INDEX] == 1) or (data[CCU_OPERATIONAL_STATE_INDEX] == 2)):
self.operational_state = "SYS INIT"
elif (data[CCU_OPERATIONAL_STATE_INDEX] == 3):
self.operational_state = "STANDBY"
elif (data[CCU_OPERATIONAL_STATE_INDEX] == 4):
self.operational_state = "TRACTOR"
elif (data[CCU_OPERATIONAL_STATE_INDEX] == 5):
self.operational_state = "DISABLE"
else:
self.operational_state = "DISABLE"
"""
Now put all of the data into a dictionary with the variable name and the interpreted value
"""
self.user_defined_feedback.clear();
item = 0;
for x in range(0,4):
if (x == 0):
fp_mask = RMP_FLOATING_POINT_FEEDBACK_1_MASK;
hex_mask = RMP_HEX_FEEDBACK_1_MASK;
ip_mask = RMP_IP_FEEDBACK_1_MASK;
elif (x == 1):
fp_mask = RMP_FLOATING_POINT_FEEDBACK_2_MASK;
hex_mask = RMP_HEX_FEEDBACK_2_MASK;
ip_mask = RMP_IP_FEEDBACK_2_MASK;
elif (x == 2):
fp_mask = RMP_FLOATING_POINT_FEEDBACK_3_MASK;
hex_mask = RMP_HEX_FEEDBACK_3_MASK;
ip_mask = RMP_IP_FEEDBACK_3_MASK;
else:
fp_mask = RMP_FLOATING_POINT_FEEDBACK_4_MASK;
hex_mask = RMP_HEX_FEEDBACK_4_MASK;
ip_mask = RMP_IP_FEEDBACK_4_MASK;
for i in range(0,32):
if (self.bitmap[x] & (1<<i)):
if (fp_mask & (1<<i)):
self.user_defined_feedback[item] = round(convert_u32_to_float(data[item]),3);
elif (hex_mask & (1 << i)):
self.user_defined_feedback[item] = hex(data[item]);
elif (ip_mask & (1 << i)):
self.user_defined_feedback[item] = numToDottedQuad(data[item]);
else:
self.user_defined_feedback[item] = int(data[item]);
item +=1;
def Convert_RMP_Cmds_for_Serial_Interface(commands):
"""
Convert a set of commands for the serial interface
"""
rmp_cmd = [0]*NUM_USB_ETH_BYTES;
rmp_cmd[RMP_USB_ETH_CAN_ID_HIGH_INDEX] = int((commands[0] & 0xFF00) >> 8);
rmp_cmd[RMP_USB_ETH_CAN_ID_LOW_INDEX] = int((commands[0] & 0x00FF));
rmp_cmd[RMP_USB_ETH_CAN_DATA_0_INDEX] = int((commands[1] & 0xFF000000) >> 24);
rmp_cmd[RMP_USB_ETH_CAN_DATA_1_INDEX] = int((commands[1] & 0x00FF0000) >> 16);
rmp_cmd[RMP_USB_ETH_CAN_DATA_2_INDEX] = int((commands[1] & 0x0000FF00) >> 8);
rmp_cmd[RMP_USB_ETH_CAN_DATA_3_INDEX] = int((commands[1] & 0x000000FF));
rmp_cmd[RMP_USB_ETH_CAN_DATA_4_INDEX] = int((commands[2] & 0xFF000000) >> 24);
rmp_cmd[RMP_USB_ETH_CAN_DATA_5_INDEX] = int((commands[2] & 0x00FF0000) >> 16);
rmp_cmd[RMP_USB_ETH_CAN_DATA_6_INDEX] = int((commands[2] & 0x0000FF00) >> 8);
rmp_cmd[RMP_USB_ETH_CAN_DATA_7_INDEX] = int((commands[2] & 0x000000FF));
"""
Compute the CRC for the command
"""
compute_buffer_crc(rmp_cmd,NUM_USB_ETH_BYTES);
"""
Convert the string to char data and return it
"""
rmp_cmd_chars = [];
for x in range(0,len(rmp_cmd)):
rmp_cmd_chars.append(chr(rmp_cmd[x]));
output = ''.join(rmp_cmd_chars);
return output;