Skip to content

Modules

Top-level package for Bonfo configuration management.

cli

Console script for bonfo.

BonfoContext dataclass

BonfoContext(port: Optional[serial.tools.list_ports_common.ListPortInfo] = None)

Source code in bonfo/cli.py
@dataclass
class BonfoContext:
    # TODO: hook/bring in state_store as a storage backend, possible change to data class and use the data class wizard?
    port: Optional[ListPortInfo] = None
    # Go this direction of only working on one rate/pid profile?
    # Profile to switch to
    # profile: = None
    # Rate profile to act on
    # rate_profile: = None

    _board = None

    @property
    def board(self) -> Optional[Board]:
        if self._board is None and self.port is not None:
            # TODO: instantiate board with stored values to do diffing?
            # Possibly on board connect, it grabs the uid of the board and hydrates from
            # last state that way?
            self._board = Board(self.port.device)
        return self._board

msp special

board

Board package for Bonfo configuration management.

Board

Board is an interface for the serial connection, configuration retrieval and saving of data.

Source code in bonfo/msp/board.py
class Board:
    """Board is an interface for the serial connection, configuration retrieval and saving of data."""

    def __init__(self, port: str, baudrate: int = 115200, trials=100) -> None:
        self.conf = Config()

        # TODO: register configs for saving/applying?
        self.rx_conf = RxConfig()
        self.rc_tuning = RcTuning()

        self.serial_trials = trials
        self.serial_write_lock = BoundedSemaphore(value=1)
        self.serial_read_lock = BoundedSemaphore(value=1)

        self.init_serial(port, baudrate=baudrate)

    def init_serial(self, port: str, baudrate: int = 115200):
        self.serial = serial.Serial(
            port,
            baudrate=baudrate,
            bytesize=serial.EIGHTBITS,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            timeout=0.1,
            xonxoff=False,
            rtscts=False,
            dsrdtr=False,
            writeTimeout=0,
        )
        logger.debug(self.serial)

    def __enter__(self):
        self.is_serial_open = self.connect(trials=self.serial_trials)

        if self.is_serial_open:
            return self
        else:
            logger.warning("Serial port ({}) not ready/available".format(self.serial.port))
            return False

    def __exit__(self, exc_type, exc_value, traceback):
        if self.serial.is_open:
            self.serial.readlines
            self.serial.close()

        """

        Returns:
        """

    def connect(self, trials=100, delay=0.5):
        """Opens the serial connection with the board.

        Args:
            trials (int, optional): How many times to try and connect. Defaults to 100.
            delay (float, optional): Wait between connection trials. Defaults to 0.5.

        Returns:
            bool: True on a successful connection
        """
        for _ in range(trials):
            try:
                if self.serial.is_open:
                    self.serial.close()
                self.serial.open()
                # self.basic_info()
                return True

            except serial.SerialException as err:
                logger.warning("Error opening the serial port ({0}): {1}".format(self.serial.port, err), exc_info=err)

            except FileNotFoundError as err:
                logger.warning("Port ({0}) not found: {1}".format(self.serial.port, err), exc_info=err)
            time.sleep(delay)

        return False

    def send_msg(self, code: MSP, data=None, blocking=True, timeout=-1):
        """Generates and sends a message with the passed code and data.

        Args:
            code (MSP): MSP code enum or code integer
            data (Any supported message struct, optional): structured data to send,
                converted to binary by struct. Defaults to None.
            blocking (bool, optional): Send message with blocking of other messages. Defaults to True.
            timeout (int, optional): Blocking timeout. Defaults to -1.

        Returns:
            int: Total bytes sent
        """
        buff = out_message_builder(code, fields=data)

        if self.serial_write_lock.acquire(blocking, timeout):
            try:
                sent_bytes = self.serial.write(buff)
            except Exception as e:
                logger.exception("Error writing to serial port", exc_info=e)
            finally:
                self.serial_write_lock.release()
                if sent_bytes > 0:
                    logger.debug("RAW message sent: {}".format(buff))

                return sent_bytes

    def receive_msg(self):

        with self.serial_read_lock:
            # TODO: do this mo-better
            buff = self.serial.readline()
            logger.debug("RAW message received: {}".format(buff))
            if len(buff) > 0:
                return Message.parse(buff)
            return None
connect(self, trials=100, delay=0.5)

Opens the serial connection with the board.

Parameters:

Name Type Description Default
trials int

How many times to try and connect. Defaults to 100.

100
delay float

