#!/usr/bin/env python
#
# License: BSD
# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""The core behaviour template for all py_tree behaviours."""
##############################################################################
# Imports
##############################################################################
from __future__ import annotations
import abc
import re
import typing
import uuid
from . import blackboard, common, logging
##############################################################################
# Behaviour BluePrint
##############################################################################
[docs]class Behaviour(abc.ABC):
"""A parent class for all user definable tree behaviours.
Args:
name: the behaviour name, defaults to auto-generating from the class name
Raises:
TypeError: if the provided name is not a string
Attributes:
~py_trees.behaviours.Behaviour.id (:class:`uuid.UUID`): automagically generated unique identifier
for the behaviour
~py_trees.behaviours.Behaviour.name (:obj:`str`): the behaviour name
~py_trees.behaviours.Behaviour.blackboards (typing.List[py_trees.blackboard.Client]): collection of attached
blackboard clients
~py_trees.behaviours.Behaviour.status (:class:`~py_trees.common.Status`): the behaviour status
(:data:`~py_trees.common.Status.INVALID`,
:data:`~py_trees.common.Status.RUNNING`,
:data:`~py_trees.common.Status.FAILURE`,
:data:`~py_trees.common.Status.SUCCESS`)
~py_trees.behaviours.Behaviour.parent (:class:`~py_trees.behaviour.Behaviour`): a
:class:`~py_trees.composites.Composite` instance if nested in a tree, otherwise None
~py_trees.behaviours.Behaviour.children ([:class:`~py_trees.behaviour.Behaviour`]): empty for regular
behaviours, populated for composites
~py_trees.behaviours.Behaviour.logger (:class:`logging.Logger`): a simple logging mechanism
~py_trees.behaviours.Behaviour.feedback_message(:obj:`str`): improve debugging with a simple message
~py_trees.behaviours.Behaviour.blackbox_level (:class:`~py_trees.common.BlackBoxLevel`): a helper variable
for dot graphs and runtime gui's to collapse/explode entire subtrees dependent upon the blackbox level.
.. seealso::
* :ref:`Skeleton Behaviour Template <skeleton-behaviour-include>`
* :ref:`The Lifecycle Demo <py-trees-demo-behaviour-lifecycle-program>`
* :ref:`The Action Behaviour Demo <py-trees-demo-action-behaviour-program>`
"""
def __init__(self, name: str):
if not isinstance(name, str):
raise TypeError(
"a behaviour name should be a string, but you passed in {}".format(
type(name)
)
)
self.id = (
uuid.uuid4()
) # used to uniquely identify this node (helps with removing children from a tree)
self.name: str = name
self.blackboards: typing.List[blackboard.Client] = []
self.qualified_name = "{}/{}".format(
self.__class__.__qualname__, self.name
) # convenience
self.status = common.Status.INVALID
self.iterator = self.tick()
self.parent: typing.Optional[
Behaviour
] = None # will get set if a behaviour is added to a composite
self.children: typing.List[Behaviour] = [] # only set by composite behaviours
self.logger = logging.Logger(name)
self.feedback_message = "" # useful for debugging, or human readable updates, but not necessary to implement
self.blackbox_level = common.BlackBoxLevel.NOT_A_BLACKBOX
############################################
# User Customisable Callbacks
############################################
[docs] def setup(self, **kwargs: typing.Any) -> None: # noqa: B027
"""
Set up and verify infrastructure (middleware connections, etc) is available.
Users should override this method for any configuration and/or validation
that is necessary prior to ticking the tree. Such construction is best
done here rather than in __init__ since there is no guarantee at __init__
that the infrastructure is ready or even available (e.g. you may be just
rendering dot graphs of the trees, no robot around).
Examples:
* establishing a middleware connection to a sensor or driver
* ensuring a sensor or driver is in a 'ready' state
This method will typically be called before a tree's first tick as this gives
the application time to check and verify that everything is in a ready state before
executing. This is especially important given that a tree does not always tick
every behaviour and if not checked up-front, it may be some time before
discovering a behaviour was in a broken state.
.. tip::
When to use :meth:`~py_trees.behaviour.Behaviour.__init__`,
:meth:`~py_trees.behaviour.Behaviour.setup` and when to use
:meth:`~py_trees.behaviour.Behaviour.initialise`?
Use :meth:`~py_trees.behaviour.Behaviour.__init__` for configuration of
non-runtime dependencies (e.g. no middleware).
Use :meth:`~py_trees.behaviour.Behaviour.setup` for one-offs or to get
early signal that everything (e.g. middleware) is ready to go.
Use :meth:`~py_trees.behaviour.Behaviour.initialise` for just-in-time
configurations and/or checks.
There are times when it makes sense to do all three. For example,
pythonic variable configuration in :meth:`~py_trees.behaviour.Behaviour.__init__`,
middleware service client creation / server existence checks in
:meth:`~py_trees.behaviour.Behaviour.setup` and a just-in-time check
to ensure the server is still available in :meth:`~py_trees.behaviour.Behaviour.initialise`.
.. tip::
Faults are notified to the user of the behaviour via exceptions.
Choice of exception to use is left to the user.
.. warning::
The kwargs argument is for distributing objects at runtime to behaviours
before ticking. For example, a simulator instance with which behaviours can
interact with the simulator's python api, a ros2 node for setting up
communications. Use sparingly, as this is not proof against keyword conflicts
amongst disparate libraries of behaviours.
Args:
**kwargs: distribute arguments to this
behaviour and in turn, all of it's children
Raises:
Exception: if this behaviour has a fault in construction or configuration
.. seealso:: :meth:`py_trees.behaviour.Behaviour.shutdown`
"""
pass
[docs] def initialise(self) -> None: # noqa: B027
"""
Execute user specified instructions prior to commencement of a new round of activity.
Users should override this method to perform any necessary initialising/clearing/resetting
of variables prior to a new round of activity for the behaviour.
This method is automatically called via the :meth:`py_trees.behaviour.Behaviour.tick` method
whenever the behaviour is not :data:`~py_trees.common.Status.RUNNING`.
... note:: This method can be called more than once in the lifetime of a tree!
"""
pass
[docs] def terminate(self, new_status: common.Status) -> None: # noqa: B027
"""
Execute user specified instructions when the behaviour is stopped.
Users should override this method to clean up.
It will be triggered when a behaviour either
finishes execution (switching from :data:`~py_trees.common.Status.RUNNING`
to :data:`~py_trees.common.Status.FAILURE` || :data:`~py_trees.common.Status.SUCCESS`)
or it got interrupted by a higher priority branch (switching to
:data:`~py_trees.common.Status.INVALID`). Remember that
the :meth:`~py_trees.behaviour.Behaviour.initialise` method
will handle resetting of variables before re-entry, so this method is about
disabling resources until this behaviour's next tick. This could be a indeterminably
long time. e.g.
* cancel an external action that got started
* shut down any temporary communication handles
Args:
new_status (:class:`~py_trees.common.Status`): the behaviour is transitioning to this new status
.. warning:: Do not set `self.status = new_status` here, that is automatically handled
by the :meth:`~py_trees.behaviour.Behaviour.stop` method.
Use the argument purely for introspection purposes (e.g.
comparing the current state in `self.status` with the state it will transition to in
`new_status`.
.. seealso:: :meth:`py_trees.behaviour.Behaviour.stop`
"""
pass
[docs] @abc.abstractmethod
def update(self) -> common.Status:
"""
Execute user specified instructions when the behaviour is ticked.
Users should override this method to perform any logic required to
arrive at a decision on the behaviour's new status. It is the primary worker function called
by the :meth:`~py_trees.behaviour.Behaviour.tick` mechanism.
Returns:
the behaviour's new status :class:`~py_trees.common.Status`
.. tip:: This method should be almost instantaneous and non-blocking
.. seealso:: :meth:`py_trees.behaviour.Behaviour.tick`
"""
return common.Status.INVALID
[docs] def shutdown(self) -> None: # noqa: B027
"""
Destroy setup infrastructure (the antithesis of setup).
Users should override this method for any custom destruction of infrastructure
usually brought into being in :meth:`~py_trees.behaviour.Behaviour.setup`.
Raises:
Exception: of whatever flavour the child raises when errors occur on destruction
.. seealso:: :meth:`py_trees.behaviour.Behaviour.setup`
"""
pass
############################################
# Private Methods - use inside a behaviour
############################################
[docs] def attach_blackboard_client(
self, name: typing.Optional[str] = None, namespace: typing.Optional[str] = None
) -> blackboard.Client:
"""
Create and attach a blackboard to this behaviour.
Args:
name: human-readable (not necessarily unique) name for the client
namespace: sandbox the client to variables behind this namespace
Returns:
a handle to the attached blackboard client
"""
if name is None:
count = len(self.blackboards)
name = self.name if (count == 0) else self.name + "-{}".format(count)
new_blackboard = blackboard.Client(name=name, namespace=namespace)
self.blackboards.append(new_blackboard)
return new_blackboard
############################################
# Public - lifecycle API
############################################
[docs] def setup_with_descendants(self) -> None:
"""Call setup on this child, it's children (it's children's children, )."""
for child in self.children:
for node in child.iterate():
node.setup()
self.setup()
[docs] def tick_once(self) -> None:
"""Tick the object without iterating step-by-step over the children (i.e. without generators)."""
# no logger necessary here...it directly relays to tick
for _unused in self.tick():
pass
[docs] def tick(self) -> typing.Iterator[Behaviour]:
"""
Tick the behaviour.
This function is a generator that can be used by an iterator on
an entire behaviour tree. It handles the logic for deciding when to
call the user's :meth:`~py_trees.behaviour.Behaviour.initialise`
and :meth:`~py_trees.behaviour.Behaviour.terminate` methods as well as making the
actual call to the user's :meth:`~py_trees.behaviour.Behaviour.update` method that determines the
behaviour's new status once the tick has finished. Once done, it will
then yield itself (generator mechanism) so that it can be used as part of
an iterator for the entire tree.
.. code-block:: python
for node in my_behaviour.tick():
print("Do something")
.. note::
This is a generator function, you must use this with *yield*. If you need a direct call,
prefer :meth:`~py_trees.behaviour.Behaviour.tick_once` instead.
Yields:
a reference to itself
.. warning::
Users should not override this method to provide custom tick behaviour. The
:meth:`~py_trees.behaviour.Behaviour.update` method has been provided for that purpose.
"""
self.logger.debug("%s.tick()" % (self.__class__.__name__))
if self.status != common.Status.RUNNING:
self.initialise()
# don't set self.status yet, terminate() may need to check what the current state is first
new_status = self.update()
if new_status not in list(common.Status):
self.logger.error(
"A behaviour returned an invalid status, setting to INVALID [%s][%s]"
% (new_status, self.name)
)
new_status = common.Status.INVALID
if new_status != common.Status.RUNNING:
self.stop(new_status)
self.status = new_status
yield self
[docs] def iterate(self, direct_descendants: bool = False) -> typing.Iterator[Behaviour]:
"""
Iterate over this child and it's children.
This utilises python generators for looping. To traverse the entire tree:
.. code-block:: python
for node in my_behaviour.iterate():
print("Name: {0}".format(node.name))
Args:
direct_descendants (:obj:`bool`): only yield children one step away from this behaviour.
Yields:
:class:`~py_trees.behaviour.Behaviour`: one of it's children
"""
for child in self.children:
if not direct_descendants:
for node in child.iterate():
yield node
else:
yield child
yield self
# TODO: better type refinement of 'viso=itor'
[docs] def visit(self, visitor: typing.Any) -> None:
"""
Introspect on this behaviour with a visitor.
This is functionality that enables external introspection into the behaviour. It gets used
by the tree manager classes to collect information as ticking traverses a tree.
Args:
visitor: the visiting class, must have a run(:class:`~py_trees.behaviour.Behaviour`) method.
"""
visitor.run(self)
[docs] def stop(self, new_status: common.Status) -> None:
"""
Stop the behaviour with the specified status.
Args:
new_status: the behaviour is transitioning to this new status
This is called to bring the current round of activity for the behaviour to completion, typically
resulting in a final status of :data:`~py_trees.common.Status.SUCCESS`,
:data:`~py_trees.common.Status.FAILURE` or :data:`~py_trees.common.Status.INVALID`.
.. warning::
Users should not override this method to provide custom termination behaviour. The
:meth:`~py_trees.behaviour.Behaviour.terminate` method has been provided for that purpose.
"""
self.logger.debug(
"%s.stop(%s)"
% (
self.__class__.__name__,
"%s->%s" % (self.status, new_status)
if self.status != new_status
else "%s" % new_status,
)
)
self.terminate(new_status)
self.status = new_status
self.iterator = self.tick()
############################################
# Public - introspection API
############################################
[docs] def has_parent_with_name(self, name: str) -> bool:
"""
Search this behaviour's ancestors for one with the specified name.
Args:
name: name of the parent to match, can be a regular expression
Returns:
whether a parent was found or not
"""
pattern = re.compile(name)
b = self
while b.parent is not None:
if pattern.match(b.parent.name) is not None:
return True
b = b.parent
return False
[docs] def has_parent_with_instance_type(
self, instance_type: "typing.Type[Behaviour]"
) -> bool:
"""
Search this behaviour's ancestors for one of the specified type.
Args:
instance type of the parent to match
Returns:
whether a parent was found or not
"""
b = self
while b.parent is not None:
if isinstance(b.parent, instance_type):
return True
b = b.parent
return False
[docs] def tip(self) -> typing.Optional[Behaviour]:
"""
Get the *tip* of this behaviour's subtree (if it has one).
This corresponds to the the deepest node that was running before the
subtree traversal reversed direction and headed back to this node.
Returns:
The deepest node (behaviour) that was running before subtree traversal
reversed direction, or None if this behaviour's status is
:data:`~py_trees.common.Status.INVALID`.
"""
return self if self.status != common.Status.INVALID else None
##############################################################################
# Mypy Convenience Types
##############################################################################
BehaviourSubClass = typing.TypeVar("BehaviourSubClass", bound=Behaviour)
# BehaviourUpdateMethod = typing.Callable[[BehaviourSubClass], common.Status]