#!/usr/bin/env python
#
# License: BSD
# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
Composites (multi-child) types for behaviour trees.
Composites are responsible for directing the path traced through
the tree on a given tick (execution). They are the **factories**
(Sequences and Parallels) and **decision makers** (Selectors) of a behaviour
tree.
.. graphviz:: dot/composites.dot
:align: center
:caption: PyTree Composites
Composite behaviours typically manage children and apply some logic to the way
they execute and return a result, but generally don't do anything themselves.
Perform the checks or actions you need to do in the non-composite behaviours.
Most any desired functionality can be authored with a combination of these
three composites. In fact, it is precisely this feature that makes behaviour
trees attractive - it breaks down complex decision making logic to just three
primitive elements. It is possible and often desirable to extend this set with
custom composites of your own, but think carefully before you do - in almost
every case, a combination of the existing composites will serve and as a
result, you will merely compound the complexity inherent in your tree logic.
This this makes it confoundingly difficult to design, introspect and debug. As
an example, design sessions often revolve around a sketched graph on a
whiteboard. When these graphs are composed of just five elements (Selectors,
Sequences, Parallels, Decorators and Behaviours), it is very easy to understand
the logic at a glance. Double the number of fundamental elements and you may as
well be back at the terminal parsing code.
.. tip:: You should never need to subclass or create new composites.
The basic operational modes of the three composites in this library are as follows:
* :class:`~py_trees.composites.Selector`: execute a child based on cascading priorities
* :class:`~py_trees.composites.Sequence`: execute children sequentially
* :class:`~py_trees.composites.Parallel`: execute children concurrently
This library does provide some flexibility in *how* each composite is implemented without
breaking the fundamental nature of each (as described above). Selectors and Sequences can
be configured with or without memory (resumes or resets if children are RUNNING) and
the results of a parallel can be configured to wait upon all children completing, succeed
on one, all or a subset thereof.
.. tip:: Follow the links in each composite's documentation to the relevant demo programs.
"""
##############################################################################
# Imports
##############################################################################
import abc
import itertools
import typing
import uuid
from . import behaviour, common
##############################################################################
# Composites
##############################################################################
[docs]class Composite(behaviour.Behaviour, abc.ABC):
"""
The parent class to all composite behaviours.
Args:
name (:obj:`str`): the composite behaviour name
children ([:class:`~py_trees.behaviour.Behaviour`]): list of children to add
"""
[docs] def __init__(
self,
name: str,
children: typing.Optional[typing.List[behaviour.Behaviour]] = None,
):
super(Composite, self).__init__(name)
if children is not None:
for child in children:
self.add_child(child)
else:
self.children = []
self.current_child: typing.Optional[behaviour.Behaviour] = None
############################################
# Virtual
############################################
[docs] @abc.abstractmethod
def tick(self) -> typing.Iterator[behaviour.Behaviour]:
"""
Tick the composite.
All composite subclasses require a re-implementation
of the tick method to provide the logic for managing multiple
children (:meth:`~py_trees.behaviour.Behaviour.tick` merely
provides default logic for when there are no children).
"""
pass
############################################
# Unused
############################################
[docs] def update(self) -> common.Status:
"""
Unused update method.
Composites should direct the flow, whilst
behaviours do the real work.
Such flows are a consequence of how the composite
interacts with it's children. The success of
behaviour trees depends on this logic being simple,
well defined and limited to a few well established
patterns - this is what ensures that visualising
a tree enables a user to quickly grasp the
decision making captured therein.
For the standard patterns, this logic is limited
to the ordering of execution and logical inferences
on the resulting status of the composite's children.
This is a good guideline to adhere to (i.e. don't reach
inside children to inference on custom variables, nor
reach out to the system your tree is attached to).
Implementation wise, this renders the
:meth:`~py_trees.behaviour.Behaviour.update` method redundant
as all customisation to create a simple, well defined composite
happens in the :meth:`~py_trees.behaviour.Behaviour.tick` method.
Bottom line, composites do not make use of this method.
Implementing it for subclasses of the core composites
will not do anything.
"""
return common.Status.INVALID
############################################
# Overrides
############################################
[docs] def stop(self, new_status: common.Status = common.Status.INVALID) -> None:
"""
Provide common stop-level functionality for all composites.
* Retain the current child on :data:`~py_trees.common.Status.SUCCESS` or
:data:`~py_trees.common.Status.FAILURE` (for introspection), lose it on
:data:`~py_trees.common.Status.INVALID`
* Kill dangling (:data:`~py_trees.common.Status.RUNNING`) children
The latter situation can arise for some composites, but more importantly,
will always occur when high higher priority behaviour interrupts this one.
Args:
new_status: behaviour will transition to this new status
"""
# Priority interrupt handling
if new_status == common.Status.INVALID:
self.current_child = None
for child in self.children:
if (
child.status != common.Status.INVALID
): # redundant if INVALID->INVALID
child.stop(new_status)
# Regular Behaviour.stop() handling
# could call directly, but replicating here to avoid repeating the logger
self.terminate(new_status)
self.status = new_status
self.iterator = self.tick()
[docs] def tip(self) -> typing.Optional[behaviour.Behaviour]:
"""
Recursive function to extract the last running node of the tree.
Returns:
the tip function of the current child of this composite or None
"""
if self.current_child is not None:
return self.current_child.tip()
else:
return super().tip()
############################################
# Children
############################################
[docs] def add_child(self, child: behaviour.Behaviour) -> uuid.UUID:
"""
Add a child.
Args:
child: child to add
Raises:
TypeError: if the child is not an instance of :class:`~py_trees.behaviour.Behaviour`
RuntimeError: if the child already has a parent
Returns:
unique id of the child
"""
if not isinstance(child, behaviour.Behaviour):
raise TypeError(
"children must be behaviours, but you passed in {}".format(type(child))
)
self.children.append(child)
if child.parent is not None:
raise RuntimeError(
"behaviour '{}' already has parent '{}'".format(
child.name, child.parent.name
)
)
child.parent = self
return child.id
[docs] def add_children(
self, children: typing.List[behaviour.Behaviour]
) -> behaviour.Behaviour:
"""
Append a list of children to the current list.
Args:
children ([:class:`~py_trees.behaviour.Behaviour`]): list of children to add
"""
for child in children:
self.add_child(child)
return self
[docs] def remove_child(self, child: behaviour.Behaviour) -> int:
"""
Remove the child behaviour from this composite.
Args:
child: child to delete
Returns:
index of the child that was removed
.. todo:: Error handling for when child is not in this list
"""
if self.current_child is not None and (self.current_child.id == child.id):
self.current_child = None
if child.status == common.Status.RUNNING:
child.stop(common.Status.INVALID)
child_index = self.children.index(child)
self.children.remove(child)
child.parent = None
return child_index
[docs] def remove_all_children(self) -> None:
"""Remove all children. Makes sure to stop each child if necessary."""
self.current_child = None
for child in self.children:
if child.status == common.Status.RUNNING:
child.stop(common.Status.INVALID)
child.parent = None
# makes sure to delete it for this class and all references to it
# http://stackoverflow.com/questions/850795/clearing-python-lists
del self.children[:]
[docs] def replace_child(
self, child: behaviour.Behaviour, replacement: behaviour.Behaviour
) -> None:
"""
Replace the child behaviour with another.
Args:
child: child to delete
replacement: child to insert
"""
self.logger.debug(
"%s.replace_child()[%s->%s]"
% (self.__class__.__name__, child.name, replacement.name)
)
child_index = self.children.index(child)
self.remove_child(child)
self.insert_child(replacement, child_index)
child.parent = None
[docs] def remove_child_by_id(self, child_id: uuid.UUID) -> None:
"""
Remove the child with the specified id.
Args:
child_id: unique id of the child
Raises:
IndexError: if the child was not found
"""
child = next((c for c in self.children if c.id == child_id), None)
if child is not None:
self.remove_child(child)
else:
raise IndexError(
"child was not found with the specified id [%s]" % child_id
)
[docs] def prepend_child(self, child: behaviour.Behaviour) -> uuid.UUID:
"""
Prepend the child before all other children.
Args:
child: child to insert
Returns:
uuid.UUID: unique id of the child
"""
self.children.insert(0, child)
child.parent = self
return child.id
[docs] def insert_child(self, child: behaviour.Behaviour, index: int) -> uuid.UUID:
"""
Insert child at the specified index.
This simply directly calls the python list's :obj:`insert` method using the child and index arguments.
Args:
child (:class:`~py_trees.behaviour.Behaviour`): child to insert
index (:obj:`int`): index to insert it at
Returns:
uuid.UUID: unique id of the child
"""
self.children.insert(index, child)
child.parent = self
return child.id
##############################################################################
# Selector
##############################################################################
[docs]class Selector(Composite):
"""
Selectors are the decision makers.
.. graphviz:: dot/selector.dot
A selector executes each of its child behaviours in turn until one of them
succeeds (at which point it itself returns :data:`~py_trees.common.Status.RUNNING`
or :data:`~py_trees.common.Status.SUCCESS`,
or it runs out of children at which point it itself returns :data:`~py_trees.common.Status.FAILURE`.
We usually refer to selecting children as a means of *choosing between priorities*.
Each child and its subtree represent a decreasingly lower priority path.
.. note::
Switching from a low -> high priority branch causes a `stop(INVALID)` signal to be sent to the previously
executing low priority branch. This signal will percolate down that child's own subtree. Behaviours
should make sure that they catch this and *destruct* appropriately.
.. note::
If configured with `memory`, higher priority checks will be skipped when a child returned with
running on the previous tick. i.e. once a priority is locked in, it will run to completion and can
only be interrupted if the selector is interrupted by higher priorities elsewhere in the tree.
.. seealso:: The :ref:`py-trees-demo-selector-program` program demos higher priority switching under a selector.
Args:
memory (:obj:`bool`): if :data:`~py_trees.common.Status.RUNNING` on the previous tick,
resume with the :data:`~py_trees.common.Status.RUNNING` child
name (:obj:`str`): the composite behaviour name
children ([:class:`~py_trees.behaviour.Behaviour`]): list of children to add
"""
[docs] def __init__(
self,
name: str,
memory: bool,
children: typing.Optional[typing.List[behaviour.Behaviour]] = None,
):
super(Selector, self).__init__(name, children)
self.memory = memory
[docs] def tick(self) -> typing.Iterator[behaviour.Behaviour]:
"""
Customise the tick behaviour for a selector.
This implements priority-interrupt style handling amongst the selector's children.
The selector's status is always a reflection of it's children's status.
Yields:
:class:`~py_trees.behaviour.Behaviour`: a reference to itself or one of its children
"""
self.logger.debug("%s.tick()" % self.__class__.__name__)
# initialise
if self.status != common.Status.RUNNING:
# selector specific initialisation - leave initialise() free for users to
# re-implement without having to make calls to super()
self.logger.debug(
"%s.tick() [!RUNNING->reset current_child]" % self.__class__.__name__
)
self.current_child = self.children[0] if self.children else None
# reset the children - don't need to worry since they will be handled
# a) prior to a remembered starting point, or
# b) invalidated by a higher level priority
# user specific initialisation
self.initialise()
# nothing to do
if not self.children:
self.current_child = None
self.stop(common.Status.FAILURE)
yield self
return
# starting point
if self.memory:
assert self.current_child is not None # should never be true, help mypy out
index = self.children.index(self.current_child)
# clear out preceding status' - not actually necessary but helps
# visualise the case of memory vs no memory
for child in itertools.islice(self.children, None, index):
child.stop(common.Status.INVALID)
else:
index = 0
# actual work
previous = self.current_child
for child in itertools.islice(self.children, index, None):
for node in child.tick():
yield node
if node is child:
if (
node.status == common.Status.RUNNING
or node.status == common.Status.SUCCESS
):
self.current_child = child
self.status = node.status
if previous is None or previous != self.current_child:
# we interrupted, invalidate everything at a lower priority
passed = False
for child in self.children:
if passed:
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
passed = True if child == self.current_child else passed
yield self
return
# all children failed, set failure ourselves and current child to the last bugger who failed us
self.status = common.Status.FAILURE
try:
self.current_child = self.children[-1]
except IndexError:
self.current_child = None
yield self
[docs] def stop(self, new_status: common.Status = common.Status.INVALID) -> None:
"""
Ensure that children are appropriately stopped and update status.
Args:
new_status : the composite is transitioning to this new status
"""
self.logger.debug(
f"{self.__class__.__name__}.stop()[{self.status}->{new_status}]"
)
Composite.stop(self, new_status)
##############################################################################
# Sequence
##############################################################################
[docs]class Sequence(Composite):
"""
Sequences are the factory lines of behaviour trees.
.. graphviz:: dot/sequence.dot
A sequence will progressively tick over each of its children so long as
each child returns :data:`~py_trees.common.Status.SUCCESS`. If any child returns
:data:`~py_trees.common.Status.FAILURE` or :data:`~py_trees.common.Status.RUNNING`
the sequence will halt and the parent will adopt
the result of this child. If it reaches the last child, it returns with
that result regardless.
.. note::
The sequence halts once it engages with a child is RUNNING, remaining behaviours
are not ticked.
.. note::
If configured with `memory` and a child returned with running on the previous tick, it will
proceed directly to the running behaviour, skipping any and all preceding behaviours. With memory
is useful for moving through a long running series of tasks. Without memory is useful if you
want conditional guards in place preceding the work that you always want checked off.
.. seealso:: The :ref:`py-trees-demo-sequence-program` program demos a simple sequence in action.
Args:
name: the composite behaviour name
memory: if :data:`~py_trees.common.Status.RUNNING` on the previous tick,
resume with the :data:`~py_trees.common.Status.RUNNING` child
children: list of children to add
"""
[docs] def __init__(
self,
name: str,
memory: bool,
children: typing.Optional[typing.List[behaviour.Behaviour]] = None,
):
super(Sequence, self).__init__(name, children)
self.memory = memory
[docs] def tick(self) -> typing.Iterator[behaviour.Behaviour]:
"""
Tick over the children.
Yields:
:class:`~py_trees.behaviour.Behaviour`: a reference to itself or one of its children
"""
self.logger.debug("%s.tick()" % self.__class__.__name__)
# initialise
index = 0
if self.status != common.Status.RUNNING:
self.current_child = self.children[0] if self.children else None
for child in self.children:
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
self.initialise() # user specific initialisation
elif self.memory and common.Status.RUNNING:
assert self.current_child is not None # should never be true, help mypy out
index = self.children.index(self.current_child)
elif not self.memory and common.Status.RUNNING:
self.current_child = self.children[0] if self.children else None
else:
# previous conditional checks should cover all variations
raise RuntimeError("Sequence reached an unknown / invalid state")
# nothing to do
if not self.children:
self.current_child = None
self.stop(common.Status.SUCCESS)
yield self
return
# actual work
for child in itertools.islice(self.children, index, None):
for node in child.tick():
yield node
if node is child and node.status != common.Status.SUCCESS:
self.status = node.status
if not self.memory:
# invalidate the remainder of the sequence
# i.e. kill dangling runners
for child in itertools.islice(self.children, index + 1, None):
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
yield self
return
try:
# advance if there is 'next' sibling
self.current_child = self.children[index + 1]
index += 1
except IndexError:
pass
self.stop(common.Status.SUCCESS)
yield self
[docs] def stop(self, new_status: common.Status = common.Status.INVALID) -> None:
"""
Ensure that children are appropriately stopped and update status.
Args:
new_status : the composite is transitioning to this new status
"""
self.logger.debug(
f"{self.__class__.__name__}.stop()[{self.status}->{new_status}]"
)
Composite.stop(self, new_status)
##############################################################################
# Parallel
##############################################################################
[docs]class Parallel(Composite):
"""
Parallels enable a kind of spooky at-a-distance concurrency.
.. graphviz:: dot/parallel.dot
A parallel ticks every child every time the parallel is itself ticked.
The parallelism however, is merely conceptual. The children have actually been
sequentially ticked, but from both the tree and the parallel's purview, all
children have been ticked at once.
The parallelism too, is not true in the sense that it kicks off multiple threads
or processes to do work. Some behaviours *may* kick off threads or processes
in the background, or connect to existing threads/processes. The behaviour itself
however, merely monitors these and is itself encosced in a py_tree which only ever
ticks in a single-threaded operation.
* Parallels will return :data:`~py_trees.common.Status.FAILURE` if any
child returns :py:data:`~py_trees.common.Status.FAILURE`
* Parallels with policy :class:`~py_trees.common.ParallelPolicy.SuccessOnAll`
only returns :py:data:`~py_trees.common.Status.SUCCESS` if **all** children
return :py:data:`~py_trees.common.Status.SUCCESS`
* Parallels with policy :class:`~py_trees.common.ParallelPolicy.SuccessOnOne`
return :py:data:`~py_trees.common.Status.SUCCESS` if **at least one** child
returns :py:data:`~py_trees.common.Status.SUCCESS` and others are
:py:data:`~py_trees.common.Status.RUNNING`
* Parallels with policy :class:`~py_trees.common.ParallelPolicy.SuccessOnSelected`
only returns :py:data:`~py_trees.common.Status.SUCCESS` if a **specified subset**
of children return :py:data:`~py_trees.common.Status.SUCCESS`
Policies :class:`~py_trees.common.ParallelPolicy.SuccessOnAll` and
:class:`~py_trees.common.ParallelPolicy.SuccessOnSelected` may be configured to be
*synchronised* in which case children that tick with
:data:`~py_trees.common.Status.SUCCESS` will be skipped on subsequent ticks until
the policy criteria is met, or one of the children returns
status :data:`~py_trees.common.Status.FAILURE`.
Parallels with policy :class:`~py_trees.common.ParallelPolicy.SuccessOnSelected` will
check in both the :meth:`~py_trees.behaviour.Behaviour.setup` and
:meth:`~py_trees.behaviour.Behaviour.tick` methods to to verify the
selected set of children is actually a subset of the children of this parallel.
.. seealso::
* :ref:`Context Switching Demo <py-trees-demo-context-switching-program>`
"""
[docs] def __init__(
self,
name: str,
policy: common.ParallelPolicy.Base,
children: typing.Optional[typing.List[behaviour.Behaviour]] = None,
):
"""
Initialise the behaviour with name, policy and a list of children.
Args:
name: the composite behaviour name
policy: policy for deciding success or otherwise (default: SuccessOnAll)
children: list of children to add
"""
super(Parallel, self).__init__(name, children)
self.policy = policy
[docs] def setup(self, **kwargs: int) -> None:
"""
Detect before ticking whether the policy configuration is invalid.
Args:
**kwargs (:obj:`dict`): distribute arguments to this
behaviour and in turn, all of it's children
Raises:
RuntimeError: if the parallel's policy configuration is invalid
Exception: be ready to catch if any of the children raise an exception
"""
self.logger.debug("%s.setup()" % (self.__class__.__name__))
self.validate_policy_configuration()
[docs] def tick(self) -> typing.Iterator[behaviour.Behaviour]:
"""
Tick over the children.
Yields:
:class:`~py_trees.behaviour.Behaviour`: a reference to itself or one of its children
Raises:
RuntimeError: if the policy configuration was invalid
"""
self.logger.debug("%s.tick()" % self.__class__.__name__)
self.validate_policy_configuration()
# reset
if self.status != common.Status.RUNNING:
self.logger.debug("%s.tick(): re-initialising" % self.__class__.__name__)
for child in self.children:
# reset the children, this ensures old SUCCESS/FAILURE status flags
# don't break the synchronisation logic below
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
self.current_child = None
# subclass (user) handling
self.initialise()
# nothing to do
if not self.children:
self.current_child = None
self.stop(common.Status.SUCCESS)
yield self
return
# process them all first
for child in self.children:
if self.policy.synchronise and child.status == common.Status.SUCCESS:
continue
for node in child.tick():
yield node
# determine new status
new_status = common.Status.RUNNING
self.current_child = self.children[-1]
try:
failed_child = next(
child
for child in self.children
if child.status == common.Status.FAILURE
)
self.current_child = failed_child
new_status = common.Status.FAILURE
except StopIteration:
if type(self.policy) is common.ParallelPolicy.SuccessOnAll:
if all([c.status == common.Status.SUCCESS for c in self.children]):
new_status = common.Status.SUCCESS
self.current_child = self.children[-1]
elif type(self.policy) is common.ParallelPolicy.SuccessOnOne:
successful = [
child
for child in self.children
if child.status == common.Status.SUCCESS
]
if successful:
new_status = common.Status.SUCCESS
self.current_child = successful[-1]
elif type(self.policy) is common.ParallelPolicy.SuccessOnSelected:
if all(
[c.status == common.Status.SUCCESS for c in self.policy.children]
):
new_status = common.Status.SUCCESS
self.current_child = self.policy.children[-1]
else:
raise RuntimeError(
"this parallel has been configured with an unrecognised policy [{}]".format(
type(self.policy)
)
)
# this parallel may have children that are still running
# so if the parallel itself has reached a final status, then
# these running children need to be terminated so they don't dangle
if new_status != common.Status.RUNNING:
self.stop(new_status)
self.status = new_status
yield self
[docs] def stop(self, new_status: common.Status = common.Status.INVALID) -> None:
"""
Ensure that any running children are stopped.
Args:
new_status : the composite is transitioning to this new status
"""
self.logger.debug(
f"{self.__class__.__name__}.stop()[{self.status}->{new_status}]"
)
# clean up dangling (running) children
for child in self.children:
if child.status == common.Status.RUNNING:
# this unfortunately knocks out it's running status for introspection
# but logically is the correct thing to do, see #132.
child.stop(common.Status.INVALID)
Composite.stop(self, new_status)
[docs] def validate_policy_configuration(self) -> None:
"""
Validate the currently stored policy.
Policy configuration can be invalid if:
* Policy is SuccessOnSelected and no behaviours have been specified
* Policy is SuccessOnSelected and behaviours that are not children exist
Raises:
RuntimeError: if policy configuration was invalid
"""
if type(self.policy) is common.ParallelPolicy.SuccessOnSelected:
if not self.policy.children:
error_message = (
"policy SuccessOnSelected requires a non-empty "
"selection of children [{}]".format(self.name)
)
self.logger.error(error_message)
raise RuntimeError(error_message)
missing_children_names = [
child.name
for child in self.policy.children
if child not in self.children
]
if missing_children_names:
error_message = (
"policy SuccessOnSelected has selected behaviours that are "
"not children of this parallel {}[{}]"
"".format(missing_children_names, self.name)
)
self.logger.error(error_message)
raise RuntimeError(error_message)