utils.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. """utilities for testing IPython kernels"""
  2. # Copyright (c) IPython Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import print_function
  5. import atexit
  6. import os
  7. import sys
  8. from contextlib import contextmanager
  9. from subprocess import PIPE, STDOUT
  10. try:
  11. from queue import Empty # Py 3
  12. except ImportError:
  13. from Queue import Empty # Py 2
  14. import nose
  15. from jupyter_client import manager
  16. STARTUP_TIMEOUT = 60
  17. TIMEOUT = 15
  18. KM = None
  19. KC = None
  20. def start_new_kernel(**kwargs):
  21. """start a new kernel, and return its Manager and Client
  22. Integrates with our output capturing for tests.
  23. """
  24. try:
  25. stdout = nose.iptest_stdstreams_fileno()
  26. except AttributeError:
  27. stdout = open(os.devnull)
  28. kwargs.update(dict(stdout=stdout, stderr=STDOUT))
  29. return manager.start_new_kernel(startup_timeout=STARTUP_TIMEOUT, **kwargs)
  30. def flush_channels(kc=None):
  31. """flush any messages waiting on the queue"""
  32. from .test_message_spec import validate_message
  33. if kc is None:
  34. kc = KC
  35. for channel in (kc.shell_channel, kc.iopub_channel):
  36. while True:
  37. try:
  38. msg = channel.get_msg(block=True, timeout=0.1)
  39. except Empty:
  40. break
  41. else:
  42. validate_message(msg)
  43. def execute(code='', kc=None, **kwargs):
  44. """wrapper for doing common steps for validating an execution request"""
  45. from .test_message_spec import validate_message
  46. if kc is None:
  47. kc = KC
  48. msg_id = kc.execute(code=code, **kwargs)
  49. reply = kc.get_shell_msg(timeout=TIMEOUT)
  50. validate_message(reply, 'execute_reply', msg_id)
  51. busy = kc.get_iopub_msg(timeout=TIMEOUT)
  52. validate_message(busy, 'status', msg_id)
  53. assert busy['content']['execution_state'] == 'busy'
  54. if not kwargs.get('silent'):
  55. execute_input = kc.get_iopub_msg(timeout=TIMEOUT)
  56. validate_message(execute_input, 'execute_input', msg_id)
  57. assert execute_input['content']['code'] == code
  58. # show tracebacks if present for debugging
  59. if reply['content'].get('traceback'):
  60. print('\n'.join(reply['content']['traceback']), file=sys.stderr)
  61. return msg_id, reply['content']
  62. def start_global_kernel():
  63. """start the global kernel (if it isn't running) and return its client"""
  64. global KM, KC
  65. if KM is None:
  66. KM, KC = start_new_kernel()
  67. atexit.register(stop_global_kernel)
  68. else:
  69. flush_channels(KC)
  70. return KC
  71. @contextmanager
  72. def kernel():
  73. """Context manager for the global kernel instance
  74. Should be used for most kernel tests
  75. Returns
  76. -------
  77. kernel_client: connected KernelClient instance
  78. """
  79. yield start_global_kernel()
  80. def uses_kernel(test_f):
  81. """Decorator for tests that use the global kernel"""
  82. def wrapped_test():
  83. with kernel() as kc:
  84. test_f(kc)
  85. wrapped_test.__doc__ = test_f.__doc__
  86. wrapped_test.__name__ = test_f.__name__
  87. return wrapped_test
  88. def stop_global_kernel():
  89. """Stop the global shared kernel instance, if it exists"""
  90. global KM, KC
  91. KC.stop_channels()
  92. KC = None
  93. if KM is None:
  94. return
  95. KM.shutdown_kernel(now=True)
  96. KM = None
  97. def new_kernel(argv=None):
  98. """Context manager for a new kernel in a subprocess
  99. Should only be used for tests where the kernel must not be re-used.
  100. Returns
  101. -------
  102. kernel_client: connected KernelClient instance
  103. """
  104. stdout = getattr(nose, 'iptest_stdstreams_fileno', open(os.devnull))
  105. kwargs = dict(stdout=stdout, stderr=STDOUT)
  106. if argv is not None:
  107. kwargs['extra_arguments'] = argv
  108. return manager.run_kernel(**kwargs)
  109. def assemble_output(iopub):
  110. """assemble stdout/err from an execution"""
  111. stdout = ''
  112. stderr = ''
  113. while True:
  114. msg = iopub.get_msg(block=True, timeout=1)
  115. msg_type = msg['msg_type']
  116. content = msg['content']
  117. if msg_type == 'status' and content['execution_state'] == 'idle':
  118. # idle message signals end of output
  119. break
  120. elif msg['msg_type'] == 'stream':
  121. if content['name'] == 'stdout':
  122. stdout += content['text']
  123. elif content['name'] == 'stderr':
  124. stderr += content['text']
  125. else:
  126. raise KeyError("bad stream: %r" % content['name'])
  127. else:
  128. # other output, ignored
  129. pass
  130. return stdout, stderr
  131. def wait_for_idle(kc):
  132. while True:
  133. msg = kc.iopub_channel.get_msg(block=True, timeout=1)
  134. msg_type = msg['msg_type']
  135. content = msg['content']
  136. if msg_type == 'status' and content['execution_state'] == 'idle':
  137. break