#!/usr/bin/env python
#
# License: BSD
# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
XML parser for the BehaviorTree format.
.. note::
The parser is experimental and its API may change between releases.
This module provides a parser for the BehaviorTree XML format, used to construct behaviour trees with key remapping
and subtree instantiation.
Overview
--------
The parser recursively builds a behaviour tree from an XML file, using a remapping table to track key assignments
and substitutions. The remapping table is a dictionary that maps keys (referenced in curly braces in the XML,
e.g. ``{key}``) to either their absolute paths (e.g. ``/some/key``) or to other keys
(e.g. ``{other_key}``), which are then further resolved to absolute paths.
In the end, all keys in the tree map to absolute paths which can be used to address the value in a map,
blackboard, or similar structure.
Remapping table example::
{
"absolute_value": "/absolute/path",
"curly_reference": "{absolute_value}",
}
Keys are resolved recursively until an absolute path is found. This allows flexible wiring of data flow between
nodes and subtrees.
Subtree templates and instantiation
-----------------------------------
Subtrees are defined as ``<BehaviorTree ID="...">`` elements in the XML. These act as templates, which can be
instantiated elsewhere in the tree using a ``<SubTree>`` tag. When a subtree is instantiated, the parser:
- Makes a local remapping table by applying any remappings specified in the ``<SubTree>`` tag.
- Recursively parses the referenced subtree template, using the updated remapping table and a new namespace.
Example subtree template:
.. code-block:: xml
<BehaviorTree ID="MySubtree">
<Sequence>
<Reader name="MyReader" input="{input_key}" />
<Writer name="MyInternalWriter" output="{transfer_key}" />
<Reader name="MyInternalReader" input="{transfer_key}" />
</Sequence>
</BehaviorTree>
Example main tree instantiating a subtree:
.. code-block:: xml
<BehaviorTree ID="MainTree">
<Writer output="{some_key}" name="WriterMain" />
<SubTree ID="MySubtree" name="Subtree1" input_key="{some_key}"/>
</BehaviorTree>
This example will map ``{some_key}`` to ``/some_key`` in the remapping table (the root namespace is ``/``),
and when the ``MySubtree`` is instantiated, it will create a new namespace ``/Subtree1`` where:
- ``{input_key}`` resolves to ``/some_key``
- ``{transfer_key}`` (which is not remapped in the ``SubTree`` tag) resolves to
``/Subtree1/transfer_key`` --- so a new key is created for the subtree.
A good documentation of remapping and subtrees can be found on the
`BT.CPP documentation <https://www.behaviortree.dev/docs/tutorial-basics/tutorial_06_subtree_ports>`_.
Parsing walkthrough
-------------------
Given the above example, parsing proceeds as follows:
1. The parser starts at the ``MainTree`` with an empty remapping table and namespace ``/``.
2. It encounters the ``Writer`` node, which uses ``{some_key}``. Since this key is new, it is mapped to
``/some_key`` in the remapping table.
3. The ``SubTree`` node is encountered. The parser:
- Copies the current remapping table.
- Adds ``input_key -> {some_key}`` to the remapping table for the subtree.
- Sets the namespace to ``/Subtree1``.
- Recursively parses the ``MySubtree`` template.
4. Inside ``MySubtree``:
- The ``Reader`` node uses ``{input_key}``, which resolves (via remapping) to ``/some_key``.
- The ``Writer`` node uses ``{transfer_key}``, which is new, so it is mapped to ``/Subtree1/transfer_key``.
- The second ``Reader`` node uses ``{transfer_key}``, which now resolves to ``/Subtree1/transfer_key``.
Key concepts
------------
- **Remapping table**: Tracks how keys in curly braces are resolved to absolute paths or to other keys.
- **Namespace**: Each subtree instantiation gets its own namespace, ensuring keys are scoped and do not collide.
- **Subtree instantiation**: Subtrees are templates; instantiating them is like copy-pasting their structure,
but with remapped keys and a new namespace.
For more details, see the code and the accompanying tests in ``test_ports_xml_parser.py``.
"""
import functools
import inspect
import os
import re
import uuid
import xml.etree.ElementTree as ET
from copy import deepcopy
from typing import Any
import py_trees
from py_trees.ports import CONST_PREFIX, DOT_REPLACEMENT, PortsMixin
from py_trees.ports_utils import (
apply_type_hints,
generate_node_name,
NOOP_LOGGER,
PortsLogger,
)
# Helper: parse curly-brace keys
CURLY_PATTERN = re.compile(r"^{(.+)}$")
# All composite or decorator tags which can have children and have ports (case-insensitive)
# Migration note: this hard-coded set is a known limitation.
# A follow-up should replace it with dynamic PortsMixin detection.
PARENT_NODES_WITH_PORTS_TAGS = {
"ifelse",
"repeat",
"retry",
"caseswitch",
"ifdataavailable",
"ifnodataavailable",
}
# All composite or decorator tags which can have children (case-insensitive)
PARENT_NODES_TAGS = set(PARENT_NODES_WITH_PORTS_TAGS) | {
"sequence",
"selector",
"fallback",
"parallel",
}
[docs]def build_bt_index(root: ET.Element) -> dict[str, ET.Element]:
"""Return ``{ID: element}`` for every ``<BehaviorTree>`` child of *root*."""
return {bt.attrib["ID"]: bt for bt in root.findall("BehaviorTree")}
[docs]def is_key(value: str) -> re.Match | None:
"""Return a regex match if *value* is a ``{key}`` reference."""
open_braces = value.count("{")
close_braces = value.count("}")
if open_braces == 0 and close_braces == 0:
return None
if open_braces != 1 or close_braces != 1:
raise ValueError(f"Malformed key reference '{value}'")
match = CURLY_PATTERN.match(value)
if not match:
raise ValueError(f"Malformed key reference '{value}'")
return match
[docs]def get_key_name(value: str) -> str:
"""Extract the key name from a ``{key}`` reference string."""
match = is_key(value)
assert match, f"Key '{value}' is not a valid key"
return str(match.group(1))
[docs]def resolve_key_remapping(key: str, remapping_table: dict[str, str]) -> str:
"""
Recursively resolve a key through the remapping table until it is not a curly-brace key.
Cyclic remappings are explicitly checked and will raise a RuntimeError if detected.
Args:
key: Input key. Expected to be either a curly-brace key or an absolute key.
remapping_table: Maps logical keys to absolute keys.
Returns:
str: The resolved absolute key.
Raises:
ValueError: If the key can't be properly resolved.
RuntimeError: If a cyclic remapping is detected.
"""
visited = set()
while True:
match = is_key(key)
if not match:
# If not a curly-brace key, it must be an absolute key. Check that this is the case (raise an error if not)
# and then return the key.
if not key.startswith("/"):
raise ValueError(f"Key '{key}' is not an absolute key")
return key
inner = match.group(1)
if inner in visited:
raise RuntimeError(f"Cyclic remapping detected for key: {key}")
visited.add(inner)
if inner in remapping_table:
key = remapping_table[inner]
else:
# If not found, this is an error. If there is a remapping to a key specified,
# then that key must be in the remapping table - reason: a key, e.g. `{my_key}`,
# will resolve to an absolute path like /subtree1/my_key (the absolute path which includes
# the subtree namespace must be in the remapping table, otherwise the information on the subtree
# namespace is lost).
raise ValueError(f"Key '{inner}' not found in remapping table")
[docs]def resolve_direct_value_remapping(key: str, remapping_table: dict[str, str]) -> str:
"""
Obtain a direct value from the remapping table.
Args:
key: The direct value.
remapping_table: A mapping of prefixed keys to their resolved values.
Returns:
The resolved value from the remapping table.
Raises:
ValueError: If the prefixed key is not found in the remapping table or is of invalid format.
"""
if key.startswith("/"):
raise ValueError(f"Key '{key}' is of invalid format")
encoded_key = key.replace(".", DOT_REPLACEMENT)
key = f"{CONST_PREFIX}{encoded_key}"
if key in remapping_table:
key = remapping_table[key]
else:
# If not found, this is an error.
raise ValueError(f"Key '{key}' not found in remapping table")
return key
[docs]def get_absolute_reference(value: str, subtree_namespace: str) -> str:
"""
Get the absolute path of a value (e.g. a key) by prepending the namespace.
Args:
value (str): The value to make absolute.
subtree_namespace (str): The current subtree namespace.
Returns:
str: The absolute reference to the value.
"""
return subtree_namespace.rstrip("/") + "/" + value
[docs]def get_class_from_init_lookup(
class_name: str, init_lookup: dict
) -> type["PortsMixin"]:
"""
Get the class from the init_lookup dictionary, ensuring it is a subclass of :class:`PortsMixin`.
Args:
class_name (str): The name of the class to look up.
init_lookup (dict): A dictionary mapping class names to class constructors or partial callables,
e.g. ``{"Producer": Producer, "Consumer": partial(Consumer, name=name)}``.
Returns:
The class (subclass of :class:`PortsMixin`).
Raises:
KeyError: If the class name is not found in the init_lookup.
TypeError: If the entry is not a class or partial callable,
or if the class is not a subclass of :class:`PortsMixin`.
"""
if class_name not in init_lookup:
raise KeyError(
f"Class name '{class_name}' not found in init_lookup: {init_lookup}"
)
entry = init_lookup[class_name]
# Case 1: Direct class reference
if inspect.isclass(entry):
cls = entry
elif isinstance(entry, functools.partial) and inspect.isclass(entry.func):
cls = entry.func
else:
raise TypeError(f"Unsupported entry type for '{class_name}': {type(entry)}")
# Validate subclass
if not issubclass(cls, PortsMixin):
raise TypeError(f"{cls.__name__} is not a subclass of PortsMixin")
return cls
[docs]def parse_behaviour_tree_xml(
xml_file: str,
main_tree_id: str | None = None,
init_lookup: dict | None = None,
logger: PortsLogger | None = None,
search_paths: list[str] | None = None,
) -> py_trees.behaviour.Behaviour:
"""
Parse the XML file and build the behavior tree.
Supports simple top-level imports via:
<Import src="other.xml"/>
<Include file="other.xml"/>
The import pre-pass inlines all <BehaviorTree> elements from the referenced XMLs
into the current document. If any imported BehaviorTree ID already exists, a
ValueError is raised.
Args:
xml_file (str): Path to the main XML file.
main_tree_id (str | None): ID of the tree to execute; if None, read from 'main_tree_to_execute'.
init_lookup (dict): Mapping from tag -> constructor/partial for PortsMixin nodes (required).
logger (PortsLogger | None): Optional logger (NoOp if None).
search_paths (list[str] | None): Optional extra directories to resolve imports.
Returns:
The root py_trees.behaviour.Behaviour for the requested tree.
Raises:
ValueError: If init_lookup is missing or the main BehaviorTree ID is not found.
FileNotFoundError / RuntimeError: From the import pre-pass if relevant.
"""
if logger is None:
logger = NOOP_LOGGER
if init_lookup is None:
raise ValueError("init_lookup dictionary must be provided")
xml_tree = ET.parse(xml_file)
root = xml_tree.getroot()
# Load and inline any imports *before* building the index
_inline_imports_into_root(
root=root,
current_file=os.path.abspath(xml_file),
logger=logger,
search_paths=search_paths,
)
if main_tree_id is None:
main_tree_id = root.attrib.get("main_tree_to_execute")
# Pretty-print the whole of the XML tree after imports
logger.debug(ET.tostring(root, encoding="unicode"))
bt_index = build_bt_index(root)
if main_tree_id not in bt_index:
raise ValueError(f"BehaviorTree with ID '{main_tree_id}' not found.")
logger.debug(f"[DEBUG] Starting parse of main tree ID='{main_tree_id}'")
bt_elem = bt_index[main_tree_id]
tree = build_tree_from_xml(
bt_elem,
remapping_table={},
init_lookup=init_lookup,
bt_index=bt_index,
logger=logger,
subtree_namespace="/",
parent_names_str="",
)
# Consistency check: traverse the whole tree and check if any nodes have duplicate names.
seen_names = set()
for node in tree.iterate():
logger.debug(f"Node in tree: {node.name} ({node.__class__.__name__})")
if node.name in seen_names:
raise ValueError(
f"Duplicate node name found: {node.name}. Mitigate by assigning explicit names to parent tags."
)
seen_names.add(node.name)
return tree
[docs]def add_new_key_to_remapping_table(
value: str, remapping_table: dict[str, str], subtree_namespace: str
) -> None:
"""
Add the value to the remapping table, **if** it is a key itself.
If we encounter a remapping in a ``SubTree`` or ``PortsMixin``-derived tag,
e.g. `remapped_key={other_key}` and the *value* (i.e. {other_key}) is a key itself,
and this key is *not* yet in the remapping table, then it means that in the
current subtree namespace, we have encountered this key for the first time.
Example:
```
<SubTree ID="subtree1" in="{other_key}"/>
```
The first time we parse a tag which has a key in the assigned value, and the key (i.e. `{other_key}`)
is not yet in the remapping table, it needs to be added, within the scope of the subtree namespace.
Args:
value (str): The value of the remapping, e.g. `{other_key}` in the statement `remapped_key={other_key}`.
remapping_table (dict[str, str]): The remapping table.
subtree_namespace (str): The current subtree namespace.
"""
if is_key(value):
key_name = get_key_name(value)
# If it's not a key, then it must be a direct value.
else:
encoded_value = value.replace(".", DOT_REPLACEMENT)
key_name = f"{CONST_PREFIX}{encoded_value}"
if key_name not in remapping_table:
scoped_key = get_absolute_reference(key_name, subtree_namespace)
remapping_table[key_name] = scoped_key
[docs]def build_subtree_remapping(
elem: ET.Element,
remapping_table: dict[str, str],
parent_namespace: str,
logger: PortsLogger = NOOP_LOGGER,
) -> dict[str, str]:
"""
Process the <SubTree> XML element.
The remapping table is updated to include the remappings from the <subtreeplus> or <subtree> element.
The subtree is then instantiated with the new remapping table.
Args:
elem: The <SubTree> (or <SubTreePlus>) XML element.
remapping_table: Parent remapping table (logical name -> absolute key).
parent_namespace: Absolute namespace of the parent tree.
logger: Optional logger.
Returns:
dict[str, str]: The local remapping for this subtree's ports.
Raises:
ValueError: If a referenced key is missing or a value can't be resolved.
RuntimeError: If cyclic remapping is detected during resolution.
"""
# Add new keys that appear in the values of an attribute of this SubTree element to the remapping table.
# Example: `<SubTree ID="subtree1" in="{other_key}" />` - if `other_key` is a key, and it has been
# encountered during parsing for the first time, it will be added to the remapping table with its absolute path
# within the parent tree namespace (e.g. `/other_key`), so that it can be resolved when parsing the subtree.
# We need to do that in order for the resolving (call to `resolve_*_remapping()` below) of the keys to work
# correctly.
for k, v in elem.attrib.items():
if k in ("ID", "name"):
continue
logger.debug(f"Checking to add new key for SubTree attribute: {k} -> {v}")
add_new_key_to_remapping_table(
v, remapping_table=remapping_table, subtree_namespace=parent_namespace
)
logger.debug(f"Updated remapping table: {remapping_table}")
# Build the new remapping table for this subtree. We build a new remapping table because we need to ensure that
# *only* the keys that are explicitly remapped in the <SubTree> element are included. The parent remapping table
# may already include keys with the same name that are to be treated as local keys in the subtree namespace.
new_remapping = {}
# Add keys from this SubTree element to the local remapping table.
# If the value is a key and it is already in the remapping table, we resolve it to its absolute path.
# Example: `<SubTree ID="subtree1" in="{other_key}" />` - we need resolve {other_key} to its absolute
# path (using the parent remapping) and then add `in -> resolved({other_key})` to the new remapping table.
for k, v in elem.attrib.items():
if k in ("ID", "name"):
continue
logger.debug(f"Processing SubTree attribute: {k} -> {v}")
if is_key(v):
if get_key_name(v) not in remapping_table:
raise ValueError(f"Key {v} not found in remapping table")
# If the value is a key and it is in the remapping table,
# we resolve it to its absolute path.
v = resolve_key_remapping(v, remapping_table)
logger.debug(
f"[Key] Adding remapping from parent remapping table: {k} -> {v}"
)
else:
# If the value is not a key, it is a direct value which is input in the subtree,
# for example `<SubTree ID="subtree1" in="500" />`.
v = resolve_direct_value_remapping(v, remapping_table)
logger.debug(
f"[Direct value] Adding remapping from parent remapping table: {k} -> {v}"
)
new_remapping[k] = v
logger.debug(f"Subtree '{elem.attrib['ID']}' new remapping table: {new_remapping}")
return new_remapping
[docs]def build_port_remappings(
elem: ET.Element,
class_: type[PortsMixin],
remapping_table: dict[str, str],
subtree_namespace: str,
logger: PortsLogger = NOOP_LOGGER,
) -> dict[str, str]:
"""
Build ``{port_name -> absolute_key}`` for any :class:`PortsMixin` node from XML attributes.
Mirrors the logic used for :class:`PortsMixin` leaves:
- Attributes must correspond to declared input/output ports
(otherwise ``NotImplementedError``).
- Each attribute value may be a ``"{key}"`` reference or a direct value; both are resolved
to absolute keys via the remapping table (adding entries as needed).
- If the natural in-namespace key (``/{ns}/{port}``) differs from the resolved absolute key,
a remapping is recorded.
"""
# Build port remappings for this PortsMixin node.
# This is a dictionary that maps port names to their absolute keys.
port_remappings = {} # {port: remap_to_absolute_key}
for attrib_key, attrib_value in elem.attrib.items():
if attrib_key == "name":
continue
if (
attrib_key not in class_.input_ports()
and attrib_key not in class_.output_ports()
):
logger.debug(
f"Attribute '{attrib_key}' is not defined in class '{class_.__name__}'. "
f"Treating as additional parameter."
)
continue
if is_key(attrib_value):
# If the value is a key, and we haven't yet encountered it, we need to add it to the remapping table,
# so that it can be resolved to its absolute path.
add_new_key_to_remapping_table(
attrib_value,
remapping_table=remapping_table,
subtree_namespace=subtree_namespace,
)
# Resolve the value to its absolute path.
absolute_key = resolve_key_remapping(attrib_value, remapping_table)
else:
# If the value is not a key, it is a direct value which is input in the node.
# We need to add a new blackboard key in this namespace.
# Then we need to add the key to the remapping table.
add_new_key_to_remapping_table(
attrib_value,
remapping_table=remapping_table,
subtree_namespace=subtree_namespace,
)
absolute_key = resolve_direct_value_remapping(attrib_value, remapping_table)
# Always record the remapping if an explicit attribute is provided. Even if the
# resolved key matches the natural namespace location, the explicit declaration
# means the user intends to share that port, and the PortsMixin will use the
# provided absolute key instead of the node-scoped default.
port_remappings[attrib_key] = absolute_key
return port_remappings
[docs]def instantiate_ports_node(
elem: ET.Element,
init_lookup: dict,
remapping_table: dict[str, str],
subtree_namespace: str,
logger: PortsLogger = NOOP_LOGGER,
constructor_kwargs: dict | None = None,
parent_names_str: str = "",
) -> PortsMixin:
"""
Instantiate any PortsMixin-based node (leaf or composite).
- looks up the class via `init_lookup` (validated with `get_class_from_init_lookup`)
- builds port remappings from `elem.attrib`
- constructs the instance (using `name` attribute or class_name)
- calls `setup_ports(...)`
Args:
elem: The XML element to parse.
init_lookup (dict): Mapping from class names (str) to callables (constructors or partials)
that return ``PortsMixin``-derived instances.
remapping_table (dict): Mapping from keys (str) to absolute keys (str).
subtree_namespace (str): The namespace for this subtree.
logger: Optional logger-like object.
constructor_kwargs (dict | None): Additional keyword arguments to pass to the constructor.
parent_names_str (str): Dot-separated string of parent names for logging context and generating node names.
Returns:
PortsMixin: the fully initialised node.
"""
portsmixin_name = elem.tag
# Get the class definition from init_lookup - needed to check the ports.
try:
cls = get_class_from_init_lookup(portsmixin_name, init_lookup)
except KeyError:
raise NotImplementedError(
f"Class name '{portsmixin_name}' not found in init_lookup.Supporting other types is still TODO."
)
port_remappings = build_port_remappings(
elem=elem,
class_=cls,
remapping_table=remapping_table,
subtree_namespace=subtree_namespace,
logger=logger,
)
instance_name = generate_node_name(
explicit_name=elem.attrib.get("name", None),
general_name=portsmixin_name,
prefix=parent_names_str,
)
logger.debug(
f"PortsMixin node '{instance_name}' in namespace {subtree_namespace} remappings: {port_remappings}"
)
if portsmixin_name not in init_lookup:
raise ValueError(
f"PortsMixin class '{portsmixin_name}' not found in init_lookup table"
)
constructor_kwargs = constructor_kwargs or {} # create constructor_kwargs
for attrib_key, attrib_value in elem.attrib.items():
if attrib_key == "name":
continue
if attrib_key not in cls.input_ports() and attrib_key not in cls.output_ports():
if is_key(attrib_value):
raise ValueError(
f"'{portsmixin_name}'(name='{instance_name}'): Port remappings are "
f"not supported for non-port attributes ('{attrib_key}'='{attrib_value}'). "
)
# Consistency check: if constructor_kwargs already has this key, and it's conflicting,
# print a warning.
if constructor_kwargs and attrib_key in constructor_kwargs:
existing_entry = constructor_kwargs[attrib_key]
if existing_entry != attrib_value:
logger.warning(
f"Conflicting values for attribute '{attrib_key}': "
f"{existing_entry} (existing) vs {attrib_value} (new). "
f"Using {attrib_value}."
)
logger.debug(
f"Attribute '{attrib_key}' is not defined in class '{cls.__name__}'. Treating as additional parameter."
)
constructor_kwargs[attrib_key] = attrib_value
ctor_callable = init_lookup[portsmixin_name]
# Try to convert the constructor arguments to the correct type.
ignore_keys = {"child", "children", "behaviour_class_name"}
constructor_kwargs, success = apply_type_hints(
ctor_callable, constructor_kwargs, logger=logger, ignore=ignore_keys
)
if not success:
logger.warning(
"Failed to apply type hints to constructor arguments. See error log. Proceeding, but leaving "
"the conversion to the constructors."
)
try:
# Pass the behaviour_class_name (the tag/registry name) to the constructor
node: PortsMixin = init_lookup[portsmixin_name](
name=instance_name,
behaviour_class_name=portsmixin_name,
**constructor_kwargs,
)
except Exception as e: # Catch everything that may go wrong in the constructor
# TODO: make XMLParserError to be more specific
raise ValueError(
f"Failed to instantiate '{portsmixin_name}' with args {constructor_kwargs} "
f"(check: does the instantiated class have kwargs in __init__?): {e}"
) from e
node.setup_ports(
port_remappings=port_remappings,
subtree_namespace=subtree_namespace,
logger=logger,
)
return node
[docs]def build_tree_from_xml(
elem: ET.Element,
remapping_table: dict[str, str],
init_lookup: dict,
bt_index: dict,
logger: PortsLogger = NOOP_LOGGER,
subtree_namespace: str = "/",
parent_names_str: str = "",
) -> py_trees.behaviour.Behaviour:
"""
Recursively build the tree from XML element.
Args:
elem: XML element
remapping_table (dict[str, str]): Remapping table.
init_lookup (dict): Mapping from class names (str) to callables (constructors or partials)
that return ``PortsMixin``-derived instances.
bt_index (dict[str, BehaviorTree]): dictionary {ID: BehaviorTree element} for subtree lookup.
subtree_namespace (str): current blackboard namespace.
logger: Optional logger-like object.
parent_names_str (str): Dot-separated string of parent names for logging context and generating node names.
Returns:
py_trees.behaviour.Behaviour
Raises:
NotImplementedError: For unknown composite tags or unsupported ports.
ValueError: For missing trees or unsupported tags.
AssertionError: If a <BehaviorTree> does not have exactly one child.
"""
tag = elem.tag.lower()
logger.debug(
f"Processing tag: '{elem.tag}' with attributes {elem.attrib}. Remapping table: {remapping_table}"
)
# Composite/Decorator nodes which can have children:
if tag in PARENT_NODES_TAGS:
node_name = generate_node_name(
explicit_name=elem.attrib.get("name", None),
general_name=elem.tag,
prefix=parent_names_str,
)
logger.debug(
f"Entering composite node: {tag} (given name {node_name} with parent names {parent_names_str}). "
"Build children first, so we can pass them to the constructor."
)
children = []
for child in elem:
# Use no general name fallback for the parent_names_str to pass to the children.
# Otherwise, the generated node names will get too long and not very readable.
# It also means that we lose the guarantee of unique names, but that can be avoided by
# assigning an explicit name to the parent tags.
concise_parent_names_str = generate_node_name(
explicit_name=elem.attrib.get("name", None),
general_name="",
prefix=parent_names_str,
no_uuid=True,
)
child_node = build_tree_from_xml(
child,
remapping_table,
init_lookup,
bt_index,
logger=logger,
subtree_namespace=subtree_namespace,
parent_names_str=concise_parent_names_str,
)
if not isinstance(child_node, py_trees.behaviour.Behaviour):
raise TypeError(
f"Child node of type {type(child_node).__name__} is not a valid py_trees Behavior."
)
children.append(child_node)
memory = elem.attrib.get("memory", "true").lower() == "true"
node: py_trees.behaviour.Behaviour
if tag == "sequence":
node = py_trees.composites.Sequence(
name=node_name, memory=memory, children=children
)
elif tag == "selector" or tag == "fallback":
node = py_trees.composites.Selector(
name=node_name, memory=memory, children=children
)
elif tag == "parallel":
policy = elem.attrib.get("policy", "success_on_one")
mapping = {
"success_on_all": py_trees.common.ParallelPolicy.SuccessOnAll,
"success_on_one": py_trees.common.ParallelPolicy.SuccessOnOne,
"success_on_selected": py_trees.common.ParallelPolicy.SuccessOnSelected,
}
node = py_trees.composites.Parallel(
name=node_name,
policy=mapping.get(
policy, py_trees.common.ParallelPolicy.SuccessOnAll
)(),
children=children,
)
elif tag in PARENT_NODES_WITH_PORTS_TAGS:
# Instantiate ports-enabled composite
constructor_kwargs: dict[str, Any] = {}
cls = get_class_from_init_lookup(elem.tag, init_lookup)
if issubclass(cls, py_trees.decorators.Decorator):
if not len(children) == 1:
raise ValueError(
f"Decorator '{elem.tag}' must have exactly one child, but got {len(children)}."
)
constructor_kwargs["child"] = children[0] if children else None
else:
constructor_kwargs["children"] = children
node = instantiate_ports_node(
elem=elem,
init_lookup=init_lookup,
remapping_table=remapping_table,
subtree_namespace=subtree_namespace,
logger=logger,
constructor_kwargs=constructor_kwargs,
parent_names_str=parent_names_str,
)
else:
raise NotImplementedError(f"Unknown composite tag: {tag}")
if not isinstance(node, py_trees.composites.Composite) and not isinstance(
node, py_trees.decorators.Decorator
):
raise TypeError(
f"XML tag '{elem.tag}' did not instantiate a Composite; got {type(node).__name__}"
)
return node
elif tag == "behaviortree":
# This branch is reached when the parser encounters a <BehaviorTree> tag during recursion.
# There are two main cases:
# 1. At the top level, the parser starts with the <BehaviorTree> whose ID matches main_tree_to_execute.
# 2. When recursing into a subtree, the parser finds the referenced <BehaviorTree> by ID and enters here.
# In both cases, the <BehaviorTree> tag is a container for the actual tree structure (usually a Sequence,
# Selector, etc.).
# The parser expects the <BehaviorTree> to have a single child (the root control node of the tree).
# This is because the subtree is to be used like a whole XML, which has one root node.
# It recurses into that child, passing along the current remapping table and other context.
# This is NOT a subtree instantiation (which is handled by the 'subtreeplus'/'subtree' branch),
# but simply the entry point for parsing the structure of a tree or subtree definition.
logger.debug(f"Entering <BehaviorTree> ID='{elem.attrib.get('ID', '')}'")
child_elems = list(elem)
assert len(child_elems) == 1, (
f"<BehaviorTree ID='{elem.attrib.get('ID', '')}'> must have exactly one child (the root node), "
f"but found {len(child_elems)} children."
)
return build_tree_from_xml(
child_elems[0],
remapping_table,
init_lookup,
bt_index,
logger=logger,
subtree_namespace=subtree_namespace,
parent_names_str=parent_names_str,
)
elif tag in ("subtreeplus", "subtree"):
# This branch is reached when the parser encounters a <subtreeplus> or <subtree> tag during recursion.
# These tags are used to instantiate subtrees.
# It recurses into that child, passing along the current remapping table and the subtree namespace.
# The remapping table is updated to include the remappings from the <subtreeplus> or <subtree> element.
# The subtree is then instantiated with the new remapping table.
logger.debug(
f"Instantiating subtree '{elem.attrib['ID']}' with remapping table BEFORE: {remapping_table}"
)
subtree_id = elem.attrib["ID"]
subtree_name = elem.attrib.get("name", str(uuid.uuid4()))
if subtree_id not in bt_index:
raise ValueError(f"Subtree ID '{subtree_id}' not found in XML.")
subtree_elem = bt_index[subtree_id]
# Build the new namespace by prepending the subtree ID
new_namespace = get_absolute_reference(subtree_name, subtree_namespace)
# Build the new remapping table for this subtree.
new_remapping = build_subtree_remapping(
elem, remapping_table, subtree_namespace, logger
)
# Recursively build the subtree with the new remapping table
return build_tree_from_xml(
subtree_elem,
new_remapping,
init_lookup,
bt_index,
logger=logger,
subtree_namespace=new_namespace,
parent_names_str=((parent_names_str + ".") if parent_names_str else "")
+ subtree_name,
)
elif elem.tag in init_lookup:
# Leaf PortsMixin node (any PortsMixin + Behaviour combination).
logger.debug(f"Creating PortsMixin leaf node for tag {elem.tag}.")
node = instantiate_ports_node(
elem=elem,
init_lookup=init_lookup,
remapping_table=remapping_table,
subtree_namespace=subtree_namespace,
logger=logger,
parent_names_str=parent_names_str,
)
if not (
isinstance(node, PortsMixin)
and isinstance(node, py_trees.behaviour.Behaviour)
):
raise TypeError(
f"XML tag '{elem.tag}' did not instantiate a PortsMixin + "
f"py_trees.behaviour.Behaviour; got {type(node).__name__}"
)
return node
else:
logger.error(f"Unsupported tag encountered: {elem.tag}")
raise ValueError(
f"Unsupported tag '{elem.tag}' encountered in XML. "
"This is not a known composite, subtree, or PortsMixin node."
)
def _collect_bt_ids(root: ET.Element) -> set[str]:
"""
Collect IDs of all <BehaviorTree> elements in an XML root.
Args:
root: XML element that contains zero or more <BehaviorTree> children.
Returns:
A set of all BehaviorTree IDs found under 'root'.
Raises:
KeyError: If any <BehaviorTree> is missing the 'ID' attribute.
"""
return {bt.attrib["ID"] for bt in root.findall("BehaviorTree")}
def _resolve_import_path(
src: str, base_dir: str, search_paths: list[str] | None
) -> str:
"""
Resolve import path to an absolute file on disk.
Resolution order:
1) If 'src' is absolute and exists, use it.
2) Relative to 'base_dir' (the directory of the including XML).
3) Each directory in 'search_paths' (if provided).
Args:
src: Path from the <Import src="..."> or <Include file="..."> attribute.
base_dir: Directory of the XML that contains the import.
search_paths: Optional additional directories to search.
Returns:
Absolute, existing path to the imported XML.
Raises:
FileNotFoundError: If no candidate path exists.
"""
candidates = []
if os.path.isabs(src):
candidates.append(src)
candidates.append(os.path.join(base_dir, src))
if search_paths:
candidates.extend(os.path.join(p, src) for p in search_paths)
for c in candidates:
if os.path.exists(c):
return os.path.abspath(c)
raise FileNotFoundError(f"Import not found for '{src}'. Tried: {candidates}")
def _inline_imports_into_root(
root: ET.Element,
current_file: str,
logger: PortsLogger,
search_paths: list[str] | None,
visited: set[str] | None = None,
) -> None:
"""
Inline top-level <Import/> / <Include/> directives into 'root' in-place.
This performs a pre-processing step so that the rest of the parser can
remain unchanged. It:
- Resolves each import path.
- Prevents import cycles.
- Parses the imported XML and recursively inlines imports there as well.
- Appends *all* <BehaviorTree> elements from the imported XML to 'root'.
- Raises if any imported BehaviorTree ID already exists in 'root'.
- Removes the <Import/> / <Include/> element after inlining.
Notes:
- Only top-level imports (direct children of 'root') are supported.
- No renaming/prefixing: ID collisions are treated as errors.
Args:
root: The root element of the currently loaded XML.
current_file: Absolute path of the XML file backing 'root'.
logger: Logger-like object or None for debug messages.
search_paths: Optional extra directories to search for import files.
visited: Set of absolute file paths already processed (cycle guard).
Raises:
ValueError: On BehaviorTree ID collisions or malformed imports.
FileNotFoundError: If an import target cannot be found.
"""
if visited is None:
visited = set()
base_dir = os.path.dirname(os.path.abspath(current_file))
try:
existing_ids = _collect_bt_ids(root)
except KeyError as e:
raise ValueError(f"BehaviorTree missing ID in '{current_file}'") from e
# Only handle top-level children named Import/Include (case-insensitive).
imports = [
child for child in list(root) if child.tag.lower() in ("import", "include")
]
logger.debug(f"Found {len(imports)} import(s) in '{current_file}'")
for imp in imports:
logger.debug(f"Processing import directive '{imp.tag}': {imp.attrib}")
# Support either src= or file=
src = imp.attrib.get("src") or imp.attrib.get("file")
if not src:
raise ValueError(f"<{imp.tag}> requires 'src' or 'file' attribute")
# Resolve path relative to the including file (or search_paths)
resolved = _resolve_import_path(src, base_dir, search_paths)
# Cycle detection
if resolved in visited:
# Don't re-add the same file
continue
visited.add(resolved)
logger.debug(f"Inlining {imp.tag} from '{resolved}'")
# Parse the imported file and inline its imports first (depth-first)
imported_tree = ET.parse(resolved)
imported_root = imported_tree.getroot()
_inline_imports_into_root(
imported_root, resolved, logger, search_paths, visited
)
logger.debug(
f"Imported XML: '{ET.tostring(imported_root, encoding='unicode')}'"
)
# Append all BehaviorTrees from imported file, but forbid ID collisions
for bt in imported_root.findall("BehaviorTree"):
logger.debug(f"Processing ID={bt.attrib.get('ID')}")
bt_id = bt.attrib.get("ID")
if not bt_id:
raise ValueError(f"Imported BehaviorTree missing ID in '{resolved}'")
if bt_id in existing_ids:
# Simple mode: collisions are hard errors
raise ValueError(
f"BehaviorTree ID collision: '{bt_id}' already exists while importing '{resolved}'"
)
# Deep-copy to detach from the imported tree and append into 'root'
root.append(deepcopy(bt))
existing_ids.add(bt_id)
logger.debug(f"Imported BehaviorTree ID='{bt_id}' from '{resolved}'")
# Remove the import directive after successful inlining
root.remove(imp)