test_connect.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. """Tests for kernel connection utilities"""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. import json
  5. import os
  6. import re
  7. import stat
  8. import tempfile
  9. import shutil
  10. from traitlets.config import Config
  11. from jupyter_core.application import JupyterApp
  12. from jupyter_core.paths import jupyter_runtime_dir
  13. from ipython_genutils.tempdir import TemporaryDirectory, TemporaryWorkingDirectory
  14. from ipython_genutils.py3compat import str_to_bytes
  15. from jupyter_client import connect, KernelClient
  16. from jupyter_client.consoleapp import JupyterConsoleApp
  17. from jupyter_client.session import Session
  18. from jupyter_client.connect import secure_write
  19. class DummyConsoleApp(JupyterApp, JupyterConsoleApp):
  20. def initialize(self, argv=[]):
  21. JupyterApp.initialize(self, argv=argv)
  22. self.init_connection_file()
  23. class DummyConfigurable(connect.ConnectionFileMixin):
  24. def initialize(self):
  25. pass
  26. sample_info = dict(ip='1.2.3.4', transport='ipc',
  27. shell_port=1, hb_port=2, iopub_port=3, stdin_port=4, control_port=5,
  28. key=b'abc123', signature_scheme='hmac-md5', kernel_name='python'
  29. )
  30. sample_info_kn = dict(ip='1.2.3.4', transport='ipc',
  31. shell_port=1, hb_port=2, iopub_port=3, stdin_port=4, control_port=5,
  32. key=b'abc123', signature_scheme='hmac-md5', kernel_name='test'
  33. )
  34. def test_write_connection_file():
  35. with TemporaryDirectory() as d:
  36. cf = os.path.join(d, 'kernel.json')
  37. connect.write_connection_file(cf, **sample_info)
  38. assert os.path.exists(cf)
  39. with open(cf, 'r') as f:
  40. info = json.load(f)
  41. info['key'] = str_to_bytes(info['key'])
  42. assert info == sample_info
  43. def test_load_connection_file_session():
  44. """test load_connection_file() after """
  45. session = Session()
  46. app = DummyConsoleApp(session=Session())
  47. app.initialize(argv=[])
  48. session = app.session
  49. with TemporaryDirectory() as d:
  50. cf = os.path.join(d, 'kernel.json')
  51. connect.write_connection_file(cf, **sample_info)
  52. app.connection_file = cf
  53. app.load_connection_file()
  54. assert session.key == sample_info['key']
  55. assert session.signature_scheme == sample_info['signature_scheme']
  56. def test_load_connection_file_session_with_kn():
  57. """test load_connection_file() after """
  58. session = Session()
  59. app = DummyConsoleApp(session=Session())
  60. app.initialize(argv=[])
  61. session = app.session
  62. with TemporaryDirectory() as d:
  63. cf = os.path.join(d, 'kernel.json')
  64. connect.write_connection_file(cf, **sample_info_kn)
  65. app.connection_file = cf
  66. app.load_connection_file()
  67. assert session.key == sample_info_kn['key']
  68. assert session.signature_scheme == sample_info_kn['signature_scheme']
  69. def test_app_load_connection_file():
  70. """test `ipython console --existing` loads a connection file"""
  71. with TemporaryDirectory() as d:
  72. cf = os.path.join(d, 'kernel.json')
  73. connect.write_connection_file(cf, **sample_info)
  74. app = DummyConsoleApp(connection_file=cf)
  75. app.initialize(argv=[])
  76. for attr, expected in sample_info.items():
  77. if attr in ('key', 'signature_scheme'):
  78. continue
  79. value = getattr(app, attr)
  80. assert value == expected, "app.%s = %s != %s" % (attr, value, expected)
  81. def test_load_connection_info():
  82. client = KernelClient()
  83. info = {
  84. 'control_port': 53702,
  85. 'hb_port': 53705,
  86. 'iopub_port': 53703,
  87. 'ip': '0.0.0.0',
  88. 'key': 'secret',
  89. 'shell_port': 53700,
  90. 'signature_scheme': 'hmac-sha256',
  91. 'stdin_port': 53701,
  92. 'transport': 'tcp',
  93. }
  94. client.load_connection_info(info)
  95. assert client.control_port == info['control_port']
  96. assert client.session.key.decode('ascii') == info['key']
  97. assert client.ip == info['ip']
  98. def test_find_connection_file():
  99. with TemporaryDirectory() as d:
  100. cf = 'kernel.json'
  101. app = DummyConsoleApp(runtime_dir=d, connection_file=cf)
  102. app.initialize()
  103. security_dir = app.runtime_dir
  104. profile_cf = os.path.join(security_dir, cf)
  105. with open(profile_cf, 'w') as f:
  106. f.write("{}")
  107. for query in (
  108. 'kernel.json',
  109. 'kern*',
  110. '*ernel*',
  111. 'k*',
  112. ):
  113. assert connect.find_connection_file(query, path=security_dir) == profile_cf
  114. def test_find_connection_file_local():
  115. with TemporaryWorkingDirectory() as d:
  116. cf = 'test.json'
  117. abs_cf = os.path.abspath(cf)
  118. with open(cf, 'w') as f:
  119. f.write('{}')
  120. for query in (
  121. 'test.json',
  122. 'test',
  123. abs_cf,
  124. os.path.join('.', 'test.json'),
  125. ):
  126. assert connect.find_connection_file(query, path=['.', jupyter_runtime_dir()]) == abs_cf
  127. def test_find_connection_file_relative():
  128. with TemporaryWorkingDirectory() as d:
  129. cf = 'test.json'
  130. os.mkdir('subdir')
  131. cf = os.path.join('subdir', 'test.json')
  132. abs_cf = os.path.abspath(cf)
  133. with open(cf, 'w') as f:
  134. f.write('{}')
  135. for query in (
  136. os.path.join('.', 'subdir', 'test.json'),
  137. os.path.join('subdir', 'test.json'),
  138. abs_cf,
  139. ):
  140. assert connect.find_connection_file(query, path=['.', jupyter_runtime_dir()]) == abs_cf
  141. def test_find_connection_file_abspath():
  142. with TemporaryDirectory() as d:
  143. cf = 'absolute.json'
  144. abs_cf = os.path.abspath(cf)
  145. with open(cf, 'w') as f:
  146. f.write('{}')
  147. assert connect.find_connection_file(abs_cf, path=jupyter_runtime_dir()) == abs_cf
  148. os.remove(abs_cf)
  149. def test_mixin_record_random_ports():
  150. with TemporaryDirectory() as d:
  151. dc = DummyConfigurable(data_dir=d, kernel_name='via-tcp', transport='tcp')
  152. dc.write_connection_file()
  153. assert dc._connection_file_written
  154. assert os.path.exists(dc.connection_file)
  155. assert dc._random_port_names == connect.port_names
  156. def test_mixin_cleanup_random_ports():
  157. with TemporaryDirectory() as d:
  158. dc = DummyConfigurable(data_dir=d, kernel_name='via-tcp', transport='tcp')
  159. dc.write_connection_file()
  160. filename = dc.connection_file
  161. dc.cleanup_random_ports()
  162. assert not os.path.exists(filename)
  163. for name in dc._random_port_names:
  164. assert getattr(dc, name) == 0
  165. def test_secure_write():
  166. def fetch_win32_permissions(filename):
  167. '''Extracts file permissions on windows using icacls'''
  168. role_permissions = {}
  169. for index, line in enumerate(os.popen("icacls %s" % filename).read().splitlines()):
  170. if index == 0:
  171. line = line.split(filename)[-1].strip().lower()
  172. match = re.match(r'\s*([^:]+):\(([^\)]*)\)', line)
  173. if match:
  174. usergroup, permissions = match.groups()
  175. usergroup = usergroup.lower().split('\\')[-1]
  176. permissions = set(p.lower() for p in permissions.split(','))
  177. role_permissions[usergroup] = permissions
  178. elif not line.strip():
  179. break
  180. return role_permissions
  181. def check_user_only_permissions(fname):
  182. # Windows has it's own permissions ACL patterns
  183. if os.name == 'nt':
  184. import win32api
  185. username = win32api.GetUserName().lower()
  186. permissions = fetch_win32_permissions(fname)
  187. assert username in permissions
  188. assert permissions[username] == set(['r', 'w'])
  189. assert 'administrators' in permissions
  190. assert permissions['administrators'] == set(['f'])
  191. assert 'everyone' not in permissions
  192. assert len(permissions) == 2
  193. else:
  194. mode = os.stat(fname).st_mode
  195. assert '0600' == oct(stat.S_IMODE(mode)).replace('0o', '0')
  196. directory = tempfile.mkdtemp()
  197. fname = os.path.join(directory, 'check_perms')
  198. try:
  199. with secure_write(fname) as f:
  200. f.write('test 1')
  201. check_user_only_permissions(fname)
  202. with open(fname, 'r') as f:
  203. assert f.read() == 'test 1'
  204. # Try changing file permissions ahead of time
  205. os.chmod(fname, 0o755)
  206. with secure_write(fname) as f:
  207. f.write('test 2')
  208. check_user_only_permissions(fname)
  209. with open(fname, 'r') as f:
  210. assert f.read() == 'test 2'
  211. finally:
  212. shutil.rmtree(directory)