123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831 |
- # -*- coding: utf-8 -*-
- from collections import OrderedDict, defaultdict
- import copy
- from functools import partial, reduce
- import inspect
- import logging
- from six import string_types
- from ..core import State, Machine, Transition, Event, listify, MachineError, Enum, EnumMeta, EventData
- _LOGGER = logging.getLogger(__name__)
- _LOGGER.addHandler(logging.NullHandler())
- # this is a workaround for dill issues when partials and super is used in conjunction
- # without it, Python 3.0 - 3.3 will not support pickling
- # https://github.com/pytransitions/transitions/issues/236
- _super = super
- # converts a hierarchical tree into a list of current states
- def _build_state_list(state_tree, separator, prefix=[]):
- res = []
- for key, value in state_tree.items():
- if value:
- res.append(_build_state_list(value, separator, prefix=prefix + [key]))
- else:
- res.append(separator.join(prefix + [key]))
- return res if len(res) > 1 else res[0]
- # custom breadth-first tree exploration
- # makes sure that ALL children are evaluated before parents in parallel states
- def _resolve_order(state_tree):
- s = state_tree
- q = []
- res = []
- p = []
- while True:
- for k in reversed(list(s.keys())):
- pk = p + [k]
- res.append(pk)
- if s[k]:
- q.append((pk, s[k]))
- if not q:
- break
- p, s = q.pop(0)
- return reversed(res)
- class FunctionWrapper(object):
- """ A wrapper to enable transitions' convenience function to_<state> for nested states.
- This allows to call model.to_A.s1.C() in case a custom separator has been chosen."""
- def __init__(self, func, path):
- """
- Args:
- func: Function to be called at the end of the path.
- path: If path is an empty string, assign function
- """
- if path:
- self.add(func, path)
- self._func = None
- else:
- self._func = func
- def add(self, func, path):
- """ Assigns a `FunctionWrapper` as an attribute named like the next segment of the substates
- path.
- Args:
- func (callable): Function to be called at the end of the path.
- path (string): Remaining segment of the substate path.
- """
- if path:
- name = path[0]
- if name[0].isdigit():
- name = 's' + name
- if hasattr(self, name):
- getattr(self, name).add(func, path[1:])
- else:
- setattr(self, name, FunctionWrapper(func, path[1:]))
- else:
- self._func = func
- def __call__(self, *args, **kwargs):
- return self._func(*args, **kwargs)
- class NestedEvent(Event):
- """ An event type to work with nested states.
- This subclass is NOT compatible with simple Machine instances.
- """
- def trigger(self, _model, _machine, *args, **kwargs):
- """ Serially execute all transitions that match the current state,
- halting as soon as one successfully completes. NOTE: This should only
- be called by HierarchicalMachine instances.
- Args:
- _model (object): model object to
- _machine (HierarchicalMachine): Since NestedEvents can be used in multiple machine instances, this one
- will be used to determine the current state separator.
- args and kwargs: Optional positional or named arguments that will
- be passed onto the EventData object, enabling arbitrary state
- information to be passed on to downstream triggered functions.
- Returns: boolean indicating whether or not a transition was
- successfully executed (True if successful, False if not).
- """
- func = partial(self._trigger, _model, _machine, *args, **kwargs)
- # pylint: disable=protected-access
- # noinspection PyProtectedMember
- # Machine._process should not be called somewhere else. That's why it should not be exposed
- # to Machine users.
- return _machine._process(func)
- def _trigger(self, _model, _machine, *args, **kwargs):
- state_tree = _machine._build_state_tree(getattr(_model, _machine.model_attribute), _machine.state_cls.separator)
- state_tree = reduce(dict.get, _machine.get_global_name(join=False), state_tree)
- ordered_states = _resolve_order(state_tree)
- done = []
- res = None
- for state_path in ordered_states:
- state_name = _machine.state_cls.separator.join(state_path)
- if state_name not in done and state_name in self.transitions:
- state = _machine.get_state(state_name)
- event_data = EventData(state, self, _machine, _model, args=args, kwargs=kwargs)
- event_data.source_name = state_name
- event_data.source_path = copy.copy(state_path)
- res = self._process(event_data)
- if res:
- elems = state_path
- while elems:
- done.append(_machine.state_cls.separator.join(elems))
- elems.pop()
- return res
- def _process(self, event_data):
- machine = event_data.machine
- machine.callbacks(event_data.machine.prepare_event, event_data)
- _LOGGER.debug("%sExecuted machine preparation callbacks before conditions.", machine.name)
- try:
- for trans in self.transitions[event_data.source_name]:
- event_data.transition = trans
- if trans.execute(event_data):
- event_data.result = True
- break
- except Exception as err:
- event_data.error = err
- raise
- finally:
- machine.callbacks(machine.finalize_event, event_data)
- _LOGGER.debug("%sExecuted machine finalize callbacks", machine.name)
- return event_data.result
- class NestedState(State):
- """ A state which allows substates.
- Attributes:
- states (OrderedDict): A list of substates of the current state.
- events (dict): A list of events defined for the nested state.
- initial (str): Name of a child which should be entered when the state is entered.
- exit_stack (defaultdict): A list of currently active substates
- """
- separator = '_'
- u""" Separator between the names of parent and child states. In case '_' is required for
- naming state, this value can be set to other values such as '.' or even unicode characters
- such as '↦' (limited to Python 3 though).
- """
- def __init__(self, name, on_enter=None, on_exit=None, ignore_invalid_triggers=None, initial=None):
- _super(NestedState, self).__init__(name=name, on_enter=on_enter, on_exit=on_exit,
- ignore_invalid_triggers=ignore_invalid_triggers)
- self.initial = initial
- self.events = {}
- self.states = OrderedDict()
- self._scope = []
- def add_substate(self, state):
- """ Adds a state as a substate.
- Args:
- state (NestedState): State to add to the current state.
- """
- self.add_substates(state)
- def add_substates(self, states):
- """ Adds a list of states to the current state.
- Args:
- states (list): List of states to add to the current state.
- """
- for state in listify(states):
- self.states[state.name] = state
- def scoped_enter(self, event_data, scope=[]):
- self._scope = scope
- self.enter(event_data)
- self._scope = []
- def scoped_exit(self, event_data, scope=[]):
- self._scope = scope
- self.exit(event_data)
- self._scope = []
- @property
- def name(self):
- return self.separator.join(self._scope + [_super(NestedState, self).name])
- class NestedTransition(Transition):
- """ A transition which handles entering and leaving nested states.
- Attributes:
- source (str): Source state of the transition.
- dest (str): Destination state of the transition.
- prepare (list): Callbacks executed before conditions checks.
- conditions (list): Callbacks evaluated to determine if
- the transition should be executed.
- before (list): Callbacks executed before the transition is executed
- but only if condition checks have been successful.
- after (list): Callbacks executed after the transition is executed
- but only if condition checks have been successful.
- """
- def _resolve_transition(self, event_data):
- machine = event_data.machine
- dst_name_path = machine.get_local_name(self.dest, join=False)
- _ = machine.get_state(dst_name_path)
- model_states = listify(getattr(event_data.model, machine.model_attribute))
- state_tree = machine._build_state_tree(model_states, machine.state_cls.separator)
- scope = machine.get_global_name(join=False)
- src_name_path = event_data.source_path
- if src_name_path == dst_name_path:
- root = src_name_path[:-1] # exit and enter the same state
- else:
- root = []
- while dst_name_path and src_name_path and src_name_path[0] == dst_name_path[0]:
- root.append(src_name_path.pop(0))
- dst_name_path.pop(0)
- scoped_tree = reduce(dict.get, scope + root, state_tree)
- exit_partials = []
- if src_name_path:
- for state_name in _resolve_order(scoped_tree):
- cb = partial(machine.get_state(root + state_name).scoped_exit,
- event_data,
- scope + root + state_name[:-1])
- exit_partials.append(cb)
- if dst_name_path:
- new_states, enter_partials = self._enter_nested(root, dst_name_path, scope + root, event_data)
- else:
- new_states, enter_partials = {}, []
- for key in scoped_tree:
- del scoped_tree[key]
- for new_key, value in new_states.items():
- scoped_tree[new_key] = value
- break
- return state_tree, exit_partials, enter_partials
- def _change_state(self, event_data):
- state_tree, exit_partials, enter_partials = self._resolve_transition(event_data)
- for func in exit_partials:
- func()
- self._update_model(event_data, state_tree)
- for func in enter_partials:
- func()
- def _enter_nested(self, root, dest, prefix_path, event_data):
- if root:
- state_name = root.pop(0)
- with event_data.machine(state_name):
- return self._enter_nested(root, dest, prefix_path, event_data)
- elif dest:
- new_states = OrderedDict()
- state_name = dest.pop(0)
- with event_data.machine(state_name):
- new_states[state_name], new_enter = self._enter_nested([], dest, prefix_path + [state_name], event_data)
- enter_partials = [partial(event_data.machine.scoped.scoped_enter, event_data, prefix_path)] + new_enter
- return new_states, enter_partials
- elif event_data.machine.scoped.initial:
- new_states = OrderedDict()
- enter_partials = []
- q = []
- prefix = prefix_path
- scoped_tree = new_states
- initial_states = [event_data.machine.scoped.states[i] for i in listify(event_data.machine.scoped.initial)]
- while True:
- event_data.scope = prefix
- for state in initial_states:
- enter_partials.append(partial(state.scoped_enter, event_data, prefix))
- scoped_tree[state.name] = OrderedDict()
- if state.initial:
- q.append((scoped_tree[state.name], prefix + [state.name],
- [state.states[i] for i in listify(state.initial)]))
- if not q:
- break
- scoped_tree, prefix, initial_states = q.pop(0)
- return new_states, enter_partials
- else:
- return {}, []
- @staticmethod
- def _update_model(event_data, tree):
- model_states = _build_state_list(tree, event_data.machine.state_cls.separator)
- with event_data.machine():
- event_data.machine.set_state(model_states, event_data.model)
- states = event_data.machine.get_states(listify(model_states))
- event_data.state = states[0] if len(states) == 1 else states
- # Prevent deep copying of callback lists since these include either references to callable or
- # strings. Deep copying a method reference would lead to the creation of an entire new (model) object
- # (see https://github.com/pytransitions/transitions/issues/248)
- def __deepcopy__(self, memo):
- cls = self.__class__
- result = cls.__new__(cls)
- memo[id(self)] = result
- for key, value in self.__dict__.items():
- if key in cls.dynamic_methods:
- setattr(result, key, copy.copy(value))
- else:
- setattr(result, key, copy.deepcopy(value, memo))
- return result
- class HierarchicalMachine(Machine):
- """ Extends transitions.core.Machine by capabilities to handle nested states.
- A hierarchical machine REQUIRES NestedStates (or any subclass of it) to operate.
- """
- state_cls = NestedState
- transition_cls = NestedTransition
- event_cls = NestedEvent
- def __init__(self, *args, **kwargs):
- self._stack = []
- self.scoped = self
- _super(HierarchicalMachine, self).__init__(*args, **kwargs)
- def __call__(self, to_scope=None):
- if isinstance(to_scope, string_types):
- state_name = to_scope.split(self.state_cls.separator)[0]
- state = self.states[state_name]
- to_scope = (state, state.states, state.events)
- elif isinstance(to_scope, Enum):
- state = self.states[to_scope.name]
- to_scope = (state, state.states, state.events)
- elif to_scope is None:
- if self._stack:
- to_scope = self._stack[0]
- else:
- to_scope = (self, self.states, self.events)
- self._next_scope = to_scope
- return self
- def __enter__(self):
- self._stack.append((self.scoped, self.states, self.events))
- self.scoped, self.states, self.events = self._next_scope
- self._next_scope = None
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.scoped, self.states, self.events = self._stack.pop()
- def add_model(self, model, initial=None):
- """ Extends transitions.core.Machine.add_model by applying a custom 'to' function to
- the added model.
- """
- models = [mod if mod != 'self' else self for mod in listify(model)]
- _super(HierarchicalMachine, self).add_model(models, initial=initial)
- initial_name = getattr(models[0], self.model_attribute)
- if hasattr(initial_name, 'name'):
- initial_name = initial_name.name
- initial_states = self._resolve_initial(models, initial_name.split(self.state_cls.separator))
- for mod in models:
- self.set_state(initial_states, mod)
- if hasattr(mod, 'to'):
- _LOGGER.warning("%sModel already has a 'to'-method. It will NOT "
- "be overwritten by NestedMachine", self.name)
- else:
- to_func = partial(self.to_state, mod)
- setattr(mod, 'to', to_func)
- def add_ordered_transitions(self, states=None, trigger='next_state',
- loop=True, loop_includes_initial=True,
- conditions=None, unless=None, before=None,
- after=None, prepare=None, **kwargs):
- if states is None:
- states = self.get_nested_state_names()
- _super(HierarchicalMachine, self).add_ordered_transitions(states=states, trigger=trigger, loop=loop,
- loop_includes_initial=loop_includes_initial,
- conditions=conditions,
- unless=unless, before=before, after=after,
- prepare=prepare, **kwargs)
- def add_states(self, states, on_enter=None, on_exit=None, ignore_invalid_triggers=None, **kwargs):
- remap = kwargs.pop('remap', None)
- for state in listify(states):
- if isinstance(state, Enum) and isinstance(state.value, EnumMeta):
- state = {'name': state.name, 'children': state.value}
- if isinstance(state, string_types):
- if remap is not None and state in remap:
- return
- domains = state.split(self.state_cls.separator, 1)
- if len(domains) > 1:
- try:
- self.get_state(domains[0])
- except ValueError:
- self.add_state(domains[0], on_enter=on_enter, on_exit=on_exit, ignore_invalid_triggers=ignore_invalid_triggers, **kwargs)
- with self(domains[0]):
- self.add_states(domains[1], on_enter=on_enter, on_exit=on_exit, ignore_invalid_triggers=ignore_invalid_triggers, **kwargs)
- else:
- if state in self.states:
- raise ValueError("State {0} cannot be added since it already exists.".format(state))
- new_state = self._create_state(state)
- self.states[new_state.name] = new_state
- self._init_state(new_state)
- elif isinstance(state, Enum):
- if remap is not None and state.name in remap:
- return
- new_state = self._create_state(state)
- if state.name in self.states:
- raise ValueError("State {0} cannot be added since it already exists.".format(state.name))
- self.states[new_state.name] = new_state
- self._init_state(new_state)
- elif isinstance(state, dict):
- if remap is not None and state['name'] in remap:
- return
- state = state.copy() # prevent messing with the initially passed dict
- remap = state.pop('remap', None)
- state_children = state.pop('children', [])
- state_parallel = state.pop('parallel', [])
- transitions = state.pop('transitions', [])
- new_state = self._create_state(**state)
- self.states[new_state.name] = new_state
- self._init_state(new_state)
- remapped_transitions = []
- with self(new_state.name):
- if state_parallel:
- self.add_states(state_parallel, remap=remap, **kwargs)
- new_state.initial = [s if isinstance(s, string_types) else s['name'] for s in state_parallel]
- else:
- self.add_states(state_children, remap=remap, **kwargs)
- if remap is not None:
- drop_event = []
- for evt in self.events.values():
- self.events[evt.name] = copy.copy(evt)
- for trigger, event in self.events.items():
- drop_source = []
- event.transitions = copy.deepcopy(event.transitions)
- for source_name, trans_source in event.transitions.items():
- if source_name in remap:
- drop_source.append(source_name)
- continue
- drop_trans = []
- for trans in trans_source:
- if trans.dest in remap:
- conditions, unless = [], []
- for cond in trans.conditions:
- # split a list in two lists based on the accessors (cond.target) truth value
- (unless, conditions)[cond.target].append(cond.func)
- remapped_transitions.append({
- 'trigger': trigger,
- 'source': new_state.name + self.state_cls.separator + trans.source,
- 'dest': remap[trans.dest],
- 'conditions': conditions,
- 'unless': unless,
- 'prepare': trans.prepare,
- 'before': trans.before,
- 'after': trans.after})
- drop_trans.append(trans)
- for t in drop_trans:
- trans_source.remove(t)
- if not trans_source:
- drop_source.append(source_name)
- for s in drop_source:
- del event.transitions[s]
- if not event.transitions:
- drop_event.append(trigger)
- for e in drop_event:
- del self.events[e]
- if transitions:
- self.add_transitions(transitions)
- self.add_transitions(remapped_transitions)
- elif isinstance(state, NestedState):
- if state.name in self.states:
- raise ValueError("State {0} cannot be added since it already exists.".format(state.name))
- self.states[state.name] = state
- self._init_state(state)
- elif isinstance(state, Machine):
- new_states = [s for s in state.states.values() if remap is None or s not in remap]
- self.add_states(new_states)
- for ev in state.events.values():
- self.events[ev.name] = ev
- if self.scoped.initial is None:
- self.scoped.initial = state.initial
- else:
- raise ValueError("Cannot add state of type {0}.".format(type(state).__name__))
- def add_transition(self, trigger, source, dest, conditions=None,
- unless=None, before=None, after=None, prepare=None, **kwargs):
- if source != self.wildcard_all:
- source = [self.state_cls.separator.join(self._get_enum_path(s)) if isinstance(s, Enum) else s
- for s in listify(source)]
- if dest != self.wildcard_same:
- dest = self.state_cls.separator.join(self._get_enum_path(dest)) if isinstance(dest, Enum) else dest
- _super(HierarchicalMachine, self).add_transition(trigger, source, dest, conditions,
- unless, before, after, prepare, **kwargs)
- def get_global_name(self, state=None, join=True):
- local_stack = [s[0] for s in self._stack] + [self.scoped]
- local_stack_start = len(local_stack) - local_stack[::-1].index(self)
- domains = [s.name for s in local_stack[local_stack_start:]]
- if state:
- state_name = state.name if hasattr(state, 'name') else state
- if state_name in self.states:
- domains.append(state_name)
- else:
- raise ValueError("State '{0}' not found in local states.".format(state))
- return self.state_cls.separator.join(domains) if join else domains
- def get_local_name(self, state_name, join=True):
- state_name = state_name.split(self.state_cls.separator)
- local_stack = [s[0] for s in self._stack] + [self.scoped]
- local_stack_start = len(local_stack) - local_stack[::-1].index(self)
- domains = [s.name for s in local_stack[local_stack_start:]]
- if domains and state_name and state_name[0] != domains[0]:
- return self.state_cls.separator.join(state_name) if join else state_name
- return self.state_cls.separator.join(state_name) if join else state_name
- def get_nested_state_names(self):
- ordered_states = []
- for state in self.states.values():
- ordered_states.append(self.get_global_name(state))
- with self(state.name):
- ordered_states.extend(self.get_nested_state_names())
- return ordered_states
- def get_nested_triggers(self, dest=None):
- if dest:
- triggers = _super(HierarchicalMachine, self).get_triggers(dest)
- else:
- triggers = list(self.events.keys())
- for state in self.states.values():
- with self(state.name):
- triggers.extend(self.get_nested_triggers())
- return triggers
- def get_state(self, state, hint=None):
- """ Return the State instance with the passed name. """
- if isinstance(state, Enum):
- state = self._get_enum_path(state)
- elif isinstance(state, string_types):
- state = state.split(self.state_cls.separator)
- if not hint:
- state = copy.copy(state)
- hint = copy.copy(state)
- if len(state) > 1:
- child = state.pop(0)
- try:
- with self(child):
- return self.get_state(state, hint)
- except (KeyError, ValueError):
- try:
- with self():
- state = self
- for elem in hint:
- state = state.states[elem]
- return state
- except KeyError:
- raise ValueError("State '%s' is not a registered state." % self.state_cls.separator.join(hint))
- elif state[0] not in self.states:
- raise ValueError("State '%s' is not a registered state." % state)
- return self.states[state[0]]
- def get_states(self, states):
- res = []
- for state in states:
- if isinstance(state, list):
- res.append(self.get_states(state))
- else:
- res.append(self.get_state(state))
- return res
- def get_triggers(self, *args):
- """ Extends transitions.core.Machine.get_triggers to also include parent state triggers. """
- # add parents to state set
- triggers = []
- with self():
- for state_name in args:
- state_path = state_name.split(self.state_cls.separator)
- root = state_path[0]
- while state_path:
- triggers.extend(_super(HierarchicalMachine, self).get_triggers(self.state_cls.separator.join(state_path)))
- with self(root):
- triggers.extend(self.get_nested_triggers(self.state_cls.separator.join(state_path)))
- state_path.pop()
- return triggers
- def is_state(self, state_name, model, allow_substates=False):
- current_name = getattr(model, self.model_attribute)
- if allow_substates:
- return current_name.startswith(state_name.name if hasattr(state_name, 'name') else state_name)
- return current_name == state_name
- def on_enter(self, state_name, callback):
- """ Helper function to add callbacks to states in case a custom state separator is used.
- Args:
- state_name (str): Name of the state
- callback (str or callable): Function to be called. Strings will be resolved to model functions.
- """
- self.get_state(state_name).add_callback('enter', callback)
- def on_exit(self, state_name, callback):
- """ Helper function to add callbacks to states in case a custom state separator is used.
- Args:
- state_name (str): Name of the state
- callback (str or callable): Function to be called. Strings will be resolved to model functions.
- """
- self.get_state(state_name).add_callback('exit', callback)
- def set_state(self, states, model=None):
- """ Set the current state.
- Args:
- states (list of str or Enum or State): value of state(s) to be set
- model (optional[object]): targeted model; if not set, all models will be set to 'state'
- """
- values = [self._set_state(value) for value in listify(states)]
- models = self.models if model is None else listify(model)
- for mod in models:
- setattr(mod, self.model_attribute, values if len(values) > 1 else values[0])
- def to_state(self, model, state_name, *args, **kwargs):
- """ Helper function to add go to states in case a custom state separator is used.
- Args:
- model (class): The model that should be used.
- state_name (str): Name of the destination state.
- """
- current_state = getattr(model, self.model_attribute)
- if isinstance(current_state, list):
- raise MachineError("Cannot use 'to_state' from parallel state")
- event = EventData(self.get_state(current_state), Event('to', self), self,
- model, args=args, kwargs=kwargs)
- event.source_name = current_state
- event.source_path = current_state.split(self.state_cls.separator)
- self._create_transition(current_state, state_name).execute(event)
- def trigger_event(self, _model, _trigger, *args, **kwargs):
- """ Processes events recursively and forwards arguments if suitable events are found.
- This function is usually bound to models with model and trigger arguments already
- resolved as a partial. Execution will halt when a nested transition has been executed
- successfully.
- Args:
- _model (object): targeted model
- _trigger (str): event name
- *args: positional parameters passed to the event and its callbacks
- **kwargs: keyword arguments passed to the event and its callbacks
- Returns:
- bool: whether a transition has been executed successfully
- Raises:
- MachineError: When no suitable transition could be found and ignore_invalid_trigger
- is not True. Note that a transition which is not executed due to conditions
- is still considered valid.
- """
- with self():
- res = self._trigger_event(_model, _trigger, None, *args, **kwargs)
- return self._check_event_result(res, _model, _trigger)
- def _add_model_to_state(self, state, model):
- name = self.get_global_name(state)
- if self.state_cls.separator == '_' or self.state_cls.separator not in name:
- value = state.value if isinstance(state.value, Enum) else name
- self._checked_assignment(model, 'is_%s' % name, partial(self.is_state, value, model))
- # Add dynamic method callbacks (enter/exit) if there are existing bound methods in the model
- # except if they are already mentioned in 'on_enter/exit' of the defined state
- for callback in self.state_cls.dynamic_methods:
- method = "{0}_{1}".format(callback, name)
- if hasattr(model, method) and inspect.ismethod(getattr(model, method)) and \
- method not in getattr(state, callback):
- state.add_callback(callback[3:], method)
- with self(state.name):
- for event in self.events.values():
- if not hasattr(model, event.name):
- self._add_trigger_to_model(event.name, model)
- for state in self.states.values():
- self._add_model_to_state(state, model)
- def _add_trigger_to_model(self, trigger, model):
- trig_func = partial(self.trigger_event, model, trigger)
- # FunctionWrappers are only necessary if a custom separator is used
- if trigger.startswith('to_') and self.state_cls.separator != '_':
- path = trigger[3:].split(self.state_cls.separator)
- if hasattr(model, 'to_' + path[0]):
- # add path to existing function wrapper
- getattr(model, 'to_' + path[0]).add(trig_func, path[1:])
- else:
- # create a new function wrapper
- self._checked_assignment(model, 'to_' + path[0], FunctionWrapper(trig_func, path[1:]))
- else:
- self._checked_assignment(model, trigger, trig_func)
- # converts a list of current states into a hierarchical state tree
- def _build_state_tree(self, model_states, separator, tree=None):
- tree = tree if tree is not None else OrderedDict()
- if isinstance(model_states, list):
- for state in model_states:
- _ = self._build_state_tree(state, separator, tree)
- else:
- tmp = tree
- if isinstance(model_states, (Enum, EnumMeta)):
- with self():
- path = self._get_enum_path(model_states)
- else:
- path = model_states.split(separator)
- for elem in path:
- tmp = tmp.setdefault(elem.name if hasattr(elem, 'name') else elem, OrderedDict())
- return tree
- def _get_enum_path(self, enum_state, prefix=[]):
- if enum_state.name in self.states and self.states[enum_state.name].value == enum_state:
- return prefix + [enum_state.name]
- for name in self.states:
- with self(name):
- res = self._get_enum_path(enum_state, prefix=prefix + [name])
- if res:
- return res
- return []
- def _check_event_result(self, res, model, trigger):
- if res is None:
- state_name = getattr(model, self.model_attribute)
- msg = "%sCan't trigger event %s from state %s!" % (self.name, trigger, state_name)
- state = self.get_state(state_name)
- ignore = state.ignore_invalid_triggers if state.ignore_invalid_triggers is not None \
- else self.ignore_invalid_triggers
- if ignore:
- _LOGGER.warning(msg)
- res = False
- else:
- raise MachineError(msg)
- return res
- def _get_trigger(self, model, trigger_name, *args, **kwargs):
- """Convenience function added to the model to trigger events by name.
- Args:
- model (object): Model with assigned event trigger.
- trigger_name (str): Name of the trigger to be called.
- *args: Variable length argument list which is passed to the triggered event.
- **kwargs: Arbitrary keyword arguments which is passed to the triggered event.
- Returns:
- bool: True if a transitions has been conducted or the trigger event has been queued.
- """
- try:
- return self.trigger_event(model, trigger_name, *args, **kwargs)
- except MachineError:
- raise AttributeError("Do not know event named '%s'." % trigger_name)
- def _has_state(self, state, raise_error=False):
- """ This function
- Args:
- state (NestedState): state to be tested
- raise_error (bool): whether ValueError should be raised when the state
- is not registered
- Returns:
- bool: Whether state is registered in the machine
- Raises:
- ValueError: When raise_error is True and state is not registered
- """
- found = _super(HierarchicalMachine, self)._has_state(state)
- if not found:
- for a_state in self.states:
- with self(a_state):
- if self._has_state(state):
- return True
- if not found and raise_error:
- msg = 'State %s has not been added to the machine' % (state.name if hasattr(state, 'name') else state)
- raise ValueError(msg)
- return found
- def _init_state(self, state):
- for model in self.models:
- self._add_model_to_state(state, model)
- if self.auto_transitions:
- state_name = self.get_global_name(state.name)
- parent = state_name.split(self.state_cls.separator, 1)
- with self():
- for a_state in self.get_nested_state_names():
- if a_state == parent[0]:
- self.add_transition('to_%s' % state_name, self.wildcard_all, state_name)
- elif len(parent) == 1:
- self.add_transition('to_%s' % a_state, state_name, a_state)
- with self(state.name):
- for substate in self.states.values():
- self._init_state(substate)
- def _resolve_initial(self, models, state_name_path, prefix=[]):
- if state_name_path:
- state_name = state_name_path.pop(0)
- with self(state_name):
- return self._resolve_initial(models, state_name_path, prefix=prefix + [state_name])
- if self.scoped.initial:
- entered_states = []
- for initial_state_name in listify(self.scoped.initial):
- with self(initial_state_name):
- entered_states.append(self._resolve_initial(models, [], prefix=prefix + [self.scoped.name]))
- return entered_states if len(entered_states) > 1 else entered_states[0]
- return self.state_cls.separator.join(prefix)
- def _set_state(self, state_name):
- if isinstance(state_name, list):
- return [self._set_state(value) for value in state_name]
- else:
- a_state = self.get_state(state_name)
- return a_state.value if isinstance(a_state.value, Enum) else state_name
- def _trigger_event(self, _model, _trigger, _state_tree, *args, **kwargs):
- if _state_tree is None:
- _state_tree = self._build_state_tree(listify(getattr(_model, self.model_attribute)),
- self.state_cls.separator)
- res = {}
- for key, value in _state_tree.items():
- if value:
- with self(key):
- res[key] = self._trigger_event(_model, _trigger, value, *args, **kwargs)
- if not res.get(key, None) and _trigger in self.events:
- res[key] = self.events[_trigger].trigger(_model, self, *args, **kwargs)
- return None if not res or all([v is None for v in res.values()]) else any(res.values())
|