Wait between connection trials. Defaults to 0.5.

0.5

Returns:

Type Description
bool

True on a successful connection

Source code in bonfo/msp/board.py
def connect(self, trials=100, delay=0.5):
    """Opens the serial connection with the board.

    Args:
        trials (int, optional): How many times to try and connect. Defaults to 100.
        delay (float, optional): Wait between connection trials. Defaults to 0.5.

    Returns:
        bool: True on a successful connection
    """
    for _ in range(trials):
        try:
            if self.serial.is_open:
                self.serial.close()
            self.serial.open()
            # self.basic_info()
            return True

        except serial.SerialException as err:
            logger.warning("Error opening the serial port ({0}): {1}".format(self.serial.port, err), exc_info=err)

        except FileNotFoundError as err:
            logger.warning("Port ({0}) not found: {1}".format(self.serial.port, err), exc_info=err)
        time.sleep(delay)

    return False
send_msg(self, code, data=None, blocking=True, timeout=-1)

Generates and sends a message with the passed code and data.

Parameters:

Name Type Description Default
code MSP

MSP code enum or code integer

required
data Any supported message struct

structured data to send, converted to binary by struct. Defaults to None.

None
blocking bool

Send message with blocking of other messages. Defaults to True.

True
timeout int

Blocking timeout. Defaults to -1.

-1

Returns:

Type Description
int

Total bytes sent

Source code in bonfo/msp/board.py
def send_msg(self, code: MSP, data=None, blocking=True, timeout=-1):
    """Generates and sends a message with the passed code and data.

    Args:
        code (MSP): MSP code enum or code integer
        data (Any supported message struct, optional): structured data to send,
            converted to binary by struct. Defaults to None.
        blocking (bool, optional): Send message with blocking of other messages. Defaults to True.
        timeout (int, optional): Blocking timeout. Defaults to -1.

    Returns:
        int: Total bytes sent
    """
    buff = out_message_builder(code, fields=data)

    if self.serial_write_lock.acquire(blocking, timeout):
        try:
            sent_bytes = self.serial.write(buff)
        except Exception as e:
            logger.exception("Error writing to serial port", exc_info=e)
        finally:
            self.serial_write_lock.release()
            if sent_bytes > 0:
                logger.debug("RAW message sent: {}".format(buff))

            return sent_bytes

codes

MSP (IntEnum)

An enumeration.

