pymmWave¶
pymmWave is an asynchronous TI mmWave library to expedite mmWave development.
A motivating example:
First get the serial ports which your sensor is using.
$ ls /dev/tty.*
... /dev/tty.SLAB_USBtoUART4 /dev/tty.SLAB_USBtoUART
Insert the serial ports to the config and data arguments, and thats it!
The connection may timeout on connect_config or connect_data. The function also may timeout on a read or write after supposedly connecting to the device. If this occurs, verify your serial port configuration, as they may need to be swapped. On Windows, this can be viewed on device manager. This currently may fail on WSL due to their serial support, but this may change. As of v1.1.2, the application will attempt to renegotiate COM connections if they have been misconfigured. It will not raise an error, but will log this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | from pymmWave.utils import load_cfg_file
from pymmWave.sensor import Sensor
from pymmWave.IWR6843AOP import IWR6843AOP
from asyncio import get_event_loop, sleep
sensor1 = IWR6843AOP("1", verbose=False)
file = load_cfg_file("./example_configs/20fpsspeedy.cfg")
# Your CONFIG serial port name
config_connected = sensor1.connect_config('/dev/tty.SLAB_USBtoUART4', 115200)
if not config_connected:
print("Config connection failed.")
exit()
# Your DATA serial port name
data_connected = sensor1.connect_data('/dev/tty.SLAB_USBtoUART', 921600)
if not data_connected:
print("Data connection failed.")
exit()
if not sensor1.send_config(file, max_retries=1):
print("Sending configuration failed")
exit()
# Sets up doppler filtering to remove static noise
sensor1.configure_filtering(.05)
# Basic class to print data from the sensor
async def print_data(sens: Sensor):
await sleep(2)
while True:
print(await sens.get_data()) # get_data() -> DopplerPointCloud.
event_loop = get_event_loop()
event_loop.create_task(sensor1.start_sensor())
event_loop.create_task(print_data(sensor1))
event_loop.run_forever()
|
This simple working example demonstrates how with simple asynchronous integration, realtime applications can integrate with TI mmWave sensors easily.
This library was designed to work with the commercial off-the-shelf (COTS) TI IWR6843AOPEVM. While TI does provide a web interface to this product, there were no straightforward Python implementations available.
The goal of this library is such that if you purchase this EVM, within minutes of unboxing, it should be hackable in Python. This library is designed to scale with your use case. In future versions, it will support various algorithms reliant on the abstract implementations presented.
It should be noted that this package requires python >3.9. This is solely due to type hinting. It should be simple to clone the github, and modify the package to support typing classes. We expect this can be accomplished with Python >3.7, though there are no plans to modify the library to support this use-case.
Version 1.1.0 brings a number of simple algorithms. This now supports generic algorithms, and presents some simple, non-optimized algorithms. These algorithms allow for pose estimation, mocking an IMU, and a simple point cloud mean.
Install¶
pip install numpy scipy pyserial
pip install pymmwave
Roadmap¶
Currently, this is a simple library providing low-overhead integration with COTS TI mmWave sensors.
The goal is to provide a set of useful RADAR optimized algorithms to expedite research and development. These applications will rely on the abstract Sensor implementations listed below, such that they can be used by developers writing custom RADAR sensor implementations.
We currently do not modify the firmware of the TI RADAR sensors and have no intention to provide support for this. While this tool can be easily modified for custom firmware, that is not the intention of this tool.
We are aware that the COTS firmware can return more data than simply point cloud data, and we intend to support this use case in the future.
Contributors¶
Thanks to the University of California, Santa Barbara, and AeroVironment for providing the backing for this tools creation.
Notably, Jackie Burd, Tiffany Cowan, Peter Feghali, Yogananda Isukapalli, Cher Lin, Swetha Pillai, Scott Rasmussen, and Phil Tokumaru.
IWR6843AOPEVM Implementation¶
-
class
pymmWave.IWR6843AOP.
IWR6843AOP
(name: str, verbose: bool = False)¶ Abstract
Sensor
class implementation for interfacing with the COTS TI IWR6843AOP evaluation board. Can be initialized with a public ‘name’, which can be used for sensor reference. This class supports point-cloud retrieval with doppler information on a per-point basis. The class can also be designed to automatically filter by doppler data to limit noisy data points.-
configure_filtering
(doppler_filtering: float = 0) → bool¶ Sets basic doppler filtering to allow for static noise removal. Doppler filtering sets a floor for doppler results, to remove points less than the input.
- Parameters
doppler_filtering (float, optional) – Doppler removal level, recommended to be fairly low. Defaults to 0.
- Returns
success
- Return type
bool
-
connect_config
(com_port: str, baud_rate: int, timeout: int = 1) → bool¶ Connect to the config port. Must be done before sending config. This function will timeout after a second by default. This timeout period is low since programmatically connecting to serial ports might be difficult with long timeout periods, as it is difficult to know apriori if you are connecting to config or data.
- Parameters
com_port (str) – Port name to use
baud_rate (int) – Baud rate
timeout (int, optional) – Timeout. Defaults to 1.
- Returns
True if successful
- Return type
bool
Example
On MacOS, for example:
>>> my_sensor.connect_config('/dev/tty.SLAB_USBtoUART4', 115200) True
-
connect_data
(com_port: str, baud_rate: int, timeout: int = 1) → bool¶ Connect the data serial port. Must be done before sending config.
- Parameters
com_port (str) – Port name to use
baud_rate (int) – Baud rate
timeout (int, optional) – Timeout. Defaults to 1.
- Returns
True if successful
- Return type
bool
Example
On MacOS, for example:
>>> my_sensor.connect_data('/dev/tty.SLAB_USBtoUART', 921600) True
-
async
get_data
() → pymmWave.data_model.DopplerPointCloud¶ Returns data when it is ready. This function also updates the frequency measurement of the sensor. This function is blocking.
- Returns
[description]
- Return type
-
get_data_nowait
() → Optional[pymmWave.data_model.DopplerPointCloud]¶ Returns data if it is ready, otherwise none. This function also updates the frequency measurement of the sensor if data is available.
- Returns
Data if there is data available, otherwise returns None.
- Return type
Optional[DopplerPointCloud]
-
get_update_freq
() → float¶ Returns the frequency that the sensor is returning data at. This is not equivalent to the true capacity of the sensor, but rather the rate which the application is successfully getting data.
- Returns
Hz
- Return type
float
-
is_alive
() → bool¶ Getter which verifies connection to this particular sensor.
- Returns
True if the sensor is still connected.
- Return type
bool
-
model
() → str¶ Returns the particular model number supported with this class.
- Returns
“IWR6843AOP”
- Return type
str
-
send_config
(config: list, max_retries: int = 1, autoretry_cfg_data: bool = True) → bool¶ Tried to send a TI config, with a simple retry mechanism. Configuration files can be created here: https://dev.ti.com/gallery/view/mmwave/mmWave_Demo_Visualizer/ver/3.5.0/. Future support may be built for creating configuration files. This can be setup to autoretry on connection failure. With single device setups, this may allow for automated search of devices by users.
- Parameters
config (list[str]) – List of strings making up the config
max_retries (int, optional) – Number of times to retry on failure. Defaults to 1.
autoretry_cfg_data (bool, optional) – If on failure to try to swap cfg/data COM ports. Defaults to True.
- Returns
If sending was successful
- Return type
bool
- Raises
SerialException – If device is disconnected before completion, SerialExceptions may be raised.
-
async
start_sensor
() → None¶ Starts the sensor and will place data into a queue. The goal of this function is to manage the state of the entire application. Nothing will happen if this function is not run with asyncio. This function attempts to read data from the sensor as quickly as it can, then extract positional+doppler data, and place data into an asyncio.Queue. Since this relies on the asyncio Queue, limitations may stem from asyncio. These issues, mainly revolving around thread safety, can be dealt with at the application layer.
This function also actively attempts to context switch between intervals to minimize overhead.
- Raises
Exception – If sensor has some failure, will throw a SerialException.
-
stop_sensor
()¶ This function attempts to close all serial ports and update internal state booleans.
-
type
() → pymmWave.sensor.Sensor.SensorType¶ Returns enum
Sensor.SensorType
- Returns
Sensor.SensorType.POINT_CLOUD3D
-
Abstract Sensor Class¶
-
exception
pymmWave.sensor.
InvalidSensorException
(message: str, errors: str)¶
-
class
pymmWave.sensor.
Sensor
¶ Base sensor class. The goal of this class implementation is such that users can implement classes which can then be used with our library of algorithms easily.
-
class
SensorType
(value)¶ Enum class for sensor types
-
error
(*args: Any, **kwargs: Any) → None¶ Report an error to the logger.
-
abstract async
get_data
() → pymmWave.data_model.DataModel¶ Return data from sensor.
-
abstract
get_data_nowait
() → Optional[pymmWave.data_model.DataModel]¶ Return data from sensor if available, otherwise None.
- Returns
Data if there is data available, otherwise returns None.
- Return type
Optional[DopplerPointCloud]
-
abstract
get_update_freq
() → float¶ Returns the sensor update freq. This is reccommended to be the actual rate of data access.
-
abstract
is_alive
() → bool¶ Check if sensor is still alive
- Returns
True if alive
- Return type
bool
-
log
(*args: Any, **kwargs: Any) → None¶ Log something to the logger.
-
abstract
model
() → str¶ Return the model of a sensor
- Returns
Name of sensor
- Return type
str
-
set_logger
(new_logger: pymmWave.logging.Logger)¶ Replace the default stdout logger with another.
- Parameters
new_logger (Logger) – The logger to use. Must implement Logger base class.
-
abstract async
start_sensor
() → None¶ Asynchronous loop that can be run as a corouting with asyncio, or other asynchronous libraries.
- Returns
Nothing, will be run as a coroutine!
-
abstract
stop_sensor
()¶ Stop sensor
- Returns
Nothing, kills everything
-
abstract
type
() → pymmWave.sensor.Sensor.SensorType¶ Return the type of a sensor
- Returns
Type of sensor
- Return type
str
-
class
-
class
pymmWave.sensor.
SpatialSensor
(sens: pymmWave.sensor.Sensor, location: tuple, pitch_rads: tuple)¶ Wrapper to provide the notion of a sensor in space
Algorithms¶
-
class
pymmWave.algos.
Algorithm
¶ Base abstract class for all algorithms.
-
error
(*args: Any, **kwargs: Any) → None¶ Report an error to the logger.
-
log
(*args: Any, **kwargs: Any) → None¶ Log something to the logger.
-
abstract
reset
() → None¶ Reset the state of an algorithm.
-
set_logger
(new_logger: pymmWave.logging.Logger)¶ Replace the default stdout logger with another.
- Parameters
new_logger (Logger) – The logger to use. Must implement Logger base class.
-
-
class
pymmWave.algos.
CloudEstimatedIMU
¶ The goal of this class is to provide an averaged IMU output from an input point cloud data. It is highly reccomended to set the minium number of required points to be relatively high. It is wholly reasonable to state that if there are less datapoints, then it is likely that the device is not moving. This estimates IMU state based on an average of the input points, if there is enough data.
-
modify_minimum_datapoints
(val: int) → None¶ Modifies the number of required datapoints to generate data.
- Parameters
val (int) – Any integer, negative indicates all values should be accepted.
-
reset
() → None¶ Reset the state of an algorithm.
-
run
(data: pymmWave.data_model.DopplerPointCloud) → Optional[pymmWave.data_model.ImuVelocityData]¶ Accepts a doppler point cloud, and generates estimated linear and angular velocity.
- Parameters
data (DopplerPointCloud) – Cloud of data, must be of size more than the minimum to be used.
- Returns
If there are not enough data points, None. Otherwise returns an estimate of the IMU.
- Return type
Optional[ImuVelocityData]
-
-
class
pymmWave.algos.
EstimatedRelativePosition
¶ Simple computed estimate of pose with provided IMU velocity.
- Parameters
Algorithm ([type]) – [description]
-
reset
() → None¶ Reset the state of an algorithm.
-
run
(imu_vel: pymmWave.data_model.ImuVelocityData, t_factor: float = 1, is_moving: bool = True) → pymmWave.data_model.Pose¶ Given IMU velocity and an arbitrary optional factor, estimate current pose. This also accepts more user metadata, specifically if the sensor is actually moving. This may not be known, but if it is, then should be set correctly. Due to the uncertainty of the RADAR sensor, this additional data will help ensure that in complex scenes the algorithm does not misunderstand its movement. It remains important to call this function even if is_moving is false, to update the frequency of the function call. :param imu_vel: IMU Velocity object :type imu_vel: ImuVelocityData :param t_factor: Optional factor to hand-tune this estimate. Simply a multiplier of time. Defaults to 1. :type t_factor: float, optional :param is_moving: A boolean which can be set to false if the user knows that the sensor is not moving. Defaults to True. :type is_moving: bool, optional
- Returns
An estimate of the current pose
- Return type
-
class
pymmWave.algos.
IMUAdjustedPersistedData
(steps_to_persist: int)¶ The goal of this class is to adjust data points based on some IMU input, and persist data points through time to retain more data. This class will keep track of when data is fed into the algorithm, and will attempt to persist state even when some data is unavailable or poor. 0 in construction will not allow any persistence
-
change_persisted_steps
(new_steps: int) → bool¶ Will attempt to change the number of steps to persist. Will return True if successful.
- Parameters
new_steps (int) – Should be greater than 0.
- Returns
Boolean representing success
- Return type
bool
-
reset
() → None¶ Reset the state of this algorithm, reset the state of memory, and reset the time it was last called.
-
run
(input_cloud: pymmWave.data_model.DopplerPointCloud, imu_in: pymmWave.data_model.ImuVelocityData) → pymmWave.data_model.DopplerPointCloud¶ Given the current Doppler point cloud with IMU data, this function will return a time adjusted point cloud. This point cloud will include previously entered point_clouds, which will only be as accurate as the data provided. Accuracy can be improved by modifying the allowable persistable steps.
- Parameters
input_cloud (DopplerPointCloud) – [description]
imu_in (ImuData) – [description]
- Returns
[description]
- Return type
-
-
class
pymmWave.algos.
SimpleMeanDistance
¶ An implementation of a simple point cloud algorithm. Computes the average L2 norm of the input data.
-
reset
() → None¶ Reset the state of an algorithm.
-
run
(input: pymmWave.data_model.DopplerPointCloud) → float¶ Takes in a point cloud and returns the mean. Stateless.
- Parameters
input (DopplerPointCloud) – Input point cloud, ignores doppler data.
- Returns
A float representing the mean from this algorithm.
- Return type
MeanFloatValue
-
Logging¶
-
class
pymmWave.logging.
Logger
¶ Base abstract class for all algorithms.
-
abstract
error
(*args: Any, **kwargs: Any) → None¶ Errors the input.
-
abstract
log
(*args: Any, **kwargs: Any) → None¶ Logs the input.
-
abstract
Data Model¶
-
class
pymmWave.data_model.
DopplerPointCloud
(data: numpy.ndarray)¶ Fairly lightweight class for X, Y, Z, and doppler data.
-
append
(other: pymmWave.data_model.DopplerPointCloud) → bool¶ Append another DopplerPointCloud object to this one in-place
- Parameters
other (DopplerPointCloud) – Another object of the same type
- Returns
If success, true.
- Return type
bool
-
get
() → numpy.ndarray¶ Gets the underlying data container.
- Returns
The underlying data matrix.
- Return type
np.ndarray
-
translate_rotate
(location: tuple, pitch_rads: scipy.spatial.transform.rotation.Rotation)¶ Translates and rotates the underlying object. This is done in-place, no further verification is done.
- Parameters
location (Tuple[float, float, float]) – Tuple of float values to shift the underlying data with: (x meters, y meters, z meters)
pitch_rads (Rotation) – Rotation matrix object from scipy.spatial.transform.rotation.Rotation
-
-
class
pymmWave.data_model.
ImuData
(altitude: float, dxdydz: tuple, drolldpitchdyaw: tuple, heading: float)¶ -
get
() → scipy.spatial.transform.rotation.Rotation¶ This is a get representing the change in roll/pitch/yaw. This will need to be time adjusted to be useful.
- Returns
[description]
- Return type
Rotation
-
-
class
pymmWave.data_model.
ImuVelocityData
(dxdydz: tuple, drolldpitchdyaw: tuple)¶ Class to represent a velocity data point from the IMU.
- Parameters
DataModel ([type]) – [description]
-
get
() → scipy.spatial.transform.rotation.Rotation¶ This is a get representing the change in roll/pitch/yaw. This will need to be time adjusted to be useful.
- Returns
[description]
- Return type
Rotation
-
class
pymmWave.data_model.
Pose
¶ A generic representation of pose. The goal is to support basic operations on representations of current physical state.
-
get
() → tuple¶ Returns a tuple representing x, y, z position, and roll, pitch, yaw.
- Returns
(x, y, z, roll, pitch, yaw)
- Return type
tuple[float, float, float, float, float, float]
-
move
(imu_vel: pymmWave.data_model.ImuVelocityData, time_passed: float)¶ Based on some IMU velocity information and the amount of time which has passed, modify the current pose.
- Parameters
imu_vel (ImuVelocityData) – Input IMU data.
time_passed (float) – Time in seconds.
-
Utilities¶
-
pymmWave.utils.
load_cfg_file
(filepath: str) → list¶ Loads a config file to a list of strings. Wrapper around readlines() with some simple verification.
- Parameters
filepath (str) – Filepath, relative paths should be ok
- Returns
List of lines as str
- Return type
list[str]