12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175 |
- # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
- # For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
- """Code parsing for coverage.py."""
- import ast
- import collections
- import os
- import re
- import token
- import tokenize
- from coverage import env
- from coverage.backward import range # pylint: disable=redefined-builtin
- from coverage.backward import bytes_to_ints, string_class
- from coverage.bytecode import CodeObjects
- from coverage.debug import short_stack
- from coverage.misc import contract, join_regex, new_contract, nice_pair, one_of
- from coverage.misc import NoSource, NotPython, StopEverything
- from coverage.phystokens import compile_unicode, generate_tokens, neuter_encoding_declaration
- class PythonParser(object):
- """Parse code to find executable lines, excluded lines, etc.
- This information is all based on static analysis: no code execution is
- involved.
- """
- @contract(text='unicode|None')
- def __init__(self, text=None, filename=None, exclude=None):
- """
- Source can be provided as `text`, the text itself, or `filename`, from
- which the text will be read. Excluded lines are those that match
- `exclude`, a regex.
- """
- assert text or filename, "PythonParser needs either text or filename"
- self.filename = filename or "<code>"
- self.text = text
- if not self.text:
- from coverage.python import get_python_source
- try:
- self.text = get_python_source(self.filename)
- except IOError as err:
- raise NoSource(
- "No source for code: '%s': %s" % (self.filename, err)
- )
- self.exclude = exclude
- # The text lines of the parsed code.
- self.lines = self.text.split('\n')
- # The normalized line numbers of the statements in the code. Exclusions
- # are taken into account, and statements are adjusted to their first
- # lines.
- self.statements = set()
- # The normalized line numbers of the excluded lines in the code,
- # adjusted to their first lines.
- self.excluded = set()
- # The raw_* attributes are only used in this class, and in
- # lab/parser.py to show how this class is working.
- # The line numbers that start statements, as reported by the line
- # number table in the bytecode.
- self.raw_statements = set()
- # The raw line numbers of excluded lines of code, as marked by pragmas.
- self.raw_excluded = set()
- # The line numbers of class and function definitions.
- self.raw_classdefs = set()
- # The line numbers of docstring lines.
- self.raw_docstrings = set()
- # Internal detail, used by lab/parser.py.
- self.show_tokens = False
- # A dict mapping line numbers to lexical statement starts for
- # multi-line statements.
- self._multiline = {}
- # Lazily-created ByteParser, arc data, and missing arc descriptions.
- self._byte_parser = None
- self._all_arcs = None
- self._missing_arc_fragments = None
- @property
- def byte_parser(self):
- """Create a ByteParser on demand."""
- if not self._byte_parser:
- self._byte_parser = ByteParser(self.text, filename=self.filename)
- return self._byte_parser
- def lines_matching(self, *regexes):
- """Find the lines matching one of a list of regexes.
- Returns a set of line numbers, the lines that contain a match for one
- of the regexes in `regexes`. The entire line needn't match, just a
- part of it.
- """
- combined = join_regex(regexes)
- if env.PY2:
- combined = combined.decode("utf8")
- regex_c = re.compile(combined)
- matches = set()
- for i, ltext in enumerate(self.lines, start=1):
- if regex_c.search(ltext):
- matches.add(i)
- return matches
- def _raw_parse(self):
- """Parse the source to find the interesting facts about its lines.
- A handful of attributes are updated.
- """
- # Find lines which match an exclusion pattern.
- if self.exclude:
- self.raw_excluded = self.lines_matching(self.exclude)
- # Tokenize, to find excluded suites, to find docstrings, and to find
- # multi-line statements.
- indent = 0
- exclude_indent = 0
- excluding = False
- excluding_decorators = False
- prev_toktype = token.INDENT
- first_line = None
- empty = True
- first_on_line = True
- tokgen = generate_tokens(self.text)
- for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
- if self.show_tokens: # pragma: debugging
- print("%10s %5s %-20r %r" % (
- tokenize.tok_name.get(toktype, toktype),
- nice_pair((slineno, elineno)), ttext, ltext
- ))
- if toktype == token.INDENT:
- indent += 1
- elif toktype == token.DEDENT:
- indent -= 1
- elif toktype == token.NAME:
- if ttext == 'class':
- # Class definitions look like branches in the bytecode, so
- # we need to exclude them. The simplest way is to note the
- # lines with the 'class' keyword.
- self.raw_classdefs.add(slineno)
- elif toktype == token.OP:
- if ttext == ':':
- should_exclude = (elineno in self.raw_excluded) or excluding_decorators
- if not excluding and should_exclude:
- # Start excluding a suite. We trigger off of the colon
- # token so that the #pragma comment will be recognized on
- # the same line as the colon.
- self.raw_excluded.add(elineno)
- exclude_indent = indent
- excluding = True
- excluding_decorators = False
- elif ttext == '@' and first_on_line:
- # A decorator.
- if elineno in self.raw_excluded:
- excluding_decorators = True
- if excluding_decorators:
- self.raw_excluded.add(elineno)
- elif toktype == token.STRING and prev_toktype == token.INDENT:
- # Strings that are first on an indented line are docstrings.
- # (a trick from trace.py in the stdlib.) This works for
- # 99.9999% of cases. For the rest (!) see:
- # http://stackoverflow.com/questions/1769332/x/1769794#1769794
- self.raw_docstrings.update(range(slineno, elineno+1))
- elif toktype == token.NEWLINE:
- if first_line is not None and elineno != first_line:
- # We're at the end of a line, and we've ended on a
- # different line than the first line of the statement,
- # so record a multi-line range.
- for l in range(first_line, elineno+1):
- self._multiline[l] = first_line
- first_line = None
- first_on_line = True
- if ttext.strip() and toktype != tokenize.COMMENT:
- # A non-whitespace token.
- empty = False
- if first_line is None:
- # The token is not whitespace, and is the first in a
- # statement.
- first_line = slineno
- # Check whether to end an excluded suite.
- if excluding and indent <= exclude_indent:
- excluding = False
- if excluding:
- self.raw_excluded.add(elineno)
- first_on_line = False
- prev_toktype = toktype
- # Find the starts of the executable statements.
- if not empty:
- self.raw_statements.update(self.byte_parser._find_statements())
- def first_line(self, line):
- """Return the first line number of the statement including `line`."""
- return self._multiline.get(line, line)
- def first_lines(self, lines):
- """Map the line numbers in `lines` to the correct first line of the
- statement.
- Returns a set of the first lines.
- """
- return set(self.first_line(l) for l in lines)
- def translate_lines(self, lines):
- """Implement `FileReporter.translate_lines`."""
- return self.first_lines(lines)
- def translate_arcs(self, arcs):
- """Implement `FileReporter.translate_arcs`."""
- return [(self.first_line(a), self.first_line(b)) for (a, b) in arcs]
- def parse_source(self):
- """Parse source text to find executable lines, excluded lines, etc.
- Sets the .excluded and .statements attributes, normalized to the first
- line of multi-line statements.
- """
- try:
- self._raw_parse()
- except (tokenize.TokenError, IndentationError) as err:
- if hasattr(err, "lineno"):
- lineno = err.lineno # IndentationError
- else:
- lineno = err.args[1][0] # TokenError
- raise NotPython(
- u"Couldn't parse '%s' as Python source: '%s' at line %d" % (
- self.filename, err.args[0], lineno
- )
- )
- self.excluded = self.first_lines(self.raw_excluded)
- ignore = self.excluded | self.raw_docstrings
- starts = self.raw_statements - ignore
- self.statements = self.first_lines(starts) - ignore
- def arcs(self):
- """Get information about the arcs available in the code.
- Returns a set of line number pairs. Line numbers have been normalized
- to the first line of multi-line statements.
- """
- if self._all_arcs is None:
- self._analyze_ast()
- return self._all_arcs
- def _analyze_ast(self):
- """Run the AstArcAnalyzer and save its results.
- `_all_arcs` is the set of arcs in the code.
- """
- aaa = AstArcAnalyzer(self.text, self.raw_statements, self._multiline)
- aaa.analyze()
- self._all_arcs = set()
- for l1, l2 in aaa.arcs:
- fl1 = self.first_line(l1)
- fl2 = self.first_line(l2)
- if fl1 != fl2:
- self._all_arcs.add((fl1, fl2))
- self._missing_arc_fragments = aaa.missing_arc_fragments
- def exit_counts(self):
- """Get a count of exits from that each line.
- Excluded lines are excluded.
- """
- exit_counts = collections.defaultdict(int)
- for l1, l2 in self.arcs():
- if l1 < 0:
- # Don't ever report -1 as a line number
- continue
- if l1 in self.excluded:
- # Don't report excluded lines as line numbers.
- continue
- if l2 in self.excluded:
- # Arcs to excluded lines shouldn't count.
- continue
- exit_counts[l1] += 1
- # Class definitions have one extra exit, so remove one for each:
- for l in self.raw_classdefs:
- # Ensure key is there: class definitions can include excluded lines.
- if l in exit_counts:
- exit_counts[l] -= 1
- return exit_counts
- def missing_arc_description(self, start, end, executed_arcs=None):
- """Provide an English sentence describing a missing arc."""
- if self._missing_arc_fragments is None:
- self._analyze_ast()
- actual_start = start
- if (
- executed_arcs and
- end < 0 and end == -start and
- (end, start) not in executed_arcs and
- (end, start) in self._missing_arc_fragments
- ):
- # It's a one-line callable, and we never even started it,
- # and we have a message about not starting it.
- start, end = end, start
- fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)])
- msgs = []
- for fragment_pair in fragment_pairs:
- smsg, emsg = fragment_pair
- if emsg is None:
- if end < 0:
- # Hmm, maybe we have a one-line callable, let's check.
- if (-end, end) in self._missing_arc_fragments:
- return self.missing_arc_description(-end, end)
- emsg = "didn't jump to the function exit"
- else:
- emsg = "didn't jump to line {lineno}"
- emsg = emsg.format(lineno=end)
- msg = "line {start} {emsg}".format(start=actual_start, emsg=emsg)
- if smsg is not None:
- msg += ", because {smsg}".format(smsg=smsg.format(lineno=actual_start))
- msgs.append(msg)
- return " or ".join(msgs)
- class ByteParser(object):
- """Parse bytecode to understand the structure of code."""
- @contract(text='unicode')
- def __init__(self, text, code=None, filename=None):
- self.text = text
- if code:
- self.code = code
- else:
- try:
- self.code = compile_unicode(text, filename, "exec")
- except SyntaxError as synerr:
- raise NotPython(
- u"Couldn't parse '%s' as Python source: '%s' at line %d" % (
- filename, synerr.msg, synerr.lineno
- )
- )
- # Alternative Python implementations don't always provide all the
- # attributes on code objects that we need to do the analysis.
- for attr in ['co_lnotab', 'co_firstlineno']:
- if not hasattr(self.code, attr):
- raise StopEverything( # pragma: only jython
- "This implementation of Python doesn't support code analysis.\n"
- "Run coverage.py under another Python for this command."
- )
- def child_parsers(self):
- """Iterate over all the code objects nested within this one.
- The iteration includes `self` as its first value.
- """
- children = CodeObjects(self.code)
- return (ByteParser(self.text, code=c) for c in children)
- def _bytes_lines(self):
- """Map byte offsets to line numbers in `code`.
- Uses co_lnotab described in Python/compile.c to map byte offsets to
- line numbers. Produces a sequence: (b0, l0), (b1, l1), ...
- Only byte offsets that correspond to line numbers are included in the
- results.
- """
- # Adapted from dis.py in the standard library.
- byte_increments = bytes_to_ints(self.code.co_lnotab[0::2])
- line_increments = bytes_to_ints(self.code.co_lnotab[1::2])
- last_line_num = None
- line_num = self.code.co_firstlineno
- byte_num = 0
- for byte_incr, line_incr in zip(byte_increments, line_increments):
- if byte_incr:
- if line_num != last_line_num:
- yield (byte_num, line_num)
- last_line_num = line_num
- byte_num += byte_incr
- line_num += line_incr
- if line_num != last_line_num:
- yield (byte_num, line_num)
- def _find_statements(self):
- """Find the statements in `self.code`.
- Produce a sequence of line numbers that start statements. Recurses
- into all code objects reachable from `self.code`.
- """
- for bp in self.child_parsers():
- # Get all of the lineno information from this code.
- for _, l in bp._bytes_lines():
- yield l
- #
- # AST analysis
- #
- class LoopBlock(object):
- """A block on the block stack representing a `for` or `while` loop."""
- @contract(start=int)
- def __init__(self, start):
- # The line number where the loop starts.
- self.start = start
- # A set of ArcStarts, the arcs from break statements exiting this loop.
- self.break_exits = set()
- class FunctionBlock(object):
- """A block on the block stack representing a function definition."""
- @contract(start=int, name=str)
- def __init__(self, start, name):
- # The line number where the function starts.
- self.start = start
- # The name of the function.
- self.name = name
- class TryBlock(object):
- """A block on the block stack representing a `try` block."""
- @contract(handler_start='int|None', final_start='int|None')
- def __init__(self, handler_start, final_start):
- # The line number of the first "except" handler, if any.
- self.handler_start = handler_start
- # The line number of the "finally:" clause, if any.
- self.final_start = final_start
- # The ArcStarts for breaks/continues/returns/raises inside the "try:"
- # that need to route through the "finally:" clause.
- self.break_from = set()
- self.continue_from = set()
- self.return_from = set()
- self.raise_from = set()
- class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
- """The information needed to start an arc.
- `lineno` is the line number the arc starts from.
- `cause` is an English text fragment used as the `startmsg` for
- AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an
- arc wasn't executed, so should fit well into a sentence of the form,
- "Line 17 didn't run because {cause}." The fragment can include "{lineno}"
- to have `lineno` interpolated into it.
- """
- def __new__(cls, lineno, cause=None):
- return super(ArcStart, cls).__new__(cls, lineno, cause)
- # Define contract words that PyContract doesn't have.
- # ArcStarts is for a list or set of ArcStart's.
- new_contract('ArcStarts', lambda seq: all(isinstance(x, ArcStart) for x in seq))
- # Turn on AST dumps with an environment variable.
- AST_DUMP = bool(int(os.environ.get("COVERAGE_AST_DUMP", 0)))
- class NodeList(object):
- """A synthetic fictitious node, containing a sequence of nodes.
- This is used when collapsing optimized if-statements, to represent the
- unconditional execution of one of the clauses.
- """
- def __init__(self, body):
- self.body = body
- self.lineno = body[0].lineno
- class AstArcAnalyzer(object):
- """Analyze source text with an AST to find executable code paths."""
- @contract(text='unicode', statements=set)
- def __init__(self, text, statements, multiline):
- self.root_node = ast.parse(neuter_encoding_declaration(text))
- # TODO: I think this is happening in too many places.
- self.statements = set(multiline.get(l, l) for l in statements)
- self.multiline = multiline
- if AST_DUMP: # pragma: debugging
- # Dump the AST so that failing tests have helpful output.
- print("Statements: {0}".format(self.statements))
- print("Multiline map: {0}".format(self.multiline))
- ast_dump(self.root_node)
- self.arcs = set()
- # A map from arc pairs to a list of pairs of sentence fragments:
- # { (start, end): [(startmsg, endmsg), ...], }
- #
- # For an arc from line 17, they should be usable like:
- # "Line 17 {endmsg}, because {startmsg}"
- self.missing_arc_fragments = collections.defaultdict(list)
- self.block_stack = []
- self.debug = bool(int(os.environ.get("COVERAGE_TRACK_ARCS", 0)))
- def analyze(self):
- """Examine the AST tree from `root_node` to determine possible arcs.
- This sets the `arcs` attribute to be a set of (from, to) line number
- pairs.
- """
- for node in ast.walk(self.root_node):
- node_name = node.__class__.__name__
- code_object_handler = getattr(self, "_code_object__" + node_name, None)
- if code_object_handler is not None:
- code_object_handler(node)
- def add_arc(self, start, end, smsg=None, emsg=None):
- """Add an arc, including message fragments to use if it is missing."""
- if self.debug: # pragma: debugging
- print("\nAdding arc: ({}, {}): {!r}, {!r}".format(start, end, smsg, emsg))
- print(short_stack(limit=6))
- self.arcs.add((start, end))
- if smsg is not None or emsg is not None:
- self.missing_arc_fragments[(start, end)].append((smsg, emsg))
- def nearest_blocks(self):
- """Yield the blocks in nearest-to-farthest order."""
- return reversed(self.block_stack)
- @contract(returns=int)
- def line_for_node(self, node):
- """What is the right line number to use for this node?
- This dispatches to _line__Node functions where needed.
- """
- node_name = node.__class__.__name__
- handler = getattr(self, "_line__" + node_name, None)
- if handler is not None:
- return handler(node)
- else:
- return node.lineno
- def _line__Assign(self, node):
- return self.line_for_node(node.value)
- def _line__Dict(self, node):
- # Python 3.5 changed how dict literals are made.
- if env.PYVERSION >= (3, 5) and node.keys:
- if node.keys[0] is not None:
- return node.keys[0].lineno
- else:
- # Unpacked dict literals `{**{'a':1}}` have None as the key,
- # use the value in that case.
- return node.values[0].lineno
- else:
- return node.lineno
- def _line__List(self, node):
- if node.elts:
- return self.line_for_node(node.elts[0])
- else:
- return node.lineno
- def _line__Module(self, node):
- if node.body:
- return self.line_for_node(node.body[0])
- else:
- # Empty modules have no line number, they always start at 1.
- return 1
- # The node types that just flow to the next node with no complications.
- OK_TO_DEFAULT = set([
- "Assign", "Assert", "AugAssign", "Delete", "Exec", "Expr", "Global",
- "Import", "ImportFrom", "Nonlocal", "Pass", "Print",
- ])
- @contract(returns='ArcStarts')
- def add_arcs(self, node):
- """Add the arcs for `node`.
- Return a set of ArcStarts, exits from this node to the next. Because a
- node represents an entire sub-tree (including its children), the exits
- from a node can be arbitrarily complex::
- if something(1):
- if other(2):
- doit(3)
- else:
- doit(5)
- There are two exits from line 1: they start at line 3 and line 5.
- """
- node_name = node.__class__.__name__
- handler = getattr(self, "_handle__" + node_name, None)
- if handler is not None:
- return handler(node)
- else:
- # No handler: either it's something that's ok to default (a simple
- # statement), or it's something we overlooked. Change this 0 to 1
- # to see if it's overlooked.
- if 0:
- if node_name not in self.OK_TO_DEFAULT:
- print("*** Unhandled: {0}".format(node))
- # Default for simple statements: one exit from this node.
- return set([ArcStart(self.line_for_node(node))])
- @one_of("from_start, prev_starts")
- @contract(returns='ArcStarts')
- def add_body_arcs(self, body, from_start=None, prev_starts=None):
- """Add arcs for the body of a compound statement.
- `body` is the body node. `from_start` is a single `ArcStart` that can
- be the previous line in flow before this body. `prev_starts` is a set
- of ArcStarts that can be the previous line. Only one of them should be
- given.
- Returns a set of ArcStarts, the exits from this body.
- """
- if prev_starts is None:
- prev_starts = set([from_start])
- for body_node in body:
- lineno = self.line_for_node(body_node)
- first_line = self.multiline.get(lineno, lineno)
- if first_line not in self.statements:
- body_node = self.find_non_missing_node(body_node)
- if body_node is None:
- continue
- lineno = self.line_for_node(body_node)
- for prev_start in prev_starts:
- self.add_arc(prev_start.lineno, lineno, prev_start.cause)
- prev_starts = self.add_arcs(body_node)
- return prev_starts
- def find_non_missing_node(self, node):
- """Search `node` looking for a child that has not been optimized away.
- This might return the node you started with, or it will work recursively
- to find a child node in self.statements.
- Returns a node, or None if none of the node remains.
- """
- # This repeats work just done in add_body_arcs, but this duplication
- # means we can avoid a function call in the 99.9999% case of not
- # optimizing away statements.
- lineno = self.line_for_node(node)
- first_line = self.multiline.get(lineno, lineno)
- if first_line in self.statements:
- return node
- missing_fn = getattr(self, "_missing__" + node.__class__.__name__, None)
- if missing_fn:
- node = missing_fn(node)
- else:
- node = None
- return node
- def _missing__If(self, node):
- # If the if-node is missing, then one of its children might still be
- # here, but not both. So return the first of the two that isn't missing.
- # Use a NodeList to hold the clauses as a single node.
- non_missing = self.find_non_missing_node(NodeList(node.body))
- if non_missing:
- return non_missing
- if node.orelse:
- return self.find_non_missing_node(NodeList(node.orelse))
- return None
- def _missing__NodeList(self, node):
- # A NodeList might be a mixture of missing and present nodes. Find the
- # ones that are present.
- non_missing_children = []
- for child in node.body:
- child = self.find_non_missing_node(child)
- if child is not None:
- non_missing_children.append(child)
- # Return the simplest representation of the present children.
- if not non_missing_children:
- return None
- if len(non_missing_children) == 1:
- return non_missing_children[0]
- return NodeList(non_missing_children)
- def is_constant_expr(self, node):
- """Is this a compile-time constant?"""
- node_name = node.__class__.__name__
- if node_name in ["NameConstant", "Num"]:
- return "Num"
- elif node_name == "Name":
- if node.id in ["True", "False", "None", "__debug__"]:
- return "Name"
- return None
- # In the fullness of time, these might be good tests to write:
- # while EXPR:
- # while False:
- # listcomps hidden deep in other expressions
- # listcomps hidden in lists: x = [[i for i in range(10)]]
- # nested function definitions
- # Exit processing: process_*_exits
- #
- # These functions process the four kinds of jump exits: break, continue,
- # raise, and return. To figure out where an exit goes, we have to look at
- # the block stack context. For example, a break will jump to the nearest
- # enclosing loop block, or the nearest enclosing finally block, whichever
- # is nearer.
- @contract(exits='ArcStarts')
- def process_break_exits(self, exits):
- """Add arcs due to jumps from `exits` being breaks."""
- for block in self.nearest_blocks():
- if isinstance(block, LoopBlock):
- block.break_exits.update(exits)
- break
- elif isinstance(block, TryBlock) and block.final_start is not None:
- block.break_from.update(exits)
- break
- @contract(exits='ArcStarts')
- def process_continue_exits(self, exits):
- """Add arcs due to jumps from `exits` being continues."""
- for block in self.nearest_blocks():
- if isinstance(block, LoopBlock):
- for xit in exits:
- self.add_arc(xit.lineno, block.start, xit.cause)
- break
- elif isinstance(block, TryBlock) and block.final_start is not None:
- block.continue_from.update(exits)
- break
- @contract(exits='ArcStarts')
- def process_raise_exits(self, exits):
- """Add arcs due to jumps from `exits` being raises."""
- for block in self.nearest_blocks():
- if isinstance(block, TryBlock):
- if block.handler_start is not None:
- for xit in exits:
- self.add_arc(xit.lineno, block.handler_start, xit.cause)
- break
- elif block.final_start is not None:
- block.raise_from.update(exits)
- break
- elif isinstance(block, FunctionBlock):
- for xit in exits:
- self.add_arc(
- xit.lineno, -block.start, xit.cause,
- "didn't except from function '{0}'".format(block.name),
- )
- break
- @contract(exits='ArcStarts')
- def process_return_exits(self, exits):
- """Add arcs due to jumps from `exits` being returns."""
- for block in self.nearest_blocks():
- if isinstance(block, TryBlock) and block.final_start is not None:
- block.return_from.update(exits)
- break
- elif isinstance(block, FunctionBlock):
- for xit in exits:
- self.add_arc(
- xit.lineno, -block.start, xit.cause,
- "didn't return from function '{0}'".format(block.name),
- )
- break
- # Handlers: _handle__*
- #
- # Each handler deals with a specific AST node type, dispatched from
- # add_arcs. Each deals with a particular kind of node type, and returns
- # the set of exits from that node. These functions mirror the Python
- # semantics of each syntactic construct. See the docstring for add_arcs to
- # understand the concept of exits from a node.
- @contract(returns='ArcStarts')
- def _handle__Break(self, node):
- here = self.line_for_node(node)
- break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed")
- self.process_break_exits([break_start])
- return set()
- @contract(returns='ArcStarts')
- def _handle_decorated(self, node):
- """Add arcs for things that can be decorated (classes and functions)."""
- last = self.line_for_node(node)
- if node.decorator_list:
- for dec_node in node.decorator_list:
- dec_start = self.line_for_node(dec_node)
- if dec_start != last:
- self.add_arc(last, dec_start)
- last = dec_start
- # The definition line may have been missed, but we should have it
- # in `self.statements`. For some constructs, `line_for_node` is
- # not what we'd think of as the first line in the statement, so map
- # it to the first one.
- body_start = self.line_for_node(node.body[0])
- body_start = self.multiline.get(body_start, body_start)
- for lineno in range(last+1, body_start):
- if lineno in self.statements:
- self.add_arc(last, lineno)
- last = lineno
- # The body is handled in collect_arcs.
- return set([ArcStart(last)])
- _handle__ClassDef = _handle_decorated
- @contract(returns='ArcStarts')
- def _handle__Continue(self, node):
- here = self.line_for_node(node)
- continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed")
- self.process_continue_exits([continue_start])
- return set()
- @contract(returns='ArcStarts')
- def _handle__For(self, node):
- start = self.line_for_node(node.iter)
- self.block_stack.append(LoopBlock(start=start))
- from_start = ArcStart(start, cause="the loop on line {lineno} never started")
- exits = self.add_body_arcs(node.body, from_start=from_start)
- # Any exit from the body will go back to the top of the loop.
- for xit in exits:
- self.add_arc(xit.lineno, start, xit.cause)
- my_block = self.block_stack.pop()
- exits = my_block.break_exits
- from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete")
- if node.orelse:
- else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
- exits |= else_exits
- else:
- # No else clause: exit from the for line.
- exits.add(from_start)
- return exits
- _handle__AsyncFor = _handle__For
- _handle__FunctionDef = _handle_decorated
- _handle__AsyncFunctionDef = _handle_decorated
- @contract(returns='ArcStarts')
- def _handle__If(self, node):
- start = self.line_for_node(node.test)
- from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
- exits = self.add_body_arcs(node.body, from_start=from_start)
- from_start = ArcStart(start, cause="the condition on line {lineno} was never false")
- exits |= self.add_body_arcs(node.orelse, from_start=from_start)
- return exits
- @contract(returns='ArcStarts')
- def _handle__NodeList(self, node):
- start = self.line_for_node(node)
- exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
- return exits
- @contract(returns='ArcStarts')
- def _handle__Raise(self, node):
- here = self.line_for_node(node)
- raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed")
- self.process_raise_exits([raise_start])
- # `raise` statement jumps away, no exits from here.
- return set()
- @contract(returns='ArcStarts')
- def _handle__Return(self, node):
- here = self.line_for_node(node)
- return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed")
- self.process_return_exits([return_start])
- # `return` statement jumps away, no exits from here.
- return set()
- @contract(returns='ArcStarts')
- def _handle__Try(self, node):
- if node.handlers:
- handler_start = self.line_for_node(node.handlers[0])
- else:
- handler_start = None
- if node.finalbody:
- final_start = self.line_for_node(node.finalbody[0])
- else:
- final_start = None
- try_block = TryBlock(handler_start, final_start)
- self.block_stack.append(try_block)
- start = self.line_for_node(node)
- exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
- # We're done with the `try` body, so this block no longer handles
- # exceptions. We keep the block so the `finally` clause can pick up
- # flows from the handlers and `else` clause.
- if node.finalbody:
- try_block.handler_start = None
- if node.handlers:
- # If there are `except` clauses, then raises in the try body
- # will already jump to them. Start this set over for raises in
- # `except` and `else`.
- try_block.raise_from = set([])
- else:
- self.block_stack.pop()
- handler_exits = set()
- if node.handlers:
- last_handler_start = None
- for handler_node in node.handlers:
- handler_start = self.line_for_node(handler_node)
- if last_handler_start is not None:
- self.add_arc(last_handler_start, handler_start)
- last_handler_start = handler_start
- from_cause = "the exception caught by line {lineno} didn't happen"
- from_start = ArcStart(handler_start, cause=from_cause)
- handler_exits |= self.add_body_arcs(handler_node.body, from_start=from_start)
- if node.orelse:
- exits = self.add_body_arcs(node.orelse, prev_starts=exits)
- exits |= handler_exits
- if node.finalbody:
- self.block_stack.pop()
- final_from = ( # You can get to the `finally` clause from:
- exits | # the exits of the body or `else` clause,
- try_block.break_from | # or a `break`,
- try_block.continue_from | # or a `continue`,
- try_block.raise_from | # or a `raise`,
- try_block.return_from # or a `return`.
- )
- final_exits = self.add_body_arcs(node.finalbody, prev_starts=final_from)
- if try_block.break_from:
- self.process_break_exits(
- self._combine_finally_starts(try_block.break_from, final_exits)
- )
- if try_block.continue_from:
- self.process_continue_exits(
- self._combine_finally_starts(try_block.continue_from, final_exits)
- )
- if try_block.raise_from:
- self.process_raise_exits(
- self._combine_finally_starts(try_block.raise_from, final_exits)
- )
- if try_block.return_from:
- self.process_return_exits(
- self._combine_finally_starts(try_block.return_from, final_exits)
- )
- if exits:
- # The finally clause's exits are only exits for the try block
- # as a whole if the try block had some exits to begin with.
- exits = final_exits
- return exits
- @contract(starts='ArcStarts', exits='ArcStarts', returns='ArcStarts')
- def _combine_finally_starts(self, starts, exits):
- """Helper for building the cause of `finally` branches.
- "finally" clauses might not execute their exits, and the causes could
- be due to a failure to execute any of the exits in the try block. So
- we use the causes from `starts` as the causes for `exits`.
- """
- causes = []
- for start in sorted(starts):
- if start.cause is not None:
- causes.append(start.cause.format(lineno=start.lineno))
- cause = " or ".join(causes)
- exits = set(ArcStart(xit.lineno, cause) for xit in exits)
- return exits
- @contract(returns='ArcStarts')
- def _handle__TryExcept(self, node):
- # Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
- # TryExcept, it means there was no finally, so fake it, and treat as
- # a general Try node.
- node.finalbody = []
- return self._handle__Try(node)
- @contract(returns='ArcStarts')
- def _handle__TryFinally(self, node):
- # Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
- # TryFinally, see if there's a TryExcept nested inside. If so, merge
- # them. Otherwise, fake fields to complete a Try node.
- node.handlers = []
- node.orelse = []
- first = node.body[0]
- if first.__class__.__name__ == "TryExcept" and node.lineno == first.lineno:
- assert len(node.body) == 1
- node.body = first.body
- node.handlers = first.handlers
- node.orelse = first.orelse
- return self._handle__Try(node)
- @contract(returns='ArcStarts')
- def _handle__While(self, node):
- constant_test = self.is_constant_expr(node.test)
- start = to_top = self.line_for_node(node.test)
- if constant_test and (env.PY3 or constant_test == "Num"):
- to_top = self.line_for_node(node.body[0])
- self.block_stack.append(LoopBlock(start=to_top))
- from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
- exits = self.add_body_arcs(node.body, from_start=from_start)
- for xit in exits:
- self.add_arc(xit.lineno, to_top, xit.cause)
- exits = set()
- my_block = self.block_stack.pop()
- exits.update(my_block.break_exits)
- from_start = ArcStart(start, cause="the condition on line {lineno} was never false")
- if node.orelse:
- else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
- exits |= else_exits
- else:
- # No `else` clause: you can exit from the start.
- if not constant_test:
- exits.add(from_start)
- return exits
- @contract(returns='ArcStarts')
- def _handle__With(self, node):
- start = self.line_for_node(node)
- exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
- return exits
- _handle__AsyncWith = _handle__With
- def _code_object__Module(self, node):
- start = self.line_for_node(node)
- if node.body:
- exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
- for xit in exits:
- self.add_arc(xit.lineno, -start, xit.cause, "didn't exit the module")
- else:
- # Empty module.
- self.add_arc(-start, start)
- self.add_arc(start, -start)
- def _code_object__FunctionDef(self, node):
- start = self.line_for_node(node)
- self.block_stack.append(FunctionBlock(start=start, name=node.name))
- exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
- self.process_return_exits(exits)
- self.block_stack.pop()
- _code_object__AsyncFunctionDef = _code_object__FunctionDef
- def _code_object__ClassDef(self, node):
- start = self.line_for_node(node)
- self.add_arc(-start, start)
- exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
- for xit in exits:
- self.add_arc(
- xit.lineno, -start, xit.cause,
- "didn't exit the body of class '{0}'".format(node.name),
- )
- def _make_oneline_code_method(noun): # pylint: disable=no-self-argument
- """A function to make methods for online callable _code_object__ methods."""
- def _code_object__oneline_callable(self, node):
- start = self.line_for_node(node)
- self.add_arc(-start, start, None, "didn't run the {0} on line {1}".format(noun, start))
- self.add_arc(
- start, -start, None,
- "didn't finish the {0} on line {1}".format(noun, start),
- )
- return _code_object__oneline_callable
- _code_object__Lambda = _make_oneline_code_method("lambda")
- _code_object__GeneratorExp = _make_oneline_code_method("generator expression")
- _code_object__DictComp = _make_oneline_code_method("dictionary comprehension")
- _code_object__SetComp = _make_oneline_code_method("set comprehension")
- if env.PY3:
- _code_object__ListComp = _make_oneline_code_method("list comprehension")
- if AST_DUMP: # pragma: debugging
- # Code only used when dumping the AST for debugging.
- SKIP_DUMP_FIELDS = ["ctx"]
- def _is_simple_value(value):
- """Is `value` simple enough to be displayed on a single line?"""
- return (
- value in [None, [], (), {}, set()] or
- isinstance(value, (string_class, int, float))
- )
- def ast_dump(node, depth=0):
- """Dump the AST for `node`.
- This recursively walks the AST, printing a readable version.
- """
- indent = " " * depth
- if not isinstance(node, ast.AST):
- print("{0}<{1} {2!r}>".format(indent, node.__class__.__name__, node))
- return
- lineno = getattr(node, "lineno", None)
- if lineno is not None:
- linemark = " @ {0}".format(node.lineno)
- else:
- linemark = ""
- head = "{0}<{1}{2}".format(indent, node.__class__.__name__, linemark)
- named_fields = [
- (name, value)
- for name, value in ast.iter_fields(node)
- if name not in SKIP_DUMP_FIELDS
- ]
- if not named_fields:
- print("{0}>".format(head))
- elif len(named_fields) == 1 and _is_simple_value(named_fields[0][1]):
- field_name, value = named_fields[0]
- print("{0} {1}: {2!r}>".format(head, field_name, value))
- else:
- print(head)
- if 0:
- print("{0}# mro: {1}".format(
- indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]),
- ))
- next_indent = indent + " "
- for field_name, value in named_fields:
- prefix = "{0}{1}:".format(next_indent, field_name)
- if _is_simple_value(value):
- print("{0} {1!r}".format(prefix, value))
- elif isinstance(value, list):
- print("{0} [".format(prefix))
- for n in value:
- ast_dump(n, depth + 8)
- print("{0}]".format(next_indent))
- else:
- print(prefix)
- ast_dump(value, depth + 8)
- print("{0}>".format(indent))
|