buoy_api.interface¶
Interface Objects¶
class Interface(Node)
ROS2 Interface node for commanding and subscribing to buoy controllers and sensors.
Provides service clients and functions to send commands to and receive telemetry from the MBARI WEC controllers:
- AHRS
- Power
- Spring
- Battery
- Trefoil
If user has overridden one of these callbacks in their user-derived class, this will subscribe to the correct topic and use their callback implementation. If user did not define one, a subscriber for that topic will not be set up:
- self.ahrs_callback
- self.battery_callback
- self.spring_callback
- self.power_callback
- self.trefoil_callback
- self.powerbuoy_callback
- self.latent_callback (sim only)
__init__¶
def __init__(node_name,
wait_for_services=False,
check_for_services=True,
**kwargs)
Initialize the Interface node.
Arguments:
node_name
(str
): name of the ROS2 nodecheck_for_services
(bool
): if True, attempt to verify service availability before usewait_for_services
(bool
): if True and if check_for_services, block until all services are availablekwargs
: additional keyword arguments forwarded to ROS 2 Node
spin¶
def spin()
Sets up a MultiThreadedExecutor
and spins the node (blocking).
If you need non-blocking control over program flow, you may skip calling this function, but a
MultiThreadedExecutor
is required for this node. You may call non-blocking spin functions of a
MultiThreadedExecutor
in your own loop.
use_sim_time¶
def use_sim_time(enable=True)
Enable/Disable using sim time in Node clock from /clock.
Arguments:
enable
(bool
): True to use /clock, False to use system time
set_pc_pack_rate_param¶
def set_pc_pack_rate_param(rate_hz=50.0, blocking=True)
Set publish rate of PC Microcontroller telemetry.
Arguments:
rate_hz
(float
): desired publish rate in Hzblocking
(bool
): if True, wait for the service call to complete
set_sc_pack_rate_param¶
def set_sc_pack_rate_param(rate_hz=50.0, blocking=True)
Set publish rate of SC Microcontroller telemetry.
Arguments:
rate_hz
(float
): desired publish rate in Hzblocking
(bool
): if True, wait for the service call to complete
set_pc_pack_rate¶
def set_pc_pack_rate(rate_hz=50, blocking=True)
Set publish rate of PC Microcontroller telemetry.
Arguments:
rate_hz
(float
): desired publish rate in Hzblocking
(bool
): if True, wait for the service call to complete
set_sc_pack_rate¶
def set_sc_pack_rate(rate_hz=50, blocking=True)
Set publish rate of SC Microcontroller telemetry.
Arguments:
rate_hz
(float
): desired publish rate in Hzblocking
(bool
): if True, wait for the service call to complete
send_pump_command¶
def send_pump_command(duration_mins, blocking=True)
Turn pump on for a duration in minutes to raise mean piston position.
Arguments:
duration_mins
(float
): pump on duration in minutesblocking
(bool
): if True, wait for the service call to complete
send_valve_command¶
def send_valve_command(duration_sec, blocking=True)
Turn valve on for a duration in seconds to lower mean piston position.
Arguments:
duration_sec
(float
): valve on duration in secondsblocking
(bool
): if True, wait for the service call to complete
send_pc_wind_curr_command¶
def send_pc_wind_curr_command(wind_curr, blocking=True)
Set winding current setpoint to control piston damping.
Arguments:
wind_curr
(float
): wind current setpoint in Ampsblocking
(bool
): if True, wait for the service call to complete
send_pc_bias_curr_command¶
def send_pc_bias_curr_command(bias_curr, blocking=True)
Set bias current setpoint to control piston damping offset.
A High bias in either direction will move the piston back and forth
Arguments:
bias_curr
(float
): bias current setpoint in Ampsblocking
(bool
): if True, wait for the service call to complete
send_pc_scale_command¶
def send_pc_scale_command(scale, blocking=True)
Set damping gain.
Arguments:
scale
(float
): damping gainblocking
(bool
): if True, wait for the service call to complete
send_pc_retract_command¶
def send_pc_retract_command(retract, blocking=True)
Set additional damping gain in the piston retract direction.
Arguments:
retract
(float
): additional damping gain for retractionblocking
(bool
): if True, wait for the service call to complete
get_inc_wave_height¶
def get_inc_wave_height(self, x, y, t, use_buoy_origin, use_relative_time, timeout=2.0)
Request incident wave height at specific location(s) and time(s).
Arguments:
- x
(float
or list
): x position(s) of desired wave height(s)
- y
(float
or list
): y position(s) of desired wave height(s)
- t
(float
or list
): t time(s) in seconds of sim time of desired wave height(s)
- use_buoy_origin
(bool
): if True, x and y are relative to buoy origin
- use_relative_time
(bool
): if True, desired time(s) are relative to current sim time
else, time(s) will be absolute sim time
- timeout
(float
): if not None, wait for timeout sec for the service call to complete
else, wait forever
Returns:
- list
of IncWaveHeight
data
set_params¶
def set_params()
Set user-defined Node params (e.g. custom controller gains).
ahrs_callback¶
def ahrs_callback(data)
Override this function to subscribe to /ahrs_data to receive XBRecord telemetry.
Arguments:
data
: incoming XBRecord
battery_callback¶
def battery_callback(data)
Override this function to subscribe to /battery_data to receive BCRecord telemetry.
Arguments:
data
: incoming BCRecord
spring_callback¶
def spring_callback(data)
Override this function to subscribe to /spring_data to receive SCRecord telemetry.
Arguments:
data
: incoming SCRecord
power_callback¶
def power_callback(data)
Override this function to subscribe to /power_data to receive PCRecord telemetry.
Arguments:
data
: incoming PCRecord
trefoil_callback¶
def trefoil_callback(data)
Override this function to subscribe to /trefoil_data to receive TFRecord telemetry.
Arguments:
data
: incoming TFRecord
powerbuoy_callback¶
def powerbuoy_callback(data)
Override this function to subscribe to /powerbuoy_data to receive PBRecord telemetry.
PBRecord contains a slice of all microcontroller's telemetry data
Arguments:
data
: incoming PBRecord
latent_callback¶
def latent_callback(self, data)
Override this function to subscribe to /latent_data to receive sim-only LatentData.
Arguments:
data
: incoming LatentData