pymmWave

pymmWave is an asynchronous TI mmWave library to expedite mmWave development.

A motivating example:

First get the serial ports which your sensor is using.

$ ls /dev/tty.*
... /dev/tty.SLAB_USBtoUART4     /dev/tty.SLAB_USBtoUART

Insert the serial ports to the config and data arguments, and thats it!

The connection may timeout on connect_config or connect_data. The function also may timeout on a read or write after supposedly connecting to the device. If this occurs, verify your serial port configuration, as they may need to be swapped. On Windows, this can be viewed on device manager. This currently may fail on WSL due to their serial support, but this may change. As of v1.1.2, the application will attempt to renegotiate COM connections if they have been misconfigured. It will not raise an error, but will log this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 from pymmWave.utils import load_cfg_file
 from pymmWave.sensor import Sensor
 from pymmWave.IWR6843AOP import IWR6843AOP
 from asyncio import get_event_loop, sleep

 sensor1 = IWR6843AOP("1", verbose=False)
 file = load_cfg_file("./example_configs/20fpsspeedy.cfg")

 # Your CONFIG serial port name
 config_connected = sensor1.connect_config('/dev/tty.SLAB_USBtoUART4', 115200)
 if not config_connected:
     print("Config connection failed.")
     exit()

 # Your DATA serial port name
 data_connected = sensor1.connect_data('/dev/tty.SLAB_USBtoUART', 921600)
 if not data_connected:
     print("Data connection failed.")
     exit()

 if not sensor1.send_config(file, max_retries=1):
     print("Sending configuration failed")
     exit()

 # Sets up doppler filtering to remove static noise
 sensor1.configure_filtering(.05)

 # Basic class to print data from the sensor
 async def print_data(sens: Sensor):
     await sleep(2)
     while True:
         print(await sens.get_data()) # get_data() -> DopplerPointCloud.

 event_loop = get_event_loop()

 event_loop.create_task(sensor1.start_sensor())
 event_loop.create_task(print_data(sensor1))
 event_loop.run_forever()

This simple working example demonstrates how with simple asynchronous integration, realtime applications can integrate with TI mmWave sensors easily.

This library was designed to work with the commercial off-the-shelf (COTS) TI IWR6843AOPEVM. While TI does provide a web interface to this product, there were no straightforward Python implementations available.

The goal of this library is such that if you purchase this EVM, within minutes of unboxing, it should be hackable in Python. This library is designed to scale with your use case. In future versions, it will support various algorithms reliant on the abstract implementations presented.

It should be noted that this package requires python >3.9. This is solely due to type hinting. It should be simple to clone the github, and modify the package to support typing classes. We expect this can be accomplished with Python >3.7, though there are no plans to modify the library to support this use-case.

Version 1.1.0 brings a number of simple algorithms. This now supports generic algorithms, and presents some simple, non-optimized algorithms. These algorithms allow for pose estimation, mocking an IMU, and a simple point cloud mean.

Install

pip install numpy scipy pyserial

pip install pymmwave

Roadmap

Currently, this is a simple library providing low-overhead integration with COTS TI mmWave sensors.

The goal is to provide a set of useful RADAR optimized algorithms to expedite research and development. These applications will rely on the abstract Sensor implementations listed below, such that they can be used by developers writing custom RADAR sensor implementations.

We currently do not modify the firmware of the TI RADAR sensors and have no intention to provide support for this. While this tool can be easily modified for custom firmware, that is not the intention of this tool.

We are aware that the COTS firmware can return more data than simply point cloud data, and we intend to support this use case in the future.

Contributors

Thanks to the University of California, Santa Barbara, and AeroVironment for providing the backing for this tools creation.

Notably, Jackie Burd, Tiffany Cowan, Peter Feghali, Yogananda Isukapalli, Cher Lin, Swetha Pillai, Scott Rasmussen, and Phil Tokumaru.

IWR6843AOPEVM Implementation

class pymmWave.IWR6843AOP.IWR6843AOP(name: str, verbose: bool = False)

Abstract Sensor class implementation for interfacing with the COTS TI IWR6843AOP evaluation board. Can be initialized with a public ‘name’, which can be used for sensor reference. This class supports point-cloud retrieval with doppler information on a per-point basis. The class can also be designed to automatically filter by doppler data to limit noisy data points.

configure_filtering(doppler_filtering: float = 0)bool

