Source code for py_websockets_bot

# Copyright (c) 2014, Dawn Robotics Ltd
# All rights reserved.

# Redistribution and use in source and binary forms, with or without 
# modification, are permitted provided that the following conditions are met:

# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.

# 2. Redistributions in binary form must reproduce the above copyright notice, 
# this list of conditions and the following disclaimer in the documentation 
# and/or other materials provided with the distribution.

# 3. Neither the name of the Dawn Robotics Ltd nor the names of its contributors 
# may be used to endorse or promote products derived from this software without 
# specific prior written permission.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""
This package provides the :class:`WebsocketsBot` class which uses websockets to communicate with
and control a robot running the `raspberry_pi_camera_bot <https://bitbucket.org/DawnRobotics/raspberry_pi_camera_bot/>`_
robot_web_server.py


WebsocketsBot
=============

    .. autoclass:: WebsocketsBot()
        :special-members:
        :members:

CameraStreamingData
===================

    .. autoclass:: CameraStreamingData()
        :members:
        
ImageReaderProcess
==================

    .. autoclass:: ImageReaderProcess()
        :members:
        
MotionVectorsReaderProcess
==========================

    .. autoclass:: MotionVectorsReaderProcess()
        :members:
"""

import math
import time
import json
import httplib
import websocket
import multiprocessing

import robot_config
import mini_driver

try:
    import numpy as np
    import cv2
except ImportError:
    print "Error: Both numpy and OpenCV (cv2) are required"
    numpy = object()    # Mock up for docs build
    cv2 = object()

#---------------------------------------------------------------------------------------------------
[docs]class ImageReaderProcess( multiprocessing.Process ): #----------------------------------------------------------------------------------------------- def __init__( self, hostname, camera_port, image_queue, stream_small_images=False, max_update_rate_hz=60.0 ): multiprocessing.Process.__init__( self ) self._hostname = hostname self._camera_port = camera_port self._image_queue = image_queue self._stream_small_images = stream_small_images self._max_update_rate_hz = max_update_rate_hz if self._max_update_rate_hz <= 0.0: self._max_update_rate_hz = 1.0 self._exit_event = multiprocessing.Event() #----------------------------------------------------------------------------------------------- def shutdown( self ): self._exit_event.set() #----------------------------------------------------------------------------------------------- def run( self ): # Run in a loop continually getting images from the server # TODO: Convert to using action=streaming while not self._exit_event.is_set(): start_time = time.time() image_data = None try: # Make the request to the web server connection = httplib.HTTPConnection( self._hostname, self._camera_port, timeout=3.0 ) if self._stream_small_images: connection.request( "GET", "/small_image/?action=snapshot" ) else: connection.request( "GET", "/?action=snapshot" ) response = connection.getresponse() if response.status == httplib.OK: image_data = response.read() except: pass # Ignore exceptions which arise from connection errors or invalid data if image_data != None: if not self._image_queue.full(): self._image_queue.put_nowait( ( image_data, time.time() ) ) else: pass #if self._stream_small_images: #print "Dropped small image frame" #else: #print "Dropped image frame" end_time = time.time() sleep_ime = 1.0/self._max_update_rate_hz - (end_time - start_time) if sleep_ime > 0.0: time.sleep( sleep_ime ) self._image_queue.cancel_join_thread() #---------------------------------------------------------------------------------------------------
[docs]class MotionVectorsReaderProcess( multiprocessing.Process ): #----------------------------------------------------------------------------------------------- def __init__( self, hostname, camera_port, motion_vectors_queue, max_update_rate_hz=60.0 ): multiprocessing.Process.__init__( self ) self._hostname = hostname self._camera_port = camera_port self._motion_vectors_queue = motion_vectors_queue self._max_update_rate_hz = max_update_rate_hz if self._max_update_rate_hz <= 0.0: self._max_update_rate_hz = 1.0 self._exit_event = multiprocessing.Event() #----------------------------------------------------------------------------------------------- def shutdown( self ): self._exit_event.set() #----------------------------------------------------------------------------------------------- def run( self ): # Run in a loop continually getting motion vectors from the server # TODO: Convert to using action=streaming while not self._exit_event.is_set(): start_time = time.time() motion_vectors_data = None try: # Make the request to the web server connection = httplib.HTTPConnection( self._hostname, self._camera_port ) connection.request( "GET", "/motion_vectors/?action=snapshot" ) response = connection.getresponse() if response.status == httplib.OK: motion_vectors_data = response.read() except: pass # Ignore exceptions which arise from connection errors or invalid data if motion_vectors_data != None: if not self._motion_vectors_queue.full(): self._motion_vectors_queue.put_nowait( ( motion_vectors_data, time.time() ) ) else: pass #print "Dropped motion vectors frame" end_time = time.time() sleep_ime = 1.0/self._max_update_rate_hz - (end_time - start_time) if sleep_ime > 0.0: time.sleep( sleep_ime ) self._motion_vectors_queue.cancel_join_thread() #---------------------------------------------------------------------------------------------------
[docs]class CameraStreamingData: #----------------------------------------------------------------------------------------------- def __init__( self, data_processing_routine ): self.reader_process = None self.callback = None self.data_queue = multiprocessing.Queue( 5 ) self.last_processed_data = None self.last_processed_data_time = 0.0 self.data_processing_routine = data_processing_routine #---------------------------------------------------------------------------------------------------
[docs]class WebsocketsBot: """Interface class for talking to a robot running the `raspberry_pi_camera_bot <https://bitbucket.org/DawnRobotics/raspberry_pi_camera_bot/>`_ robot_web_server.py using websockets""" TIME_BETWEEN_CAMERA_KEEP_ALIVES = 0.25 #-----------------------------------------------------------------------------------------------
[docs] def __init__( self, hostname="localhost", port=80, camera_port=8080 ): """Constructs a :py:class:`WebsocketsBot` :param str hostname: The ip address of the robot :param int port: The port of the robot's web server :param int camera_port: The port of the web server providing camera images""" self._hostname = hostname self._port = port self._camera_port = camera_port url = "ws://{0}:{1}/robot_control/websocket".format( hostname, port ) print "Connecting to", url self._websocket = websocket.create_connection( url ) # These placeholders keep track of the process of streaming different types of data from # the robot's camera self._streaming_data = { "image" : CameraStreamingData( self._process_raw_image_data ), "small_image" : CameraStreamingData( self._process_raw_image_data ), "motion_vector" : CameraStreamingData( self._process_raw_motion_vectors_data ) } self._last_camera_keep_alive_time = 0.0 #-----------------------------------------------------------------------------------------------
[docs] def disconnect( self ): """This routine should be called when the :py:class:`WebsocketsBot` is no longer needed""" for data_name in self._streaming_data: self._stop_streaming( data_name ) #-----------------------------------------------------------------------------------------------
[docs] def start_streaming_camera_images( self, image_callback=None ): """This routine must be called to start streaming images from the robot's camera. Images can either be obtained in a blocking fashion, by calling :py:func:`get_latest_camera_image`, or an image callback can be provided. :param func image_callback: A function to call with an image and timestamp when \ an image is obtained.""" def create_streaming_process( data_queue ): return ImageReaderProcess( self._hostname, self._camera_port, data_queue ) self._start_streaming( "image", create_streaming_process, image_callback ) #-----------------------------------------------------------------------------------------------
[docs] def stop_streaming_camera_images( self ): """Stops streaming images from the robot's camera""" self._stop_streaming( "image" ) #-----------------------------------------------------------------------------------------------
[docs] def get_latest_camera_image( self, max_image_age=None ): """This routine will block until an image is obtained from the robot's camera. :param float max_image_age: Specifies the maximum age of the image to retrieve in seconds. \ If this is None, then the routine will retrieve the first available image. If the currently \ available image is older than this age then the routine blocks until a newer image is retrieved. :return: a tuple containing the image and its timestamp :raises Exception: if :py:func:`start_streaming_camera_images` has not been called""" return self._get_latest_streaming_data( "image", "start_streaming_camera_images", max_image_age ) #-----------------------------------------------------------------------------------------------
[docs] def start_streaming_small_camera_images( self, image_callback=None ): """This routine must be called to start streaming 'small' images from the robot's camera. Images can either be obtained in a blocking fashion, by calling :py:func:`get_latest_small_camera_image`, or an image callback can be provided. A small image is the normal camera image, reduced to a size of 160x120. Processing smaller images can give a large speed up to a lot of computer vision algorithms, and this is very important when running on a computationally constrained platform such as the Raspberry Pi. :param func image_callback: A function to call with an image and timestamp when \ an image is obtained.""" def create_streaming_process( data_queue ): return ImageReaderProcess( self._hostname, self._camera_port, data_queue, stream_small_images=True ) self._start_streaming( "small_image", create_streaming_process, image_callback ) #-----------------------------------------------------------------------------------------------
[docs] def stop_streaming_small_camera_images( self ): """Stops streaming 'small' images from the robot's camera""" self._stop_streaming( "small_image" ) #-----------------------------------------------------------------------------------------------
[docs] def get_latest_small_camera_image( self, max_image_age=None ): """This routine will block until an image is obtained from the robot's camera. :param float max_image_age: Specifies the maximum age of the image to retrieve in seconds. \ If this is None, then the routine will retrieve the first available image. If the currently \ available image is older than this age then the routine blocks until a newer image is retrieved. :return: a tuple containing the image and its timestamp :raises Exception: if :py:func:`start_streaming_small_camera_images` has not been called""" return self._get_latest_streaming_data( "small_image", "start_streaming_small_camera_images", max_image_age ) #-----------------------------------------------------------------------------------------------
[docs] def start_streaming_motion_vectors( self, motion_vectors_callback=None ): """This routine must be called to start streaming motion vectors from the robot's camera. Motion vectors can either be obtained in a blocking fashion, by calling :py:func:`get_latest_motion_vectors`, or a callback can be provided. :param func motion_vectors_callback: A function to call with motion vectors and a timestamp when \ a new batch of motion vectors is obtained.""" def create_streaming_process( data_queue ): return MotionVectorsReaderProcess( self._hostname, self._camera_port, data_queue ) self._start_streaming( "motion_vector", create_streaming_process, motion_vectors_callback ) #-----------------------------------------------------------------------------------------------
[docs] def stop_streaming_motion_vectors( self ): """Stops streaming motion vectors from the robot's camera""" self._stop_streaming( "motion_vector" ) #-----------------------------------------------------------------------------------------------
[docs] def get_latest_motion_vectors( self, max_motion_vectors_age=None ): """This routine will block until an image is obtained from the robot's camera. :param float max_motion_vectors_age: Specifies the maximum age of the motion vectors to retrieve \ in seconds. If this is None, then the routine will retrieve the first available set of motion \ vectors. If the currently available motion vectors are older than this age then the routine blocks \ until a newer set of motion vectors is retrieved. :return: a tuple containing the motion vectors and a timestamp :raises Exception: if :py:func:`stop_streaming_motion_vectors` has not been called""" return self._get_latest_streaming_data( "motion_vector", "stop_streaming_motion_vectors", max_motion_vectors_age ) #-----------------------------------------------------------------------------------------------
def _start_streaming( self, data_name, create_streaming_process_func, callback=None ): streaming_data = self._streaming_data[ data_name ] if streaming_data.reader_process == None: streaming_data.reader_process = create_streaming_process_func( streaming_data.data_queue ) streaming_data.reader_process.start() streaming_data.callback = callback self._update_camera_keep_alive() #----------------------------------------------------------------------------------------------- def _stop_streaming( self, data_name ): streaming_data = self._streaming_data[ data_name ] if streaming_data.reader_process != None: # Start the shutdown streaming_data.reader_process.shutdown() # Clear out the data queue while not streaming_data.data_queue.empty(): streaming_data.data_queue.get_nowait() # Wait for the process to finish streaming_data.reader_process.join() # Clear out the process and callback streaming_data.reader_process = None streaming_data.callback = None # Double check that the queue is clear while not streaming_data.data_queue.empty(): streaming_data.data_queue.get_nowait() #----------------------------------------------------------------------------------------------- def _get_latest_streaming_data( self, data_name, start_streaming_routine_name, max_data_age=None ): streaming_data = self._streaming_data[ data_name ] if streaming_data.reader_process == None: raise Exception( "Trying to get data before {0} has been called".format( start_streaming_routine_name ) ) processed_data = streaming_data.last_processed_data processed_data_time = streaming_data.last_processed_data_time while processed_data == None \ or ( max_data_age != None and time.time() - processed_data_time > max_data_age ): time.sleep( 0.001 ) self.update() processed_data = streaming_data.last_processed_data processed_data_time = streaming_data.last_processed_data_time return processed_data, processed_data_time #----------------------------------------------------------------------------------------------- def _update_camera_keep_alive( self ): # If we're trying to read camera images or motion vectors then we need # to periodically poke the server streaming_data_from_camera = False for data_name in self._streaming_data: if self._streaming_data[ data_name ].reader_process != None: streaming_data_from_camera = True break if streaming_data_from_camera: if time.time() - self._last_camera_keep_alive_time >= self.TIME_BETWEEN_CAMERA_KEEP_ALIVES: self._websocket.send( "StartStreaming" ) self._last_camera_keep_alive_time = time.time() #-----------------------------------------------------------------------------------------------
[docs] def set_motor_speeds( self, left_motor_speed, right_motor_speed ): """Sets the motor speeds of the robot :param float left_motor_speed: Left motor speed from 0 to 100% :param float right_motor_speed: Right motor speed from 0 to 100%""" self._websocket.send( "SetMotorSpeeds {0} {1}".format( left_motor_speed, right_motor_speed ) ) #-----------------------------------------------------------------------------------------------
[docs] def set_neck_angles( self, pan_angle_degrees, tilt_angle_degrees ): """Sets the neck angles of the robot in degrees. The centre point for each motor is 90 degrees :param float pan_angle_degrees: Pan servo angle from 0 to 180 degrees :param float tilt_angle_degrees: Tilt servo angle from 0 to 180 degrees""" self._websocket.send( "SetNeckAngles {0} {1}".format( pan_angle_degrees, tilt_angle_degrees ) ) #-----------------------------------------------------------------------------------------------
[docs] def set_motor_joystick_pos( self, joystickX, joystickY ): """Passes a movement joystick command to the robot. The magnitude of the joystick input should be less than or equal to 1.0. So :math:`\sqrt{ \mathrm{joystickX}^2 + \mathrm{joystickY}^2 } \le 1.0,` however, if it's not then it will be constrained automatically. :param float joystickX: Joystick X input from -1.0 (left) to 1.0 (right) :param float joystickY: Joystick Y input from -1.0 (backwards) to 1.0 (forwards)""" self._websocket.send( "Move {0} {1}".format( joystickX, joystickY ) ) #-----------------------------------------------------------------------------------------------
[docs] def set_neck_joystick_pos( self, joystickX, joystickY ): """Passes a neck joystick command to the robot. The magnitude of the joystick input should be less than or equal to 1.0. So :math:`\sqrt{ \mathrm{joystickX}^2 + \mathrm{joystickY}^2 } \le 1.0,` however, if it's not then it will be constrained automatically. :param float joystickX: Joystick X input from -1.0 (left) to 1.0 (right) :param float joystickY: Joystick Y input from -1.0 (down) to 1.0 (up)""" self._websocket.send( "PanTilt {0} {1}".format( joystickX, joystickY ) ) #-----------------------------------------------------------------------------------------------
[docs] def get_robot_status_dict( self ): """Gets a dictionary containing various status variables from the robot. If robot sensor data is returned then it will be returned as a dictionary of sensor readings in the entry called 'sensors'. Each sensor reading is a dictionary consisting of a 'data' entry and a 'timestamp' entry, where the timestamp is the system clock time of the robot in seconds when the reading was taken. :return: A dictionary containing status variables from the robot, along with the client time when the status dictionary was received. :rtype: (dict, float)""" self._websocket.send( "GetRobotStatus" ) status_dict = json.loads( self._websocket.recv() ) read_time = time.time() return status_dict, read_time #-----------------------------------------------------------------------------------------------
[docs] def get_battery_voltage( self ): """Reads the battery voltage from the robot. :return: The battery voltage read from the robot, along with the client time when the battery voltage was received. :rtype: (float, float)""" voltage = 0.0 status_dict, read_time = self.get_robot_status_dict() if "batteryVoltage" in status_dict: voltage = status_dict[ "batteryVoltage" ] else: read_time = None # No valid reading made return voltage, read_time #-----------------------------------------------------------------------------------------------
[docs] def get_robot_config( self ): """Reads the current configuration of the robot. :return: A :py:class:`RobotConfig` object containing configuration parameters for the robot :rtype: :py:class:`RobotConfig`""" self._websocket.send( "GetConfig" ) config_dict = json.loads( self._websocket.recv() ) config = robot_config.RobotConfig() config.readDataFromConfigDict( config_dict ) return config #-----------------------------------------------------------------------------------------------
[docs] def set_robot_config( self, config ): """Sends configuration parameters to the robot. In order to avoid disrupting existing configuration settings it is recommended that you call :py:func:`get_robot_config` first, modify the configuration parameters you're interested in, and then call this routine with the modified robot configuration. :param :py:class:`RobotConfig` config: The configuration to send to the robot""" config_dict = config.getConfigDict() self._websocket.send( "SetConfig {0}".format( json.dumps( config_dict, default=lambda o: o.__dict__, separators=(',',':') ) ) ) #-----------------------------------------------------------------------------------------------
[docs] def estimate_robot_time_offset( self ): """Estimates the time difference between our system clock and the robot's system clock. We use a simple algorithm based on the method given at http://www.mine-control.com/zack/timesync/timesync.html The estimation process will take about 10 to 20 seconds so should only be called occasionally (perhaps once every 30 minutes or so). TODO: Get this to work asynchronously... :return: The offset to add to the system time in order to get to the robot's time :rtype: float""" NUM_SAMPLES = 6 INTER_SAMPLE_WAIT_TIME = 1.5 samples = [] for i in range( NUM_SAMPLES ): # Record the current time on the client start_time = time.time() # Get the current time on the server self._websocket.send( "GetServerTime" ) server_time = json.loads( self._websocket.recv() )[ "serverTime" ] # Estimate the latency as half the round trip time end_time = time.time() latency = (end_time - start_time)/2.0 # Calculate the offset to get from client time to server time offset = server_time - end_time + latency samples.append( ( latency, offset ) ) if i < NUM_SAMPLES - 1: time.sleep( INTER_SAMPLE_WAIT_TIME ) # Sort the samples on latency and extract the median latency sorted_samples = sorted( samples, key=lambda o: o[ 0 ] ) median_latency = sorted_samples[ int( NUM_SAMPLES/2 ) ][ 0 ] # Calculate the standard deviation of the latency latency_sum = 0.0 for i in range( NUM_SAMPLES ): latency_sum += sorted_samples[ i ][ 0 ] mean_latency = latency_sum/NUM_SAMPLES squared_diff_sum = 0.0 for i in range( NUM_SAMPLES ): squared_diff_sum += (mean_latency - sorted_samples[ i ][ 0 ])**2 latency_sd = math.sqrt( squared_diff_sum/NUM_SAMPLES ) # Calculate the average offset from all samples within one standard deviation of the # median latency offset_sum = 0.0 num_offset_samples = 0 for i in range( NUM_SAMPLES ): if abs( median_latency - sorted_samples[ i ][ 0 ] ) <= latency_sd: offset_sum += sorted_samples[ i ][ 1 ] num_offset_samples += 1 return offset_sum/num_offset_samples #-----------------------------------------------------------------------------------------------
[docs] def centre_neck( self ): """Centres the neck of the robot""" self._websocket.send( "Centre" ) #-----------------------------------------------------------------------------------------------
[docs] def power_off_robot( self ): """Instructs the Pi on the robot to shut down""" self._websocket.send( "Shutdown" ) #-----------------------------------------------------------------------------------------------
[docs] def update( self ): """This routine must be called periodically to ensure that background commications with the robot (to keep camera images streaming for example) happen on a regular basis.""" # Keep camera alive if needed self._update_camera_keep_alive() # Update streaming data for data_name in self._streaming_data: streaming_data = self._streaming_data[ data_name ] num_items_to_process = streaming_data.data_queue.qsize() while num_items_to_process > 0 and not streaming_data.data_queue.empty(): num_items_to_process -= 1 raw_data, data_time = streaming_data.data_queue.get_nowait() if raw_data != None: processed_data = streaming_data.data_processing_routine( raw_data ) if processed_data != None: streaming_data.last_processed_data = processed_data streaming_data.last_processed_data_time = data_time if streaming_data.callback != None: streaming_data.callback( processed_data, data_time ) #-----------------------------------------------------------------------------------------------
def _process_raw_image_data( self, image_data ): image = None try: npArray = np.fromstring( image_data, np.uint8 ) image = cv2.imdecode( npArray, cv2.CV_LOAD_IMAGE_COLOR ) except ( TypeError, cv2.error ): pass # Ignore exceptions which arise from invalid data return image #----------------------------------------------------------------------------------------------- def _process_raw_motion_vectors_data( self, motion_vectors_data ): motion_vectors = None if motion_vectors_data != None and len( motion_vectors_data ) > 0: try: motion_vectors_shape = np.fromstring( motion_vectors_data[ :4 ], dtype=[("width","u2"),("height","u2")] ) motion_vectors = np.fromstring( motion_vectors_data[ 4: ], dtype=[("x","i1"),("y","i1"),("sad","u2")] ) motion_vectors.shape = ( motion_vectors_shape[ "height" ], motion_vectors_shape[ "width" ] ) #print motion_vectors.shape except TypeError: pass # Ignore exceptions which arise from invalid data return motion_vectors