Source code in bonfo/msp/codes.py
class MSP(IntEnum):
    API_VERSION = 1
    FC_VARIANT = 2
    FC_VERSION = 3
    BOARD_INFO = 4
    BUILD_INFO = 5

    NAME = 10
    SET_NAME = 11

    BATTERY_CONFIG = 32
    SET_BATTERY_CONFIG = 33
    MODE_RANGES = 34
    SET_MODE_RANGE = 35
    FEATURE_CONFIG = 36
    SET_FEATURE_CONFIG = 37
    BOARD_ALIGNMENT_CONFIG = 38
    SET_BOARD_ALIGNMENT_CONFIG = 39
    CURRENT_METER_CONFIG = 40
    SET_CURRENT_METER_CONFIG = 41
    MIXER_CONFIG = 42
    SET_MIXER_CONFIG = 43
    RX_CONFIG = 44
    SET_RX_CONFIG = 45
    LED_COLORS = 46
    SET_LED_COLORS = 47
    LED_STRIP_CONFIG = 48
    SET_LED_STRIP_CONFIG = 49
    RSSI_CONFIG = 50
    SET_RSSI_CONFIG = 51
    ADJUSTMENT_RANGES = 52
    SET_ADJUSTMENT_RANGE = 53
    CF_SERIAL_CONFIG = 54
    SET_CF_SERIAL_CONFIG = 55
    VOLTAGE_METER_CONFIG = 56
    SET_VOLTAGE_METER_CONFIG = 57
    SONAR = 58
    PID_CONTROLLER = 59
    SET_PID_CONTROLLER = 60
    ARMING_CONFIG = 61
    SET_ARMING_CONFIG = 62
    RX_MAP = 64
    SET_RX_MAP = 65
    BF_CONFIG = 66  # deprecated
    SET_BF_CONFIG = 67  # deprecated
    SET_REBOOT = 68
    BF_BUILD_INFO = 69  # deprecated
    DATAFLASH_SUMMARY = 70
    DATAFLASH_READ = 71
    DATAFLASH_ERASE = 72
    LOOP_TIME = 73
    SET_LOOP_TIME = 74
    FAILSAFE_CONFIG = 75
    SET_FAILSAFE_CONFIG = 76
    RXFAIL_CONFIG = 77
    SET_RXFAIL_CONFIG = 78
    SDCARD_SUMMARY = 79
    BLACKBOX_CONFIG = 80
    SET_BLACKBOX_CONFIG = 81
    TRANSPONDER_CONFIG = 82
    SET_TRANSPONDER_CONFIG = 83
    OSD_CONFIG = 84
    SET_OSD_CONFIG = 85
    OSD_CHAR_READ = 86
    OSD_CHAR_WRITE = 87
    VTX_CONFIG = 88
    SET_VTX_CONFIG = 89
    ADVANCED_CONFIG = 90
    SET_ADVANCED_CONFIG = 91
    FILTER_CONFIG = 92
    SET_FILTER_CONFIG = 93
    PID_ADVANCED = 94
    SET_PID_ADVANCED = 95
    SENSOR_CONFIG = 96
    SET_SENSOR_CONFIG = 97
    SPECIAL_PARAMETERS = 98  # deprecated
    ARMING_DISABLE = 99
    SET_SPECIAL_PARAMETERS = 99  # deprecated
    IDENT = 100  # deprecated
    STATUS = 101
    RAW_IMU = 102
    SERVO = 103
    MOTOR = 104
    RC = 105
    RAW_GPS = 106
    COMP_GPS = 107
    ATTITUDE = 108
    ALTITUDE = 109
    ANALOG = 110
    RC_TUNING = 111
    PID = 112
    BOX = 113  # deprecatedD
    MISC = 114  # deprecated
    BOXNAMES = 116
    PIDNAMES = 117
    WP = 118  # deprecated
    BOXIDS = 119
    SERVO_CONFIGURATIONS = 120
    MOTOR_3D_CONFIG = 124
    RC_DEADBAND = 125
    SENSOR_ALIGNMENT = 126
    LED_STRIP_MODECOLOR = 127

    VOLTAGE_METERS = 128
    CURRENT_METERS = 129
    BATTERY_STATE = 130
    MOTOR_CONFIG = 131
    GPS_CONFIG = 132
    COMPASS_CONFIG = 133
    GPS_RESCUE = 135

    STATUS_EX = 150

    UID = 160
    GPS_SV_INFO = 164

    GPSSTATISTICS = 166

    DISPLAYPORT = 182

    COPY_PROFILE = 183

    BEEPER_CONFIG = 184
    SET_BEEPER_CONFIG = 185

    SET_RAW_RC = 200
    SET_RAW_GPS = 201  # deprecated
    SET_PID = 202
    SET_BOX = 203  # deprecated
    SET_RC_TUNING = 204
    ACC_CALIBRATION = 205
    MAG_CALIBRATION = 206
    SET_MISC = 207  # deprecated
    RESET_CONF = 208
    SET_WP = 209  # deprecated
    SELECT_SETTING = 210
    SET_HEADING = 211  # deprecated
    SET_SERVO_CONFIGURATION = 212
    SET_MOTOR = 214
    SET_MOTOR_3D_CONFIG = 217
    SET_RC_DEADBAND = 218
    SET_RESET_CURR_PID = 219
    SET_SENSOR_ALIGNMENT = 220
    SET_LED_STRIP_MODECOLOR = 221
    SET_MOTOR_CONFIG = 222
    SET_GPS_CONFIG = 223
    SET_COMPASS_CONFIG = 224
    SET_GPS_RESCUE = 225
    # SET_CHANNEL_FORWARDING =  # TODO: find

    MODE_RANGES_EXTRA = 238
    SET_ACC_TRIM = 239
    ACC_TRIM = 240
    SERVO_MIX_RULES = 241
    SET_SERVO_MIX_RULE = 242  # deprecated
    SET_4WAY_IF = 245  # deprecated
    SET_RTC = 246
    RTC = 247  # deprecated
    SET_BOARD_INFO = 248  # deprecated
    SET_SIGNATURE = 249  # deprecated

    EEPROM_WRITE = 250
    DEBUGMSG = 253  # deprecated
    DEBUG = 254

    # INAV/v2 specific codes
    SETTING = 0x1003
    SET_SETTING = 0x1004

    COMMON_MOTOR_MIXER = 0x1005
    COMMON_SET_MOTOR_MIXER = 0x1006

    COMMON_SETTING_INFO = 0x1007
    COMMON_PG_LIST = 0x1008

    INAV_CF_SERIAL_CONFIG = 0x1009
    INAV_SET_CF_SERIAL_CONFIG = 0x100A

    INAV_STATUS = 0x2000
    INAV_OPTICAL_FLOW = 0x2001
    INAV_ANALOG = 0x2002
    INAV_MISC = 0x2003
    INAV_SET_MISC = 0x2004
    INAV_BATTERY_CONFIG = 0x2005
    INAV_SET_BATTERY_CONFIG = 0x2006
    INAV_RATE_PROFILE = 0x2007
    INAV_SET_RATE_PROFILE = 0x2008
    INAV_AIR_SPEED = 0x2009
    INAV_OUTPUT_MAPPING = 0x200A

    INAV_MIXER = 0x2010
    INAV_SET_MIXER = 0x2011

    INAV_OSD_LAYOUTS = 0x2012
    INAV_OSD_SET_LAYOUT_ITEM = 0x2013
    INAV_OSD_ALARMS = 0x2014
    INAV_OSD_SET_ALARMS = 0x2015
    INAV_OSD_PREFERENCES = 0x2016
    INAV_OSD_SET_PREFERENCES = 0x2017

    INAV_MC_BRAKING = 0x200B
    INAV_SET_MC_BRAKING = 0x200C

    INAV_SELECT_BATTERY_PROFILE = 0x2018

    INAV_DEBUG = 0x2019

    INAV_BLACKBOX_CONFIG = 0x201A
    INAV_SET_BLACKBOX_CONFIG = 0x201B

    INAV_SENSOR_CONFIG = 0x201C
    INAV_SET_SENSOR_CONFIG = 0x201D
    INAV_TEMPERATURES = 0x201E

    INAV_SERVO_MIXER = 0x2020
    INAV_SET_SERVO_MIXER = 0x2021
    INAV_LOGIC_CONDITIONS = 0x2022
    INAV_SET_LOGIC_CONDITIONS = 0x2023
    INAV_LOGIC_CONDITIONS_STATUS = 0x2026

    INAV_PID = 0x2030
    INAV_SET_PID = 0x2031

    INAV_OPFLOW_CALIBRATION = 0x203

