pipe.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. # taken from https://github.com/Bogdanp/threadop
  4. import ast
  5. import inspect
  6. from textwrap import dedent
  7. class _PipeOpTransformer(ast.NodeTransformer):
  8. def visit_FunctionDef(self, node):
  9. node.decorator_list = [
  10. decorator for decorator in node.decorator_list
  11. if not isinstance(decorator, ast.Name) or decorator.id != "enable_pipeOp"
  12. ]
  13. self.generic_visit(node)
  14. return node
  15. def visit_BinOp(self, node):
  16. self.generic_visit(node)
  17. if isinstance(node.op, ast.BitOr):
  18. if not isinstance(node.right, ast.Call):
  19. raise RuntimeError("The RHS of a | must be a call.")
  20. node.right.args.insert(0, node.left)
  21. return node.right
  22. return node
  23. def enable_pipeOp(fn):
  24. """Transform all occurrences of the right shift operator by moving
  25. the left-hand expression into the right first argument position of
  26. the right-hand expression.
  27. For example, it turns::
  28. 42 | add(2) | multiply(10) | print()
  29. into::
  30. print(multiply(add(42, 2), 10))
  31. Limitations:
  32. * The right-hand side *must* be a function call.
  33. * Requires access to functions' source code.
  34. """
  35. transformer = _PipeOpTransformer()
  36. tree = transformer.visit(ast.parse(dedent(inspect.getsource(fn))))
  37. code = compile(tree, inspect.getfile(fn), "exec")
  38. scope = {}
  39. exec(code, fn.__globals__, scope)
  40. return scope[fn.__name__]