Source code for py_trees.visitors

#!/usr/bin/env python
#
# License: BSD
#   https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
Visiting rights to behaviours.

Visitors are entities that can be passed to a tree implementation
(e.g. :class:`~py_trees.trees.BehaviourTree`) and used to either visit
each and every behaviour in the tree, or visit behaviours as the tree is
traversed in an executing tick. At each behaviour, the visitor
runs its own method on the behaviour to do as it wishes - logging, introspecting, etc.

.. warning:: Visitors should not modify the behaviours they visit.
"""

##############################################################################
# Imports
##############################################################################

import typing
import uuid

from . import behaviour, blackboard, common, display

##############################################################################
# Visitors
##############################################################################


[docs]class VisitorBase(object): """ Parent template for visitor types. Visitors are primarily designed to work with :class:`~py_trees.trees.BehaviourTree` but they can be used in the same way for other tree custodian implementations. Args: full: flag to indicate whether it should be used to visit only traversed nodes or the entire tree Attributes: full: flag to indicate whether it should be used to visit only traversed nodes or the entire tree """ def __init__(self, full: bool = False): self.full: bool = full
[docs] def initialise(self) -> None: """Override if any resetting of variables needs to be performed between ticks (i.e. visitations).""" pass
[docs] def finalise(self) -> None: """Override if any work needs to be performed after ticks (i.e. showing data).""" pass
[docs] def run(self, behaviour: behaviour.Behaviour) -> None: """ Converse with the behaviour. This method gets run as each behaviour is ticked. Override it to perform some activity - e.g. introspect the behaviour to store/process logging data for visualisations. Args: behaviour (:class:`~py_trees.behaviour.Behaviour`): behaviour that is ticking """ pass
[docs]class DebugVisitor(VisitorBase): """ Picks up and logs feedback messages and the behaviour's status. Logging is done with the behaviour's logger. """ def __init__(self) -> None: super(DebugVisitor, self).__init__(full=False)
[docs] def run(self, behaviour: behaviour.Behaviour) -> None: """ Log behaviour information on the debug channel. Args: behaviour: behaviour being visited. """ if behaviour.feedback_message: behaviour.logger.debug( "%s.run() [%s][%s]" % ( self.__class__.__name__, behaviour.feedback_message, behaviour.status, ) ) else: behaviour.logger.debug( "%s.run() [%s]" % (self.__class__.__name__, behaviour.status) )
[docs]class SnapshotVisitor(VisitorBase): """ Creates a snapshot of the tree state (behaviour status' only). Visits the ticked part of a tree, checking off the status against the set of status results recorded in the previous tick. If there has been a change, it flags it. This is useful for determining when to trigger, e.g. logging. Attributes: changed (Bool): flagged if there is a difference in the visited path or :class:`~py_trees.common.Status` of any behaviour on the path visited (dict): dictionary of behaviour id (uuid.UUID) and status (:class:`~py_trees.common.Status`) pairs from the current tick previously_visited (dict): dictionary of behaviour id (uuid.UUID) and status (:class:`~py_trees.common.Status`) pairs from the previous tick running_nodes([uuid.UUID]): list of id's for behaviours which were traversed in the current tick previously_running_nodes([uuid.UUID]): list of id's for behaviours which were traversed in the last tick visited_blackboard_ids(typing.Set[uuid.UUID]): blackboard client id's on the visited path visited_blackboard_keys(typing.Set[str]): blackboard variable keys on the visited path .. seealso:: The :ref:`py-trees-demo-logging-program` program demonstrates use of this visitor to trigger logging of a tree serialisation. """ def __init__(self) -> None: super().__init__(full=False) self.changed = False self.visited: typing.Dict[uuid.UUID, common.Status] = {} self.previously_visited: typing.Dict[uuid.UUID, common.Status] = {} self.visited_blackboard_keys: typing.Set[str] = set() self.visited_blackboard_client_ids: typing.Set[uuid.UUID] = set()
[docs] def initialise(self) -> None: """Store the last snapshot for comparison with the next incoming snapshot. This should get called before a tree ticks. """ self.changed = False self.previously_visited = self.visited self.visited = {} self.visited_blackboard_keys = set() self.visited_blackboard_client_ids = set()
[docs] def run(self, behaviour: behaviour.Behaviour) -> None: """ Catch the id, status and store it. Additionally add it to the running list if it is :data:`~py_trees.common.Status.RUNNING`. Args: behaviour: behaviour that is ticking """ # behaviour status self.visited[behaviour.id] = behaviour.status try: if self.visited[behaviour.id] != self.previously_visited[behaviour.id]: self.changed = True except KeyError: self.changed = True # blackboards for b in behaviour.blackboards: self.visited_blackboard_client_ids.add(b.id()) self.visited_blackboard_keys = ( self.visited_blackboard_keys | b.read | b.write | b.exclusive )
[docs]class DisplaySnapshotVisitor(SnapshotVisitor): """ Visit the tree, capturing the visited path, it's changes since the last tick. Additionally print the snapshot to console. Args: display_only_visited_behaviours: useful for cropping the unvisited part of a large tree display_blackboard: print to the console the relevant part of the blackboard associated with behaviours on the visited path display_activity_stream: print to the console a log of the activity on the blackboard over the last tick """ def __init__( self, display_only_visited_behaviours: bool = False, display_blackboard: bool = False, display_activity_stream: bool = False, ): super().__init__() self.display_only_visited_behaviours = display_only_visited_behaviours self.display_blackboard = display_blackboard self.display_activity_stream = display_activity_stream if self.display_activity_stream: blackboard.Blackboard.enable_activity_stream()
[docs] def initialise(self) -> None: """Reset and initialise all variables.""" self.root: typing.Optional[behaviour.Behaviour] = None super().initialise() if self.display_activity_stream: if blackboard.Blackboard.activity_stream is not None: blackboard.Blackboard.activity_stream.clear()
[docs] def run(self, behaviour: behaviour.Behaviour) -> None: """ Track the root of the tree and run :class:`~py_trees.visitors.SnapshotVisitor`. Args: behaviour: behaviour being visited. """ self.root = behaviour # last behaviour visited will always be the root super().run(behaviour)
[docs] def finalise(self) -> None: """Print a summary on stdout after all behaviours have been visited.""" if self.root is not None: print( "\n" + display.unicode_tree( root=self.root, show_only_visited=self.display_only_visited_behaviours, show_status=False, visited=self.visited, previously_visited=self.previously_visited, ) ) if self.display_blackboard: print(display.unicode_blackboard(key_filter=self.visited_blackboard_keys)) if self.display_activity_stream: print(display.unicode_blackboard_activity_stream())