PerformAction Tutorial (Python)

This tutorial is an extension of the Fleet Adapter Tutorial and will guide you to write custom actions in your fleet adapter. While RMF offers a few standard tasks, we understand that different robots may be equipped and programmed to perform different types of actions, such as cleaning, object-picking, teleoperation, and so on. By supporting custom tasks, users can trigger a custom action specified in the fleet adapter's config.yaml beforehand, and RMF would relinquish control of the robot until it is signalled that the robot has completed the custom action. You may explore the Supporting a new Task in RMF section to read more about supporting custom tasks and how you can create your own task JSON to be sent to RMF.

In this tutorial, we will refer to a simplified version of the rmf_demos_fleet_adapter to implement a Clean PerformAction capability in our fleet adapter.

1. Define the PerformAction in the fleet config.yaml

We will need to define the name of the action in the fleet configuration, so that RMF recognizes this action as performable when a task is submitted and is able to dispatch it to a fleet that can fulfil it. In our config.yaml under the rmf_fleet section, we can provide a list of performable actions for our fleet. For example, let's define clean as an action supported by this fleet:

rmf_fleet:
  actions: ["clean"]

2. Apply action execution logic inside our fleet adapter

After RMF receives a task consisting of this action and dispatches it to the right fleet, the fleet adapter's execute_action(~) callback will be triggered. The category parsed to this callback corresponds to the action name that we have previously defined, and the description consists of any details about the action that we might be interested in.

Assume that this is the task JSON submitted to RMF:

{
  "type": "dispatch_task_request",
  "request": {
    "unix_millis_earliest_start_time": start_time,
    "category": "clean",
    "description": {
      "zone": "clean_lobby"
    }
  }
}

In our example, the category provided would be clean, and the description would contain which cleaning zone this task is directing our robot to, which is clean_lobby. Hence, we will need to implement the logic in our execute_action(~):

    def execute_action(self, category: str, description: dict, execution):
        self.execution = execution

        if category == 'clean':
            self.perform_clean(description['zone'])

    def perform_clean(self, zone):
        if self.api.start_activity(self.name, 'clean', zone):
            self.node.get_logger().info(
                f'Commanding [{self.name}] to clean zone [{zone}]'
            )
        else:
            self.node.get_logger().error(
                f'Fleet manager for [{self.name}] does not know how to '
                f'clean zone [{zone}]. We will terminate the activity.'
            )
            self.execution.finished()
            self.execution = None

Since our fleet may be capable of performing multiple custom actions, we will need to conduct a check to ensure that the category received matches the robot API that we are targeting. Upon receiving a clean action, we can trigger the robot's API accordingly.

3. Implement the robot API for the custom action

This is where the start_activity(~) method inside RobotClientAPI.py comes into play. We would require it to implement the API call to the robot to start the cleaning activity. As an example, if the robot API uses REST to make calls to the robot, the implemented method may look like this:

    def start_activity(
        self,
        robot_name: str,
        activity: str,
        label: str
    ):
        ''' Request the robot to begin a process. This is specific to the robot
            and the use case. For example, load/unload a cart for Deliverybot
            or begin cleaning a zone for a cleaning robot.'''
        url = (
            self.prefix +
            f"/open-rmf/rmf_demos_fm/start_activity?robot_name={robot_name}"
        )
        # data fields: task, map_name, destination{}, data{}
        data = {'activity': activity, 'label': label}
        try:
            response = requests.post(url, timeout=self.timeout, json=data)
            response.raise_for_status()
            if self.debug:
                print(f'Response: {response.json()}')

            if response.json()['success']:
                return True

            # If we get a response with success=False, then
            return False
        except HTTPError as http_err:
            print(f'HTTP error for {robot_name} in start_activity: {http_err}')
        except Exception as err:
            print(f'Other error {robot_name} in start_activity: {err}')
        return False

4. Complete the action

Since we stored a self.execution object in our RobotAdapter, we will be notified when any execution (navigation, stop, or action) is completed as the update loop continually calls is_command_completed to check on its status.

    def update(self, state):
        activity_identifier = None
        if self.execution:
            if self.api.is_command_completed():
                self.execution.finished()
                self.execution = None
            else:
                activity_identifier = self.execution.identifier

If your implementation requires a separate callback to mark the execution as finished, you can create a new function to conduct this check and call self.execution.finished() when the action is completed.