state

Data classes that store parsed information from a MSP message.

Config dataclass

Config(api_version: str = '0.0.0', flight_controller_identifier: str = '', flight_controller_version: str = '', version: int = 0, build_info: str = '', multi_type: int = 0, msp_version: int = 0, capability: int = 0, cycle_time: int = 0, i2c_error: int = 0, active_sensors: int = 0, mode: int = 0, profile: int = 0, uid: list = , accelerometer_trims: list = , name: str = '', display_name: str = 'pilot', num_profiles: int = 3, rate_profile: int = 0, board_type: int = 0, arming_disable_count: int = 0, arming_disable_flags: int = 0, arming_disabled: bool = False, runaway_takeoff_prevention_disabled: bool = False, board_identifier: str = '', board_version: int = 0, comm_capabilities: int = 0, target_name: str = '', board_name: str = '', manufacturer_id: str = '', signature: list = , mcu_type_id: int = 255)

Source code in bonfo/msp/state.py
@dataclass
class Config:
    api_version: str = "0.0.0"
    flight_controller_identifier: str = ''
    flight_controller_version: str = ''
    version: int = 0
    build_info: str = ''
    multi_type: int = 0
    msp_version: int = 0
    capability: int = 0
    cycle_time: int = 0
    i2c_error: int = 0
    active_sensors: int = 0
    mode: int = 0
    profile: int = 0
    uid: list = field(default_factory=lambda: [0, 0, 0])
    accelerometer_trims: list = field(default_factory=lambda: [0, 0])
    name: str = ''
    display_name: str = 'pilot'
    num_profiles: int = 3
    rate_profile: int = 0
    board_type: int = 0
    arming_disable_count: int = 0
    arming_disable_flags: int = 0
    arming_disabled: bool = False
    runaway_takeoff_prevention_disabled: bool = False
    board_identifier: str = ""
    board_version: int = 0
    comm_capabilities: int = 0
    target_name: str = ""
    board_name: str = ""
    manufacturer_id: str = ""
    signature: list = field(default_factory=lambda: [])
    mcu_type_id: int = 255

    @property
    def is_inav(self):
        return "INAV" in self.flight_controller_identifier

DataHandler dataclass

