test_misc.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  4. # Use of this source code is governed by a BSD-style license that can be
  5. # found in the LICENSE file.
  6. """
  7. Miscellaneous tests.
  8. """
  9. import ast
  10. import collections
  11. import errno
  12. import json
  13. import os
  14. import pickle
  15. import socket
  16. import stat
  17. from psutil import LINUX
  18. from psutil import POSIX
  19. from psutil import WINDOWS
  20. from psutil._common import memoize
  21. from psutil._common import memoize_when_activated
  22. from psutil._common import supports_ipv6
  23. from psutil._common import wrap_numbers
  24. from psutil._compat import PY3
  25. from psutil.tests import APPVEYOR
  26. from psutil.tests import CI_TESTING
  27. from psutil.tests import DEVNULL
  28. from psutil.tests import HAS_BATTERY
  29. from psutil.tests import HAS_MEMORY_MAPS
  30. from psutil.tests import HAS_NET_IO_COUNTERS
  31. from psutil.tests import HAS_SENSORS_BATTERY
  32. from psutil.tests import HAS_SENSORS_FANS
  33. from psutil.tests import HAS_SENSORS_TEMPERATURES
  34. from psutil.tests import import_module_by_path
  35. from psutil.tests import mock
  36. from psutil.tests import PsutilTestCase
  37. from psutil.tests import PYTHON_EXE
  38. from psutil.tests import reload_module
  39. from psutil.tests import ROOT_DIR
  40. from psutil.tests import SCRIPTS_DIR
  41. from psutil.tests import sh
  42. from psutil.tests import TRAVIS
  43. from psutil.tests import unittest
  44. import psutil
  45. import psutil.tests
  46. # ===================================================================
  47. # --- Misc / generic tests.
  48. # ===================================================================
  49. class TestMisc(PsutilTestCase):
  50. def test_process__repr__(self, func=repr):
  51. p = psutil.Process(self.spawn_testproc().pid)
  52. r = func(p)
  53. self.assertIn("psutil.Process", r)
  54. self.assertIn("pid=%s" % p.pid, r)
  55. self.assertIn("name='%s'" % p.name(), r)
  56. self.assertIn("status=", r)
  57. self.assertNotIn("exitcode=", r)
  58. p.terminate()
  59. p.wait()
  60. r = func(p)
  61. self.assertIn("status='terminated'", r)
  62. self.assertIn("exitcode=", r)
  63. with mock.patch.object(psutil.Process, "name",
  64. side_effect=psutil.ZombieProcess(os.getpid())):
  65. p = psutil.Process()
  66. r = func(p)
  67. self.assertIn("pid=%s" % p.pid, r)
  68. self.assertIn("status='zombie'", r)
  69. self.assertNotIn("name=", r)
  70. with mock.patch.object(psutil.Process, "name",
  71. side_effect=psutil.NoSuchProcess(os.getpid())):
  72. p = psutil.Process()
  73. r = func(p)
  74. self.assertIn("pid=%s" % p.pid, r)
  75. self.assertIn("terminated", r)
  76. self.assertNotIn("name=", r)
  77. with mock.patch.object(psutil.Process, "name",
  78. side_effect=psutil.AccessDenied(os.getpid())):
  79. p = psutil.Process()
  80. r = func(p)
  81. self.assertIn("pid=%s" % p.pid, r)
  82. self.assertNotIn("name=", r)
  83. def test_process__str__(self):
  84. self.test_process__repr__(func=str)
  85. def test_no_such_process__repr__(self, func=repr):
  86. self.assertEqual(
  87. repr(psutil.NoSuchProcess(321)),
  88. "psutil.NoSuchProcess process no longer exists (pid=321)")
  89. self.assertEqual(
  90. repr(psutil.NoSuchProcess(321, name='foo')),
  91. "psutil.NoSuchProcess process no longer exists (pid=321, "
  92. "name='foo')")
  93. self.assertEqual(
  94. repr(psutil.NoSuchProcess(321, msg='foo')),
  95. "psutil.NoSuchProcess foo")
  96. def test_zombie_process__repr__(self, func=repr):
  97. self.assertEqual(
  98. repr(psutil.ZombieProcess(321)),
  99. "psutil.ZombieProcess process still exists but it's a zombie "
  100. "(pid=321)")
  101. self.assertEqual(
  102. repr(psutil.ZombieProcess(321, name='foo')),
  103. "psutil.ZombieProcess process still exists but it's a zombie "
  104. "(pid=321, name='foo')")
  105. self.assertEqual(
  106. repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
  107. "psutil.ZombieProcess process still exists but it's a zombie "
  108. "(pid=321, name='foo', ppid=1)")
  109. self.assertEqual(
  110. repr(psutil.ZombieProcess(321, msg='foo')),
  111. "psutil.ZombieProcess foo")
  112. def test_access_denied__repr__(self, func=repr):
  113. self.assertEqual(
  114. repr(psutil.AccessDenied(321)),
  115. "psutil.AccessDenied (pid=321)")
  116. self.assertEqual(
  117. repr(psutil.AccessDenied(321, name='foo')),
  118. "psutil.AccessDenied (pid=321, name='foo')")
  119. self.assertEqual(
  120. repr(psutil.AccessDenied(321, msg='foo')),
  121. "psutil.AccessDenied foo")
  122. def test_timeout_expired__repr__(self, func=repr):
  123. self.assertEqual(
  124. repr(psutil.TimeoutExpired(321)),
  125. "psutil.TimeoutExpired timeout after 321 seconds")
  126. self.assertEqual(
  127. repr(psutil.TimeoutExpired(321, pid=111)),
  128. "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
  129. self.assertEqual(
  130. repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
  131. "psutil.TimeoutExpired timeout after 321 seconds "
  132. "(pid=111, name='foo')")
  133. def test_process__eq__(self):
  134. p1 = psutil.Process()
  135. p2 = psutil.Process()
  136. self.assertEqual(p1, p2)
  137. p2._ident = (0, 0)
  138. self.assertNotEqual(p1, p2)
  139. self.assertNotEqual(p1, 'foo')
  140. def test_process__hash__(self):
  141. s = set([psutil.Process(), psutil.Process()])
  142. self.assertEqual(len(s), 1)
  143. def test__all__(self):
  144. dir_psutil = dir(psutil)
  145. for name in dir_psutil:
  146. if name in ('long', 'tests', 'test', 'PermissionError',
  147. 'ProcessLookupError'):
  148. continue
  149. if not name.startswith('_'):
  150. try:
  151. __import__(name)
  152. except ImportError:
  153. if name not in psutil.__all__:
  154. fun = getattr(psutil, name)
  155. if fun is None:
  156. continue
  157. if (fun.__doc__ is not None and
  158. 'deprecated' not in fun.__doc__.lower()):
  159. self.fail('%r not in psutil.__all__' % name)
  160. # Import 'star' will break if __all__ is inconsistent, see:
  161. # https://github.com/giampaolo/psutil/issues/656
  162. # Can't do `from psutil import *` as it won't work on python 3
  163. # so we simply iterate over __all__.
  164. for name in psutil.__all__:
  165. self.assertIn(name, dir_psutil)
  166. def test_version(self):
  167. self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
  168. psutil.__version__)
  169. def test_process_as_dict_no_new_names(self):
  170. # See https://github.com/giampaolo/psutil/issues/813
  171. p = psutil.Process()
  172. p.foo = '1'
  173. self.assertNotIn('foo', p.as_dict())
  174. def test_memoize(self):
  175. @memoize
  176. def foo(*args, **kwargs):
  177. "foo docstring"
  178. calls.append(None)
  179. return (args, kwargs)
  180. calls = []
  181. # no args
  182. for x in range(2):
  183. ret = foo()
  184. expected = ((), {})
  185. self.assertEqual(ret, expected)
  186. self.assertEqual(len(calls), 1)
  187. # with args
  188. for x in range(2):
  189. ret = foo(1)
  190. expected = ((1, ), {})
  191. self.assertEqual(ret, expected)
  192. self.assertEqual(len(calls), 2)
  193. # with args + kwargs
  194. for x in range(2):
  195. ret = foo(1, bar=2)
  196. expected = ((1, ), {'bar': 2})
  197. self.assertEqual(ret, expected)
  198. self.assertEqual(len(calls), 3)
  199. # clear cache
  200. foo.cache_clear()
  201. ret = foo()
  202. expected = ((), {})
  203. self.assertEqual(ret, expected)
  204. self.assertEqual(len(calls), 4)
  205. # docstring
  206. self.assertEqual(foo.__doc__, "foo docstring")
  207. def test_memoize_when_activated(self):
  208. class Foo:
  209. @memoize_when_activated
  210. def foo(self):
  211. calls.append(None)
  212. f = Foo()
  213. calls = []
  214. f.foo()
  215. f.foo()
  216. self.assertEqual(len(calls), 2)
  217. # activate
  218. calls = []
  219. f.foo.cache_activate(f)
  220. f.foo()
  221. f.foo()
  222. self.assertEqual(len(calls), 1)
  223. # deactivate
  224. calls = []
  225. f.foo.cache_deactivate(f)
  226. f.foo()
  227. f.foo()
  228. self.assertEqual(len(calls), 2)
  229. def test_parse_environ_block(self):
  230. from psutil._common import parse_environ_block
  231. def k(s):
  232. return s.upper() if WINDOWS else s
  233. self.assertEqual(parse_environ_block("a=1\0"),
  234. {k("a"): "1"})
  235. self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
  236. {k("a"): "1", k("b"): "2"})
  237. self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
  238. {k("a"): "1", k("b"): ""})
  239. # ignore everything after \0\0
  240. self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
  241. {k("a"): "1", k("b"): "2"})
  242. # ignore everything that is not an assignment
  243. self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
  244. self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
  245. # do not fail if the block is incomplete
  246. self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
  247. def test_supports_ipv6(self):
  248. self.addCleanup(supports_ipv6.cache_clear)
  249. if supports_ipv6():
  250. with mock.patch('psutil._common.socket') as s:
  251. s.has_ipv6 = False
  252. supports_ipv6.cache_clear()
  253. assert not supports_ipv6()
  254. supports_ipv6.cache_clear()
  255. with mock.patch('psutil._common.socket.socket',
  256. side_effect=socket.error) as s:
  257. assert not supports_ipv6()
  258. assert s.called
  259. supports_ipv6.cache_clear()
  260. with mock.patch('psutil._common.socket.socket',
  261. side_effect=socket.gaierror) as s:
  262. assert not supports_ipv6()
  263. supports_ipv6.cache_clear()
  264. assert s.called
  265. supports_ipv6.cache_clear()
  266. with mock.patch('psutil._common.socket.socket.bind',
  267. side_effect=socket.gaierror) as s:
  268. assert not supports_ipv6()
  269. supports_ipv6.cache_clear()
  270. assert s.called
  271. else:
  272. with self.assertRaises(Exception):
  273. sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
  274. try:
  275. sock.bind(("::1", 0))
  276. finally:
  277. sock.close()
  278. def test_isfile_strict(self):
  279. from psutil._common import isfile_strict
  280. this_file = os.path.abspath(__file__)
  281. assert isfile_strict(this_file)
  282. assert not isfile_strict(os.path.dirname(this_file))
  283. with mock.patch('psutil._common.os.stat',
  284. side_effect=OSError(errno.EPERM, "foo")):
  285. self.assertRaises(OSError, isfile_strict, this_file)
  286. with mock.patch('psutil._common.os.stat',
  287. side_effect=OSError(errno.EACCES, "foo")):
  288. self.assertRaises(OSError, isfile_strict, this_file)
  289. with mock.patch('psutil._common.os.stat',
  290. side_effect=OSError(errno.ENOENT, "foo")):
  291. assert not isfile_strict(this_file)
  292. with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
  293. assert not isfile_strict(this_file)
  294. def test_serialization(self):
  295. def check(ret):
  296. if json is not None:
  297. json.loads(json.dumps(ret))
  298. a = pickle.dumps(ret)
  299. b = pickle.loads(a)
  300. self.assertEqual(ret, b)
  301. check(psutil.Process().as_dict())
  302. check(psutil.virtual_memory())
  303. check(psutil.swap_memory())
  304. check(psutil.cpu_times())
  305. check(psutil.cpu_times_percent(interval=0))
  306. check(psutil.net_io_counters())
  307. if LINUX and not os.path.exists('/proc/diskstats'):
  308. pass
  309. else:
  310. if not APPVEYOR:
  311. check(psutil.disk_io_counters())
  312. check(psutil.disk_partitions())
  313. check(psutil.disk_usage(os.getcwd()))
  314. check(psutil.users())
  315. def test_setup_script(self):
  316. setup_py = os.path.join(ROOT_DIR, 'setup.py')
  317. if CI_TESTING and not os.path.exists(setup_py):
  318. return self.skipTest("can't find setup.py")
  319. module = import_module_by_path(setup_py)
  320. self.assertRaises(SystemExit, module.setup)
  321. self.assertEqual(module.get_version(), psutil.__version__)
  322. def test_ad_on_process_creation(self):
  323. # We are supposed to be able to instantiate Process also in case
  324. # of zombie processes or access denied.
  325. with mock.patch.object(psutil.Process, 'create_time',
  326. side_effect=psutil.AccessDenied) as meth:
  327. psutil.Process()
  328. assert meth.called
  329. with mock.patch.object(psutil.Process, 'create_time',
  330. side_effect=psutil.ZombieProcess(1)) as meth:
  331. psutil.Process()
  332. assert meth.called
  333. with mock.patch.object(psutil.Process, 'create_time',
  334. side_effect=ValueError) as meth:
  335. with self.assertRaises(ValueError):
  336. psutil.Process()
  337. assert meth.called
  338. def test_sanity_version_check(self):
  339. # see: https://github.com/giampaolo/psutil/issues/564
  340. with mock.patch(
  341. "psutil._psplatform.cext.version", return_value="0.0.0"):
  342. with self.assertRaises(ImportError) as cm:
  343. reload_module(psutil)
  344. self.assertIn("version conflict", str(cm.exception).lower())
  345. # ===================================================================
  346. # --- Tests for wrap_numbers() function.
  347. # ===================================================================
  348. nt = collections.namedtuple('foo', 'a b c')
  349. class TestWrapNumbers(PsutilTestCase):
  350. def setUp(self):
  351. wrap_numbers.cache_clear()
  352. tearDown = setUp
  353. def test_first_call(self):
  354. input = {'disk1': nt(5, 5, 5)}
  355. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  356. def test_input_hasnt_changed(self):
  357. input = {'disk1': nt(5, 5, 5)}
  358. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  359. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  360. def test_increase_but_no_wrap(self):
  361. input = {'disk1': nt(5, 5, 5)}
  362. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  363. input = {'disk1': nt(10, 15, 20)}
  364. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  365. input = {'disk1': nt(20, 25, 30)}
  366. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  367. input = {'disk1': nt(20, 25, 30)}
  368. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  369. def test_wrap(self):
  370. # let's say 100 is the threshold
  371. input = {'disk1': nt(100, 100, 100)}
  372. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  373. # first wrap restarts from 10
  374. input = {'disk1': nt(100, 100, 10)}
  375. self.assertEqual(wrap_numbers(input, 'disk_io'),
  376. {'disk1': nt(100, 100, 110)})
  377. # then it remains the same
  378. input = {'disk1': nt(100, 100, 10)}
  379. self.assertEqual(wrap_numbers(input, 'disk_io'),
  380. {'disk1': nt(100, 100, 110)})
  381. # then it goes up
  382. input = {'disk1': nt(100, 100, 90)}
  383. self.assertEqual(wrap_numbers(input, 'disk_io'),
  384. {'disk1': nt(100, 100, 190)})
  385. # then it wraps again
  386. input = {'disk1': nt(100, 100, 20)}
  387. self.assertEqual(wrap_numbers(input, 'disk_io'),
  388. {'disk1': nt(100, 100, 210)})
  389. # and remains the same
  390. input = {'disk1': nt(100, 100, 20)}
  391. self.assertEqual(wrap_numbers(input, 'disk_io'),
  392. {'disk1': nt(100, 100, 210)})
  393. # now wrap another num
  394. input = {'disk1': nt(50, 100, 20)}
  395. self.assertEqual(wrap_numbers(input, 'disk_io'),
  396. {'disk1': nt(150, 100, 210)})
  397. # and again
  398. input = {'disk1': nt(40, 100, 20)}
  399. self.assertEqual(wrap_numbers(input, 'disk_io'),
  400. {'disk1': nt(190, 100, 210)})
  401. # keep it the same
  402. input = {'disk1': nt(40, 100, 20)}
  403. self.assertEqual(wrap_numbers(input, 'disk_io'),
  404. {'disk1': nt(190, 100, 210)})
  405. def test_changing_keys(self):
  406. # Emulate a case where the second call to disk_io()
  407. # (or whatever) provides a new disk, then the new disk
  408. # disappears on the third call.
  409. input = {'disk1': nt(5, 5, 5)}
  410. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  411. input = {'disk1': nt(5, 5, 5),
  412. 'disk2': nt(7, 7, 7)}
  413. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  414. input = {'disk1': nt(8, 8, 8)}
  415. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  416. def test_changing_keys_w_wrap(self):
  417. input = {'disk1': nt(50, 50, 50),
  418. 'disk2': nt(100, 100, 100)}
  419. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  420. # disk 2 wraps
  421. input = {'disk1': nt(50, 50, 50),
  422. 'disk2': nt(100, 100, 10)}
  423. self.assertEqual(wrap_numbers(input, 'disk_io'),
  424. {'disk1': nt(50, 50, 50),
  425. 'disk2': nt(100, 100, 110)})
  426. # disk 2 disappears
  427. input = {'disk1': nt(50, 50, 50)}
  428. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  429. # then it appears again; the old wrap is supposed to be
  430. # gone.
  431. input = {'disk1': nt(50, 50, 50),
  432. 'disk2': nt(100, 100, 100)}
  433. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  434. # remains the same
  435. input = {'disk1': nt(50, 50, 50),
  436. 'disk2': nt(100, 100, 100)}
  437. self.assertEqual(wrap_numbers(input, 'disk_io'), input)
  438. # and then wraps again
  439. input = {'disk1': nt(50, 50, 50),
  440. 'disk2': nt(100, 100, 10)}
  441. self.assertEqual(wrap_numbers(input, 'disk_io'),
  442. {'disk1': nt(50, 50, 50),
  443. 'disk2': nt(100, 100, 110)})
  444. def test_real_data(self):
  445. d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
  446. 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
  447. 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
  448. 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
  449. self.assertEqual(wrap_numbers(d, 'disk_io'), d)
  450. self.assertEqual(wrap_numbers(d, 'disk_io'), d)
  451. # decrease this ↓
  452. d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
  453. 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
  454. 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
  455. 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
  456. out = wrap_numbers(d, 'disk_io')
  457. self.assertEqual(out['nvme0n1'][0], 400)
  458. # --- cache tests
  459. def test_cache_first_call(self):
  460. input = {'disk1': nt(5, 5, 5)}
  461. wrap_numbers(input, 'disk_io')
  462. cache = wrap_numbers.cache_info()
  463. self.assertEqual(cache[0], {'disk_io': input})
  464. self.assertEqual(cache[1], {'disk_io': {}})
  465. self.assertEqual(cache[2], {'disk_io': {}})
  466. def test_cache_call_twice(self):
  467. input = {'disk1': nt(5, 5, 5)}
  468. wrap_numbers(input, 'disk_io')
  469. input = {'disk1': nt(10, 10, 10)}
  470. wrap_numbers(input, 'disk_io')
  471. cache = wrap_numbers.cache_info()
  472. self.assertEqual(cache[0], {'disk_io': input})
  473. self.assertEqual(
  474. cache[1],
  475. {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
  476. self.assertEqual(cache[2], {'disk_io': {}})
  477. def test_cache_wrap(self):
  478. # let's say 100 is the threshold
  479. input = {'disk1': nt(100, 100, 100)}
  480. wrap_numbers(input, 'disk_io')
  481. # first wrap restarts from 10
  482. input = {'disk1': nt(100, 100, 10)}
  483. wrap_numbers(input, 'disk_io')
  484. cache = wrap_numbers.cache_info()
  485. self.assertEqual(cache[0], {'disk_io': input})
  486. self.assertEqual(
  487. cache[1],
  488. {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
  489. self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
  490. def assert_():
  491. cache = wrap_numbers.cache_info()
  492. self.assertEqual(
  493. cache[1],
  494. {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
  495. ('disk1', 2): 100}})
  496. self.assertEqual(cache[2],
  497. {'disk_io': {'disk1': set([('disk1', 2)])}})
  498. # then it remains the same
  499. input = {'disk1': nt(100, 100, 10)}
  500. wrap_numbers(input, 'disk_io')
  501. cache = wrap_numbers.cache_info()
  502. self.assertEqual(cache[0], {'disk_io': input})
  503. assert_()
  504. # then it goes up
  505. input = {'disk1': nt(100, 100, 90)}
  506. wrap_numbers(input, 'disk_io')
  507. cache = wrap_numbers.cache_info()
  508. self.assertEqual(cache[0], {'disk_io': input})
  509. assert_()
  510. # then it wraps again
  511. input = {'disk1': nt(100, 100, 20)}
  512. wrap_numbers(input, 'disk_io')
  513. cache = wrap_numbers.cache_info()
  514. self.assertEqual(cache[0], {'disk_io': input})
  515. self.assertEqual(
  516. cache[1],
  517. {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
  518. self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
  519. def test_cache_changing_keys(self):
  520. input = {'disk1': nt(5, 5, 5)}
  521. wrap_numbers(input, 'disk_io')
  522. input = {'disk1': nt(5, 5, 5),
  523. 'disk2': nt(7, 7, 7)}
  524. wrap_numbers(input, 'disk_io')
  525. cache = wrap_numbers.cache_info()
  526. self.assertEqual(cache[0], {'disk_io': input})
  527. self.assertEqual(
  528. cache[1],
  529. {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
  530. self.assertEqual(cache[2], {'disk_io': {}})
  531. def test_cache_clear(self):
  532. input = {'disk1': nt(5, 5, 5)}
  533. wrap_numbers(input, 'disk_io')
  534. wrap_numbers(input, 'disk_io')
  535. wrap_numbers.cache_clear('disk_io')
  536. self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
  537. wrap_numbers.cache_clear('disk_io')
  538. wrap_numbers.cache_clear('?!?')
  539. @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
  540. def test_cache_clear_public_apis(self):
  541. if not psutil.disk_io_counters() or not psutil.net_io_counters():
  542. return self.skipTest("no disks or NICs available")
  543. psutil.disk_io_counters()
  544. psutil.net_io_counters()
  545. caches = wrap_numbers.cache_info()
  546. for cache in caches:
  547. self.assertIn('psutil.disk_io_counters', cache)
  548. self.assertIn('psutil.net_io_counters', cache)
  549. psutil.disk_io_counters.cache_clear()
  550. caches = wrap_numbers.cache_info()
  551. for cache in caches:
  552. self.assertIn('psutil.net_io_counters', cache)
  553. self.assertNotIn('psutil.disk_io_counters', cache)
  554. psutil.net_io_counters.cache_clear()
  555. caches = wrap_numbers.cache_info()
  556. self.assertEqual(caches, ({}, {}, {}))
  557. # ===================================================================
  558. # --- Example script tests
  559. # ===================================================================
  560. @unittest.skipIf(not os.path.exists(SCRIPTS_DIR),
  561. "can't locate scripts directory")
  562. class TestScripts(PsutilTestCase):
  563. """Tests for scripts in the "scripts" directory."""
  564. @staticmethod
  565. def assert_stdout(exe, *args, **kwargs):
  566. exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
  567. cmd = [PYTHON_EXE, exe]
  568. for arg in args:
  569. cmd.append(arg)
  570. try:
  571. out = sh(cmd, **kwargs).strip()
  572. except RuntimeError as err:
  573. if 'AccessDenied' in str(err):
  574. return str(err)
  575. else:
  576. raise
  577. assert out, out
  578. return out
  579. @staticmethod
  580. def assert_syntax(exe, args=None):
  581. exe = os.path.join(SCRIPTS_DIR, exe)
  582. if PY3:
  583. f = open(exe, 'rt', encoding='utf8')
  584. else:
  585. f = open(exe, 'rt')
  586. with f:
  587. src = f.read()
  588. ast.parse(src)
  589. def test_coverage(self):
  590. # make sure all example scripts have a test method defined
  591. meths = dir(self)
  592. for name in os.listdir(SCRIPTS_DIR):
  593. if name.endswith('.py'):
  594. if 'test_' + os.path.splitext(name)[0] not in meths:
  595. # self.assert_stdout(name)
  596. self.fail('no test defined for %r script'
  597. % os.path.join(SCRIPTS_DIR, name))
  598. @unittest.skipIf(not POSIX, "POSIX only")
  599. def test_executable(self):
  600. for name in os.listdir(SCRIPTS_DIR):
  601. if name.endswith('.py'):
  602. path = os.path.join(SCRIPTS_DIR, name)
  603. if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
  604. self.fail('%r is not executable' % path)
  605. def test_disk_usage(self):
  606. self.assert_stdout('disk_usage.py')
  607. def test_free(self):
  608. self.assert_stdout('free.py')
  609. def test_meminfo(self):
  610. self.assert_stdout('meminfo.py')
  611. def test_procinfo(self):
  612. self.assert_stdout('procinfo.py', str(os.getpid()))
  613. @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
  614. def test_who(self):
  615. self.assert_stdout('who.py')
  616. def test_ps(self):
  617. self.assert_stdout('ps.py')
  618. def test_pstree(self):
  619. self.assert_stdout('pstree.py')
  620. def test_netstat(self):
  621. self.assert_stdout('netstat.py')
  622. # permission denied on travis
  623. @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
  624. def test_ifconfig(self):
  625. self.assert_stdout('ifconfig.py')
  626. @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
  627. def test_pmap(self):
  628. self.assert_stdout('pmap.py', str(os.getpid()))
  629. def test_procsmem(self):
  630. if 'uss' not in psutil.Process().memory_full_info()._fields:
  631. raise self.skipTest("not supported")
  632. self.assert_stdout('procsmem.py', stderr=DEVNULL)
  633. def test_killall(self):
  634. self.assert_syntax('killall.py')
  635. def test_nettop(self):
  636. self.assert_syntax('nettop.py')
  637. def test_top(self):
  638. self.assert_syntax('top.py')
  639. def test_iotop(self):
  640. self.assert_syntax('iotop.py')
  641. def test_pidof(self):
  642. output = self.assert_stdout('pidof.py', psutil.Process().name())
  643. self.assertIn(str(os.getpid()), output)
  644. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  645. def test_winservices(self):
  646. self.assert_stdout('winservices.py')
  647. def test_cpu_distribution(self):
  648. self.assert_syntax('cpu_distribution.py')
  649. @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
  650. @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
  651. def test_temperatures(self):
  652. if not psutil.sensors_temperatures():
  653. self.skipTest("no temperatures")
  654. self.assert_stdout('temperatures.py')
  655. @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
  656. @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
  657. def test_fans(self):
  658. if not psutil.sensors_fans():
  659. self.skipTest("no fans")
  660. self.assert_stdout('fans.py')
  661. @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
  662. @unittest.skipIf(not HAS_BATTERY, "no battery")
  663. def test_battery(self):
  664. self.assert_stdout('battery.py')
  665. @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
  666. @unittest.skipIf(not HAS_BATTERY, "no battery")
  667. def test_sensors(self):
  668. self.assert_stdout('sensors.py')
  669. if __name__ == '__main__':
  670. from psutil.tests.runner import run_from_name
  671. run_from_name(__file__)