Source code for py_trees.meta

#!/usr/bin/env python
#
# License: BSD
#   https://raw.githubusercontent.com/stonier/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
.. attention::
    This module is the least likely to remain stable in this package. It has
    only received cursory attention so far and a more thoughtful design for handling
    behaviour 'hats' might be needful at some point in the future.

Meta behaviours are created by utilising various programming techniques
pulled from a magic bag of tricks. Some of these minimise the effort to generate
a new behaviour while others provide mechanisms that greatly expand your
library of usable behaviours without having to increase the number of explicit
behaviours contained therein. The latter is achieved by providing a means for
behaviours to wear different 'hats' via python decorators.

.. image:: images/many-hats.png
   :width: 40px
   :align: center

Each function or decorator listed below includes its own example code
demonstrating its use.

**Factories**

* :func:`py_trees.meta.create_behaviour_from_function`
* :func:`py_trees.meta.create_imposter`

**Decorators (Hats)**

* :func:`py_trees.meta.condition`
* :func:`py_trees.meta.inverter`
* :func:`py_trees.meta.failure_is_running`
* :func:`py_trees.meta.failure_is_success`
* :func:`py_trees.meta.oneshot`
* :func:`py_trees.meta.running_is_failure`
* :func:`py_trees.meta.running_is_success`
* :func:`py_trees.meta.success_is_failure`
* :func:`py_trees.meta.success_is_running`
* :func:`py_trees.meta.timeout`
"""

##############################################################################
# Imports
##############################################################################

import functools
import time

from . import behaviour
from . import common
from . import composites

##############################################################################
# Utility Methods
##############################################################################


[docs]def create_behaviour_from_function(func): """ Create a behaviour from the specified function, dropping it in for the Behaviour :meth:`~py_trees.behaviour.Behaviour.update` method. Ths function must include the `self` argument and return a :class:`~py_trees.behaviours.common.Status` value. It also automatically provides a drop-in for the :meth:`~py_trees.behaviour.Behaviour.terminate` method that clears the feedback message. Other methods are left untouched. Args: func (:obj:`function`): a drop-in for the :meth:`~py_trees.behaviour.Behaviour.update` method """ class_name = func.__name__.capitalize() def terminate(self, new_status): if new_status == common.Status.INVALID: self.feedback_message = "" # globals()[class_name] = type(class_name, (Behaviour,), dict(update=func)) return type(class_name, (behaviour.Behaviour,), dict(update=func, terminate=terminate))
############################################################################## # Some Machinery ##############################################################################
[docs]def create_imposter(cls): """ Creates a new behaviour type impersonating (encapsulating) another behaviour type. This is primarily used to develop other decorators but can also be useful in itself. It takes care of the handles responsible for making the encapsulation work and leaves you with just the task of replacing the relevant modifications (usually to the :meth:`~py_trees.behaviour.Behaviour.update` method). The modifications can be made by direct replacement of methods or by inheriting and overriding them. See the examples below. Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the new encapsulated behaviour class Examples: Replacing methods: .. code-block:: python def _update(self): self.original.tick_once() if self.original.status == common.Status.FAILURE: return common.Status.SUCCESS else: return self.original.status FailureIsSuccess = create_imposter(py_trees.behaviours.Failure) setattr(FailureIsSuccess, "update", _update) Subclassing and overriding: .. code-block:: python class FailureIsSuccess(create_imposter(py_trees.behaviours.Failure)): def __init__(self, *args, **kwargs): super(FailureIsSuccess, self).__init__(*args, **kwargs) def update(self): self.original.tick_once() if self.original.status == common.Status.FAILURE: return common.Status.SUCCESS else: return self.original.status """ class Imposter(behaviour.Behaviour): def __init__(self, *args, **kwargs): """ Pass on the arguments intact except for the name. That is modified to begin with an underscore to denote that it is internal. """ self.original = cls(*args, **kwargs) super(Imposter, self).__init__(self.original.name) self.original.name = "_" + self.original.name # aliases to original variables/methods self.blackbox_level = self.original.blackbox_level self.children = self.original.children self.setup = self.original.setup # id is important to match for composites...the children must relate to the correct parent id self.id = self.original.id if isinstance(self.original, composites.Composite): # monkeypatch add_child def add_child(child): assert isinstance(child, behaviour.Behaviour), "children must be behaviours, but you passed in %s" % type(child) self.children.append(child) child.parent = self return child.id self.original.add_child = add_child def tip(self): """ This function overrides :meth:`~py_trees.behaviour.Behaviour.tip` and provides a fused capability depending on whether the original behaviour is a composite or not. If it is composite, it relies on the composite's return value, else it uses it's own. Important not to use the original itself since it doesn't officially exist in a tree. """ if isinstance(self.original, composites.Composite): return self.original.tip() else: return super(Imposter, self).tip() def tick(self): """ This function overrides Behaviour.tick() and work the same way except it would not call initialise and stop methods on original and let the original's update handle it's state. There is some analysis explaining the need for this override in https://github.com/stonier/py_trees/issues/32 :return py_trees.Behaviour: a reference to itself """ self.logger.debug("%s.tick()" % (self.__class__.__name__)) # this only initialises the imposter if self.status != common.Status.RUNNING: self.initialise() # initialise() and terminate() for the original behaviour # will be called from inside the original's tick() for behaviour in self.original.tick(): if behaviour != self.original: yield behaviour new_status = self.update() if new_status not in list(common.Status): self.logger.error("A behaviour returned an invalid status, setting to INVALID [%s][%s]" % (new_status, self.name)) new_status = common.Status.INVALID self.status = new_status yield self def update(self): """ This is the usual method that gets replaced by the meta classes. """ return self.original.status def terminate(self, new_status): """ Imposter's custom implementation of termination that is called when higher priority interrupts occur. Note that stop/terminate are not called in the usual sequence of a tick, since that would double up on stop calls to the underlying original. """ self.logger.debug("%s.terminate()[%s]" % (self.__class__.__name__, new_status)) self.original.stop(new_status) self.status = self.original.status def __getattr__(self, name): """ So we can pull extra attributes in the original above and beyond the behaviour attributes. """ return getattr(self.original, name) return Imposter
############################################################################## # Timeout ##############################################################################
[docs]def timeout(cls, duration): """ A decorator that applies a timeout pattern to an existing behaviour. If the timeout is reached, the encapsulated behaviour's :meth:`~py_trees.behaviour.Behaviour.stop` method is called with status :data:`~py_trees.common.Status.FAILURE` otherwise it will simply directly tick and return with the same status as that of it's encapsulated behaviour. Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type duration (:obj:`float`): timeout length in seconds Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class with timeout Examples: .. code-block:: python @py_trees.meta.timeout(10) class WorkBehaviour(py_trees.behaviour.Behaviour) or .. code-block:: python work_with_timeout = py_trees.meta.timeout(WorkBehaviour, 10.0)(name="Work") """ def _timeout_init(func, duration): """ Replace the default tick with one which runs the original function only if the oneshot variable is unset, yielding the unmodified object otherwise. """ @functools.wraps(func) def wrapped(self, *args, **kwargs): func(self, *args, **kwargs) self.duration = duration self.finish_time = None return wrapped def _timeout_initialise(self): if self.finish_time is None: self.finish_time = time.time() + self.duration def _timeout_update(func): @functools.wraps(func) def wrapped(self, *args, **kwargs): # make sure this class initialises time related functions # when the underlying original will initialise itself if self.original.status != common.Status.RUNNING: self.initialise() current_time = time.time() if current_time > self.finish_time: self.feedback_message = "timed out" # invalidate the original (i.e. cancel it) self.original.stop(common.Status.INVALID) return common.Status.FAILURE else: self.feedback_message = self.original.feedback_message + " [time left: %s]" % (self.finish_time - current_time) return self.original.status return wrapped def _timeout_terminate(func): @functools.wraps(func) def wrapped(self, new_status): if new_status != common.Status.RUNNING: self.finish_time = None return wrapped Timeout = create_imposter(cls) setattr(Timeout, "__init__", _timeout_init(Timeout.__init__, duration)) setattr(Timeout, "initialise", _timeout_initialise) setattr(Timeout, "update", _timeout_update(Timeout.update)) setattr(Timeout, "terminate", _timeout_terminate(Timeout.terminate)) return Timeout
############################################################################## # Oneshot ############################################################################## def oneshot(cls): def _oneshot_init(func): @functools.wraps(func) def wrapped(self, *args, **kwargs): func(self, *args, **kwargs) self.final_status = None return wrapped def _oneshot_update(func): @functools.wraps(func) def wrapped(self, *args, **kwargs): self.logger.debug("OneShot.wrapped_update()") if self.final_status: return self.final_status else: if self.original.status in (common.Status.FAILURE, common.Status.SUCCESS): self.final_status = self.original.status return self.original.status return wrapped def _oneshot_tick(func): @functools.wraps(func) def wrapped(self, *args, **kwargs): if self.final_status: self.status = self.final_status self.logger.debug("OneShot.wrapped_tick()[rebounding]") yield self else: self.logger.debug("OneShot.wrapped_tick()") for behaviour in func(self): yield behaviour return wrapped def _oneshot_terminate(func): @functools.wraps(func) def wrapped(self, new_status): self.logger.debug("OneShot.wrapped_terminate()[{}]".format(new_status)) # handle only the interrupt/reset case if new_status == common.Status.INVALID: if self.final_status: self.status = new_status else: self.original.stop(new_status) self.status = self.original.status return wrapped OneShot = create_imposter(cls) setattr(OneShot, "__init__", _oneshot_init(OneShot.__init__)) setattr(OneShot, "update", _oneshot_update(OneShot.update)) setattr(OneShot, "tick", _oneshot_tick(OneShot.tick)) setattr(OneShot, "terminate", _oneshot_terminate(OneShot.terminate)) return OneShot ############################################################################## # Inverter ##############################################################################
[docs]def inverter(cls): """ A decorator that inverts the result of a class's update function. Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @inverter class Failure(Success) pass or .. code-block:: python failure = inverter(Success)("Failure") """ def _update(func): @functools.wraps(func) def wrapped(self): if self.original.status == common.Status.SUCCESS: self.feedback_message = "success -> failure [{}]".format(self.original.feedback_message) return common.Status.FAILURE elif self.original.status == common.Status.FAILURE: self.feedback_message = "failure -> success [{}]".format(self.original.feedback_message) return common.Status.SUCCESS else: self.feedback_message = self.original.feedback_message return self.original.status return wrapped Inverter = create_imposter(cls) setattr(Inverter, "update", _update(Inverter.update)) return Inverter
############################# # RunningIsFailure #############################
[docs]def running_is_failure(cls): """ Got to be snappy! We want results...yesterday! Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @running_is_failure class NeedResultsNow(Pontificating) pass or .. code-block:: python need_results_now = running_is_failure(Pontificating)("Greek Philosopher") """ def _update(func): @functools.wraps(func) def wrapped(self): if self.original.status == common.Status.RUNNING: self.feedback_message = "running is failure" + (" [%s]" % self.original.feedback_message if self.original.feedback_message else "") return common.Status.FAILURE else: self.feedback_message = self.original.feedback_message return self.original.status return wrapped RunningIsFailure = create_imposter(cls) setattr(RunningIsFailure, "__name__", running_is_failure.__name__) setattr(RunningIsFailure, "update", _update(RunningIsFailure.update)) return RunningIsFailure
############################# # RunningIsSuccess #############################
[docs]def running_is_success(cls): """ Don't hang around... Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @running_is_success class DontHangAround(Pontificating) pass or .. code-block:: python dont_hang_around = running_is_success(Pontificating)("Greek Philosopher") """ def _update(func): @functools.wraps(func) def wrapped(self): if self.original.status == common.Status.RUNNING: self.feedback_message = "running is success" + (" [%s]" % self.original.feedback_message if self.original.feedback_message else "") return common.Status.SUCCESS else: self.feedback_message = self.original.feedback_message return self.original.status return wrapped RunningIsSuccess = create_imposter(cls) setattr(RunningIsSuccess, "__name__", running_is_success.__name__) setattr(RunningIsSuccess, "update", _update(RunningIsSuccess.update)) return RunningIsSuccess
############################# # FailureIsSuccess #############################
[docs]def failure_is_success(cls): """ Be positive, always succeed. Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @failure_is_success class MustGoOnRegardless(ActedLikeAGoon) pass or .. code-block:: python must_go_on_regardless = failure_is_success(ActedLikeAGoon)(name="Goon") """ def _update(func): @functools.wraps(func) def wrapped(self): if self.original.status == common.Status.FAILURE: self.feedback_message = "failure is success" + (" [%s]" % self.original.feedback_message if self.original.feedback_message else "") return common.Status.SUCCESS else: self.feedback_message = self.original.feedback_message return self.original.status return wrapped FailureIsSuccess = create_imposter(cls) setattr(FailureIsSuccess, "__name__", failure_is_success.__name__) setattr(FailureIsSuccess, "update", _update(FailureIsSuccess.update)) return FailureIsSuccess
############################# # FailureIsRunning #############################
[docs]def failure_is_running(cls): """ Dont stop running. Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @failure_is_running class MustGoOnRegardless(ActingLikeAGoon) pass or .. code-block:: python must_go_on_regardless = failure_is_running(ActingLikeAGoon)(name="Goon") """ def _update(func): @functools.wraps(func) def wrapped(self): if self.original.status == common.Status.FAILURE: self.feedback_message = "failure is running" + (" [%s]" % self.original.feedback_message if self.original.feedback_message else "") return common.Status.RUNNING else: self.feedback_message = self.original.feedback_message return self.original.status return wrapped FailureIsRunning = create_imposter(cls) setattr(FailureIsRunning, "__name__", failure_is_running.__name__) setattr(FailureIsRunning, "update", _update(FailureIsRunning.update)) return FailureIsRunning
############################# # SuccessIsFailure #############################
[docs]def success_is_failure(cls): """ Be depressed, always fail. Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @success_is_failure class TheEndIsNigh(ActingLikeAGoon) pass or .. code-block:: python the_end_is_nigh = success_is_failure(ActingLikeAGoon)(name="Goon") """ def _update(func): @functools.wraps(func) def wrapped(self): if self.original.status == common.Status.SUCCESS: self.feedback_message = "success is failure" + (" [%s]" % self.original.feedback_message if self.original.feedback_message else "") return common.Status.FAILURE else: self.feedback_message = self.original.feedback_message return self.original.status return wrapped SuccessIsFailure = create_imposter(cls) setattr(SuccessIsFailure, "__name__", success_is_failure.__name__) setattr(SuccessIsFailure, "update", _update(SuccessIsFailure.update)) return SuccessIsFailure
############################# # SuccessIsRunning #############################
[docs]def success_is_running(cls): """ It never ends... Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @success_is_running class TheEndIsSillNotNigh(ActingLikeAGoon) pass or .. code-block:: python the_end_is_still_not_nigh = success_is_running(ActingLikeAGoon)(name="Goon") """ def _update(func): @functools.wraps(func) def wrapped(self): if self.original.status == common.Status.SUCCESS: self.feedback_message = "success is running [%s]" % self.original.feedback_message return common.Status.RUNNING else: self.feedback_message = self.original.feedback_message return self.original.status return wrapped SuccessIsRunning = create_imposter(cls) setattr(SuccessIsRunning, "__name__", success_is_running.__name__) setattr(SuccessIsRunning, "update", _update(SuccessIsRunning.update)) return SuccessIsRunning
############################# # Condition #############################
[docs]def condition(cls, status): """ Encapsulates a behaviour and wait for it's status to flip to the desired state. This behaviour will tick with :data:`~py_trees.common.Status.RUNNING` while waiting and :data:`~py_trees.common.Status.SUCCESS` when the flip occurs. Args: cls (:class:`~py_trees.behaviour.Behaviour`): an existing behaviour class type status (:class:`~py_trees.common.Status`): the desired status to watch for Returns: :class:`~py_trees.behaviour.Behaviour`: the modified behaviour class Examples: .. code-block:: python @condition(py_trees.common.Status.RUNNING) class HangingAbout(WillStartSoon) pass or .. code-block:: python hanging_about = condition(WillStartSoon, py_trees.common.Status.RUNNING)(name="Hanging About") """ def _init(func, status): """ Replace the default init with one which also accepts the desired status to watch for. """ @functools.wraps(func) def wrapped(self, *args, **kwargs): func(self, *args, **kwargs) self.succeed_status = status return wrapped def _update(func): @functools.wraps(func) def wrapped(self): self.logger.debug("%s.update()" % self.__class__.__name__) self.feedback_message = "'{0}' has status {1}, waiting for {2}".format(self.original.name, self.original.status, self.succeed_status) if self.original.status == self.succeed_status: if self.original.status == common.Status.RUNNING: self.original.stop() return common.Status.SUCCESS else: return common.Status.RUNNING return wrapped Condition = create_imposter(cls) setattr(Condition, "__name__", "Condition<{0}>".format(cls.__name__)) setattr(Condition, "__init__", _init(Condition.__init__, status)) setattr(Condition, "update", _update(Condition.update)) return Condition