test_system.py 35 KB


  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Tests for system APIS."""
  6. import contextlib
  7. import datetime
  8. import errno
  9. import os
  10. import pprint
  11. import shutil
  12. import signal
  13. import socket
  14. import sys
  15. import time
  16. import psutil
  17. from psutil import AIX
  18. from psutil import BSD
  19. from psutil import FREEBSD
  20. from psutil import LINUX
  21. from psutil import MACOS
  22. from psutil import NETBSD
  23. from psutil import OPENBSD
  24. from psutil import POSIX
  25. from psutil import SUNOS
  26. from psutil import WINDOWS
  27. from psutil._compat import FileNotFoundError
  28. from psutil._compat import long
  29. from psutil.tests import ASCII_FS
  30. from psutil.tests import check_net_address
  31. from psutil.tests import CI_TESTING
  32. from psutil.tests import DEVNULL
  33. from psutil.tests import enum
  34. from psutil.tests import GLOBAL_TIMEOUT
  35. from psutil.tests import HAS_BATTERY
  36. from psutil.tests import HAS_CPU_FREQ
  37. from psutil.tests import HAS_GETLOADAVG
  38. from psutil.tests import HAS_NET_IO_COUNTERS
  39. from psutil.tests import HAS_SENSORS_BATTERY
  40. from psutil.tests import HAS_SENSORS_FANS
  41. from psutil.tests import HAS_SENSORS_TEMPERATURES
  42. from psutil.tests import IS_64BIT
  43. from psutil.tests import mock
  44. from psutil.tests import PsutilTestCase
  45. from psutil.tests import PYPY
  46. from psutil.tests import retry_on_failure
  47. from psutil.tests import TRAVIS
  48. from psutil.tests import GITHUB_WHEELS
  49. from psutil.tests import UNICODE_SUFFIX
  50. from psutil.tests import unittest
  51. # ===================================================================
  52. # --- System-related API tests
  53. # ===================================================================
  54. class TestProcessAPIs(PsutilTestCase):
  55. def test_process_iter(self):
  56. self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
  57. sproc = self.spawn_testproc()
  58. self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
  59. p = psutil.Process(sproc.pid)
  60. p.kill()
  61. p.wait()
  62. self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
  63. with mock.patch('psutil.Process',
  64. side_effect=psutil.NoSuchProcess(os.getpid())):
  65. self.assertEqual(list(psutil.process_iter()), [])
  66. with mock.patch('psutil.Process',
  67. side_effect=psutil.AccessDenied(os.getpid())):
  68. with self.assertRaises(psutil.AccessDenied):
  69. list(psutil.process_iter())
  70. def test_prcess_iter_w_attrs(self):
  71. for p in psutil.process_iter(attrs=['pid']):
  72. self.assertEqual(list(p.info.keys()), ['pid'])
  73. with self.assertRaises(ValueError):
  74. list(psutil.process_iter(attrs=['foo']))
  75. with mock.patch("psutil._psplatform.Process.cpu_times",
  76. side_effect=psutil.AccessDenied(0, "")) as m:
  77. for p in psutil.process_iter(attrs=["pid", "cpu_times"]):
  78. self.assertIsNone(p.info['cpu_times'])
  79. self.assertGreaterEqual(p.info['pid'], 0)
  80. assert m.called
  81. with mock.patch("psutil._psplatform.Process.cpu_times",
  82. side_effect=psutil.AccessDenied(0, "")) as m:
  83. flag = object()
  84. for p in psutil.process_iter(
  85. attrs=["pid", "cpu_times"], ad_value=flag):
  86. self.assertIs(p.info['cpu_times'], flag)
  87. self.assertGreaterEqual(p.info['pid'], 0)
  88. assert m.called
  89. @unittest.skipIf(PYPY and WINDOWS,
  90. "spawn_testproc() unreliable on PYPY + WINDOWS")
  91. def test_wait_procs(self):
  92. def callback(p):
  93. pids.append(p.pid)
  94. pids = []
  95. sproc1 = self.spawn_testproc()
  96. sproc2 = self.spawn_testproc()
  97. sproc3 = self.spawn_testproc()
  98. procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
  99. self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
  100. self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
  101. t = time.time()
  102. gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
  103. self.assertLess(time.time() - t, 0.5)
  104. self.assertEqual(gone, [])
  105. self.assertEqual(len(alive), 3)
  106. self.assertEqual(pids, [])
  107. for p in alive:
  108. self.assertFalse(hasattr(p, 'returncode'))
  109. @retry_on_failure(30)
  110. def test(procs, callback):
  111. gone, alive = psutil.wait_procs(procs, timeout=0.03,
  112. callback=callback)
  113. self.assertEqual(len(gone), 1)
  114. self.assertEqual(len(alive), 2)
  115. return gone, alive
  116. sproc3.terminate()
  117. gone, alive = test(procs, callback)
  118. self.assertIn(sproc3.pid, [x.pid for x in gone])
  119. if POSIX:
  120. self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
  121. else:
  122. self.assertEqual(gone.pop().returncode, 1)
  123. self.assertEqual(pids, [sproc3.pid])
  124. for p in alive:
  125. self.assertFalse(hasattr(p, 'returncode'))
  126. @retry_on_failure(30)
  127. def test(procs, callback):
  128. gone, alive = psutil.wait_procs(procs, timeout=0.03,
  129. callback=callback)
  130. self.assertEqual(len(gone), 3)
  131. self.assertEqual(len(alive), 0)
  132. return gone, alive
  133. sproc1.terminate()
  134. sproc2.terminate()
  135. gone, alive = test(procs, callback)
  136. self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid]))
  137. for p in gone:
  138. self.assertTrue(hasattr(p, 'returncode'))
  139. @unittest.skipIf(PYPY and WINDOWS,
  140. "spawn_testproc() unreliable on PYPY + WINDOWS")
  141. def test_wait_procs_no_timeout(self):
  142. sproc1 = self.spawn_testproc()
  143. sproc2 = self.spawn_testproc()
  144. sproc3 = self.spawn_testproc()
  145. procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
  146. for p in procs:
  147. p.terminate()
  148. gone, alive = psutil.wait_procs(procs)
  149. def test_pid_exists(self):
  150. sproc = self.spawn_testproc()
  151. self.assertTrue(psutil.pid_exists(sproc.pid))
  152. p = psutil.Process(sproc.pid)
  153. p.kill()
  154. p.wait()
  155. self.assertFalse(psutil.pid_exists(sproc.pid))
  156. self.assertFalse(psutil.pid_exists(-1))
  157. self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
  158. def test_pid_exists_2(self):
  159. pids = psutil.pids()
  160. for pid in pids:
  161. try:
  162. assert psutil.pid_exists(pid)
  163. except AssertionError:
  164. # in case the process disappeared in meantime fail only
  165. # if it is no longer in psutil.pids()
  166. time.sleep(.1)
  167. if pid in psutil.pids():
  168. self.fail(pid)
  169. pids = range(max(pids) + 5000, max(pids) + 6000)
  170. for pid in pids:
  171. self.assertFalse(psutil.pid_exists(pid), msg=pid)
  172. class TestMiscAPIs(PsutilTestCase):
  173. def test_boot_time(self):
  174. bt = psutil.boot_time()
  175. self.assertIsInstance(bt, float)
  176. self.assertGreater(bt, 0)
  177. self.assertLess(bt, time.time())
  178. @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI")
  179. def test_users(self):
  180. users = psutil.users()
  181. self.assertNotEqual(users, [])
  182. for user in users:
  183. assert user.name, user
  184. self.assertIsInstance(user.name, str)
  185. self.assertIsInstance(user.terminal, (str, type(None)))
  186. if user.host is not None:
  187. self.assertIsInstance(user.host, (str, type(None)))
  188. user.terminal
  189. user.host
  190. assert user.started > 0.0, user
  191. datetime.datetime.fromtimestamp(user.started)
  192. if WINDOWS or OPENBSD:
  193. self.assertIsNone(user.pid)
  194. else:
  195. psutil.Process(user.pid)
  196. @unittest.skipIf(not POSIX, 'POSIX only')
  197. def test_PAGESIZE(self):
  198. # pagesize is used internally to perform different calculations
  199. # and it's determined by using SC_PAGE_SIZE; make sure
  200. # getpagesize() returns the same value.
  201. import resource
  202. self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
  203. def test_test(self):
  204. # test for psutil.test() function
  205. stdout = sys.stdout
  206. sys.stdout = DEVNULL
  207. try:
  208. psutil.test()
  209. finally:
  210. sys.stdout = stdout
  211. def test_os_constants(self):
  212. names = ["POSIX", "WINDOWS", "LINUX", "MACOS", "FREEBSD", "OPENBSD",
  213. "NETBSD", "BSD", "SUNOS"]
  214. for name in names:
  215. self.assertIsInstance(getattr(psutil, name), bool, msg=name)
  216. if os.name == 'posix':
  217. assert psutil.POSIX
  218. assert not psutil.WINDOWS
  219. names.remove("POSIX")
  220. if "linux" in sys.platform.lower():
  221. assert psutil.LINUX
  222. names.remove("LINUX")
  223. elif "bsd" in sys.platform.lower():
  224. assert psutil.BSD
  225. self.assertEqual([psutil.FREEBSD, psutil.OPENBSD,
  226. psutil.NETBSD].count(True), 1)
  227. names.remove("BSD")
  228. names.remove("FREEBSD")
  229. names.remove("OPENBSD")
  230. names.remove("NETBSD")
  231. elif "sunos" in sys.platform.lower() or \
  232. "solaris" in sys.platform.lower():
  233. assert psutil.SUNOS
  234. names.remove("SUNOS")
  235. elif "darwin" in sys.platform.lower():
  236. assert psutil.MACOS
  237. names.remove("MACOS")
  238. else:
  239. assert psutil.WINDOWS
  240. assert not psutil.POSIX
  241. names.remove("WINDOWS")
  242. # assert all other constants are set to False
  243. for name in names:
  244. self.assertIs(getattr(psutil, name), False, msg=name)
  245. class TestMemoryAPIs(PsutilTestCase):
  246. def test_virtual_memory(self):
  247. mem = psutil.virtual_memory()
  248. assert mem.total > 0, mem
  249. assert mem.available > 0, mem
  250. assert 0 <= mem.percent <= 100, mem
  251. assert mem.used > 0, mem
  252. assert mem.free >= 0, mem
  253. for name in mem._fields:
  254. value = getattr(mem, name)
  255. if name != 'percent':
  256. self.assertIsInstance(value, (int, long))
  257. if name != 'total':
  258. if not value >= 0:
  259. self.fail("%r < 0 (%s)" % (name, value))
  260. if value > mem.total:
  261. self.fail("%r > total (total=%s, %s=%s)"
  262. % (name, mem.total, name, value))
  263. def test_swap_memory(self):
  264. mem = psutil.swap_memory()
  265. self.assertEqual(
  266. mem._fields, ('total', 'used', 'free', 'percent', 'sin', 'sout'))
  267. assert mem.total >= 0, mem
  268. assert mem.used >= 0, mem
  269. if mem.total > 0:
  270. # likely a system with no swap partition
  271. assert mem.free > 0, mem
  272. else:
  273. assert mem.free == 0, mem
  274. assert 0 <= mem.percent <= 100, mem
  275. assert mem.sin >= 0, mem
  276. assert mem.sout >= 0, mem
  277. class TestCpuAPIs(PsutilTestCase):
  278. def test_cpu_count_logical(self):
  279. logical = psutil.cpu_count()
  280. self.assertIsNotNone(logical)
  281. self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
  282. self.assertGreaterEqual(logical, 1)
  283. #
  284. if os.path.exists("/proc/cpuinfo"):
  285. with open("/proc/cpuinfo") as fd:
  286. cpuinfo_data = fd.read()
  287. if "physical id" not in cpuinfo_data:
  288. raise unittest.SkipTest("cpuinfo doesn't include physical id")
  289. def test_cpu_count_physical(self):
  290. logical = psutil.cpu_count()
  291. physical = psutil.cpu_count(logical=False)
  292. if physical is None:
  293. raise self.skipTest("physical cpu_count() is None")
  294. if WINDOWS and sys.getwindowsversion()[:2] <= (6, 1): # <= Vista
  295. self.assertIsNone(physical)
  296. else:
  297. self.assertGreaterEqual(physical, 1)
  298. self.assertGreaterEqual(logical, physical)
  299. def test_cpu_count_none(self):
  300. # https://github.com/giampaolo/psutil/issues/1085
  301. for val in (-1, 0, None):
  302. with mock.patch('psutil._psplatform.cpu_count_logical',
  303. return_value=val) as m:
  304. self.assertIsNone(psutil.cpu_count())
  305. assert m.called
  306. with mock.patch('psutil._psplatform.cpu_count_physical',
  307. return_value=val) as m:
  308. self.assertIsNone(psutil.cpu_count(logical=False))
  309. assert m.called
  310. def test_cpu_times(self):
  311. # Check type, value >= 0, str().
  312. total = 0
  313. times = psutil.cpu_times()
  314. sum(times)
  315. for cp_time in times:
  316. self.assertIsInstance(cp_time, float)
  317. self.assertGreaterEqual(cp_time, 0.0)
  318. total += cp_time
  319. self.assertEqual(total, sum(times))
  320. str(times)
  321. # CPU times are always supposed to increase over time
  322. # or at least remain the same and that's because time
  323. # cannot go backwards.
  324. # Surprisingly sometimes this might not be the case (at
  325. # least on Windows and Linux), see:
  326. # https://github.com/giampaolo/psutil/issues/392
  327. # https://github.com/giampaolo/psutil/issues/645
  328. # if not WINDOWS:
  329. # last = psutil.cpu_times()
  330. # for x in range(100):
  331. # new = psutil.cpu_times()
  332. # for field in new._fields:
  333. # new_t = getattr(new, field)
  334. # last_t = getattr(last, field)
  335. # self.assertGreaterEqual(new_t, last_t,
  336. # msg="%s %s" % (new_t, last_t))
  337. # last = new
  338. def test_cpu_times_time_increases(self):
  339. # Make sure time increases between calls.
  340. t1 = sum(psutil.cpu_times())
  341. stop_at = time.time() + GLOBAL_TIMEOUT
  342. while time.time() < stop_at:
  343. t2 = sum(psutil.cpu_times())
  344. if t2 > t1:
  345. return
  346. self.fail("time remained the same")
  347. def test_per_cpu_times(self):
  348. # Check type, value >= 0, str().
  349. for times in psutil.cpu_times(percpu=True):
  350. total = 0
  351. sum(times)
  352. for cp_time in times:
  353. self.assertIsInstance(cp_time, float)
  354. self.assertGreaterEqual(cp_time, 0.0)
  355. total += cp_time
  356. self.assertEqual(total, sum(times))
  357. str(times)
  358. self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
  359. len(psutil.cpu_times(percpu=False)))
  360. # Note: in theory CPU times are always supposed to increase over
  361. # time or remain the same but never go backwards. In practice
  362. # sometimes this is not the case.
  363. # This issue seemd to be afflict Windows:
  364. # https://github.com/giampaolo/psutil/issues/392
  365. # ...but it turns out also Linux (rarely) behaves the same.
  366. # last = psutil.cpu_times(percpu=True)
  367. # for x in range(100):
  368. # new = psutil.cpu_times(percpu=True)
  369. # for index in range(len(new)):
  370. # newcpu = new[index]
  371. # lastcpu = last[index]
  372. # for field in newcpu._fields:
  373. # new_t = getattr(newcpu, field)
  374. # last_t = getattr(lastcpu, field)
  375. # self.assertGreaterEqual(
  376. # new_t, last_t, msg="%s %s" % (lastcpu, newcpu))
  377. # last = new
  378. def test_per_cpu_times_2(self):
  379. # Simulate some work load then make sure time have increased
  380. # between calls.
  381. tot1 = psutil.cpu_times(percpu=True)
  382. giveup_at = time.time() + GLOBAL_TIMEOUT
  383. while True:
  384. if time.time() >= giveup_at:
  385. return self.fail("timeout")
  386. tot2 = psutil.cpu_times(percpu=True)
  387. for t1, t2 in zip(tot1, tot2):
  388. t1, t2 = psutil._cpu_busy_time(t1), psutil._cpu_busy_time(t2)
  389. difference = t2 - t1
  390. if difference >= 0.05:
  391. return
  392. def test_cpu_times_comparison(self):
  393. # Make sure the sum of all per cpu times is almost equal to
  394. # base "one cpu" times.
  395. base = psutil.cpu_times()
  396. per_cpu = psutil.cpu_times(percpu=True)
  397. summed_values = base._make([sum(num) for num in zip(*per_cpu)])
  398. for field in base._fields:
  399. self.assertAlmostEqual(
  400. getattr(base, field), getattr(summed_values, field), delta=1)
  401. def _test_cpu_percent(self, percent, last_ret, new_ret):
  402. try:
  403. self.assertIsInstance(percent, float)
  404. self.assertGreaterEqual(percent, 0.0)
  405. self.assertIsNot(percent, -0.0)
  406. self.assertLessEqual(percent, 100.0 * psutil.cpu_count())
  407. except AssertionError as err:
  408. raise AssertionError("\n%s\nlast=%s\nnew=%s" % (
  409. err, pprint.pformat(last_ret), pprint.pformat(new_ret)))
  410. def test_cpu_percent(self):
  411. last = psutil.cpu_percent(interval=0.001)
  412. for x in range(100):
  413. new = psutil.cpu_percent(interval=None)
  414. self._test_cpu_percent(new, last, new)
  415. last = new
  416. with self.assertRaises(ValueError):
  417. psutil.cpu_percent(interval=-1)
  418. def test_per_cpu_percent(self):
  419. last = psutil.cpu_percent(interval=0.001, percpu=True)
  420. self.assertEqual(len(last), psutil.cpu_count())
  421. for x in range(100):
  422. new = psutil.cpu_percent(interval=None, percpu=True)
  423. for percent in new:
  424. self._test_cpu_percent(percent, last, new)
  425. last = new
  426. with self.assertRaises(ValueError):
  427. psutil.cpu_percent(interval=-1, percpu=True)
  428. def test_cpu_times_percent(self):
  429. last = psutil.cpu_times_percent(interval=0.001)
  430. for x in range(100):
  431. new = psutil.cpu_times_percent(interval=None)
  432. for percent in new:
  433. self._test_cpu_percent(percent, last, new)
  434. self._test_cpu_percent(sum(new), last, new)
  435. last = new
  436. with self.assertRaises(ValueError):
  437. psutil.cpu_times_percent(interval=-1)
  438. def test_per_cpu_times_percent(self):
  439. last = psutil.cpu_times_percent(interval=0.001, percpu=True)
  440. self.assertEqual(len(last), psutil.cpu_count())
  441. for x in range(100):
  442. new = psutil.cpu_times_percent(interval=None, percpu=True)
  443. for cpu in new:
  444. for percent in cpu:
  445. self._test_cpu_percent(percent, last, new)
  446. self._test_cpu_percent(sum(cpu), last, new)
  447. last = new
  448. def test_per_cpu_times_percent_negative(self):
  449. # see: https://github.com/giampaolo/psutil/issues/645
  450. psutil.cpu_times_percent(percpu=True)
  451. zero_times = [x._make([0 for x in range(len(x._fields))])
  452. for x in psutil.cpu_times(percpu=True)]
  453. with mock.patch('psutil.cpu_times', return_value=zero_times):
  454. for cpu in psutil.cpu_times_percent(percpu=True):
  455. for percent in cpu:
  456. self._test_cpu_percent(percent, None, None)
  457. def test_cpu_stats(self):
  458. # Tested more extensively in per-platform test modules.
  459. infos = psutil.cpu_stats()
  460. self.assertEqual(
  461. infos._fields,
  462. ('ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls'))
  463. for name in infos._fields:
  464. value = getattr(infos, name)
  465. self.assertGreaterEqual(value, 0)
  466. # on AIX, ctx_switches is always 0
  467. if not AIX and name in ('ctx_switches', 'interrupts'):
  468. self.assertGreater(value, 0)
  469. @unittest.skipIf(not HAS_CPU_FREQ, "not suported")
  470. def test_cpu_freq(self):
  471. def check_ls(ls):
  472. for nt in ls:
  473. self.assertEqual(nt._fields, ('current', 'min', 'max'))
  474. if nt.max != 0.0:
  475. self.assertLessEqual(nt.current, nt.max)
  476. for name in nt._fields:
  477. value = getattr(nt, name)
  478. self.assertIsInstance(value, (int, long, float))
  479. self.assertGreaterEqual(value, 0)
  480. ls = psutil.cpu_freq(percpu=True)
  481. if TRAVIS and not ls:
  482. raise self.skipTest("skipped on Travis")
  483. if FREEBSD and not ls:
  484. raise self.skipTest("returns empty list on FreeBSD")
  485. assert ls, ls
  486. check_ls([psutil.cpu_freq(percpu=False)])
  487. if LINUX:
  488. self.assertEqual(len(ls), psutil.cpu_count())
  489. @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
  490. def test_getloadavg(self):
  491. loadavg = psutil.getloadavg()
  492. self.assertEqual(len(loadavg), 3)
  493. for load in loadavg:
  494. self.assertIsInstance(load, float)
  495. self.assertGreaterEqual(load, 0.0)
  496. class TestDiskAPIs(PsutilTestCase):
  497. @unittest.skipIf(PYPY and not IS_64BIT, "unreliable on PYPY32 + 32BIT")
  498. def test_disk_usage(self):
  499. usage = psutil.disk_usage(os.getcwd())
  500. self.assertEqual(usage._fields, ('total', 'used', 'free', 'percent'))
  501. assert usage.total > 0, usage
  502. assert usage.used > 0, usage
  503. assert usage.free > 0, usage
  504. assert usage.total > usage.used, usage
  505. assert usage.total > usage.free, usage
  506. assert 0 <= usage.percent <= 100, usage.percent
  507. if hasattr(shutil, 'disk_usage'):
  508. # py >= 3.3, see: http://bugs.python.org/issue12442
  509. shutil_usage = shutil.disk_usage(os.getcwd())
  510. tolerance = 5 * 1024 * 1024 # 5MB
  511. self.assertEqual(usage.total, shutil_usage.total)
  512. self.assertAlmostEqual(usage.free, shutil_usage.free,
  513. delta=tolerance)
  514. self.assertAlmostEqual(usage.used, shutil_usage.used,
  515. delta=tolerance)
  516. # if path does not exist OSError ENOENT is expected across
  517. # all platforms
  518. fname = self.get_testfn()
  519. with self.assertRaises(FileNotFoundError):
  520. psutil.disk_usage(fname)
  521. @unittest.skipIf(not ASCII_FS, "not an ASCII fs")
  522. def test_disk_usage_unicode(self):
  523. # See: https://github.com/giampaolo/psutil/issues/416
  524. with self.assertRaises(UnicodeEncodeError):
  525. psutil.disk_usage(UNICODE_SUFFIX)
  526. def test_disk_usage_bytes(self):
  527. psutil.disk_usage(b'.')
  528. def test_disk_partitions(self):
  529. # all = False
  530. ls = psutil.disk_partitions(all=False)
  531. # on travis we get:
  532. # self.assertEqual(p.cpu_affinity(), [n])
  533. # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
  534. self.assertTrue(ls, msg=ls)
  535. for disk in ls:
  536. self.assertIsInstance(disk.device, str)
  537. self.assertIsInstance(disk.mountpoint, str)
  538. self.assertIsInstance(disk.fstype, str)
  539. self.assertIsInstance(disk.opts, str)
  540. if WINDOWS and 'cdrom' in disk.opts:
  541. continue
  542. if not POSIX:
  543. assert os.path.exists(disk.device), disk
  544. else:
  545. # we cannot make any assumption about this, see:
  546. # http://goo.gl/p9c43
  547. disk.device
  548. # on modern systems mount points can also be files
  549. assert os.path.exists(disk.mountpoint), disk
  550. assert disk.fstype, disk
  551. # all = True
  552. ls = psutil.disk_partitions(all=True)
  553. self.assertTrue(ls, msg=ls)
  554. for disk in psutil.disk_partitions(all=True):
  555. if not WINDOWS and disk.mountpoint:
  556. try:
  557. os.stat(disk.mountpoint)
  558. except OSError as err:
  559. if (GITHUB_WHEELS or TRAVIS) and \
  560. MACOS and err.errno == errno.EIO:
  561. continue
  562. # http://mail.python.org/pipermail/python-dev/
  563. # 2012-June/120787.html
  564. if err.errno not in (errno.EPERM, errno.EACCES):
  565. raise
  566. else:
  567. assert os.path.exists(disk.mountpoint), disk
  568. self.assertIsInstance(disk.fstype, str)
  569. self.assertIsInstance(disk.opts, str)
  570. def find_mount_point(path):
  571. path = os.path.abspath(path)
  572. while not os.path.ismount(path):
  573. path = os.path.dirname(path)
  574. return path.lower()
  575. mount = find_mount_point(__file__)
  576. mounts = [x.mountpoint.lower() for x in
  577. psutil.disk_partitions(all=True) if x.mountpoint]
  578. self.assertIn(mount, mounts)
  579. psutil.disk_usage(mount)
  580. @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
  581. '/proc/diskstats not available on this linux version')
  582. @unittest.skipIf(CI_TESTING and not psutil.disk_io_counters(),
  583. "unreliable on CI") # no visible disks
  584. def test_disk_io_counters(self):
  585. def check_ntuple(nt):
  586. self.assertEqual(nt[0], nt.read_count)
  587. self.assertEqual(nt[1], nt.write_count)
  588. self.assertEqual(nt[2], nt.read_bytes)
  589. self.assertEqual(nt[3], nt.write_bytes)
  590. if not (OPENBSD or NETBSD):
  591. self.assertEqual(nt[4], nt.read_time)
  592. self.assertEqual(nt[5], nt.write_time)
  593. if LINUX:
  594. self.assertEqual(nt[6], nt.read_merged_count)
  595. self.assertEqual(nt[7], nt.write_merged_count)
  596. self.assertEqual(nt[8], nt.busy_time)
  597. elif FREEBSD:
  598. self.assertEqual(nt[6], nt.busy_time)
  599. for name in nt._fields:
  600. assert getattr(nt, name) >= 0, nt
  601. ret = psutil.disk_io_counters(perdisk=False)
  602. assert ret is not None, "no disks on this system?"
  603. check_ntuple(ret)
  604. ret = psutil.disk_io_counters(perdisk=True)
  605. # make sure there are no duplicates
  606. self.assertEqual(len(ret), len(set(ret)))
  607. for key in ret:
  608. assert key, key
  609. check_ntuple(ret[key])
  610. def test_disk_io_counters_no_disks(self):
  611. # Emulate a case where no disks are installed, see:
  612. # https://github.com/giampaolo/psutil/issues/1062
  613. with mock.patch('psutil._psplatform.disk_io_counters',
  614. return_value={}) as m:
  615. self.assertIsNone(psutil.disk_io_counters(perdisk=False))
  616. self.assertEqual(psutil.disk_io_counters(perdisk=True), {})
  617. assert m.called
  618. class TestNetAPIs(PsutilTestCase):
  619. @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
  620. def test_net_io_counters(self):
  621. def check_ntuple(nt):
  622. self.assertEqual(nt[0], nt.bytes_sent)
  623. self.assertEqual(nt[1], nt.bytes_recv)
  624. self.assertEqual(nt[2], nt.packets_sent)
  625. self.assertEqual(nt[3], nt.packets_recv)
  626. self.assertEqual(nt[4], nt.errin)
  627. self.assertEqual(nt[5], nt.errout)
  628. self.assertEqual(nt[6], nt.dropin)
  629. self.assertEqual(nt[7], nt.dropout)
  630. assert nt.bytes_sent >= 0, nt
  631. assert nt.bytes_recv >= 0, nt
  632. assert nt.packets_sent >= 0, nt
  633. assert nt.packets_recv >= 0, nt
  634. assert nt.errin >= 0, nt
  635. assert nt.errout >= 0, nt
  636. assert nt.dropin >= 0, nt
  637. assert nt.dropout >= 0, nt
  638. ret = psutil.net_io_counters(pernic=False)
  639. check_ntuple(ret)
  640. ret = psutil.net_io_counters(pernic=True)
  641. self.assertNotEqual(ret, [])
  642. for key in ret:
  643. self.assertTrue(key)
  644. self.assertIsInstance(key, str)
  645. check_ntuple(ret[key])
  646. @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
  647. def test_net_io_counters_no_nics(self):
  648. # Emulate a case where no NICs are installed, see:
  649. # https://github.com/giampaolo/psutil/issues/1062
  650. with mock.patch('psutil._psplatform.net_io_counters',
  651. return_value={}) as m:
  652. self.assertIsNone(psutil.net_io_counters(pernic=False))
  653. self.assertEqual(psutil.net_io_counters(pernic=True), {})
  654. assert m.called
  655. def test_net_if_addrs(self):
  656. nics = psutil.net_if_addrs()
  657. assert nics, nics
  658. nic_stats = psutil.net_if_stats()
  659. # Not reliable on all platforms (net_if_addrs() reports more
  660. # interfaces).
  661. # self.assertEqual(sorted(nics.keys()),
  662. # sorted(psutil.net_io_counters(pernic=True).keys()))
  663. families = set([socket.AF_INET, socket.AF_INET6, psutil.AF_LINK])
  664. for nic, addrs in nics.items():
  665. self.assertIsInstance(nic, str)
  666. self.assertEqual(len(set(addrs)), len(addrs))
  667. for addr in addrs:
  668. self.assertIsInstance(addr.family, int)
  669. self.assertIsInstance(addr.address, str)
  670. self.assertIsInstance(addr.netmask, (str, type(None)))
  671. self.assertIsInstance(addr.broadcast, (str, type(None)))
  672. self.assertIn(addr.family, families)
  673. if sys.version_info >= (3, 4) and not PYPY:
  674. self.assertIsInstance(addr.family, enum.IntEnum)
  675. if nic_stats[nic].isup:
  676. # Do not test binding to addresses of interfaces
  677. # that are down
  678. if addr.family == socket.AF_INET:
  679. s = socket.socket(addr.family)
  680. with contextlib.closing(s):
  681. s.bind((addr.address, 0))
  682. elif addr.family == socket.AF_INET6:
  683. info = socket.getaddrinfo(
  684. addr.address, 0, socket.AF_INET6,
  685. socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
  686. af, socktype, proto, canonname, sa = info
  687. s = socket.socket(af, socktype, proto)
  688. with contextlib.closing(s):
  689. s.bind(sa)
  690. for ip in (addr.address, addr.netmask, addr.broadcast,
  691. addr.ptp):
  692. if ip is not None:
  693. # TODO: skip AF_INET6 for now because I get:
  694. # AddressValueError: Only hex digits permitted in
  695. # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
  696. if addr.family != socket.AF_INET6:
  697. check_net_address(ip, addr.family)
  698. # broadcast and ptp addresses are mutually exclusive
  699. if addr.broadcast:
  700. self.assertIsNone(addr.ptp)
  701. elif addr.ptp:
  702. self.assertIsNone(addr.broadcast)
  703. if BSD or MACOS or SUNOS:
  704. if hasattr(socket, "AF_LINK"):
  705. self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
  706. elif LINUX:
  707. self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
  708. elif WINDOWS:
  709. self.assertEqual(psutil.AF_LINK, -1)
  710. def test_net_if_addrs_mac_null_bytes(self):
  711. # Simulate that the underlying C function returns an incomplete
  712. # MAC address. psutil is supposed to fill it with null bytes.
  713. # https://github.com/giampaolo/psutil/issues/786
  714. if POSIX:
  715. ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)]
  716. else:
  717. ret = [('em1', -1, '06-3d-29', None, None, None)]
  718. with mock.patch('psutil._psplatform.net_if_addrs',
  719. return_value=ret) as m:
  720. addr = psutil.net_if_addrs()['em1'][0]
  721. assert m.called
  722. if POSIX:
  723. self.assertEqual(addr.address, '06:3d:29:00:00:00')
  724. else:
  725. self.assertEqual(addr.address, '06-3d-29-00-00-00')
  726. @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") # raises EPERM
  727. def test_net_if_stats(self):
  728. nics = psutil.net_if_stats()
  729. assert nics, nics
  730. all_duplexes = (psutil.NIC_DUPLEX_FULL,
  731. psutil.NIC_DUPLEX_HALF,
  732. psutil.NIC_DUPLEX_UNKNOWN)
  733. for name, stats in nics.items():
  734. self.assertIsInstance(name, str)
  735. isup, duplex, speed, mtu = stats
  736. self.assertIsInstance(isup, bool)
  737. self.assertIn(duplex, all_duplexes)
  738. self.assertIn(duplex, all_duplexes)
  739. self.assertGreaterEqual(speed, 0)
  740. self.assertGreaterEqual(mtu, 0)
  741. @unittest.skipIf(not (LINUX or BSD or MACOS),
  742. "LINUX or BSD or MACOS specific")
  743. def test_net_if_stats_enodev(self):
  744. # See: https://github.com/giampaolo/psutil/issues/1279
  745. with mock.patch('psutil._psutil_posix.net_if_mtu',
  746. side_effect=OSError(errno.ENODEV, "")) as m:
  747. ret = psutil.net_if_stats()
  748. self.assertEqual(ret, {})
  749. assert m.called
  750. class TestSensorsAPIs(PsutilTestCase):
  751. @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
  752. def test_sensors_temperatures(self):
  753. temps = psutil.sensors_temperatures()
  754. for name, entries in temps.items():
  755. self.assertIsInstance(name, str)
  756. for entry in entries:
  757. self.assertIsInstance(entry.label, str)
  758. if entry.current is not None:
  759. self.assertGreaterEqual(entry.current, 0)
  760. if entry.high is not None:
  761. self.assertGreaterEqual(entry.high, 0)
  762. if entry.critical is not None:
  763. self.assertGreaterEqual(entry.critical, 0)
  764. @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
  765. def test_sensors_temperatures_fahreneit(self):
  766. d = {'coretemp': [('label', 50.0, 60.0, 70.0)]}
  767. with mock.patch("psutil._psplatform.sensors_temperatures",
  768. return_value=d) as m:
  769. temps = psutil.sensors_temperatures(
  770. fahrenheit=True)['coretemp'][0]
  771. assert m.called
  772. self.assertEqual(temps.current, 122.0)
  773. self.assertEqual(temps.high, 140.0)
  774. self.assertEqual(temps.critical, 158.0)
  775. @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
  776. @unittest.skipIf(not HAS_BATTERY, "no battery")
  777. def test_sensors_battery(self):
  778. ret = psutil.sensors_battery()
  779. self.assertGreaterEqual(ret.percent, 0)
  780. self.assertLessEqual(ret.percent, 100)
  781. if ret.secsleft not in (psutil.POWER_TIME_UNKNOWN,
  782. psutil.POWER_TIME_UNLIMITED):
  783. self.assertGreaterEqual(ret.secsleft, 0)
  784. else:
  785. if ret.secsleft == psutil.POWER_TIME_UNLIMITED:
  786. self.assertTrue(ret.power_plugged)
  787. self.assertIsInstance(ret.power_plugged, bool)
  788. @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
  789. def test_sensors_fans(self):
  790. fans = psutil.sensors_fans()
  791. for name, entries in fans.items():
  792. self.assertIsInstance(name, str)
  793. for entry in entries:
  794. self.assertIsInstance(entry.label, str)
  795. self.assertIsInstance(entry.current, (int, long))
  796. self.assertGreaterEqual(entry.current, 0)
  797. if __name__ == '__main__':
  798. from psutil.tests.runner import run_from_name
  799. run_from_name(__file__)