Sets basic doppler filtering to allow for static noise removal. Doppler filtering sets a floor for doppler results, to remove points less than the input.

Parameters

doppler_filtering (float, optional) – Doppler removal level, recommended to be fairly low. Defaults to 0.

Returns

success

Return type

bool

connect_config(com_port: str, baud_rate: int, timeout: int = 1)bool

Connect to the config port. Must be done before sending config. This function will timeout after a second by default. This timeout period is low since programmatically connecting to serial ports might be difficult with long timeout periods, as it is difficult to know apriori if you are connecting to config or data.

Parameters
  • com_port (str) – Port name to use

  • baud_rate (int) – Baud rate

  • timeout (int, optional) – Timeout. Defaults to 1.

Returns

True if successful

Return type

bool

Example

On MacOS, for example:

>>> my_sensor.connect_config('/dev/tty.SLAB_USBtoUART4', 115200)
True
connect_data(com_port: str, baud_rate: int, timeout: int = 1)bool

Connect the data serial port. Must be done before sending config.

Parameters
  • com_port (str) – Port name to use

  • baud_rate (int) – Baud rate

  • timeout (int, optional) – Timeout. Defaults to 1.

Returns

True if successful

Return type

bool

Example

On MacOS, for example:

>>> my_sensor.connect_data('/dev/tty.SLAB_USBtoUART', 921600)
True
async get_data()pymmWave.data_model.DopplerPointCloud

Returns data when it is ready. This function also updates the frequency measurement of the sensor. This function is blocking.

Returns

[description]

Return type

DopplerPointCloud

get_data_nowait()Optional[pymmWave.data_model.DopplerPointCloud]

Returns data if it is ready, otherwise none. This function also updates the frequency measurement of the sensor if data is available.

Returns

Data if there is data available, otherwise returns None.

Return type

Optional[DopplerPointCloud]

get_update_freq()float

Returns the frequency that the sensor is returning data at. This is not equivalent to the true capacity of the sensor, but rather the rate which the application is successfully getting data.

Returns

Hz

Return type

float

is_alive()bool

Getter which verifies connection to this particular sensor.

Returns

True if the sensor is still connected.

Return type

bool

model()str

Returns the particular model number supported with this class.

Returns

“IWR6843AOP”

Return type

str

send_config(config: list, max_retries: int = 1, autoretry_cfg_data: bool = True)bool

Tried to send a TI config, with a simple retry mechanism. Configuration files can be created here: https://dev.ti.com/gallery/view/mmwave/mmWave_Demo_Visualizer/ver/3.5.0/. Future support may be built for creating configuration files. This can be setup to autoretry on connection failure. With single device setups, this may allow for automated search of devices by users.

Parameters
  • config (list[str]) – List of strings making up the config

  • max_retries (int, optional) – Number of times to retry on failure. Defaults to 1.

  • autoretry_cfg_data (bool, optional) – If on failure to try to swap cfg/data COM ports. Defaults to True.

Returns

If sending was successful

Return type

bool

Raises

SerialException – If device is disconnected before completion, SerialExceptions may be raised.

async start_sensor()None

Starts the sensor and will place data into a queue. The goal of this function is to manage the state of the entire application. Nothing will happen if this function is not run with asyncio. This function attempts to read data from the sensor as quickly as it can, then extract positional+doppler data, and place data into an asyncio.Queue. Since this relies on the asyncio Queue, limitations may stem from asyncio. These issues, mainly revolving around thread safety, can be dealt with at the application layer.

This function also actively attempts to context switch between intervals to minimize overhead.

Raises

Exception – If sensor has some failure, will throw a SerialException.

stop_sensor()

This function attempts to close all serial ports and update internal state booleans.

type()pymmWave.sensor.Sensor.SensorType

Returns enum Sensor.SensorType

Returns

Sensor.SensorType.POINT_CLOUD3D

Abstract Sensor Class

exception pymmWave.sensor.InvalidSensorException(message: str, errors: str)
class pymmWave.sensor.Sensor

Base sensor class. The goal of this class implementation is such that users can implement classes which can then be used with our library of algorithms easily.

class SensorType(value)

Enum class for sensor types

error(*args: Any, **kwargs: Any)None

Report an error to the logger.

abstract async get_data()pymmWave.data_model.DataModel

Return data from sensor.

