_methodical.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. # -*- test-case-name: automat._test.test_methodical -*-
  2. from functools import wraps
  3. from itertools import count
  4. try:
  5. # Python 3
  6. from inspect import getfullargspec as getArgsSpec
  7. except ImportError:
  8. # Python 2
  9. from inspect import getargspec as getArgsSpec
  10. import attr
  11. from ._core import Transitioner, Automaton
  12. from ._introspection import preserveName
  13. def _keywords_only(f):
  14. """
  15. Decorate a function so all its arguments must be passed by keyword.
  16. A useful utility for decorators that take arguments so that they don't
  17. accidentally get passed the thing they're decorating as their first
  18. argument.
  19. Only works for methods right now.
  20. """
  21. @wraps(f)
  22. def g(self, **kw):
  23. return f(self, **kw)
  24. return g
  25. @attr.s(frozen=True)
  26. class MethodicalState(object):
  27. """
  28. A state for a L{MethodicalMachine}.
  29. """
  30. machine = attr.ib(repr=False)
  31. method = attr.ib()
  32. serialized = attr.ib(repr=False)
  33. def upon(self, input, enter, outputs, collector=list):
  34. """
  35. Declare a state transition within the L{MethodicalMachine} associated
  36. with this L{MethodicalState}: upon the receipt of the input C{input},
  37. enter the state C{enter}, emitting each output in C{outputs}.
  38. """
  39. inputSpec = getArgsSpec(input.method)
  40. for output in outputs:
  41. outputSpec = getArgsSpec(output.method)
  42. if inputSpec != outputSpec:
  43. raise TypeError(
  44. "method {input} signature {inputSignature} "
  45. "does not match output {output} "
  46. "signature {outputSignature}".format(
  47. input=input.method.__name__,
  48. output=output.method.__name__,
  49. inputSignature=inputSpec,
  50. outputSignature=outputSpec,
  51. ))
  52. self.machine._oneTransition(self, input, enter, outputs, collector)
  53. def _name(self):
  54. return self.method.__name__
  55. def _transitionerFromInstance(oself, symbol, automaton):
  56. """
  57. Get a L{Transitioner}
  58. """
  59. transitioner = getattr(oself, symbol, None)
  60. if transitioner is None:
  61. transitioner = Transitioner(
  62. automaton,
  63. automaton.initialState,
  64. )
  65. setattr(oself, symbol, transitioner)
  66. return transitioner
  67. def _empty():
  68. pass
  69. def _docstring():
  70. """docstring"""
  71. def assertNoCode(inst, attribute, f):
  72. # The function body must be empty, i.e. "pass" or "return None", which
  73. # both yield the same bytecode: LOAD_CONST (None), RETURN_VALUE. We also
  74. # accept functions with only a docstring, which yields slightly different
  75. # bytecode, because the "None" is put in a different constant slot.
  76. # Unfortunately, this does not catch function bodies that return a
  77. # constant value, e.g. "return 1", because their code is identical to a
  78. # "return None". They differ in the contents of their constant table, but
  79. # checking that would require us to parse the bytecode, find the index
  80. # being returned, then making sure the table has a None at that index.
  81. if f.__code__.co_code not in (_empty.__code__.co_code,
  82. _docstring.__code__.co_code):
  83. raise ValueError("function body must be empty")
  84. @attr.s(cmp=False, hash=False)
  85. class MethodicalInput(object):
  86. """
  87. An input for a L{MethodicalMachine}.
  88. """
  89. automaton = attr.ib(repr=False)
  90. method = attr.ib(validator=assertNoCode)
  91. symbol = attr.ib(repr=False)
  92. collectors = attr.ib(default=attr.Factory(dict), repr=False)
  93. def __get__(self, oself, type=None):
  94. """
  95. Return a function that takes no arguments and returns values returned
  96. by output functions produced by the given L{MethodicalInput} in
  97. C{oself}'s current state.
  98. """
  99. transitioner = _transitionerFromInstance(oself, self.symbol,
  100. self.automaton)
  101. @preserveName(self.method)
  102. @wraps(self.method)
  103. def doInput(*args, **kwargs):
  104. self.method(oself, *args, **kwargs)
  105. previousState = transitioner._state
  106. (outputs, outTracer) = transitioner.transition(self)
  107. collector = self.collectors[previousState]
  108. values = []
  109. for output in outputs:
  110. if outTracer:
  111. outTracer(output._name())
  112. value = output(oself, *args, **kwargs)
  113. values.append(value)
  114. return collector(values)
  115. return doInput
  116. def _name(self):
  117. return self.method.__name__
  118. @attr.s(frozen=True)
  119. class MethodicalOutput(object):
  120. """
  121. An output for a L{MethodicalMachine}.
  122. """
  123. machine = attr.ib(repr=False)
  124. method = attr.ib()
  125. def __get__(self, oself, type=None):
  126. """
  127. Outputs are private, so raise an exception when we attempt to get one.
  128. """
  129. raise AttributeError(
  130. "{cls}.{method} is a state-machine output method; "
  131. "to produce this output, call an input method instead.".format(
  132. cls=type.__name__,
  133. method=self.method.__name__
  134. )
  135. )
  136. def __call__(self, oself, *args, **kwargs):
  137. """
  138. Call the underlying method.
  139. """
  140. return self.method(oself, *args, **kwargs)
  141. def _name(self):
  142. return self.method.__name__
  143. @attr.s(cmp=False, hash=False)
  144. class MethodicalTracer(object):
  145. automaton = attr.ib(repr=False)
  146. symbol = attr.ib(repr=False)
  147. def __get__(self, oself, type=None):
  148. transitioner = _transitionerFromInstance(oself, self.symbol,
  149. self.automaton)
  150. def setTrace(tracer):
  151. transitioner.setTrace(tracer)
  152. return setTrace
  153. counter = count()
  154. def gensym():
  155. """
  156. Create a unique Python identifier.
  157. """
  158. return "_symbol_" + str(next(counter))
  159. class MethodicalMachine(object):
  160. """
  161. A L{MethodicalMachine} is an interface to an L{Automaton} that uses methods
  162. on a class.
  163. """
  164. def __init__(self):
  165. self._automaton = Automaton()
  166. self._reducers = {}
  167. self._symbol = gensym()
  168. def __get__(self, oself, type=None):
  169. """
  170. L{MethodicalMachine} is an implementation detail for setting up
  171. class-level state; applications should never need to access it on an
  172. instance.
  173. """
  174. if oself is not None:
  175. raise AttributeError(
  176. "MethodicalMachine is an implementation detail.")
  177. return self
  178. @_keywords_only
  179. def state(self, initial=False, terminal=False,
  180. serialized=None):
  181. """
  182. Declare a state, possibly an initial state or a terminal state.
  183. This is a decorator for methods, but it will modify the method so as
  184. not to be callable any more.
  185. @param initial: is this state the initial state? Only one state on
  186. this L{MethodicalMachine} may be an initial state; more than one is
  187. an error.
  188. @type initial: L{bool}
  189. @param terminal: Is this state a terminal state, i.e. a state that the
  190. machine can end up in? (This is purely informational at this
  191. point.)
  192. @type terminal: L{bool}
  193. @param serialized: a serializable value to be used to represent this
  194. state to external systems. This value should be hashable;
  195. L{unicode} is a good type to use.
  196. @type serialized: a hashable (comparable) value
  197. """
  198. def decorator(stateMethod):
  199. state = MethodicalState(machine=self,
  200. method=stateMethod,
  201. serialized=serialized)
  202. if initial:
  203. self._automaton.initialState = state
  204. return state
  205. return decorator
  206. @_keywords_only
  207. def input(self):
  208. """
  209. Declare an input.
  210. This is a decorator for methods.
  211. """
  212. def decorator(inputMethod):
  213. return MethodicalInput(automaton=self._automaton,
  214. method=inputMethod,
  215. symbol=self._symbol)
  216. return decorator
  217. @_keywords_only
  218. def output(self):
  219. """
  220. Declare an output.
  221. This is a decorator for methods.
  222. This method will be called when the state machine transitions to this
  223. state as specified in the L{MethodicalMachine.output} method.
  224. """
  225. def decorator(outputMethod):
  226. return MethodicalOutput(machine=self, method=outputMethod)
  227. return decorator
  228. def _oneTransition(self, startState, inputToken, endState, outputTokens,
  229. collector):
  230. """
  231. See L{MethodicalState.upon}.
  232. """
  233. # FIXME: tests for all of this (some of it is wrong)
  234. # if not isinstance(startState, MethodicalState):
  235. # raise NotImplementedError("start state {} isn't a state"
  236. # .format(startState))
  237. # if not isinstance(inputToken, MethodicalInput):
  238. # raise NotImplementedError("start state {} isn't an input"
  239. # .format(inputToken))
  240. # if not isinstance(endState, MethodicalState):
  241. # raise NotImplementedError("end state {} isn't a state"
  242. # .format(startState))
  243. # for output in outputTokens:
  244. # if not isinstance(endState, MethodicalState):
  245. # raise NotImplementedError("output state {} isn't a state"
  246. # .format(endState))
  247. self._automaton.addTransition(startState, inputToken, endState,
  248. tuple(outputTokens))
  249. inputToken.collectors[startState] = collector
  250. @_keywords_only
  251. def serializer(self):
  252. """
  253. """
  254. def decorator(decoratee):
  255. @wraps(decoratee)
  256. def serialize(oself):
  257. transitioner = _transitionerFromInstance(oself, self._symbol,
  258. self._automaton)
  259. return decoratee(oself, transitioner._state.serialized)
  260. return serialize
  261. return decorator
  262. @_keywords_only
  263. def unserializer(self):
  264. """
  265. """
  266. def decorator(decoratee):
  267. @wraps(decoratee)
  268. def unserialize(oself, *args, **kwargs):
  269. state = decoratee(oself, *args, **kwargs)
  270. mapping = {}
  271. for eachState in self._automaton.states():
  272. mapping[eachState.serialized] = eachState
  273. transitioner = _transitionerFromInstance(
  274. oself, self._symbol, self._automaton)
  275. transitioner._state = mapping[state]
  276. return None # it's on purpose
  277. return unserialize
  278. return decorator
  279. @property
  280. def _setTrace(self):
  281. return MethodicalTracer(self._automaton, self._symbol)
  282. def asDigraph(self):
  283. """
  284. Generate a L{graphviz.Digraph} that represents this machine's
  285. states and transitions.
  286. @return: L{graphviz.Digraph} object; for more information, please
  287. see the documentation for
  288. U{graphviz<https://graphviz.readthedocs.io/>}
  289. """
  290. from ._visualize import makeDigraph
  291. return makeDigraph(
  292. self._automaton,
  293. stateAsString=lambda state: state.method.__name__,
  294. inputAsString=lambda input: input.method.__name__,
  295. outputAsString=lambda output: output.method.__name__,
  296. )