diff --git a/python/examples/pose_relay.py b/python/examples/pose_relay.py new file mode 100644 index 00000000..66b8021d --- /dev/null +++ b/python/examples/pose_relay.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 + +"""Relay a pose from a source device to a target device via ExternalPoseInput, +accounting for source/target lever arms and target body mounting with respect to +source body. +""" + +from datetime import datetime +import os +import sys +import time + +import numpy as np +import pymap3d as pm + +# Add the Python root directory (fusion-engine-client/python/) to the import +# search path to enable FusionEngine imports if this application is being run +# directly out of the repository and is not installed as a pip package. +root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.insert(0, root_dir) + +from fusion_engine_client.analysis.attitude import ( + get_enu_rotation_matrix, euler_angles_to_dcm, dcm_to_euler_angles, +) +from fusion_engine_client.messages import MessageType, PoseMessage +from fusion_engine_client.messages.measurements import ( + ExternalPoseInput, SystemTimeSource, +) +from fusion_engine_client.messages.defs import SolutionType, Timestamp +from fusion_engine_client.parsers import FusionEngineDecoder, FusionEngineEncoder +from fusion_engine_client.utils import trace as logging +from fusion_engine_client.utils.argument_parser import ArgumentParser +from fusion_engine_client.utils.transport_utils import * +from fusion_engine_client.utils.bin_utils import bytes_to_hex + +logger = logging.getLogger('point_one.fusion_engine.pose_relay') + +# Coordinate frame abbreviations: +# enu - Local navigation frame (East, North, Up) +# ecef - Earth-Centered, Earth-Fixed frame +# sb - Source body frame +# tb - Target body frame + +# Calculates the target YPR in the local navigation frame given the source YPR +# in the local navigation frame and the target-to-source-body rotation. +def calculate_ypr_enu_to_tb_deg(ypr_enu_to_sb_deg, + rotmat_tb_to_sb) -> np.ndarray: + rotmat_enu_to_sb = \ + euler_angles_to_dcm(ypr_enu_to_sb_deg[::-1], order='321', deg=True) + rotmat_sb_to_enu = rotmat_enu_to_sb.T + rotmat_tb_to_enu = rotmat_sb_to_enu @ rotmat_tb_to_sb + rpy = dcm_to_euler_angles(rotmat_tb_to_enu.T, order='321', deg=True) + return rpy[::-1] + + +# Calculates the Jacobian of the target YPR with respect to the source YPR, +# given the target-to-source-body rotation. +def calculate_jacobian_ypr_target_wrt_source(ypr_enu_to_sb_deg, + rotmat_tb_to_sb, + delta_deg=1e-6) -> np.ndarray: + # Numerical approximation of the Jacobian of the target YPR with respect to + # the source YPR, using central differences. + jacobian = np.zeros((3, 3)) + for j in range(3): + ypr_enu_to_sb_deg_plus = ypr_enu_to_sb_deg.copy() + ypr_enu_to_sb_deg_minus = ypr_enu_to_sb_deg.copy() + ypr_enu_to_sb_deg_plus[j] += delta_deg + ypr_enu_to_sb_deg_minus[j] -= delta_deg + ypr_enu_to_tb_deg_plus = \ + calculate_ypr_enu_to_tb_deg(ypr_enu_to_sb_deg_plus, rotmat_tb_to_sb) + ypr_enu_to_tb_deg_minus = \ + calculate_ypr_enu_to_tb_deg(ypr_enu_to_sb_deg_minus, rotmat_tb_to_sb) + jacobian[:, j] = \ + (ypr_enu_to_tb_deg_plus - ypr_enu_to_tb_deg_minus) / (2.0 * delta_deg) + return jacobian + + +# Helper function to transform source pose to target pose. +def transform_pose_to_target(source_pose_msg, + source_lever_arm_m, target_lever_arm_m, + ypr_tb_to_sb_deg, vec_tb_to_sb_m) -> dict: + have_target_rot = not np.allclose(ypr_tb_to_sb_deg, 0.0) + if have_target_rot: + rotmat_tb_to_sb = \ + euler_angles_to_dcm(ypr_tb_to_sb_deg[::-1], order='321', deg=True) + else: + rotmat_tb_to_sb = np.eye(3) + + # ---------- Position ---------- + source_position_ecef_m = np.array( + pm.geodetic2ecef(source_pose_msg.lla_deg[0], + source_pose_msg.lla_deg[1], + source_pose_msg.lla_deg[2])) + + rotmat_ecef_to_enu = get_enu_rotation_matrix( + source_pose_msg.lla_deg[0], source_pose_msg.lla_deg[1], deg=True) + rotmat_enu_to_ecef = rotmat_ecef_to_enu.T + + # We will still pass through NAN attitude, but for the sake of + # the position transformation we can treat it as no rotation. + ypr_enu_to_sb_deg = source_pose_msg.ypr_deg + if np.any(np.isnan(ypr_enu_to_sb_deg)): + rotmat_enu_to_sb = np.eye(3) + else: + rotmat_enu_to_sb = euler_angles_to_dcm( + ypr_enu_to_sb_deg[::-1], order='321', deg=True) + rotmat_sb_to_enu = rotmat_enu_to_sb.T + rotmat_sb_to_ecef = rotmat_enu_to_ecef @ rotmat_sb_to_enu + + vec_tb_to_sb_in_ecef_m = rotmat_sb_to_ecef @ vec_tb_to_sb_m + + # Apply lever arm correction to get target output position in + # ECEF. + # target_position_ecef_m = + # source_position_ecef_m - (source lever arm in ECEF) + # + vec_tb_to_sb_in_ecef_m + (target lever arm in ECEF) + target_position_ecef_m = source_position_ecef_m \ + - rotmat_sb_to_ecef @ source_lever_arm_m + vec_tb_to_sb_in_ecef_m \ + + rotmat_sb_to_ecef @ rotmat_tb_to_sb @ target_lever_arm_m + + source_position_enu_cov_m2 = np.diag(source_pose_msg.position_std_enu_m ** 2) + target_position_ecef_cov_m2 = \ + rotmat_enu_to_ecef @ source_position_enu_cov_m2 @ rotmat_ecef_to_enu + target_position_ecef_std_m = np.sqrt(np.diag(target_position_ecef_cov_m2)) + + # ---------- Attitude ---------- + if have_target_rot and not np.any(np.isnan(ypr_enu_to_sb_deg)): + # YPR does not transform linearly, so we need to + # pre and post multiply by the Jacobian. + ypr_enu_to_tb_deg = calculate_ypr_enu_to_tb_deg( + ypr_enu_to_sb_deg, rotmat_tb_to_sb) + jacobian = calculate_jacobian_ypr_target_wrt_source( + ypr_enu_to_sb_deg, rotmat_tb_to_sb) + ypr_enu_to_sb_cov_deg2 = np.diag(source_pose_msg.ypr_std_deg ** 2) + ypr_enu_to_tb_cov_deg2 = jacobian @ ypr_enu_to_sb_cov_deg2 @ jacobian.T + ypr_enu_to_tb_std_deg = np.sqrt(np.diag(ypr_enu_to_tb_cov_deg2)) + else: + ypr_enu_to_tb_deg = ypr_enu_to_sb_deg.copy() + ypr_enu_to_tb_std_deg = source_pose_msg.ypr_std_deg.copy() + + # ---------- Velocity ---------- + # IMPORTANT!: Assume target is not moving relative to source, + # and that the source angular rate is zero, whether or not + # the source velocity is non-zero. We print a warning here + # just as a reminder. + + have_source_vel = not np.any(np.isnan(source_pose_msg.velocity_body_mps)) + if have_source_vel: + velocity_sb_mps = source_pose_msg.velocity_body_mps + # source_velocity_enu_mps = (source body velocity in ENU) + # + (source velocity relative to source body in ENU) + # Last term is assumed zero. + source_velocity_enu_mps = rotmat_sb_to_enu @ velocity_sb_mps + + # target_velocity_enu_mps = source_velocity_enu_mps + # + (target velocity wrt source in ENU) + # Last term is assumed zero. + target_velocity_enu_mps = source_velocity_enu_mps + velocity_sb_cov_m2ps2 = np.diag(source_pose_msg.velocity_std_body_mps ** 2) + target_velocity_enu_cov_m2ps2 = \ + rotmat_sb_to_enu @ velocity_sb_cov_m2ps2 @ rotmat_enu_to_sb + target_velocity_enu_std_mps = \ + np.sqrt(np.diag(target_velocity_enu_cov_m2ps2)) + else: + target_velocity_enu_mps = np.full(3, np.nan) + target_velocity_enu_std_mps = np.full(3, np.nan) + + return { + "position_ecef_m": target_position_ecef_m, + "position_std_ecef_m": target_position_ecef_std_m, + "ypr_deg": ypr_enu_to_tb_deg, + "ypr_std_deg": ypr_enu_to_tb_std_deg, + "velocity_enu_mps": target_velocity_enu_mps, + "velocity_std_enu_mps": target_velocity_enu_std_mps, + } + + +# Helper function to convert target pose to ExternalPoseInput message. +def transformed_pose_to_external_pose_input(source_pose_msg, transformed_pose) \ + -> ExternalPoseInput: + ext_pose = ExternalPoseInput() + + # Timestamps. + if source_pose_msg.gps_time: + ext_pose.details.measurement_time = source_pose_msg.gps_time + ext_pose.details.measurement_time_source = SystemTimeSource.GPS_TIME + else: + ext_pose.details.measurement_time = Timestamp() + ext_pose.details.measurement_time_source = SystemTimeSource.INVALID + + ext_pose.solution_type = source_pose_msg.solution_type + ext_pose.flags = ExternalPoseInput.FLAG_RESET_POSITION_DATA + + ext_pose.position_ecef_m = transformed_pose["position_ecef_m"].astype(np.float64) + ext_pose.position_std_ecef_m = transformed_pose["position_std_ecef_m"].astype(np.float32) + + ext_pose.ypr_deg = transformed_pose["ypr_deg"].astype(np.float32) + ext_pose.ypr_std_deg = transformed_pose["ypr_std_deg"].astype(np.float32) + + ext_pose.velocity_enu_mps = transformed_pose["velocity_enu_mps"].astype(np.float32) + ext_pose.velocity_std_enu_mps = transformed_pose["velocity_std_enu_mps"].astype(np.float32) + + return ext_pose + + +def main(): + parser = ArgumentParser(description="""Relay pose from a source device to +a target device via ExternalPoseInput. + +Connects to a source device, receives PoseMessage outputs, transforms the first +valid pose while accounting for lever arm differences and target mounting, and +sends an ExternalPoseInput message to the target device. + +Lever arms are specified in each body frame (+x forward, +y left, +z up) for +each source and target, and represent the position of each device's output point +(i.e., the point whose position appears in the PoseMessage or ExternalPoseInput) +relative to the body origin. + +Target rotation is specified as Yaw-Pitch-Roll (YPR, degrees), describing the +orientation of the target device's body frame relative to the source's body +frame. See @ref euler_angles_to_dcm for rotation convention details. + +Target translation is specified as XYZ (meters), describing the position of the +target device's body frame origin relative to the source's body frame origin. + +Examples: + # Source at body origin, target at [1, 0, 0] (1m forward), no rotation + # or translation: + ./pose_relay.py tcp://192.168.1.100:30202 tcp://192.168.1.200:30201 \\ + --target-lever-arm 1 0 0 + + # Source output lever arm at [0.5, 0, 1.0], target at [1.0, -0.3, 0.8]: + ./pose_relay.py tcp://192.168.1.100:30202 tcp://192.168.1.200:30201 \\ + --source-lever-arm 0.5 0 1.0 --target-lever-arm 1.0 -0.3 0.8 \\ + + # Target is rotated 90 degrees in yaw relative to vehicle body, and + # translated 0.5m forward and 0.2m left relative to vehicle body: + ./pose_relay.py tcp://192.168.1.100:30202 tcp://192.168.1.200:30201 \\ + --target-rotation 90 0 0 --target-translation 0.5 0.2 0 +""") + + parser.add_argument( + 'source', type=str, + help="Source device transport (provides PoseMessage).\n" + TRANSPORT_HELP_STRING) + + parser.add_argument( + 'target', type=str, + help="Target device transport (receives ExternalPoseInput).\n" + TRANSPORT_HELP_STRING) + + parser.add_argument( + '--source-lever-arm', type=float, nargs=3, default=[0.0, 0.0, 0.0], + metavar=('X', 'Y', 'Z'), + help="""Output lever arm of the source device in the source vehicle's +body frame (meters). This is the position of the source device's reported output +point relative to the source vehicle body origin: [x_forward, y_left, z_up].""") + + parser.add_argument( + '--target-lever-arm', type=float, nargs=3, default=[0.0, 0.0, 0.0], + metavar=('X', 'Y', 'Z'), + help="""Output lever arm of the target device in the target vehicle's +body frame (meters). This is the position of the target device's desired output +point relative to the target vehicle body origin: [x_forward, y_left, z_up]. + +This should match the output lever arm configured on the target device, so +that the ExternalPoseInput position corresponds to the point the target will +report in its own PoseMessage after initialization.""") + + parser.add_argument( + '--target-rotation', type=float, nargs=3, default=[0.0, 0.0, 0.0], + metavar=('YAW', 'PITCH', 'ROLL'), + help="""Orientation of the target device body frame relative to the +source's body frame, specified as YPR (degrees). This describes how the target +device is mounted on the source vehicle. E.g., if the target is rotated 90 +degrees in yaw relative to the source vehicle, specify: 90 0 0.""") + + parser.add_argument( + '--target-translation', type=float, nargs=3, default=[0.0, 0.0, 0.0], + metavar=('X', 'Y', 'Z'), + help="""Translation of the target device body frame relative to the +source's body frame, specified as XYZ (meters). This describes how the target +device is mounted on the source vehicle. E.g., if the target is translated 1 +meter forward relative to the source vehicle, specify: 1 0 0.""") + + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help="Print verbose/trace debugging messages.") + + options = parser.parse_args() + + # Configure logging. + logging.basicConfig( + format='%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(message)s', + stream=sys.stderr) + + if options.verbose == 0: + logger.setLevel(logging.INFO) + elif options.verbose == 1: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.DEBUG) + logging.getLogger('point_one.fusion_engine.parsers').setLevel(logging.DEBUG) + + # Parse lever arms and transforms. + # B - source body frame, b - target body frame, t - target output, s - source output + source_lever_arm_m = np.array(options.source_lever_arm) + target_lever_arm_m = np.array(options.target_lever_arm) + + # Target rotation. + ypr_tb_to_sb_deg = np.array(options.target_rotation) + + # Target translation. + vec_tb_to_sb_m = np.array(options.target_translation) + + logger.info(f"Source output lever arm: [{source_lever_arm_m[0]:.3f}, {source_lever_arm_m[1]:.3f}, {source_lever_arm_m[2]:.3f}] m") + logger.info(f"Target output lever arm: [{target_lever_arm_m[0]:.3f}, {target_lever_arm_m[1]:.3f}, {target_lever_arm_m[2]:.3f}] m") + logger.info(f"Target-to-source-body rotation: [{ypr_tb_to_sb_deg[0]:.1f}, {ypr_tb_to_sb_deg[1]:.1f}, {ypr_tb_to_sb_deg[2]:.1f}] deg") + logger.info(f"Target-to-source-body translation: [{vec_tb_to_sb_m[0]:.3f}, {vec_tb_to_sb_m[1]:.3f}, {vec_tb_to_sb_m[2]:.3f}] m") + + # Connect to source and target. + response_timeout_sec = 3.0 + try: + source_transport = create_transport(options.source, + timeout_sec=response_timeout_sec, + print_func=logger.info) + except Exception as e: + logger.error(f"Failed to connect to source: {e}") + sys.exit(1) + + try: + target_transport = create_transport(options.target, + timeout_sec=response_timeout_sec, + print_func=logger.info) + except Exception as e: + logger.error(f"Failed to connect to target: {e}") + sys.exit(1) + + logger.info("Connected. Waiting for pose messages from source...") + + # Main loop. + decoder = FusionEngineDecoder(return_bytes=False) + encoder = FusionEngineEncoder() + + start_time = datetime.now() + + poses_received = 0 + poses_forwarded = 0 + + try: + while True: + # Need to specify read size or read waits for end of file character. + # This returns immediately even if 0 bytes are available. + received_data = recv_from_transport(source_transport, 64) + + if len(received_data) == 0: + time.sleep(0.1) + continue + + messages = decoder.on_data(received_data) + for header, message in messages: + if header.message_type != MessageType.POSE: + continue + + poses_received += 1 + source_pose_msg: PoseMessage = message + + logger.debug("Received pose message: \n" + + f"LLA Position: [{source_pose_msg.lla_deg[0]:.6f}, {source_pose_msg.lla_deg[1]:.6f}, {source_pose_msg.lla_deg[2]:.3f}] deg\n" + + f"Attitude: [{source_pose_msg.ypr_deg[0]:.1f}, {source_pose_msg.ypr_deg[1]:.1f}, {source_pose_msg.ypr_deg[2]:.1f}] deg\n" + + f"Body Velocity: [{source_pose_msg.velocity_body_mps[0]:.2f}, {source_pose_msg.velocity_body_mps[1]:.2f}, {source_pose_msg.velocity_body_mps[2]:.2f}] m/s\n" + + "Attempting to transform and forward...") + + if source_pose_msg.solution_type == SolutionType.Invalid: + logger.debug("Skipping invalid pose solution.") + continue + + if np.any(np.isnan(source_pose_msg.lla_deg)): + logger.debug("Skipping pose with NaN position.") + continue + + if np.linalg.norm(source_pose_msg.velocity_body_mps) > 0.1: + logger.warning( + "Source velocity is non-zero. We assume no relative motion between source and target. " + "If the target is moving relative to the source, relayed velocity will be incorrect." + ) + + transformed_pose = transform_pose_to_target( + source_pose_msg, source_lever_arm_m, target_lever_arm_m, + ypr_tb_to_sb_deg, vec_tb_to_sb_m) + + ext_pose = transformed_pose_to_external_pose_input( + source_pose_msg, transformed_pose) + + # Encode and send. + encoded_data = encoder.encode_message(ext_pose) + logger.debug(bytes_to_hex(encoded_data, bytes_per_row=16, bytes_per_col=2)) + + target_transport.send(encoded_data) + + poses_forwarded += 1 + logger.info( + f"Forwarded pose: " + f"ECEF Position: [{transformed_pose['position_ecef_m'][0]:.2f}, {transformed_pose['position_ecef_m'][1]:.2f}, {transformed_pose['position_ecef_m'][2]:.2f}] m, " + f"Attitude: [{transformed_pose['ypr_deg'][0]:.1f}, {transformed_pose['ypr_deg'][1]:.1f}, {transformed_pose['ypr_deg'][2]:.1f}] deg, " + f"ENU Velocity: [{transformed_pose['velocity_enu_mps'][0]:.2f}, {transformed_pose['velocity_enu_mps'][1]:.2f}, {transformed_pose['velocity_enu_mps'][2]:.2f}] m/s") + break + + if poses_forwarded > 0: + break + + source_transport.close() + target_transport.close() + + except KeyboardInterrupt: + pass + finally: + elapsed = (datetime.now() - start_time).total_seconds() + logger.info(f"Done. Pose relayed in {elapsed:.1f} seconds.") + + +if __name__ == "__main__": + main() diff --git a/python/fusion_engine_client/analysis/attitude.py b/python/fusion_engine_client/analysis/attitude.py index 14a2d180..ffa89c04 100644 --- a/python/fusion_engine_client/analysis/attitude.py +++ b/python/fusion_engine_client/analysis/attitude.py @@ -49,3 +49,133 @@ def get_enu_rotation_matrix(latitude, longitude, deg=False): C_enu_e = C_ned_e[(1, 0, 2), :] C_enu_e[2, :] = -C_enu_e[2, :] return C_enu_e + + +def axis_rotation_dcm(angle, axis, deg=False): + """! + @brief Create a direction cosine matrix rotating a coordinate frame around the specified axis. + + The resulting DCM represents a passive transformation of a vector expressed in the original coordinate frame (@f$ + \alpha @f$) to one expressed in the rotated coordinate frame (@f$ \beta @f$). + + @param angle The desired rotation angle. + @param axis The axis to be rotated around (@c 'x', @c 'y', or @c 'z'). + @param deg If @c True, interpret the input angles as degrees. Otherwise, interpret as radians. + + @return The 3x3 rotation matrix @f$ C_\alpha^\beta @f$. + """ + if deg: + angle = np.deg2rad(angle) + + cos = np.cos(angle) + sin = np.sin(angle) + + if axis == 'x': + return np.array(((1, 0, 0), + (0, cos, sin), + (0, -sin, cos))) + elif axis == 'y': + return np.array(((cos, 0, -sin), + ( 0, 1, 0), + (sin, 0, cos))) + elif axis == 'z': + return np.array((( cos, sin, 0), + (-sin, cos, 0), + ( 0, 0, 1))) + else: + raise RuntimeError('Invalid axis specification.') + + +def euler_angles_to_dcm(x, y=None, z=None, order='321', deg=False): + """! + @brief Convert an Euler (Tait-Bryan) angle attitude (in radians) into a direction cosine matrix. + + The input angles represent the rotation from coordinate frame @f$ \alpha @f$ to coordinate frame @f$ \beta @f$ (@f$ + C_\alpha^\beta @f$. This is frequently the rotation from the navigation frame (local level frame) to the body frame, + @f$ C_n^b @f$. + + @note + This DCM represents a passive transformation of a vector from one coordinate frame to another, as opposed to an + active rotation of a vector within a single coordinate frame. + + This function supports any intrinsic rotation order through the @c order argument. The most common convention is a + three-two-one rotation (@c '321'), in which the coordinate frame is first rotated around the Z-axis (i.e., the third + axis) by the angle @c z, then the second axis in the resulting intermediate coordinate frame (the Y'-axis), and + finally the third axis in the last intermediate coordinate frame (the X''-axis). 3-2-1 rotation angles are commonly + referred to as yaw, pitch, and roll for the Z, Y, and X rotations respectively. + + You may specify all three angles separately using the @c x, @c y, and @c z parameters, or provide a Numpy array + containing all three angles. + + @param x The angle to rotate around the X-axis (i.e., axis 1). May be a Numpy array containing the rotation angles + for all three axes in the order X, Y, Z. + @param y The angle to rotate around the Y-axis (i.e., axis 2). + @param z The angle to rotate around the Z-axis (i.e., axis 3). + @param order A string specifying the rotation order to be applied. + @param deg If @c True, interpret the input angles as degrees. Otherwise, interpret as radians. + + @return The 3x3 rotation matrix rotating from the @f$ \alpha @f$ frame to the @f$ \beta @f$ frame (@f$ + C_\alpha^\beta @f$). + """ + if isinstance(x, np.ndarray): + if y is not None or z is not None: + raise RuntimeError('Cannot specify Y/Z angles when using a Numpy array.') + else: + angles = x + else: + if y is None or z is None: + raise RuntimeError('You must specify Y and Z angles.') + else: + angles = np.array((x, y, z)) + + if deg: + angles = np.deg2rad(angles) + + # 3-2-1 order written explicitly for efficiency since it's the most common order. Taken from Groves section 2.2.2. + if order == '321': + sin_phi, sin_theta, sin_psi = np.sin(angles) + cos_phi, cos_theta, cos_psi = np.cos(angles) + + return np.array(( + (cos_theta * cos_psi, cos_theta * sin_psi, -sin_theta), + (-cos_phi * sin_psi + sin_phi * sin_theta * cos_psi, + cos_phi * cos_psi + sin_phi * sin_theta * sin_psi, + sin_phi * cos_theta), + (sin_phi * sin_psi + cos_phi * sin_theta * cos_psi, + -sin_phi * cos_psi + cos_phi * sin_theta * sin_psi, + cos_phi * cos_theta) + )) + else: + dcm = {} + dcm['1'] = axis_rotation_dcm(angles[0], axis='x') + dcm['2'] = axis_rotation_dcm(angles[1], axis='y') + dcm['3'] = axis_rotation_dcm(angles[2], axis='z') + + return dcm[order[2]].dot(dcm[order[1]]).dot(dcm[order[0]]) + + +def dcm_to_euler_angles(dcm, order='321', deg=False): + """! + @brief Convert a direction cosine matrix into an Euler (Tait-Bryan) angle attitude representation. + + Convert a DCM representing the rotation from coordinate frame @f$ \alpha @f$ to coordinate frame @f$ \beta @f$ into + corresponding Euler angles with the specified rotation order. + + @param dcm The 3x3 rotation matrix. + @param order The desired Euler angle order. + @param deg If @c True, return angles in degrees. Otherwise, return radians. + + @return A Numpy array containing the X, Y, and Z rotation angles. + """ + if order == '321': + x = np.arctan2(dcm[1, 2], dcm[2, 2]) + y = -np.arcsin(dcm[0, 2]) + z = np.arctan2(dcm[0, 1], dcm[0, 0]) + else: + raise RuntimeError('Unsupported rotation order.') + + result = np.array((x, y, z)) + if deg: + result = np.rad2deg(result) + + return result diff --git a/python/fusion_engine_client/messages/defs.py b/python/fusion_engine_client/messages/defs.py index 7897aca8..52c72da2 100644 --- a/python/fusion_engine_client/messages/defs.py +++ b/python/fusion_engine_client/messages/defs.py @@ -106,6 +106,7 @@ class MessageType(IntEnum): IMU_INPUT = 11004 GNSS_ATTITUDE_OUTPUT = 11005 RAW_GNSS_ATTITUDE_OUTPUT = 11006 + EXTERNAL_POSE_INPUT = 11007 # Vehicle measurement messages. DEPRECATED_WHEEL_SPEED_MEASUREMENT = 11101 diff --git a/python/fusion_engine_client/messages/measurements.py b/python/fusion_engine_client/messages/measurements.py index 1c33f5e4..bef22c23 100644 --- a/python/fusion_engine_client/messages/measurements.py +++ b/python/fusion_engine_client/messages/measurements.py @@ -1417,6 +1417,130 @@ def to_numpy(cls, messages: Sequence['RawGNSSAttitudeOutput']): result.update(MeasurementDetails.to_numpy([m.details for m in messages])) return result +################################################################################ +# External Pose Measurements +################################################################################ + + +class ExternalPoseInput(MessagePayload): + """! @brief External pose measurement input. + + Position is expressed in the ECEF frame. Orientation is the yaw, pitch, roll + (YPR) angles in the local ENU frame. Velocity is expressed in the local ENU + frame. Any elements that are not available should be set to `nan`. Standard + deviation fields are specified in the same units. + """ + MESSAGE_TYPE = MessageType.EXTERNAL_POSE_INPUT + MESSAGE_VERSION = 0 + + FLAG_RESET_POSITION_DATA = 0x1 + + _STRUCT = struct.Struct(' (bytes, int): + if buffer is None: + buffer = bytearray(self.calcsize()) + offset = 0 + + initial_offset = offset + + offset += self.details.pack(buffer, offset, return_buffer=False) + + offset += self.pack_values( + self._STRUCT, buffer, offset, + int(self.solution_type), + self.flags, + self.position_ecef_m[0], self.position_ecef_m[1], self.position_ecef_m[2], + self.position_std_ecef_m[0], self.position_std_ecef_m[1], self.position_std_ecef_m[2], + self.ypr_deg[0], self.ypr_deg[1], self.ypr_deg[2], + self.ypr_std_deg[0], self.ypr_std_deg[1], self.ypr_std_deg[2], + self.velocity_enu_mps[0], self.velocity_enu_mps[1], self.velocity_enu_mps[2], + self.velocity_std_enu_mps[0], self.velocity_std_enu_mps[1], self.velocity_std_enu_mps[2]) + if return_buffer: + return buffer + else: + return offset - initial_offset + + def unpack(self, buffer: bytes, offset: int = 0, + message_version: int = MessagePayload._UNSPECIFIED_VERSION) -> int: + initial_offset = offset + + offset += self.details.unpack(buffer, offset) + + (solution_type_int, + self.flags, + self.position_ecef_m[0], self.position_ecef_m[1], self.position_ecef_m[2], + self.position_std_ecef_m[0], self.position_std_ecef_m[1], self.position_std_ecef_m[2], + self.ypr_deg[0], self.ypr_deg[1], self.ypr_deg[2], + self.ypr_std_deg[0], self.ypr_std_deg[1], self.ypr_std_deg[2], + self.velocity_enu_mps[0], self.velocity_enu_mps[1], self.velocity_enu_mps[2], + self.velocity_std_enu_mps[0], self.velocity_std_enu_mps[1], self.velocity_std_enu_mps[2]) = \ + self._STRUCT.unpack_from(buffer, offset) + offset += self._STRUCT.size + + self.solution_type = SolutionType(solution_type_int) + + return offset - initial_offset + + @classmethod + def calcsize(cls) -> int: + return MeasurementDetails.calcsize() + cls._STRUCT.size + + @classmethod + def to_numpy(cls, messages: Sequence['ExternalPoseInput']): + result = { + 'solution_type': np.array([int(m.solution_type) for m in messages], dtype=int), + 'flags': np.array([m.flags for m in messages], dtype=np.uint32), + 'position_ecef_m': np.array([m.position_ecef_m for m in messages]).T, + 'position_std_ecef_m': np.array([m.position_std_ecef_m for m in messages]).T, + 'ypr_deg': np.array([m.ypr_deg for m in messages]).T, + 'ypr_std_deg': np.array([m.ypr_std_deg for m in messages]).T, + 'velocity_enu_mps': np.array([m.velocity_enu_mps for m in messages]).T, + 'velocity_std_enu_mps': np.array([m.velocity_std_enu_mps for m in messages]).T, + } + result.update(MeasurementDetails.to_numpy([m.details for m in messages])) + return result + + def __getattr__(self, item): + if item == 'p1_time': + return self.details.p1_time + else: + return super().__getattr__(item) + + def __repr__(self): + result = super().__repr__()[:-1] + result += f', solution_type={self.solution_type}, position_ecef={self.position_ecef_m}, ' \ + f'ypr_deg={self.ypr_deg}, velocity_enu_mps={self.velocity_enu_mps}]' + return result + + def __str__(self): + string = 'External Pose Measurement @ %s\n' % str(self.details.p1_time) + string += ' Solution type: %s\n' % self.solution_type.name + string += ' Position (ECEF): %.2f, %.2f, %.2f (m, m, m)\n' % tuple(self.position_ecef_m) + string += ' Position std (ECEF): %.2f, %.2f, %.2f (m, m, m)\n' % tuple(self.position_std_ecef_m) + string += ' YPR: %.2f, %.2f, %.2f (deg, deg, deg)\n' % tuple(self.ypr_deg) + string += ' YPR std: %.2f, %.2f, %.2f (deg, deg, deg)\n' % tuple(self.ypr_std_deg) + string += ' Velocity (ENU): %.2f, %.2f, %.2f (m/s, m/s, m/s)\n' % tuple(self.velocity_enu_mps) + string += ' Velocity std (ENU): %.2f, %.2f, %.2f (m/s, m/s, m/s)' % tuple(self.velocity_std_enu_mps) + return string + ################################################################################ # Binary Sensor Data Definitions ################################################################################ diff --git a/python/tests/test_message_defs.py b/python/tests/test_message_defs.py index 1fd66802..a6b09fc7 100644 --- a/python/tests/test_message_defs.py +++ b/python/tests/test_message_defs.py @@ -98,7 +98,7 @@ def test_find_message_types(): # Use wildcards to match multiple types. assert MessagePayload.find_matching_message_types(['pose*']) == {MessageType.POSE, MessageType.POSE_AUX} assert MessagePayload.find_matching_message_types(['*pose*']) == {MessageType.POSE, MessageType.POSE_AUX, - MessageType.ROS_POSE} + MessageType.EXTERNAL_POSE_INPUT, MessageType.ROS_POSE} assert MessagePayload.find_matching_message_types(['pose*', 'gnssi*']) == {MessageType.POSE, MessageType.POSE_AUX, MessageType.GNSS_INFO} assert MessagePayload.find_matching_message_types(['pose*,gnssi*']) == {MessageType.POSE, MessageType.POSE_AUX, diff --git a/src/point_one/fusion_engine/messages/defs.h b/src/point_one/fusion_engine/messages/defs.h index caecde81..58de69e9 100644 --- a/src/point_one/fusion_engine/messages/defs.h +++ b/src/point_one/fusion_engine/messages/defs.h @@ -54,6 +54,7 @@ enum class MessageType : uint16_t { IMU_INPUT = 11004, ///< @ref IMUInput GNSS_ATTITUDE_OUTPUT = 11005, ///< @ref GNSSAttitudeOutput RAW_GNSS_ATTITUDE_OUTPUT = 11006, ///< @ref RawGNSSAttitudeOutput + EXTERNAL_POSE_INPUT = 11007, ///< @ref ExternalPoseInput // Vehicle measurement messages. DEPRECATED_WHEEL_SPEED_MEASUREMENT = @@ -176,6 +177,9 @@ P1_CONSTEXPR_FUNC const char* to_string(MessageType type) { case MessageType::RAW_GNSS_ATTITUDE_OUTPUT: return "Raw GNSS Attitude Output"; + case MessageType::EXTERNAL_POSE_INPUT: + return "External Pose Input"; + case MessageType::DEPRECATED_WHEEL_SPEED_MEASUREMENT: return "Wheel Speed Measurement"; @@ -353,6 +357,7 @@ P1_CONSTEXPR_FUNC bool IsCommand(MessageType message_type) { case MessageType::IMU_INPUT: case MessageType::GNSS_ATTITUDE_OUTPUT: case MessageType::RAW_GNSS_ATTITUDE_OUTPUT: + case MessageType::EXTERNAL_POSE_INPUT: case MessageType::DEPRECATED_WHEEL_SPEED_MEASUREMENT: case MessageType::DEPRECATED_VEHICLE_SPEED_MEASUREMENT: case MessageType::WHEEL_TICK_INPUT: diff --git a/src/point_one/fusion_engine/messages/measurements.h b/src/point_one/fusion_engine/messages/measurements.h index e5da0463..bb00e11b 100644 --- a/src/point_one/fusion_engine/messages/measurements.h +++ b/src/point_one/fusion_engine/messages/measurements.h @@ -1264,6 +1264,87 @@ struct P1_ALIGNAS(4) RawGNSSAttitudeOutput : public MessagePayload { float position_std_enu_m[3] = {NAN, NAN, NAN}; }; +//////////////////////////////////////////////////////////////////////////////// +// External Pose Measurements +//////////////////////////////////////////////////////////////////////////////// + +/** + * @brief External pose measurement input (@ref + * MessageType::EXTERNAL_POSE_INPUT, version 1.0). + * @ingroup measurement_messages + * + * This message is an input to the device containing pose and velocity + * measurements generated by an external source (for example, another Point One + * device or a vision system). + * + * Position is expressed in the ECEF frame, and velocity is expressed in the + * local ENU frame. @ref position_ecef_m should correspond to the output lever + * arm point configured on the receiving device (see @ref + * ConfigType::OUTPUT_LEVER_ARM), so that the position matches the point the + * device will report in its own @ref PoseMessage after initialization. + * + * Orientation is specified as yaw, pitch, roll (YPR) angles in the local ENU + * frame, following the same intrinsic Euler-321 convention as @ref + * PoseMessage::ypr_deg. + * + * Any elements that are not available should be set to `NAN`. Standard + * deviation fields are specified in the same units as the corresponding + * measurement. + */ +struct P1_ALIGNAS(4) ExternalPoseInput : public MessagePayload { + static constexpr MessageType MESSAGE_TYPE = MessageType::EXTERNAL_POSE_INPUT; + static constexpr uint8_t MESSAGE_VERSION = 0; + + /** Equivalent to sending a reset message. See @ref ResetRequest. */ + static constexpr uint32_t FLAG_RESET_POSITION_DATA = 0x01; + + /** Measurement timestamp and additional information. */ + MeasurementDetails details; + + /** The pose solution type provided by the external source. */ + SolutionType solution_type = SolutionType::Invalid; + + uint8_t reserved[3] = {0}; + + /** A bitmask of flags associated with the measurement. */ + uint32_t flags = 0; + + /** + * An estimate of the device's output position (in meters), resolved in the + * ECEF frame. + */ + double position_ecef_m[3] = {NAN, NAN, NAN}; + + /** + * An estimate of the device's output position standard deviation (in meters), + * resolved in the ECEF frame. + */ + float position_std_ecef_m[3] = {NAN, NAN, NAN}; + + /** An estimate of the device's output orientation (in degrees), resolved in + * the local ENU tangent plane. See @ref PoseMessage::ypr_deg for a complete + * rotation definition. + */ + float ypr_deg[3] = {NAN, NAN, NAN}; + + /** An estimate of the device's output orientation standard deviation (in + * degrees). + */ + float ypr_std_deg[3] = {NAN, NAN, NAN}; + + /** + * An estimate of the device's output velocity (in m/s), resolved in the local + * ENU tangent plane. + */ + float velocity_enu_mps[3] = {NAN, NAN, NAN}; + + /** + * An estimate of the device's output velocity standard deviation (in m/s), + * resolved in the local ENU tangent plane. + */ + float velocity_std_enu_mps[3] = {NAN, NAN, NAN}; +}; + //////////////////////////////////////////////////////////////////////////////// // Binary Sensor Data Definitions ////////////////////////////////////////////////////////////////////////////////