abstract get_data_nowait()Optional[pymmWave.data_model.DataModel]

Return data from sensor if available, otherwise None.

Returns

Data if there is data available, otherwise returns None.

Return type

Optional[DopplerPointCloud]

abstract get_update_freq()float

Returns the sensor update freq. This is reccommended to be the actual rate of data access.

abstract is_alive()bool

Check if sensor is still alive

Returns

True if alive

Return type

bool

log(*args: Any, **kwargs: Any)None

Log something to the logger.

abstract model()str

Return the model of a sensor

Returns

Name of sensor

Return type

str

set_logger(new_logger: pymmWave.logging.Logger)

Replace the default stdout logger with another.

Parameters

new_logger (Logger) – The logger to use. Must implement Logger base class.

abstract async start_sensor()None

Asynchronous loop that can be run as a corouting with asyncio, or other asynchronous libraries.

Returns

Nothing, will be run as a coroutine!

abstract stop_sensor()

Stop sensor

Returns

Nothing, kills everything

abstract type()pymmWave.sensor.Sensor.SensorType

Return the type of a sensor

Returns

Type of sensor

Return type

str

class pymmWave.sensor.SpatialSensor(sens: pymmWave.sensor.Sensor, location: tuple, pitch_rads: tuple)

Wrapper to provide the notion of a sensor in space

Algorithms

class pymmWave.algos.Algorithm

Base abstract class for all algorithms.

error(*args: Any, **kwargs: Any)None

Report an error to the logger.

log(*args: Any, **kwargs: Any)None

Log something to the logger.

abstract reset()None

Reset the state of an algorithm.

set_logger(new_logger: pymmWave.logging.Logger)

Replace the default stdout logger with another.

Parameters

new_logger (Logger) – The logger to use. Must implement Logger base class.

class pymmWave.algos.CloudEstimatedIMU

The goal of this class is to provide an averaged IMU output from an input point cloud data. It is highly reccomended to set the minium number of required points to be relatively high. It is wholly reasonable to state that if there are less datapoints, then it is likely that the device is not moving. This estimates IMU state based on an average of the input points, if there is enough data.

modify_minimum_datapoints(val: int)None

Modifies the number of required datapoints to generate data.

Parameters

val (int) – Any integer, negative indicates all values should be accepted.

reset()None

Reset the state of an algorithm.

run(data: pymmWave.data_model.DopplerPointCloud)Optional[pymmWave.data_model.ImuVelocityData]

Accepts a doppler point cloud, and generates estimated linear and angular velocity.

Parameters

data (DopplerPointCloud) – Cloud of data, must be of size more than the minimum to be used.

Returns

If there are not enough data points, None. Otherwise returns an estimate of the IMU.

Return type

Optional[ImuVelocityData]

class pymmWave.algos.EstimatedRelativePosition

Simple computed estimate of pose with provided IMU velocity.

Parameters

Algorithm ([type]) – [description]

reset()None

Reset the state of an algorithm.

run(imu_vel: pymmWave.data_model.ImuVelocityData, t_factor: float = 1, is_moving: bool = True)pymmWave.data_model.Pose

Given IMU velocity and an arbitrary optional factor, estimate current pose. This also accepts more user metadata, specifically if the sensor is actually moving. This may not be known, but if it is, then should be set correctly. Due to the uncertainty of the RADAR sensor, this additional data will help ensure that in complex scenes the algorithm does not misunderstand its movement. It remains important to call this function even if is_moving is false, to update the frequency of the function call. :param imu_vel: IMU Velocity object :type imu_vel: ImuVelocityData :param t_factor: Optional factor to hand-tune this estimate. Simply a multiplier of time. Defaults to 1. :type t_factor: float, optional :param is_moving: A boolean which can be set to false if the user knows that the sensor is not moving. Defaults to True. :type is_moving: bool, optional

Returns

An estimate of the current pose

Return type

Pose

class pymmWave.algos.IMUAdjustedPersistedData(steps_to_persist: int)

The goal of this class is to adjust data points based on some IMU input, and persist data points through time to retain more data. This class will keep track of when data is fed into the algorithm, and will attempt to persist state even when some data is unavailable or poor. 0 in construction will not allow any persistence

change_persisted_steps(new_steps: int)bool

Will attempt to change the number of steps to persist. Will return True if successful.

Parameters

new_steps (int) – Should be greater than 0.

Returns