DataHandler(msp_version: int = 1, state: float = 0, flags: int = 0, message_direction: int = -1, code: int = 0, data_view: int = 0, message_length_expected: int = 0, message_length_received: int = 0, message_buffer: list = , message_buffer_uint8_view: list = , message_checksum: int = 0, messageIsJumboFrame: bool = False, crcError: bool = False, callbacks: list = , packet_error: int = 0, unsupported: int = 0, last_received_timestamp: Optional[float] = None, listeners: list = )

Source code in bonfo/msp/state.py
@dataclass
class DataHandler:
    msp_version: int = 1
    state: float = 0
    flags: int = 0
    message_direction: int = -1
    code: int = 0
    data_view: int = 0
    message_length_expected: int = 0
    message_length_received: int = 0
    message_buffer: list = field(default_factory=lambda: [])
    message_buffer_uint8_view: list = field(default_factory=lambda: [])
    message_checksum: int = 0
    messageIsJumboFrame: bool = False
    crcError: bool = False
    callbacks: list = field(default_factory=lambda: [])
    packet_error: int = 0
    unsupported: int = 0
    last_received_timestamp: Optional[float] = None
    listeners: list = field(default_factory=lambda: [])
    # TODO: state property with data class wizard that tracks field changes and logs them?

RcTuning (YAMLWizard) dataclass

RcTuning(rc_rate: float = 0, rc_expo: float = 0, roll_pitch_rate: float = 0, roll_rate: float = 0, pitch_rate: float = 0, yaw_rate: float = 0, dynamic_thr_pid: float = 0, throttle_mid: float = 0, throttle_expo: float = 0, dynamic_thr_breakpoint: float = 0, rc_yaw_expo: float = 0, rcyawrate: float = 0, rcpitchrate: float = 0, rc_pitch_expo: float = 0, roll_rate_limit: int = 1998, pitch_rate_limit: int = 1998, yaw_rate_limit: int = 1998)

Source code in bonfo/msp/state.py
@dataclass
class RcTuning(YAMLWizard):
    # May allow for saving the raw byte value to get more percission than the float value conversion?
    # TODO: decimals in the future? store actual byte values and convert as needed?
    rc_rate: float = 0
    rc_expo: float = 0
    roll_pitch_rate: float = 0  # pre 1.7 api only
    roll_rate: float = 0
    pitch_rate: float = 0
    yaw_rate: float = 0
    dynamic_thr_pid: float = 0
    throttle_mid: float = 0
    throttle_expo: float = 0
    dynamic_thr_breakpoint: float = 0
    rc_yaw_expo: float = 0
    rcyawrate: float = 0
    rcpitchrate: float = 0
    rc_pitch_expo: float = 0
    roll_rate_limit: int = 1998
    pitch_rate_limit: int = 1998
    yaw_rate_limit: int = 1998

    def apply_struct(self, data):
        self.struct.parse(data)
        breakpoint()

RxConfig dataclass

RxConfig(serialrx_provider: int = 0, stick_max: int = 0, stick_center: int = 0, stick_min: int = 0, spektrum_sat_bind: int = 0, rx_min_usec: int = 0, rx_max_usec: int = 0, rc_interpolation: int = 0, rc_interpolation_interval: int = 0, rc_interpolation_channels: int = 0, air_mode_activate_threshold: int = 0, rx_spi_protocol: int = 0, rx_spi_id: int = 0, rx_spi_rf_channel_count: int = 0, fpv_cam_angle_degrees: int = 0, rc_smoothing_type: int = 0, rc_smoothing_input_cutoff: int = 0, rc_smoothing_derivative_cutoff: int = 0, rc_smoothing_input_type: int = 0, rc_smoothing_derivative_type: int = 0)

Source code in bonfo/msp/state.py
@dataclass
class RxConfig:
    serialrx_provider: int = 0
    stick_max: int = 0
    stick_center: int = 0
    stick_min: int = 0
    spektrum_sat_bind: int = 0
    rx_min_usec: int = 0
    rx_max_usec: int = 0
    rc_interpolation: int = 0
    rc_interpolation_interval: int = 0
    rc_interpolation_channels: int = 0
    air_mode_activate_threshold: int = 0
    rx_spi_protocol: int = 0
    rx_spi_id: int = 0
    rx_spi_rf_channel_count: int = 0
    fpv_cam_angle_degrees: int = 0
    rc_smoothing_type: int = 0
    rc_smoothing_input_cutoff: int = 0
    rc_smoothing_derivative_cutoff: int = 0
    rc_smoothing_input_type: int = 0
    rc_smoothing_derivative_type: int = 0