structs.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from .compat import collections_abc
  2. class DirectedGraph(object):
  3. """A graph structure with directed edges."""
  4. def __init__(self):
  5. self._vertices = set()
  6. self._forwards = {} # <key> -> Set[<key>]
  7. self._backwards = {} # <key> -> Set[<key>]
  8. def __iter__(self):
  9. return iter(self._vertices)
  10. def __len__(self):
  11. return len(self._vertices)
  12. def __contains__(self, key):
  13. return key in self._vertices
  14. def copy(self):
  15. """Return a shallow copy of this graph."""
  16. other = DirectedGraph()
  17. other._vertices = set(self._vertices)
  18. other._forwards = {k: set(v) for k, v in self._forwards.items()}
  19. other._backwards = {k: set(v) for k, v in self._backwards.items()}
  20. return other
  21. def add(self, key):
  22. """Add a new vertex to the graph."""
  23. if key in self._vertices:
  24. raise ValueError("vertex exists")
  25. self._vertices.add(key)
  26. self._forwards[key] = set()
  27. self._backwards[key] = set()
  28. def remove(self, key):
  29. """Remove a vertex from the graph, disconnecting all edges from/to it."""
  30. self._vertices.remove(key)
  31. for f in self._forwards.pop(key):
  32. self._backwards[f].remove(key)
  33. for t in self._backwards.pop(key):
  34. self._forwards[t].remove(key)
  35. def connected(self, f, t):
  36. return f in self._backwards[t] and t in self._forwards[f]
  37. def connect(self, f, t):
  38. """Connect two existing vertices.
  39. Nothing happens if the vertices are already connected.
  40. """
  41. if t not in self._vertices:
  42. raise KeyError(t)
  43. self._forwards[f].add(t)
  44. self._backwards[t].add(f)
  45. def iter_edges(self):
  46. for f, children in self._forwards.items():
  47. for t in children:
  48. yield f, t
  49. def iter_children(self, key):
  50. return iter(self._forwards[key])
  51. def iter_parents(self, key):
  52. return iter(self._backwards[key])
  53. class _FactoryIterableView(object):
  54. """Wrap an iterator factory returned by `find_matches()`.
  55. Calling `iter()` on this class would invoke the underlying iterator
  56. factory, making it a "collection with ordering" that can be iterated
  57. through multiple times, but lacks random access methods presented in
  58. built-in Python sequence types.
  59. """
  60. def __init__(self, factory):
  61. self._factory = factory
  62. def __repr__(self):
  63. return "{}({})".format(type(self).__name__, list(self._factory()))
  64. def __bool__(self):
  65. try:
  66. next(self._factory())
  67. except StopIteration:
  68. return False
  69. return True
  70. __nonzero__ = __bool__ # XXX: Python 2.
  71. def __iter__(self):
  72. return self._factory()
  73. def for_preference(self):
  74. """Provide an candidate iterable for `get_preference()`"""
  75. return self._factory()
  76. def excluding(self, candidates):
  77. """Create a new instance excluding specified candidates."""
  78. def factory():
  79. return (c for c in self._factory() if c not in candidates)
  80. return type(self)(factory)
  81. class _SequenceIterableView(object):
  82. """Wrap an iterable returned by find_matches().
  83. This is essentially just a proxy to the underlying sequence that provides
  84. the same interface as `_FactoryIterableView`.
  85. """
  86. def __init__(self, sequence):
  87. self._sequence = sequence
  88. def __repr__(self):
  89. return "{}({})".format(type(self).__name__, self._sequence)
  90. def __bool__(self):
  91. return bool(self._sequence)
  92. __nonzero__ = __bool__ # XXX: Python 2.
  93. def __iter__(self):
  94. return iter(self._sequence)
  95. def __len__(self):
  96. return len(self._sequence)
  97. def for_preference(self):
  98. """Provide an candidate iterable for `get_preference()`"""
  99. return self._sequence
  100. def excluding(self, candidates):
  101. """Create a new instance excluding specified candidates."""
  102. return type(self)([c for c in self._sequence if c not in candidates])
  103. def build_iter_view(matches):
  104. """Build an iterable view from the value returned by `find_matches()`."""
  105. if callable(matches):
  106. return _FactoryIterableView(matches)
  107. if not isinstance(matches, collections_abc.Sequence):
  108. matches = list(matches)
  109. return _SequenceIterableView(matches)