Boolean representing success

Return type

bool

reset()None

Reset the state of this algorithm, reset the state of memory, and reset the time it was last called.

run(input_cloud: pymmWave.data_model.DopplerPointCloud, imu_in: pymmWave.data_model.ImuVelocityData)pymmWave.data_model.DopplerPointCloud

Given the current Doppler point cloud with IMU data, this function will return a time adjusted point cloud. This point cloud will include previously entered point_clouds, which will only be as accurate as the data provided. Accuracy can be improved by modifying the allowable persistable steps.

Parameters
Returns

[description]

Return type

DopplerPointCloud

class pymmWave.algos.SimpleMeanDistance

An implementation of a simple point cloud algorithm. Computes the average L2 norm of the input data.

reset()None

Reset the state of an algorithm.

run(input: pymmWave.data_model.DopplerPointCloud)float

Takes in a point cloud and returns the mean. Stateless.

Parameters

input (DopplerPointCloud) – Input point cloud, ignores doppler data.

Returns

A float representing the mean from this algorithm.

Return type

MeanFloatValue

Logging

class pymmWave.logging.Logger

Base abstract class for all algorithms.

abstract error(*args: Any, **kwargs: Any)None

Errors the input.

abstract log(*args: Any, **kwargs: Any)None

Logs the input.

class pymmWave.logging.NativeLogger

Logger using python’s logging library. User can set settings out of scope which will persist.

error(*args: Any, **kwargs: Any)None

Errors the input.

log(*args: Any, **kwargs: Any)None

Logs the input.

class pymmWave.logging.StdOutLogger

Very simple print based logger.

error(*args: Any, **kwargs: Any)None

Errors the input.

log(*args: Any, **kwargs: Any)None

Logs the input.

Data Model

class pymmWave.data_model.DataModel

Base Data Class

abstract get()Any

Get the underlying value

class pymmWave.data_model.DopplerPointCloud(data: numpy.ndarray)

Fairly lightweight class for X, Y, Z, and doppler data.

append(other: pymmWave.data_model.DopplerPointCloud)bool

Append another DopplerPointCloud object to this one in-place

Parameters

other (DopplerPointCloud) – Another object of the same type

Returns

If success, true.

Return type

bool

get()numpy.ndarray

Gets the underlying data container.

Returns

The underlying data matrix.

Return type

np.ndarray

translate_rotate(location: tuple, pitch_rads: scipy.spatial.transform.rotation.Rotation)

Translates and rotates the underlying object. This is done in-place, no further verification is done.

Parameters
  • location (Tuple[float, float, float]) – Tuple of float values to shift the underlying data with: (x meters, y meters, z meters)

  • pitch_rads (Rotation) – Rotation matrix object from scipy.spatial.transform.rotation.Rotation

class pymmWave.data_model.ImuData(altitude: float, dxdydz: tuple, drolldpitchdyaw: tuple, heading: float)
get()scipy.spatial.transform.rotation.Rotation

This is a get representing the change in roll/pitch/yaw. This will need to be time adjusted to be useful.

Returns

[description]

Return type

Rotation

class pymmWave.data_model.ImuVelocityData(dxdydz: tuple, drolldpitchdyaw: tuple)

Class to represent a velocity data point from the IMU.

Parameters

DataModel ([type]) – [description]

get()scipy.spatial.transform.rotation.Rotation

This is a get representing the change in roll/pitch/yaw. This will need to be time adjusted to be useful.

Returns

[description]

Return type

Rotation

class pymmWave.data_model.Pose

A generic representation of pose. The goal is to support basic operations on representations of current physical state.

get()tuple

Returns a tuple representing x, y, z position, and roll, pitch, yaw.

Returns

(x, y, z, roll, pitch, yaw)

Return type

tuple[float, float, float, float, float, float]

move(imu_vel: pymmWave.data_model.ImuVelocityData, time_passed: float)

Based on some IMU velocity information and the amount of time which has passed, modify the current pose.

Parameters
  • imu_vel (ImuVelocityData) – Input IMU data.

  • time_passed (float) – Time in seconds.

Utilities

pymmWave.utils.load_cfg_file(filepath: str)list

Loads a config file to a list of strings. Wrapper around readlines() with some simple verification.

Parameters

filepath (str) – Filepath, relative paths should be ok

Returns

List of lines as str

Return type

list[str]

Indices and tables