test_linux.py 88 KB


  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Linux specific tests."""
  6. from __future__ import division
  7. import collections
  8. import contextlib
  9. import errno
  10. import glob
  11. import io
  12. import os
  13. import re
  14. import shutil
  15. import socket
  16. import struct
  17. import textwrap
  18. import time
  19. import warnings
  20. import psutil
  21. from psutil import LINUX
  22. from psutil._compat import basestring
  23. from psutil._compat import FileNotFoundError
  24. from psutil._compat import PY3
  25. from psutil._compat import u
  26. from psutil.tests import call_until
  27. from psutil.tests import GLOBAL_TIMEOUT
  28. from psutil.tests import HAS_BATTERY
  29. from psutil.tests import HAS_CPU_FREQ
  30. from psutil.tests import HAS_GETLOADAVG
  31. from psutil.tests import HAS_RLIMIT
  32. from psutil.tests import mock
  33. from psutil.tests import PsutilTestCase
  34. from psutil.tests import PYPY
  35. from psutil.tests import reload_module
  36. from psutil.tests import retry_on_failure
  37. from psutil.tests import safe_rmpath
  38. from psutil.tests import sh
  39. from psutil.tests import skip_on_not_implemented
  40. from psutil.tests import ThreadTask
  41. from psutil.tests import TOLERANCE_DISK_USAGE
  42. from psutil.tests import TOLERANCE_SYS_MEM
  43. from psutil.tests import TRAVIS
  44. from psutil.tests import unittest
  45. from psutil.tests import which
  46. HERE = os.path.abspath(os.path.dirname(__file__))
  47. SIOCGIFADDR = 0x8915
  48. SIOCGIFCONF = 0x8912
  49. SIOCGIFHWADDR = 0x8927
  50. SIOCGIFNETMASK = 0x891b
  51. SIOCGIFBRDADDR = 0x8919
  52. if LINUX:
  53. SECTOR_SIZE = 512
  54. EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
  55. # =====================================================================
  56. # --- utils
  57. # =====================================================================
  58. def get_ipv4_address(ifname):
  59. import fcntl
  60. ifname = ifname[:15]
  61. if PY3:
  62. ifname = bytes(ifname, 'ascii')
  63. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  64. with contextlib.closing(s):
  65. return socket.inet_ntoa(
  66. fcntl.ioctl(s.fileno(),
  67. SIOCGIFADDR,
  68. struct.pack('256s', ifname))[20:24])
  69. def get_ipv4_netmask(ifname):
  70. import fcntl
  71. ifname = ifname[:15]
  72. if PY3:
  73. ifname = bytes(ifname, 'ascii')
  74. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  75. with contextlib.closing(s):
  76. return socket.inet_ntoa(
  77. fcntl.ioctl(s.fileno(),
  78. SIOCGIFNETMASK,
  79. struct.pack('256s', ifname))[20:24])
  80. def get_ipv4_broadcast(ifname):
  81. import fcntl
  82. ifname = ifname[:15]
  83. if PY3:
  84. ifname = bytes(ifname, 'ascii')
  85. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  86. with contextlib.closing(s):
  87. return socket.inet_ntoa(
  88. fcntl.ioctl(s.fileno(),
  89. SIOCGIFBRDADDR,
  90. struct.pack('256s', ifname))[20:24])
  91. def get_ipv6_address(ifname):
  92. with open("/proc/net/if_inet6", 'rt') as f:
  93. for line in f.readlines():
  94. fields = line.split()
  95. if fields[-1] == ifname:
  96. break
  97. else:
  98. raise ValueError("could not find interface %r" % ifname)
  99. unformatted = fields[0]
  100. groups = []
  101. for i in range(0, len(unformatted), 4):
  102. groups.append(unformatted[i:i + 4])
  103. formatted = ":".join(groups)
  104. packed = socket.inet_pton(socket.AF_INET6, formatted)
  105. return socket.inet_ntop(socket.AF_INET6, packed)
  106. def get_mac_address(ifname):
  107. import fcntl
  108. ifname = ifname[:15]
  109. if PY3:
  110. ifname = bytes(ifname, 'ascii')
  111. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  112. with contextlib.closing(s):
  113. info = fcntl.ioctl(
  114. s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
  115. if PY3:
  116. def ord(x):
  117. return x
  118. else:
  119. import __builtin__
  120. ord = __builtin__.ord
  121. return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
  122. def free_swap():
  123. """Parse 'free' cmd and return swap memory's s total, used and free
  124. values.
  125. """
  126. out = sh('free -b', env={"LANG": "C.UTF-8"})
  127. lines = out.split('\n')
  128. for line in lines:
  129. if line.startswith('Swap'):
  130. _, total, used, free = line.split()
  131. nt = collections.namedtuple('free', 'total used free')
  132. return nt(int(total), int(used), int(free))
  133. raise ValueError(
  134. "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
  135. def free_physmem():
  136. """Parse 'free' cmd and return physical memory's total, used
  137. and free values.
  138. """
  139. # Note: free can have 2 different formats, invalidating 'shared'
  140. # and 'cached' memory which may have different positions so we
  141. # do not return them.
  142. # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
  143. out = sh('free -b', env={"LANG": "C.UTF-8"})
  144. lines = out.split('\n')
  145. for line in lines:
  146. if line.startswith('Mem'):
  147. total, used, free, shared = \
  148. [int(x) for x in line.split()[1:5]]
  149. nt = collections.namedtuple(
  150. 'free', 'total used free shared output')
  151. return nt(total, used, free, shared, out)
  152. raise ValueError(
  153. "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
  154. def vmstat(stat):
  155. out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
  156. for line in out.split("\n"):
  157. line = line.strip()
  158. if stat in line:
  159. return int(line.split(' ')[0])
  160. raise ValueError("can't find %r in 'vmstat' output" % stat)
  161. def get_free_version_info():
  162. out = sh("free -V").strip()
  163. if 'UNKNOWN' in out:
  164. raise unittest.SkipTest("can't determine free version")
  165. return tuple(map(int, out.split()[-1].split('.')))
  166. @contextlib.contextmanager
  167. def mock_open_content(for_path, content):
  168. """Mock open() builtin and forces it to return a certain `content`
  169. on read() if the path being opened matches `for_path`.
  170. """
  171. def open_mock(name, *args, **kwargs):
  172. if name == for_path:
  173. if PY3:
  174. if isinstance(content, basestring):
  175. return io.StringIO(content)
  176. else:
  177. return io.BytesIO(content)
  178. else:
  179. return io.BytesIO(content)
  180. else:
  181. return orig_open(name, *args, **kwargs)
  182. orig_open = open
  183. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  184. with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
  185. yield m
  186. @contextlib.contextmanager
  187. def mock_open_exception(for_path, exc):
  188. """Mock open() builtin and raises `exc` if the path being opened
  189. matches `for_path`.
  190. """
  191. def open_mock(name, *args, **kwargs):
  192. if name == for_path:
  193. raise exc
  194. else:
  195. return orig_open(name, *args, **kwargs)
  196. orig_open = open
  197. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  198. with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
  199. yield m
  200. # =====================================================================
  201. # --- system virtual memory
  202. # =====================================================================
  203. @unittest.skipIf(not LINUX, "LINUX only")
  204. class TestSystemVirtualMemory(PsutilTestCase):
  205. def test_total(self):
  206. # free_value = free_physmem().total
  207. # psutil_value = psutil.virtual_memory().total
  208. # self.assertEqual(free_value, psutil_value)
  209. vmstat_value = vmstat('total memory') * 1024
  210. psutil_value = psutil.virtual_memory().total
  211. self.assertAlmostEqual(vmstat_value, psutil_value)
  212. @retry_on_failure()
  213. def test_used(self):
  214. # Older versions of procps used slab memory to calculate used memory.
  215. # This got changed in:
  216. # https://gitlab.com/procps-ng/procps/commit/
  217. # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e
  218. if get_free_version_info() < (3, 3, 12):
  219. raise self.skipTest("old free version")
  220. free = free_physmem()
  221. free_value = free.used
  222. psutil_value = psutil.virtual_memory().used
  223. self.assertAlmostEqual(
  224. free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
  225. msg='%s %s \n%s' % (free_value, psutil_value, free.output))
  226. @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
  227. @retry_on_failure()
  228. def test_free(self):
  229. vmstat_value = vmstat('free memory') * 1024
  230. psutil_value = psutil.virtual_memory().free
  231. self.assertAlmostEqual(
  232. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  233. @retry_on_failure()
  234. def test_buffers(self):
  235. vmstat_value = vmstat('buffer memory') * 1024
  236. psutil_value = psutil.virtual_memory().buffers
  237. self.assertAlmostEqual(
  238. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  239. # https://travis-ci.org/giampaolo/psutil/jobs/226719664
  240. @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
  241. @retry_on_failure()
  242. def test_active(self):
  243. vmstat_value = vmstat('active memory') * 1024
  244. psutil_value = psutil.virtual_memory().active
  245. self.assertAlmostEqual(
  246. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  247. # https://travis-ci.org/giampaolo/psutil/jobs/227242952
  248. @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
  249. @retry_on_failure()
  250. def test_inactive(self):
  251. vmstat_value = vmstat('inactive memory') * 1024
  252. psutil_value = psutil.virtual_memory().inactive
  253. self.assertAlmostEqual(
  254. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  255. @retry_on_failure()
  256. def test_shared(self):
  257. free = free_physmem()
  258. free_value = free.shared
  259. if free_value == 0:
  260. raise unittest.SkipTest("free does not support 'shared' column")
  261. psutil_value = psutil.virtual_memory().shared
  262. self.assertAlmostEqual(
  263. free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
  264. msg='%s %s \n%s' % (free_value, psutil_value, free.output))
  265. @retry_on_failure()
  266. def test_available(self):
  267. # "free" output format has changed at some point:
  268. # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
  269. out = sh("free -b")
  270. lines = out.split('\n')
  271. if 'available' not in lines[0]:
  272. raise unittest.SkipTest("free does not support 'available' column")
  273. else:
  274. free_value = int(lines[1].split()[-1])
  275. psutil_value = psutil.virtual_memory().available
  276. self.assertAlmostEqual(
  277. free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
  278. msg='%s %s \n%s' % (free_value, psutil_value, out))
  279. def test_warnings_on_misses(self):
  280. # Emulate a case where /proc/meminfo provides few info.
  281. # psutil is supposed to set the missing fields to 0 and
  282. # raise a warning.
  283. with mock_open_content(
  284. '/proc/meminfo',
  285. textwrap.dedent("""\
  286. Active(anon): 6145416 kB
  287. Active(file): 2950064 kB
  288. Inactive(anon): 574764 kB
  289. Inactive(file): 1567648 kB
  290. MemAvailable: -1 kB
  291. MemFree: 2057400 kB
  292. MemTotal: 16325648 kB
  293. SReclaimable: 346648 kB
  294. """).encode()) as m:
  295. with warnings.catch_warnings(record=True) as ws:
  296. warnings.simplefilter("always")
  297. ret = psutil.virtual_memory()
  298. assert m.called
  299. self.assertEqual(len(ws), 1)
  300. w = ws[0]
  301. assert w.filename.endswith('psutil/_pslinux.py')
  302. self.assertIn(
  303. "memory stats couldn't be determined", str(w.message))
  304. self.assertIn("cached", str(w.message))
  305. self.assertIn("shared", str(w.message))
  306. self.assertIn("active", str(w.message))
  307. self.assertIn("inactive", str(w.message))
  308. self.assertIn("buffers", str(w.message))
  309. self.assertIn("available", str(w.message))
  310. self.assertEqual(ret.cached, 0)
  311. self.assertEqual(ret.active, 0)
  312. self.assertEqual(ret.inactive, 0)
  313. self.assertEqual(ret.shared, 0)
  314. self.assertEqual(ret.buffers, 0)
  315. self.assertEqual(ret.available, 0)
  316. self.assertEqual(ret.slab, 0)
  317. @retry_on_failure()
  318. def test_avail_old_percent(self):
  319. # Make sure that our calculation of avail mem for old kernels
  320. # is off by max 15%.
  321. from psutil._pslinux import calculate_avail_vmem
  322. from psutil._pslinux import open_binary
  323. mems = {}
  324. with open_binary('/proc/meminfo') as f:
  325. for line in f:
  326. fields = line.split()
  327. mems[fields[0]] = int(fields[1]) * 1024
  328. a = calculate_avail_vmem(mems)
  329. if b'MemAvailable:' in mems:
  330. b = mems[b'MemAvailable:']
  331. diff_percent = abs(a - b) / a * 100
  332. self.assertLess(diff_percent, 15)
  333. def test_avail_old_comes_from_kernel(self):
  334. # Make sure "MemAvailable:" coluimn is used instead of relying
  335. # on our internal algorithm to calculate avail mem.
  336. with mock_open_content(
  337. '/proc/meminfo',
  338. textwrap.dedent("""\
  339. Active: 9444728 kB
  340. Active(anon): 6145416 kB
  341. Active(file): 2950064 kB
  342. Buffers: 287952 kB
  343. Cached: 4818144 kB
  344. Inactive(file): 1578132 kB
  345. Inactive(anon): 574764 kB
  346. Inactive(file): 1567648 kB
  347. MemAvailable: 6574984 kB
  348. MemFree: 2057400 kB
  349. MemTotal: 16325648 kB
  350. Shmem: 577588 kB
  351. SReclaimable: 346648 kB
  352. """).encode()) as m:
  353. with warnings.catch_warnings(record=True) as ws:
  354. ret = psutil.virtual_memory()
  355. assert m.called
  356. self.assertEqual(ret.available, 6574984 * 1024)
  357. w = ws[0]
  358. self.assertIn(
  359. "inactive memory stats couldn't be determined", str(w.message))
  360. def test_avail_old_missing_fields(self):
  361. # Remove Active(file), Inactive(file) and SReclaimable
  362. # from /proc/meminfo and make sure the fallback is used
  363. # (free + cached),
  364. with mock_open_content(
  365. "/proc/meminfo",
  366. textwrap.dedent("""\
  367. Active: 9444728 kB
  368. Active(anon): 6145416 kB
  369. Buffers: 287952 kB
  370. Cached: 4818144 kB
  371. Inactive(file): 1578132 kB
  372. Inactive(anon): 574764 kB
  373. MemFree: 2057400 kB
  374. MemTotal: 16325648 kB
  375. Shmem: 577588 kB
  376. """).encode()) as m:
  377. with warnings.catch_warnings(record=True) as ws:
  378. ret = psutil.virtual_memory()
  379. assert m.called
  380. self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
  381. w = ws[0]
  382. self.assertIn(
  383. "inactive memory stats couldn't be determined", str(w.message))
  384. def test_avail_old_missing_zoneinfo(self):
  385. # Remove /proc/zoneinfo file. Make sure fallback is used
  386. # (free + cached).
  387. with mock_open_content(
  388. "/proc/meminfo",
  389. textwrap.dedent("""\
  390. Active: 9444728 kB
  391. Active(anon): 6145416 kB
  392. Active(file): 2950064 kB
  393. Buffers: 287952 kB
  394. Cached: 4818144 kB
  395. Inactive(file): 1578132 kB
  396. Inactive(anon): 574764 kB
  397. Inactive(file): 1567648 kB
  398. MemFree: 2057400 kB
  399. MemTotal: 16325648 kB
  400. Shmem: 577588 kB
  401. SReclaimable: 346648 kB
  402. """).encode()):
  403. with mock_open_exception(
  404. "/proc/zoneinfo",
  405. IOError(errno.ENOENT, 'no such file or directory')):
  406. with warnings.catch_warnings(record=True) as ws:
  407. ret = psutil.virtual_memory()
  408. self.assertEqual(
  409. ret.available, 2057400 * 1024 + 4818144 * 1024)
  410. w = ws[0]
  411. self.assertIn(
  412. "inactive memory stats couldn't be determined",
  413. str(w.message))
  414. def test_virtual_memory_mocked(self):
  415. # Emulate /proc/meminfo because neither vmstat nor free return slab.
  416. def open_mock(name, *args, **kwargs):
  417. if name == '/proc/meminfo':
  418. return io.BytesIO(textwrap.dedent("""\
  419. MemTotal: 100 kB
  420. MemFree: 2 kB
  421. MemAvailable: 3 kB
  422. Buffers: 4 kB
  423. Cached: 5 kB
  424. SwapCached: 6 kB
  425. Active: 7 kB
  426. Inactive: 8 kB
  427. Active(anon): 9 kB
  428. Inactive(anon): 10 kB
  429. Active(file): 11 kB
  430. Inactive(file): 12 kB
  431. Unevictable: 13 kB
  432. Mlocked: 14 kB
  433. SwapTotal: 15 kB
  434. SwapFree: 16 kB
  435. Dirty: 17 kB
  436. Writeback: 18 kB
  437. AnonPages: 19 kB
  438. Mapped: 20 kB
  439. Shmem: 21 kB
  440. Slab: 22 kB
  441. SReclaimable: 23 kB
  442. SUnreclaim: 24 kB
  443. KernelStack: 25 kB
  444. PageTables: 26 kB
  445. NFS_Unstable: 27 kB
  446. Bounce: 28 kB
  447. WritebackTmp: 29 kB
  448. CommitLimit: 30 kB
  449. Committed_AS: 31 kB
  450. VmallocTotal: 32 kB
  451. VmallocUsed: 33 kB
  452. VmallocChunk: 34 kB
  453. HardwareCorrupted: 35 kB
  454. AnonHugePages: 36 kB
  455. ShmemHugePages: 37 kB
  456. ShmemPmdMapped: 38 kB
  457. CmaTotal: 39 kB
  458. CmaFree: 40 kB
  459. HugePages_Total: 41 kB
  460. HugePages_Free: 42 kB
  461. HugePages_Rsvd: 43 kB
  462. HugePages_Surp: 44 kB
  463. Hugepagesize: 45 kB
  464. DirectMap46k: 46 kB
  465. DirectMap47M: 47 kB
  466. DirectMap48G: 48 kB
  467. """).encode())
  468. else:
  469. return orig_open(name, *args, **kwargs)
  470. orig_open = open
  471. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  472. with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
  473. mem = psutil.virtual_memory()
  474. assert m.called
  475. self.assertEqual(mem.total, 100 * 1024)
  476. self.assertEqual(mem.free, 2 * 1024)
  477. self.assertEqual(mem.buffers, 4 * 1024)
  478. # cached mem also includes reclaimable memory
  479. self.assertEqual(mem.cached, (5 + 23) * 1024)
  480. self.assertEqual(mem.shared, 21 * 1024)
  481. self.assertEqual(mem.active, 7 * 1024)
  482. self.assertEqual(mem.inactive, 8 * 1024)
  483. self.assertEqual(mem.slab, 22 * 1024)
  484. self.assertEqual(mem.available, 3 * 1024)
  485. # =====================================================================
  486. # --- system swap memory
  487. # =====================================================================
  488. @unittest.skipIf(not LINUX, "LINUX only")
  489. class TestSystemSwapMemory(PsutilTestCase):
  490. @staticmethod
  491. def meminfo_has_swap_info():
  492. """Return True if /proc/meminfo provides swap metrics."""
  493. with open("/proc/meminfo") as f:
  494. data = f.read()
  495. return 'SwapTotal:' in data and 'SwapFree:' in data
  496. def test_total(self):
  497. free_value = free_swap().total
  498. psutil_value = psutil.swap_memory().total
  499. return self.assertAlmostEqual(
  500. free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  501. @retry_on_failure()
  502. def test_used(self):
  503. free_value = free_swap().used
  504. psutil_value = psutil.swap_memory().used
  505. return self.assertAlmostEqual(
  506. free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  507. @retry_on_failure()
  508. def test_free(self):
  509. free_value = free_swap().free
  510. psutil_value = psutil.swap_memory().free
  511. return self.assertAlmostEqual(
  512. free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  513. def test_missing_sin_sout(self):
  514. with mock.patch('psutil._common.open', create=True) as m:
  515. with warnings.catch_warnings(record=True) as ws:
  516. warnings.simplefilter("always")
  517. ret = psutil.swap_memory()
  518. assert m.called
  519. self.assertEqual(len(ws), 1)
  520. w = ws[0]
  521. assert w.filename.endswith('psutil/_pslinux.py')
  522. self.assertIn(
  523. "'sin' and 'sout' swap memory stats couldn't "
  524. "be determined", str(w.message))
  525. self.assertEqual(ret.sin, 0)
  526. self.assertEqual(ret.sout, 0)
  527. def test_no_vmstat_mocked(self):
  528. # see https://github.com/giampaolo/psutil/issues/722
  529. with mock_open_exception(
  530. "/proc/vmstat",
  531. IOError(errno.ENOENT, 'no such file or directory')) as m:
  532. with warnings.catch_warnings(record=True) as ws:
  533. warnings.simplefilter("always")
  534. ret = psutil.swap_memory()
  535. assert m.called
  536. self.assertEqual(len(ws), 1)
  537. w = ws[0]
  538. assert w.filename.endswith('psutil/_pslinux.py')
  539. self.assertIn(
  540. "'sin' and 'sout' swap memory stats couldn't "
  541. "be determined and were set to 0",
  542. str(w.message))
  543. self.assertEqual(ret.sin, 0)
  544. self.assertEqual(ret.sout, 0)
  545. def test_meminfo_against_sysinfo(self):
  546. # Make sure the content of /proc/meminfo about swap memory
  547. # matches sysinfo() syscall, see:
  548. # https://github.com/giampaolo/psutil/issues/1015
  549. if not self.meminfo_has_swap_info():
  550. return unittest.skip("/proc/meminfo has no swap metrics")
  551. with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
  552. swap = psutil.swap_memory()
  553. assert not m.called
  554. import psutil._psutil_linux as cext
  555. _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
  556. total *= unit_multiplier
  557. free *= unit_multiplier
  558. self.assertEqual(swap.total, total)
  559. self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM)
  560. def test_emulate_meminfo_has_no_metrics(self):
  561. # Emulate a case where /proc/meminfo provides no swap metrics
  562. # in which case sysinfo() syscall is supposed to be used
  563. # as a fallback.
  564. with mock_open_content("/proc/meminfo", b"") as m:
  565. psutil.swap_memory()
  566. assert m.called
  567. # =====================================================================
  568. # --- system CPU
  569. # =====================================================================
  570. @unittest.skipIf(not LINUX, "LINUX only")
  571. class TestSystemCPUTimes(PsutilTestCase):
  572. @unittest.skipIf(TRAVIS, "unknown failure on travis")
  573. def test_fields(self):
  574. fields = psutil.cpu_times()._fields
  575. kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
  576. kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
  577. if kernel_ver_info >= (2, 6, 11):
  578. self.assertIn('steal', fields)
  579. else:
  580. self.assertNotIn('steal', fields)
  581. if kernel_ver_info >= (2, 6, 24):
  582. self.assertIn('guest', fields)
  583. else:
  584. self.assertNotIn('guest', fields)
  585. if kernel_ver_info >= (3, 2, 0):
  586. self.assertIn('guest_nice', fields)
  587. else:
  588. self.assertNotIn('guest_nice', fields)
  589. @unittest.skipIf(not LINUX, "LINUX only")
  590. class TestSystemCPUCountLogical(PsutilTestCase):
  591. @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
  592. "/sys/devices/system/cpu/online does not exist")
  593. def test_against_sysdev_cpu_online(self):
  594. with open("/sys/devices/system/cpu/online") as f:
  595. value = f.read().strip()
  596. if "-" in str(value):
  597. value = int(value.split('-')[1]) + 1
  598. self.assertEqual(psutil.cpu_count(), value)
  599. @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
  600. "/sys/devices/system/cpu does not exist")
  601. def test_against_sysdev_cpu_num(self):
  602. ls = os.listdir("/sys/devices/system/cpu")
  603. count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
  604. self.assertEqual(psutil.cpu_count(), count)
  605. @unittest.skipIf(not which("nproc"), "nproc utility not available")
  606. def test_against_nproc(self):
  607. num = int(sh("nproc --all"))
  608. self.assertEqual(psutil.cpu_count(logical=True), num)
  609. @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
  610. def test_against_lscpu(self):
  611. out = sh("lscpu -p")
  612. num = len([x for x in out.split('\n') if not x.startswith('#')])
  613. self.assertEqual(psutil.cpu_count(logical=True), num)
  614. def test_emulate_fallbacks(self):
  615. import psutil._pslinux
  616. original = psutil._pslinux.cpu_count_logical()
  617. # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
  618. # order to cause the parsing of /proc/cpuinfo and /proc/stat.
  619. with mock.patch(
  620. 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
  621. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  622. assert m.called
  623. # Let's have open() return emtpy data and make sure None is
  624. # returned ('cause we mimick os.cpu_count()).
  625. with mock.patch('psutil._common.open', create=True) as m:
  626. self.assertIsNone(psutil._pslinux.cpu_count_logical())
  627. self.assertEqual(m.call_count, 2)
  628. # /proc/stat should be the last one
  629. self.assertEqual(m.call_args[0][0], '/proc/stat')
  630. # Let's push this a bit further and make sure /proc/cpuinfo
  631. # parsing works as expected.
  632. with open('/proc/cpuinfo', 'rb') as f:
  633. cpuinfo_data = f.read()
  634. fake_file = io.BytesIO(cpuinfo_data)
  635. with mock.patch('psutil._common.open',
  636. return_value=fake_file, create=True) as m:
  637. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  638. # Finally, let's make /proc/cpuinfo return meaningless data;
  639. # this way we'll fall back on relying on /proc/stat
  640. with mock_open_content('/proc/cpuinfo', b"") as m:
  641. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  642. m.called
  643. @unittest.skipIf(not LINUX, "LINUX only")
  644. class TestSystemCPUCountPhysical(PsutilTestCase):
  645. @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
  646. def test_against_lscpu(self):
  647. out = sh("lscpu -p")
  648. core_ids = set()
  649. for line in out.split('\n'):
  650. if not line.startswith('#'):
  651. fields = line.split(',')
  652. core_ids.add(fields[1])
  653. self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
  654. def test_method_2(self):
  655. meth_1 = psutil._pslinux.cpu_count_physical()
  656. with mock.patch('glob.glob', return_value=[]) as m:
  657. meth_2 = psutil._pslinux.cpu_count_physical()
  658. assert m.called
  659. if meth_1 is not None:
  660. self.assertEqual(meth_1, meth_2)
  661. def test_emulate_none(self):
  662. with mock.patch('glob.glob', return_value=[]) as m1:
  663. with mock.patch('psutil._common.open', create=True) as m2:
  664. self.assertIsNone(psutil._pslinux.cpu_count_physical())
  665. assert m1.called
  666. assert m2.called
  667. @unittest.skipIf(not LINUX, "LINUX only")
  668. class TestSystemCPUFrequency(PsutilTestCase):
  669. @unittest.skipIf(TRAVIS, "fails on Travis")
  670. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  671. def test_emulate_use_second_file(self):
  672. # https://github.com/giampaolo/psutil/issues/981
  673. def path_exists_mock(path):
  674. if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
  675. return False
  676. else:
  677. return orig_exists(path)
  678. orig_exists = os.path.exists
  679. with mock.patch("os.path.exists", side_effect=path_exists_mock,
  680. create=True):
  681. assert psutil.cpu_freq()
  682. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  683. def test_emulate_use_cpuinfo(self):
  684. # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
  685. # exist and /proc/cpuinfo is used instead.
  686. def path_exists_mock(path):
  687. if path.startswith('/sys/devices/system/cpu/'):
  688. return False
  689. else:
  690. if path == "/proc/cpuinfo":
  691. flags.append(None)
  692. return os_path_exists(path)
  693. flags = []
  694. os_path_exists = os.path.exists
  695. try:
  696. with mock.patch("os.path.exists", side_effect=path_exists_mock):
  697. reload_module(psutil._pslinux)
  698. ret = psutil.cpu_freq()
  699. assert ret
  700. assert flags
  701. self.assertEqual(ret.max, 0.0)
  702. self.assertEqual(ret.min, 0.0)
  703. for freq in psutil.cpu_freq(percpu=True):
  704. self.assertEqual(ret.max, 0.0)
  705. self.assertEqual(ret.min, 0.0)
  706. finally:
  707. reload_module(psutil._pslinux)
  708. reload_module(psutil)
  709. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  710. def test_emulate_data(self):
  711. def open_mock(name, *args, **kwargs):
  712. if (name.endswith('/scaling_cur_freq') and
  713. name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
  714. return io.BytesIO(b"500000")
  715. elif (name.endswith('/scaling_min_freq') and
  716. name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
  717. return io.BytesIO(b"600000")
  718. elif (name.endswith('/scaling_max_freq') and
  719. name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
  720. return io.BytesIO(b"700000")
  721. elif name == '/proc/cpuinfo':
  722. return io.BytesIO(b"cpu MHz : 500")
  723. else:
  724. return orig_open(name, *args, **kwargs)
  725. orig_open = open
  726. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  727. with mock.patch(patch_point, side_effect=open_mock):
  728. with mock.patch(
  729. 'os.path.exists', return_value=True):
  730. freq = psutil.cpu_freq()
  731. self.assertEqual(freq.current, 500.0)
  732. # when /proc/cpuinfo is used min and max frequencies are not
  733. # available and are set to 0.
  734. if freq.min != 0.0:
  735. self.assertEqual(freq.min, 600.0)
  736. if freq.max != 0.0:
  737. self.assertEqual(freq.max, 700.0)
  738. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  739. def test_emulate_multi_cpu(self):
  740. def open_mock(name, *args, **kwargs):
  741. n = name
  742. if (n.endswith('/scaling_cur_freq') and
  743. n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
  744. return io.BytesIO(b"100000")
  745. elif (n.endswith('/scaling_min_freq') and
  746. n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
  747. return io.BytesIO(b"200000")
  748. elif (n.endswith('/scaling_max_freq') and
  749. n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
  750. return io.BytesIO(b"300000")
  751. elif (n.endswith('/scaling_cur_freq') and
  752. n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
  753. return io.BytesIO(b"400000")
  754. elif (n.endswith('/scaling_min_freq') and
  755. n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
  756. return io.BytesIO(b"500000")
  757. elif (n.endswith('/scaling_max_freq') and
  758. n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
  759. return io.BytesIO(b"600000")
  760. elif name == '/proc/cpuinfo':
  761. return io.BytesIO(b"cpu MHz : 100\n"
  762. b"cpu MHz : 400")
  763. else:
  764. return orig_open(name, *args, **kwargs)
  765. orig_open = open
  766. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  767. with mock.patch(patch_point, side_effect=open_mock):
  768. with mock.patch('os.path.exists', return_value=True):
  769. with mock.patch('psutil._pslinux.cpu_count_logical',
  770. return_value=2):
  771. freq = psutil.cpu_freq(percpu=True)
  772. self.assertEqual(freq[0].current, 100.0)
  773. if freq[0].min != 0.0:
  774. self.assertEqual(freq[0].min, 200.0)
  775. if freq[0].max != 0.0:
  776. self.assertEqual(freq[0].max, 300.0)
  777. self.assertEqual(freq[1].current, 400.0)
  778. if freq[1].min != 0.0:
  779. self.assertEqual(freq[1].min, 500.0)
  780. if freq[1].max != 0.0:
  781. self.assertEqual(freq[1].max, 600.0)
  782. @unittest.skipIf(TRAVIS, "fails on Travis")
  783. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  784. def test_emulate_no_scaling_cur_freq_file(self):
  785. # See: https://github.com/giampaolo/psutil/issues/1071
  786. def open_mock(name, *args, **kwargs):
  787. if name.endswith('/scaling_cur_freq'):
  788. raise IOError(errno.ENOENT, "")
  789. elif name.endswith('/cpuinfo_cur_freq'):
  790. return io.BytesIO(b"200000")
  791. elif name == '/proc/cpuinfo':
  792. return io.BytesIO(b"cpu MHz : 200")
  793. else:
  794. return orig_open(name, *args, **kwargs)
  795. orig_open = open
  796. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  797. with mock.patch(patch_point, side_effect=open_mock):
  798. with mock.patch('os.path.exists', return_value=True):
  799. with mock.patch('psutil._pslinux.cpu_count_logical',
  800. return_value=1):
  801. freq = psutil.cpu_freq()
  802. self.assertEqual(freq.current, 200)
  803. @unittest.skipIf(not LINUX, "LINUX only")
  804. class TestSystemCPUStats(PsutilTestCase):
  805. @unittest.skipIf(TRAVIS, "fails on Travis")
  806. def test_ctx_switches(self):
  807. vmstat_value = vmstat("context switches")
  808. psutil_value = psutil.cpu_stats().ctx_switches
  809. self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
  810. @unittest.skipIf(TRAVIS, "fails on Travis")
  811. def test_interrupts(self):
  812. vmstat_value = vmstat("interrupts")
  813. psutil_value = psutil.cpu_stats().interrupts
  814. self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
  815. @unittest.skipIf(not LINUX, "LINUX only")
  816. class TestLoadAvg(PsutilTestCase):
  817. @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
  818. def test_getloadavg(self):
  819. psutil_value = psutil.getloadavg()
  820. with open("/proc/loadavg", "r") as f:
  821. proc_value = f.read().split()
  822. self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
  823. self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
  824. self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
  825. # =====================================================================
  826. # --- system network
  827. # =====================================================================
  828. @unittest.skipIf(not LINUX, "LINUX only")
  829. class TestSystemNetIfAddrs(PsutilTestCase):
  830. def test_ips(self):
  831. for name, addrs in psutil.net_if_addrs().items():
  832. for addr in addrs:
  833. if addr.family == psutil.AF_LINK:
  834. self.assertEqual(addr.address, get_mac_address(name))
  835. elif addr.family == socket.AF_INET:
  836. self.assertEqual(addr.address, get_ipv4_address(name))
  837. self.assertEqual(addr.netmask, get_ipv4_netmask(name))
  838. if addr.broadcast is not None:
  839. self.assertEqual(addr.broadcast,
  840. get_ipv4_broadcast(name))
  841. else:
  842. self.assertEqual(get_ipv4_broadcast(name), '0.0.0.0')
  843. elif addr.family == socket.AF_INET6:
  844. # IPv6 addresses can have a percent symbol at the end.
  845. # E.g. these 2 are equivalent:
  846. # "fe80::1ff:fe23:4567:890a"
  847. # "fe80::1ff:fe23:4567:890a%eth0"
  848. # That is the "zone id" portion, which usually is the name
  849. # of the network interface.
  850. address = addr.address.split('%')[0]
  851. self.assertEqual(address, get_ipv6_address(name))
  852. # XXX - not reliable when having virtual NICs installed by Docker.
  853. # @unittest.skipIf(not which('ip'), "'ip' utility not available")
  854. # @unittest.skipIf(TRAVIS, "skipped on Travis")
  855. # def test_net_if_names(self):
  856. # out = sh("ip addr").strip()
  857. # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
  858. # found = 0
  859. # for line in out.split('\n'):
  860. # line = line.strip()
  861. # if re.search(r"^\d+:", line):
  862. # found += 1
  863. # name = line.split(':')[1].strip()
  864. # self.assertIn(name, nics)
  865. # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
  866. # pprint.pformat(nics), out))
  867. @unittest.skipIf(not LINUX, "LINUX only")
  868. class TestSystemNetIfStats(PsutilTestCase):
  869. def test_against_ifconfig(self):
  870. for name, stats in psutil.net_if_stats().items():
  871. try:
  872. out = sh("ifconfig %s" % name)
  873. except RuntimeError:
  874. pass
  875. else:
  876. self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
  877. self.assertEqual(stats.mtu,
  878. int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
  879. def test_mtu(self):
  880. for name, stats in psutil.net_if_stats().items():
  881. with open("/sys/class/net/%s/mtu" % name, "rt") as f:
  882. self.assertEqual(stats.mtu, int(f.read().strip()))
  883. @unittest.skipIf(not LINUX, "LINUX only")
  884. class TestSystemNetIOCounters(PsutilTestCase):
  885. @retry_on_failure()
  886. def test_against_ifconfig(self):
  887. def ifconfig(nic):
  888. ret = {}
  889. out = sh("ifconfig %s" % name)
  890. ret['packets_recv'] = int(
  891. re.findall(r'RX packets[: ](\d+)', out)[0])
  892. ret['packets_sent'] = int(
  893. re.findall(r'TX packets[: ](\d+)', out)[0])
  894. ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
  895. ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
  896. ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
  897. ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
  898. ret['bytes_recv'] = int(
  899. re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
  900. ret['bytes_sent'] = int(
  901. re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
  902. return ret
  903. nio = psutil.net_io_counters(pernic=True, nowrap=False)
  904. for name, stats in nio.items():
  905. try:
  906. ifconfig_ret = ifconfig(name)
  907. except RuntimeError:
  908. continue
  909. self.assertAlmostEqual(
  910. stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
  911. self.assertAlmostEqual(
  912. stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
  913. self.assertAlmostEqual(
  914. stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
  915. self.assertAlmostEqual(
  916. stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
  917. self.assertAlmostEqual(
  918. stats.errin, ifconfig_ret['errin'], delta=10)
  919. self.assertAlmostEqual(
  920. stats.errout, ifconfig_ret['errout'], delta=10)
  921. self.assertAlmostEqual(
  922. stats.dropin, ifconfig_ret['dropin'], delta=10)
  923. self.assertAlmostEqual(
  924. stats.dropout, ifconfig_ret['dropout'], delta=10)
  925. @unittest.skipIf(not LINUX, "LINUX only")
  926. class TestSystemNetConnections(PsutilTestCase):
  927. @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
  928. @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
  929. def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
  930. # see: https://github.com/giampaolo/psutil/issues/623
  931. try:
  932. s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
  933. self.addCleanup(s.close)
  934. s.bind(("::1", 0))
  935. except socket.error:
  936. pass
  937. psutil.net_connections(kind='inet6')
  938. def test_emulate_unix(self):
  939. with mock_open_content(
  940. '/proc/net/unix',
  941. textwrap.dedent("""\
  942. 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
  943. 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
  944. 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
  945. 000000000000000000000000000000000000000000000000000000
  946. """)) as m:
  947. psutil.net_connections(kind='unix')
  948. assert m.called
  949. # =====================================================================
  950. # --- system disks
  951. # =====================================================================
  952. @unittest.skipIf(not LINUX, "LINUX only")
  953. class TestSystemDiskPartitions(PsutilTestCase):
  954. @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
  955. @skip_on_not_implemented()
  956. def test_against_df(self):
  957. # test psutil.disk_usage() and psutil.disk_partitions()
  958. # against "df -a"
  959. def df(path):
  960. out = sh('df -P -B 1 "%s"' % path).strip()
  961. lines = out.split('\n')
  962. lines.pop(0)
  963. line = lines.pop(0)
  964. dev, total, used, free = line.split()[:4]
  965. if dev == 'none':
  966. dev = ''
  967. total, used, free = int(total), int(used), int(free)
  968. return dev, total, used, free
  969. for part in psutil.disk_partitions(all=False):
  970. usage = psutil.disk_usage(part.mountpoint)
  971. dev, total, used, free = df(part.mountpoint)
  972. self.assertEqual(usage.total, total)
  973. self.assertAlmostEqual(usage.free, free,
  974. delta=TOLERANCE_DISK_USAGE)
  975. self.assertAlmostEqual(usage.used, used,
  976. delta=TOLERANCE_DISK_USAGE)
  977. def test_zfs_fs(self):
  978. # Test that ZFS partitions are returned.
  979. with open("/proc/filesystems", "r") as f:
  980. data = f.read()
  981. if 'zfs' in data:
  982. for part in psutil.disk_partitions():
  983. if part.fstype == 'zfs':
  984. break
  985. else:
  986. self.fail("couldn't find any ZFS partition")
  987. else:
  988. # No ZFS partitions on this system. Let's fake one.
  989. fake_file = io.StringIO(u("nodev\tzfs\n"))
  990. with mock.patch('psutil._common.open',
  991. return_value=fake_file, create=True) as m1:
  992. with mock.patch(
  993. 'psutil._pslinux.cext.disk_partitions',
  994. return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
  995. ret = psutil.disk_partitions()
  996. assert m1.called
  997. assert m2.called
  998. assert ret
  999. self.assertEqual(ret[0].fstype, 'zfs')
  1000. def test_emulate_realpath_fail(self):
  1001. # See: https://github.com/giampaolo/psutil/issues/1307
  1002. try:
  1003. with mock.patch('os.path.realpath',
  1004. return_value='/non/existent') as m:
  1005. with self.assertRaises(FileNotFoundError):
  1006. psutil.disk_partitions()
  1007. assert m.called
  1008. finally:
  1009. psutil.PROCFS_PATH = "/proc"
  1010. @unittest.skipIf(not LINUX, "LINUX only")
  1011. class TestSystemDiskIoCounters(PsutilTestCase):
  1012. def test_emulate_kernel_2_4(self):
  1013. # Tests /proc/diskstats parsing format for 2.4 kernels, see:
  1014. # https://github.com/giampaolo/psutil/issues/767
  1015. with mock_open_content(
  1016. '/proc/diskstats',
  1017. " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"):
  1018. with mock.patch('psutil._pslinux.is_storage_device',
  1019. return_value=True):
  1020. ret = psutil.disk_io_counters(nowrap=False)
  1021. self.assertEqual(ret.read_count, 1)
  1022. self.assertEqual(ret.read_merged_count, 2)
  1023. self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
  1024. self.assertEqual(ret.read_time, 4)
  1025. self.assertEqual(ret.write_count, 5)
  1026. self.assertEqual(ret.write_merged_count, 6)
  1027. self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
  1028. self.assertEqual(ret.write_time, 8)
  1029. self.assertEqual(ret.busy_time, 10)
  1030. def test_emulate_kernel_2_6_full(self):
  1031. # Tests /proc/diskstats parsing format for 2.6 kernels,
  1032. # lines reporting all metrics:
  1033. # https://github.com/giampaolo/psutil/issues/767
  1034. with mock_open_content(
  1035. '/proc/diskstats',
  1036. " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"):
  1037. with mock.patch('psutil._pslinux.is_storage_device',
  1038. return_value=True):
  1039. ret = psutil.disk_io_counters(nowrap=False)
  1040. self.assertEqual(ret.read_count, 1)
  1041. self.assertEqual(ret.read_merged_count, 2)
  1042. self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
  1043. self.assertEqual(ret.read_time, 4)
  1044. self.assertEqual(ret.write_count, 5)
  1045. self.assertEqual(ret.write_merged_count, 6)
  1046. self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
  1047. self.assertEqual(ret.write_time, 8)
  1048. self.assertEqual(ret.busy_time, 10)
  1049. def test_emulate_kernel_2_6_limited(self):
  1050. # Tests /proc/diskstats parsing format for 2.6 kernels,
  1051. # where one line of /proc/partitions return a limited
  1052. # amount of metrics when it bumps into a partition
  1053. # (instead of a disk). See:
  1054. # https://github.com/giampaolo/psutil/issues/767
  1055. with mock_open_content(
  1056. '/proc/diskstats',
  1057. " 3 1 hda 1 2 3 4"):
  1058. with mock.patch('psutil._pslinux.is_storage_device',
  1059. return_value=True):
  1060. ret = psutil.disk_io_counters(nowrap=False)
  1061. self.assertEqual(ret.read_count, 1)
  1062. self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
  1063. self.assertEqual(ret.write_count, 3)
  1064. self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
  1065. self.assertEqual(ret.read_merged_count, 0)
  1066. self.assertEqual(ret.read_time, 0)
  1067. self.assertEqual(ret.write_merged_count, 0)
  1068. self.assertEqual(ret.write_time, 0)
  1069. self.assertEqual(ret.busy_time, 0)
  1070. def test_emulate_include_partitions(self):
  1071. # Make sure that when perdisk=True disk partitions are returned,
  1072. # see:
  1073. # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
  1074. with mock_open_content(
  1075. '/proc/diskstats',
  1076. textwrap.dedent("""\
  1077. 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
  1078. 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
  1079. """)):
  1080. with mock.patch('psutil._pslinux.is_storage_device',
  1081. return_value=False):
  1082. ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
  1083. self.assertEqual(len(ret), 2)
  1084. self.assertEqual(ret['nvme0n1'].read_count, 1)
  1085. self.assertEqual(ret['nvme0n1p1'].read_count, 1)
  1086. self.assertEqual(ret['nvme0n1'].write_count, 5)
  1087. self.assertEqual(ret['nvme0n1p1'].write_count, 5)
  1088. def test_emulate_exclude_partitions(self):
  1089. # Make sure that when perdisk=False partitions (e.g. 'sda1',
  1090. # 'nvme0n1p1') are skipped and not included in the total count.
  1091. # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
  1092. with mock_open_content(
  1093. '/proc/diskstats',
  1094. textwrap.dedent("""\
  1095. 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
  1096. 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
  1097. """)):
  1098. with mock.patch('psutil._pslinux.is_storage_device',
  1099. return_value=False):
  1100. ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
  1101. self.assertIsNone(ret)
  1102. #
  1103. def is_storage_device(name):
  1104. return name == 'nvme0n1'
  1105. with mock_open_content(
  1106. '/proc/diskstats',
  1107. textwrap.dedent("""\
  1108. 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
  1109. 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
  1110. """)):
  1111. with mock.patch('psutil._pslinux.is_storage_device',
  1112. create=True, side_effect=is_storage_device):
  1113. ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
  1114. self.assertEqual(ret.read_count, 1)
  1115. self.assertEqual(ret.write_count, 5)
  1116. def test_emulate_use_sysfs(self):
  1117. def exists(path):
  1118. if path == '/proc/diskstats':
  1119. return False
  1120. return True
  1121. wprocfs = psutil.disk_io_counters(perdisk=True)
  1122. with mock.patch('psutil._pslinux.os.path.exists',
  1123. create=True, side_effect=exists):
  1124. wsysfs = psutil.disk_io_counters(perdisk=True)
  1125. self.assertEqual(len(wprocfs), len(wsysfs))
  1126. def test_emulate_not_impl(self):
  1127. def exists(path):
  1128. return False
  1129. with mock.patch('psutil._pslinux.os.path.exists',
  1130. create=True, side_effect=exists):
  1131. self.assertRaises(NotImplementedError, psutil.disk_io_counters)
  1132. # =====================================================================
  1133. # --- misc
  1134. # =====================================================================
  1135. @unittest.skipIf(not LINUX, "LINUX only")
  1136. class TestMisc(PsutilTestCase):
  1137. def test_boot_time(self):
  1138. vmstat_value = vmstat('boot time')
  1139. psutil_value = psutil.boot_time()
  1140. self.assertEqual(int(vmstat_value), int(psutil_value))
  1141. def test_no_procfs_on_import(self):
  1142. my_procfs = self.get_testfn()
  1143. os.mkdir(my_procfs)
  1144. with open(os.path.join(my_procfs, 'stat'), 'w') as f:
  1145. f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
  1146. f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
  1147. f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
  1148. try:
  1149. orig_open = open
  1150. def open_mock(name, *args, **kwargs):
  1151. if name.startswith('/proc'):
  1152. raise IOError(errno.ENOENT, 'rejecting access for test')
  1153. return orig_open(name, *args, **kwargs)
  1154. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1155. with mock.patch(patch_point, side_effect=open_mock):
  1156. reload_module(psutil)
  1157. self.assertRaises(IOError, psutil.cpu_times)
  1158. self.assertRaises(IOError, psutil.cpu_times, percpu=True)
  1159. self.assertRaises(IOError, psutil.cpu_percent)
  1160. self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
  1161. self.assertRaises(IOError, psutil.cpu_times_percent)
  1162. self.assertRaises(
  1163. IOError, psutil.cpu_times_percent, percpu=True)
  1164. psutil.PROCFS_PATH = my_procfs
  1165. self.assertEqual(psutil.cpu_percent(), 0)
  1166. self.assertEqual(sum(psutil.cpu_times_percent()), 0)
  1167. # since we don't know the number of CPUs at import time,
  1168. # we awkwardly say there are none until the second call
  1169. per_cpu_percent = psutil.cpu_percent(percpu=True)
  1170. self.assertEqual(sum(per_cpu_percent), 0)
  1171. # ditto awkward length
  1172. per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
  1173. self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
  1174. # much user, very busy
  1175. with open(os.path.join(my_procfs, 'stat'), 'w') as f:
  1176. f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
  1177. f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
  1178. f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
  1179. self.assertNotEqual(psutil.cpu_percent(), 0)
  1180. self.assertNotEqual(
  1181. sum(psutil.cpu_percent(percpu=True)), 0)
  1182. self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
  1183. self.assertNotEqual(
  1184. sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
  1185. finally:
  1186. shutil.rmtree(my_procfs)
  1187. reload_module(psutil)
  1188. self.assertEqual(psutil.PROCFS_PATH, '/proc')
  1189. def test_cpu_steal_decrease(self):
  1190. # Test cumulative cpu stats decrease. We should ignore this.
  1191. # See issue #1210.
  1192. with mock_open_content(
  1193. "/proc/stat",
  1194. textwrap.dedent("""\
  1195. cpu 0 0 0 0 0 0 0 1 0 0
  1196. cpu0 0 0 0 0 0 0 0 1 0 0
  1197. cpu1 0 0 0 0 0 0 0 1 0 0
  1198. """).encode()) as m:
  1199. # first call to "percent" functions should read the new stat file
  1200. # and compare to the "real" file read at import time - so the
  1201. # values are meaningless
  1202. psutil.cpu_percent()
  1203. assert m.called
  1204. psutil.cpu_percent(percpu=True)
  1205. psutil.cpu_times_percent()
  1206. psutil.cpu_times_percent(percpu=True)
  1207. with mock_open_content(
  1208. "/proc/stat",
  1209. textwrap.dedent("""\
  1210. cpu 1 0 0 0 0 0 0 0 0 0
  1211. cpu0 1 0 0 0 0 0 0 0 0 0
  1212. cpu1 1 0 0 0 0 0 0 0 0 0
  1213. """).encode()) as m:
  1214. # Increase "user" while steal goes "backwards" to zero.
  1215. cpu_percent = psutil.cpu_percent()
  1216. assert m.called
  1217. cpu_percent_percpu = psutil.cpu_percent(percpu=True)
  1218. cpu_times_percent = psutil.cpu_times_percent()
  1219. cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
  1220. self.assertNotEqual(cpu_percent, 0)
  1221. self.assertNotEqual(sum(cpu_percent_percpu), 0)
  1222. self.assertNotEqual(sum(cpu_times_percent), 0)
  1223. self.assertNotEqual(sum(cpu_times_percent), 100.0)
  1224. self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
  1225. self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
  1226. self.assertEqual(cpu_times_percent.steal, 0)
  1227. self.assertNotEqual(cpu_times_percent.user, 0)
  1228. def test_boot_time_mocked(self):
  1229. with mock.patch('psutil._common.open', create=True) as m:
  1230. self.assertRaises(
  1231. RuntimeError,
  1232. psutil._pslinux.boot_time)
  1233. assert m.called
  1234. def test_users_mocked(self):
  1235. # Make sure ':0' and ':0.0' (returned by C ext) are converted
  1236. # to 'localhost'.
  1237. with mock.patch('psutil._pslinux.cext.users',
  1238. return_value=[('giampaolo', 'pts/2', ':0',
  1239. 1436573184.0, True, 2)]) as m:
  1240. self.assertEqual(psutil.users()[0].host, 'localhost')
  1241. assert m.called
  1242. with mock.patch('psutil._pslinux.cext.users',
  1243. return_value=[('giampaolo', 'pts/2', ':0.0',
  1244. 1436573184.0, True, 2)]) as m:
  1245. self.assertEqual(psutil.users()[0].host, 'localhost')
  1246. assert m.called
  1247. # ...otherwise it should be returned as-is
  1248. with mock.patch('psutil._pslinux.cext.users',
  1249. return_value=[('giampaolo', 'pts/2', 'foo',
  1250. 1436573184.0, True, 2)]) as m:
  1251. self.assertEqual(psutil.users()[0].host, 'foo')
  1252. assert m.called
  1253. def test_procfs_path(self):
  1254. tdir = self.get_testfn()
  1255. os.mkdir(tdir)
  1256. try:
  1257. psutil.PROCFS_PATH = tdir
  1258. self.assertRaises(IOError, psutil.virtual_memory)
  1259. self.assertRaises(IOError, psutil.cpu_times)
  1260. self.assertRaises(IOError, psutil.cpu_times, percpu=True)
  1261. self.assertRaises(IOError, psutil.boot_time)
  1262. # self.assertRaises(IOError, psutil.pids)
  1263. self.assertRaises(IOError, psutil.net_connections)
  1264. self.assertRaises(IOError, psutil.net_io_counters)
  1265. self.assertRaises(IOError, psutil.net_if_stats)
  1266. # self.assertRaises(IOError, psutil.disk_io_counters)
  1267. self.assertRaises(IOError, psutil.disk_partitions)
  1268. self.assertRaises(psutil.NoSuchProcess, psutil.Process)
  1269. finally:
  1270. psutil.PROCFS_PATH = "/proc"
  1271. @retry_on_failure()
  1272. def test_issue_687(self):
  1273. # In case of thread ID:
  1274. # - pid_exists() is supposed to return False
  1275. # - Process(tid) is supposed to work
  1276. # - pids() should not return the TID
  1277. # See: https://github.com/giampaolo/psutil/issues/687
  1278. t = ThreadTask()
  1279. t.start()
  1280. try:
  1281. p = psutil.Process()
  1282. threads = p.threads()
  1283. self.assertEqual(len(threads), 2)
  1284. tid = sorted(threads, key=lambda x: x.id)[1].id
  1285. self.assertNotEqual(p.pid, tid)
  1286. pt = psutil.Process(tid)
  1287. pt.as_dict()
  1288. self.assertNotIn(tid, psutil.pids())
  1289. finally:
  1290. t.stop()
  1291. def test_pid_exists_no_proc_status(self):
  1292. # Internally pid_exists relies on /proc/{pid}/status.
  1293. # Emulate a case where this file is empty in which case
  1294. # psutil is supposed to fall back on using pids().
  1295. with mock_open_content("/proc/%s/status", "") as m:
  1296. assert psutil.pid_exists(os.getpid())
  1297. assert m.called
  1298. # =====================================================================
  1299. # --- sensors
  1300. # =====================================================================
  1301. @unittest.skipIf(not LINUX, "LINUX only")
  1302. @unittest.skipIf(not HAS_BATTERY, "no battery")
  1303. class TestSensorsBattery(PsutilTestCase):
  1304. @unittest.skipIf(not which("acpi"), "acpi utility not available")
  1305. def test_percent(self):
  1306. out = sh("acpi -b")
  1307. acpi_value = int(out.split(",")[1].strip().replace('%', ''))
  1308. psutil_value = psutil.sensors_battery().percent
  1309. self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
  1310. def test_emulate_power_plugged(self):
  1311. # Pretend the AC power cable is connected.
  1312. def open_mock(name, *args, **kwargs):
  1313. if name.endswith("AC0/online") or name.endswith("AC/online"):
  1314. return io.BytesIO(b"1")
  1315. else:
  1316. return orig_open(name, *args, **kwargs)
  1317. orig_open = open
  1318. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1319. with mock.patch(patch_point, side_effect=open_mock) as m:
  1320. self.assertEqual(psutil.sensors_battery().power_plugged, True)
  1321. self.assertEqual(
  1322. psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
  1323. assert m.called
  1324. def test_emulate_power_plugged_2(self):
  1325. # Same as above but pretend /AC0/online does not exist in which
  1326. # case code relies on /status file.
  1327. def open_mock(name, *args, **kwargs):
  1328. if name.endswith("AC0/online") or name.endswith("AC/online"):
  1329. raise IOError(errno.ENOENT, "")
  1330. elif name.endswith("/status"):
  1331. return io.StringIO(u("charging"))
  1332. else:
  1333. return orig_open(name, *args, **kwargs)
  1334. orig_open = open
  1335. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1336. with mock.patch(patch_point, side_effect=open_mock) as m:
  1337. self.assertEqual(psutil.sensors_battery().power_plugged, True)
  1338. assert m.called
  1339. def test_emulate_power_not_plugged(self):
  1340. # Pretend the AC power cable is not connected.
  1341. def open_mock(name, *args, **kwargs):
  1342. if name.endswith("AC0/online") or name.endswith("AC/online"):
  1343. return io.BytesIO(b"0")
  1344. else:
  1345. return orig_open(name, *args, **kwargs)
  1346. orig_open = open
  1347. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1348. with mock.patch(patch_point, side_effect=open_mock) as m:
  1349. self.assertEqual(psutil.sensors_battery().power_plugged, False)
  1350. assert m.called
  1351. def test_emulate_power_not_plugged_2(self):
  1352. # Same as above but pretend /AC0/online does not exist in which
  1353. # case code relies on /status file.
  1354. def open_mock(name, *args, **kwargs):
  1355. if name.endswith("AC0/online") or name.endswith("AC/online"):
  1356. raise IOError(errno.ENOENT, "")
  1357. elif name.endswith("/status"):
  1358. return io.StringIO(u("discharging"))
  1359. else:
  1360. return orig_open(name, *args, **kwargs)
  1361. orig_open = open
  1362. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1363. with mock.patch(patch_point, side_effect=open_mock) as m:
  1364. self.assertEqual(psutil.sensors_battery().power_plugged, False)
  1365. assert m.called
  1366. def test_emulate_power_undetermined(self):
  1367. # Pretend we can't know whether the AC power cable not
  1368. # connected (assert fallback to False).
  1369. def open_mock(name, *args, **kwargs):
  1370. if name.startswith("/sys/class/power_supply/AC0/online") or \
  1371. name.startswith("/sys/class/power_supply/AC/online"):
  1372. raise IOError(errno.ENOENT, "")
  1373. elif name.startswith("/sys/class/power_supply/BAT0/status"):
  1374. return io.BytesIO(b"???")
  1375. else:
  1376. return orig_open(name, *args, **kwargs)
  1377. orig_open = open
  1378. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1379. with mock.patch(patch_point, side_effect=open_mock) as m:
  1380. self.assertIsNone(psutil.sensors_battery().power_plugged)
  1381. assert m.called
  1382. def test_emulate_energy_full_0(self):
  1383. # Emulate a case where energy_full files returns 0.
  1384. with mock_open_content(
  1385. "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
  1386. self.assertEqual(psutil.sensors_battery().percent, 0)
  1387. assert m.called
  1388. def test_emulate_energy_full_not_avail(self):
  1389. # Emulate a case where energy_full file does not exist.
  1390. # Expected fallback on /capacity.
  1391. with mock_open_exception(
  1392. "/sys/class/power_supply/BAT0/energy_full",
  1393. IOError(errno.ENOENT, "")):
  1394. with mock_open_exception(
  1395. "/sys/class/power_supply/BAT0/charge_full",
  1396. IOError(errno.ENOENT, "")):
  1397. with mock_open_content(
  1398. "/sys/class/power_supply/BAT0/capacity", b"88"):
  1399. self.assertEqual(psutil.sensors_battery().percent, 88)
  1400. def test_emulate_no_power(self):
  1401. # Emulate a case where /AC0/online file nor /BAT0/status exist.
  1402. with mock_open_exception(
  1403. "/sys/class/power_supply/AC/online",
  1404. IOError(errno.ENOENT, "")):
  1405. with mock_open_exception(
  1406. "/sys/class/power_supply/AC0/online",
  1407. IOError(errno.ENOENT, "")):
  1408. with mock_open_exception(
  1409. "/sys/class/power_supply/BAT0/status",
  1410. IOError(errno.ENOENT, "")):
  1411. self.assertIsNone(psutil.sensors_battery().power_plugged)
  1412. @unittest.skipIf(not LINUX, "LINUX only")
  1413. class TestSensorsBatteryEmulated(PsutilTestCase):
  1414. def test_it(self):
  1415. def open_mock(name, *args, **kwargs):
  1416. if name.endswith("/energy_now"):
  1417. return io.StringIO(u("60000000"))
  1418. elif name.endswith("/power_now"):
  1419. return io.StringIO(u("0"))
  1420. elif name.endswith("/energy_full"):
  1421. return io.StringIO(u("60000001"))
  1422. else:
  1423. return orig_open(name, *args, **kwargs)
  1424. orig_open = open
  1425. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1426. with mock.patch('os.listdir', return_value=["BAT0"]) as mlistdir:
  1427. with mock.patch(patch_point, side_effect=open_mock) as mopen:
  1428. self.assertIsNotNone(psutil.sensors_battery())
  1429. assert mlistdir.called
  1430. assert mopen.called
  1431. @unittest.skipIf(not LINUX, "LINUX only")
  1432. class TestSensorsTemperatures(PsutilTestCase):
  1433. def test_emulate_class_hwmon(self):
  1434. def open_mock(name, *args, **kwargs):
  1435. if name.endswith('/name'):
  1436. return io.StringIO(u("name"))
  1437. elif name.endswith('/temp1_label'):
  1438. return io.StringIO(u("label"))
  1439. elif name.endswith('/temp1_input'):
  1440. return io.BytesIO(b"30000")
  1441. elif name.endswith('/temp1_max'):
  1442. return io.BytesIO(b"40000")
  1443. elif name.endswith('/temp1_crit'):
  1444. return io.BytesIO(b"50000")
  1445. else:
  1446. return orig_open(name, *args, **kwargs)
  1447. orig_open = open
  1448. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1449. with mock.patch(patch_point, side_effect=open_mock):
  1450. # Test case with /sys/class/hwmon
  1451. with mock.patch('glob.glob',
  1452. return_value=['/sys/class/hwmon/hwmon0/temp1']):
  1453. temp = psutil.sensors_temperatures()['name'][0]
  1454. self.assertEqual(temp.label, 'label')
  1455. self.assertEqual(temp.current, 30.0)
  1456. self.assertEqual(temp.high, 40.0)
  1457. self.assertEqual(temp.critical, 50.0)
  1458. def test_emulate_class_thermal(self):
  1459. def open_mock(name, *args, **kwargs):
  1460. if name.endswith('0_temp'):
  1461. return io.BytesIO(b"50000")
  1462. elif name.endswith('temp'):
  1463. return io.BytesIO(b"30000")
  1464. elif name.endswith('0_type'):
  1465. return io.StringIO(u("critical"))
  1466. elif name.endswith('type'):
  1467. return io.StringIO(u("name"))
  1468. else:
  1469. return orig_open(name, *args, **kwargs)
  1470. def glob_mock(path):
  1471. if path == '/sys/class/hwmon/hwmon*/temp*_*':
  1472. return []
  1473. elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
  1474. return []
  1475. elif path == '/sys/class/thermal/thermal_zone*':
  1476. return ['/sys/class/thermal/thermal_zone0']
  1477. elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
  1478. return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
  1479. '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
  1480. return []
  1481. orig_open = open
  1482. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1483. with mock.patch(patch_point, side_effect=open_mock):
  1484. with mock.patch('glob.glob', create=True, side_effect=glob_mock):
  1485. temp = psutil.sensors_temperatures()['name'][0]
  1486. self.assertEqual(temp.label, '')
  1487. self.assertEqual(temp.current, 30.0)
  1488. self.assertEqual(temp.high, 50.0)
  1489. self.assertEqual(temp.critical, 50.0)
  1490. @unittest.skipIf(not LINUX, "LINUX only")
  1491. class TestSensorsFans(PsutilTestCase):
  1492. def test_emulate_data(self):
  1493. def open_mock(name, *args, **kwargs):
  1494. if name.endswith('/name'):
  1495. return io.StringIO(u("name"))
  1496. elif name.endswith('/fan1_label'):
  1497. return io.StringIO(u("label"))
  1498. elif name.endswith('/fan1_input'):
  1499. return io.StringIO(u("2000"))
  1500. else:
  1501. return orig_open(name, *args, **kwargs)
  1502. orig_open = open
  1503. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1504. with mock.patch(patch_point, side_effect=open_mock):
  1505. with mock.patch('glob.glob',
  1506. return_value=['/sys/class/hwmon/hwmon2/fan1']):
  1507. fan = psutil.sensors_fans()['name'][0]
  1508. self.assertEqual(fan.label, 'label')
  1509. self.assertEqual(fan.current, 2000)
  1510. # =====================================================================
  1511. # --- test process
  1512. # =====================================================================
  1513. @unittest.skipIf(not LINUX, "LINUX only")
  1514. class TestProcess(PsutilTestCase):
  1515. @retry_on_failure()
  1516. def test_memory_full_info(self):
  1517. testfn = self.get_testfn()
  1518. src = textwrap.dedent("""
  1519. import time
  1520. with open("%s", "w") as f:
  1521. time.sleep(10)
  1522. """ % testfn)
  1523. sproc = self.pyrun(src)
  1524. call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn)
  1525. p = psutil.Process(sproc.pid)
  1526. time.sleep(.1)
  1527. mem = p.memory_full_info()
  1528. maps = p.memory_maps(grouped=False)
  1529. self.assertAlmostEqual(
  1530. mem.uss, sum([x.private_dirty + x.private_clean for x in maps]),
  1531. delta=4096)
  1532. self.assertAlmostEqual(
  1533. mem.pss, sum([x.pss for x in maps]), delta=4096)
  1534. self.assertAlmostEqual(
  1535. mem.swap, sum([x.swap for x in maps]), delta=4096)
  1536. def test_memory_full_info_mocked(self):
  1537. # See: https://github.com/giampaolo/psutil/issues/1222
  1538. with mock_open_content(
  1539. "/proc/%s/smaps" % os.getpid(),
  1540. textwrap.dedent("""\
  1541. fffff0 r-xp 00000000 00:00 0 [vsyscall]
  1542. Size: 1 kB
  1543. Rss: 2 kB
  1544. Pss: 3 kB
  1545. Shared_Clean: 4 kB
  1546. Shared_Dirty: 5 kB
  1547. Private_Clean: 6 kB
  1548. Private_Dirty: 7 kB
  1549. Referenced: 8 kB
  1550. Anonymous: 9 kB
  1551. LazyFree: 10 kB
  1552. AnonHugePages: 11 kB
  1553. ShmemPmdMapped: 12 kB
  1554. Shared_Hugetlb: 13 kB
  1555. Private_Hugetlb: 14 kB
  1556. Swap: 15 kB
  1557. SwapPss: 16 kB
  1558. KernelPageSize: 17 kB
  1559. MMUPageSize: 18 kB
  1560. Locked: 19 kB
  1561. VmFlags: rd ex
  1562. """).encode()) as m:
  1563. p = psutil.Process()
  1564. mem = p.memory_full_info()
  1565. assert m.called
  1566. self.assertEqual(mem.uss, (6 + 7 + 14) * 1024)
  1567. self.assertEqual(mem.pss, 3 * 1024)
  1568. self.assertEqual(mem.swap, 15 * 1024)
  1569. # On PYPY file descriptors are not closed fast enough.
  1570. @unittest.skipIf(PYPY, "unreliable on PYPY")
  1571. def test_open_files_mode(self):
  1572. def get_test_file(fname):
  1573. p = psutil.Process()
  1574. giveup_at = time.time() + GLOBAL_TIMEOUT
  1575. while True:
  1576. for file in p.open_files():
  1577. if file.path == os.path.abspath(fname):
  1578. return file
  1579. elif time.time() > giveup_at:
  1580. break
  1581. raise RuntimeError("timeout looking for test file")
  1582. #
  1583. testfn = self.get_testfn()
  1584. with open(testfn, "w"):
  1585. self.assertEqual(get_test_file(testfn).mode, "w")
  1586. with open(testfn, "r"):
  1587. self.assertEqual(get_test_file(testfn).mode, "r")
  1588. with open(testfn, "a"):
  1589. self.assertEqual(get_test_file(testfn).mode, "a")
  1590. #
  1591. with open(testfn, "r+"):
  1592. self.assertEqual(get_test_file(testfn).mode, "r+")
  1593. with open(testfn, "w+"):
  1594. self.assertEqual(get_test_file(testfn).mode, "r+")
  1595. with open(testfn, "a+"):
  1596. self.assertEqual(get_test_file(testfn).mode, "a+")
  1597. # note: "x" bit is not supported
  1598. if PY3:
  1599. safe_rmpath(testfn)
  1600. with open(testfn, "x"):
  1601. self.assertEqual(get_test_file(testfn).mode, "w")
  1602. safe_rmpath(testfn)
  1603. with open(testfn, "x+"):
  1604. self.assertEqual(get_test_file(testfn).mode, "r+")
  1605. def test_open_files_file_gone(self):
  1606. # simulates a file which gets deleted during open_files()
  1607. # execution
  1608. p = psutil.Process()
  1609. files = p.open_files()
  1610. with open(self.get_testfn(), 'w'):
  1611. # give the kernel some time to see the new file
  1612. call_until(p.open_files, "len(ret) != %i" % len(files))
  1613. with mock.patch('psutil._pslinux.os.readlink',
  1614. side_effect=OSError(errno.ENOENT, "")) as m:
  1615. files = p.open_files()
  1616. assert not files
  1617. assert m.called
  1618. # also simulate the case where os.readlink() returns EINVAL
  1619. # in which case psutil is supposed to 'continue'
  1620. with mock.patch('psutil._pslinux.os.readlink',
  1621. side_effect=OSError(errno.EINVAL, "")) as m:
  1622. self.assertEqual(p.open_files(), [])
  1623. assert m.called
  1624. def test_open_files_fd_gone(self):
  1625. # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
  1626. # while iterating through fds.
  1627. # https://travis-ci.org/giampaolo/psutil/jobs/225694530
  1628. p = psutil.Process()
  1629. files = p.open_files()
  1630. with open(self.get_testfn(), 'w'):
  1631. # give the kernel some time to see the new file
  1632. call_until(p.open_files, "len(ret) != %i" % len(files))
  1633. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1634. with mock.patch(patch_point,
  1635. side_effect=IOError(errno.ENOENT, "")) as m:
  1636. files = p.open_files()
  1637. assert not files
  1638. assert m.called
  1639. # --- mocked tests
  1640. def test_terminal_mocked(self):
  1641. with mock.patch('psutil._pslinux._psposix.get_terminal_map',
  1642. return_value={}) as m:
  1643. self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
  1644. assert m.called
  1645. # TODO: re-enable this test.
  1646. # def test_num_ctx_switches_mocked(self):
  1647. # with mock.patch('psutil._common.open', create=True) as m:
  1648. # self.assertRaises(
  1649. # NotImplementedError,
  1650. # psutil._pslinux.Process(os.getpid()).num_ctx_switches)
  1651. # assert m.called
  1652. def test_cmdline_mocked(self):
  1653. # see: https://github.com/giampaolo/psutil/issues/639
  1654. p = psutil.Process()
  1655. fake_file = io.StringIO(u('foo\x00bar\x00'))
  1656. with mock.patch('psutil._common.open',
  1657. return_value=fake_file, create=True) as m:
  1658. self.assertEqual(p.cmdline(), ['foo', 'bar'])
  1659. assert m.called
  1660. fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
  1661. with mock.patch('psutil._common.open',
  1662. return_value=fake_file, create=True) as m:
  1663. self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
  1664. assert m.called
  1665. def test_cmdline_spaces_mocked(self):
  1666. # see: https://github.com/giampaolo/psutil/issues/1179
  1667. p = psutil.Process()
  1668. fake_file = io.StringIO(u('foo bar '))
  1669. with mock.patch('psutil._common.open',
  1670. return_value=fake_file, create=True) as m:
  1671. self.assertEqual(p.cmdline(), ['foo', 'bar'])
  1672. assert m.called
  1673. fake_file = io.StringIO(u('foo bar '))
  1674. with mock.patch('psutil._common.open',
  1675. return_value=fake_file, create=True) as m:
  1676. self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
  1677. assert m.called
  1678. def test_cmdline_mixed_separators(self):
  1679. # https://github.com/giampaolo/psutil/issues/
  1680. # 1179#issuecomment-552984549
  1681. p = psutil.Process()
  1682. fake_file = io.StringIO(u('foo\x20bar\x00'))
  1683. with mock.patch('psutil._common.open',
  1684. return_value=fake_file, create=True) as m:
  1685. self.assertEqual(p.cmdline(), ['foo', 'bar'])
  1686. assert m.called
  1687. def test_readlink_path_deleted_mocked(self):
  1688. with mock.patch('psutil._pslinux.os.readlink',
  1689. return_value='/home/foo (deleted)'):
  1690. self.assertEqual(psutil.Process().exe(), "/home/foo")
  1691. self.assertEqual(psutil.Process().cwd(), "/home/foo")
  1692. def test_threads_mocked(self):
  1693. # Test the case where os.listdir() returns a file (thread)
  1694. # which no longer exists by the time we open() it (race
  1695. # condition). threads() is supposed to ignore that instead
  1696. # of raising NSP.
  1697. def open_mock(name, *args, **kwargs):
  1698. if name.startswith('/proc/%s/task' % os.getpid()):
  1699. raise IOError(errno.ENOENT, "")
  1700. else:
  1701. return orig_open(name, *args, **kwargs)
  1702. orig_open = open
  1703. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1704. with mock.patch(patch_point, side_effect=open_mock) as m:
  1705. ret = psutil.Process().threads()
  1706. assert m.called
  1707. self.assertEqual(ret, [])
  1708. # ...but if it bumps into something != ENOENT we want an
  1709. # exception.
  1710. def open_mock(name, *args, **kwargs):
  1711. if name.startswith('/proc/%s/task' % os.getpid()):
  1712. raise IOError(errno.EPERM, "")
  1713. else:
  1714. return orig_open(name, *args, **kwargs)
  1715. with mock.patch(patch_point, side_effect=open_mock):
  1716. self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
  1717. def test_exe_mocked(self):
  1718. with mock.patch('psutil._pslinux.readlink',
  1719. side_effect=OSError(errno.ENOENT, "")) as m1:
  1720. with mock.patch('psutil.Process.cmdline',
  1721. side_effect=psutil.AccessDenied(0, "")) as m2:
  1722. # No such file error; might be raised also if /proc/pid/exe
  1723. # path actually exists for system processes with low pids
  1724. # (about 0-20). In this case psutil is supposed to return
  1725. # an empty string.
  1726. ret = psutil.Process().exe()
  1727. assert m1.called
  1728. assert m2.called
  1729. self.assertEqual(ret, "")
  1730. # ...but if /proc/pid no longer exist we're supposed to treat
  1731. # it as an alias for zombie process
  1732. with mock.patch('psutil._pslinux.os.path.lexists',
  1733. return_value=False):
  1734. self.assertRaises(
  1735. psutil.ZombieProcess, psutil.Process().exe)
  1736. def test_issue_1014(self):
  1737. # Emulates a case where smaps file does not exist. In this case
  1738. # wrap_exception decorator should not raise NoSuchProcess.
  1739. with mock_open_exception(
  1740. '/proc/%s/smaps' % os.getpid(),
  1741. IOError(errno.ENOENT, "")) as m:
  1742. p = psutil.Process()
  1743. with self.assertRaises(FileNotFoundError):
  1744. p.memory_maps()
  1745. assert m.called
  1746. @unittest.skipIf(not HAS_RLIMIT, "not supported")
  1747. def test_rlimit_zombie(self):
  1748. # Emulate a case where rlimit() raises ENOSYS, which may
  1749. # happen in case of zombie process:
  1750. # https://travis-ci.org/giampaolo/psutil/jobs/51368273
  1751. with mock.patch("psutil._pslinux.cext.linux_prlimit",
  1752. side_effect=OSError(errno.ENOSYS, "")) as m:
  1753. p = psutil.Process()
  1754. p.name()
  1755. with self.assertRaises(psutil.ZombieProcess) as exc:
  1756. p.rlimit(psutil.RLIMIT_NOFILE)
  1757. assert m.called
  1758. self.assertEqual(exc.exception.pid, p.pid)
  1759. self.assertEqual(exc.exception.name, p.name())
  1760. def test_cwd_zombie(self):
  1761. with mock.patch("psutil._pslinux.os.readlink",
  1762. side_effect=OSError(errno.ENOENT, "")) as m:
  1763. p = psutil.Process()
  1764. p.name()
  1765. with self.assertRaises(psutil.ZombieProcess) as exc:
  1766. p.cwd()
  1767. assert m.called
  1768. self.assertEqual(exc.exception.pid, p.pid)
  1769. self.assertEqual(exc.exception.name, p.name())
  1770. def test_stat_file_parsing(self):
  1771. from psutil._pslinux import CLOCK_TICKS
  1772. args = [
  1773. "0", # pid
  1774. "(cat)", # name
  1775. "Z", # status
  1776. "1", # ppid
  1777. "0", # pgrp
  1778. "0", # session
  1779. "0", # tty
  1780. "0", # tpgid
  1781. "0", # flags
  1782. "0", # minflt
  1783. "0", # cminflt
  1784. "0", # majflt
  1785. "0", # cmajflt
  1786. "2", # utime
  1787. "3", # stime
  1788. "4", # cutime
  1789. "5", # cstime
  1790. "0", # priority
  1791. "0", # nice
  1792. "0", # num_threads
  1793. "0", # itrealvalue
  1794. "6", # starttime
  1795. "0", # vsize
  1796. "0", # rss
  1797. "0", # rsslim
  1798. "0", # startcode
  1799. "0", # endcode
  1800. "0", # startstack
  1801. "0", # kstkesp
  1802. "0", # kstkeip
  1803. "0", # signal
  1804. "0", # blocked
  1805. "0", # sigignore
  1806. "0", # sigcatch
  1807. "0", # wchan
  1808. "0", # nswap
  1809. "0", # cnswap
  1810. "0", # exit_signal
  1811. "6", # processor
  1812. "0", # rt priority
  1813. "0", # policy
  1814. "7", # delayacct_blkio_ticks
  1815. ]
  1816. content = " ".join(args).encode()
  1817. with mock_open_content('/proc/%s/stat' % os.getpid(), content):
  1818. p = psutil.Process()
  1819. self.assertEqual(p.name(), 'cat')
  1820. self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
  1821. self.assertEqual(p.ppid(), 1)
  1822. self.assertEqual(
  1823. p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
  1824. cpu = p.cpu_times()
  1825. self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
  1826. self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
  1827. self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
  1828. self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
  1829. self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
  1830. self.assertEqual(p.cpu_num(), 6)
  1831. def test_status_file_parsing(self):
  1832. with mock_open_content(
  1833. '/proc/%s/status' % os.getpid(),
  1834. textwrap.dedent("""\
  1835. Uid:\t1000\t1001\t1002\t1003
  1836. Gid:\t1004\t1005\t1006\t1007
  1837. Threads:\t66
  1838. Cpus_allowed:\tf
  1839. Cpus_allowed_list:\t0-7
  1840. voluntary_ctxt_switches:\t12
  1841. nonvoluntary_ctxt_switches:\t13""").encode()):
  1842. p = psutil.Process()
  1843. self.assertEqual(p.num_ctx_switches().voluntary, 12)
  1844. self.assertEqual(p.num_ctx_switches().involuntary, 13)
  1845. self.assertEqual(p.num_threads(), 66)
  1846. uids = p.uids()
  1847. self.assertEqual(uids.real, 1000)
  1848. self.assertEqual(uids.effective, 1001)
  1849. self.assertEqual(uids.saved, 1002)
  1850. gids = p.gids()
  1851. self.assertEqual(gids.real, 1004)
  1852. self.assertEqual(gids.effective, 1005)
  1853. self.assertEqual(gids.saved, 1006)
  1854. self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
  1855. @unittest.skipIf(not LINUX, "LINUX only")
  1856. class TestProcessAgainstStatus(PsutilTestCase):
  1857. """/proc/pid/stat and /proc/pid/status have many values in common.
  1858. Whenever possible, psutil uses /proc/pid/stat (it's faster).
  1859. For all those cases we check that the value found in
  1860. /proc/pid/stat (by psutil) matches the one found in
  1861. /proc/pid/status.
  1862. """
  1863. @classmethod
  1864. def setUpClass(cls):
  1865. cls.proc = psutil.Process()
  1866. def read_status_file(self, linestart):
  1867. with psutil._psplatform.open_text(
  1868. '/proc/%s/status' % self.proc.pid) as f:
  1869. for line in f:
  1870. line = line.strip()
  1871. if line.startswith(linestart):
  1872. value = line.partition('\t')[2]
  1873. try:
  1874. return int(value)
  1875. except ValueError:
  1876. return value
  1877. raise ValueError("can't find %r" % linestart)
  1878. def test_name(self):
  1879. value = self.read_status_file("Name:")
  1880. self.assertEqual(self.proc.name(), value)
  1881. def test_status(self):
  1882. value = self.read_status_file("State:")
  1883. value = value[value.find('(') + 1:value.rfind(')')]
  1884. value = value.replace(' ', '-')
  1885. self.assertEqual(self.proc.status(), value)
  1886. def test_ppid(self):
  1887. value = self.read_status_file("PPid:")
  1888. self.assertEqual(self.proc.ppid(), value)
  1889. def test_num_threads(self):
  1890. value = self.read_status_file("Threads:")
  1891. self.assertEqual(self.proc.num_threads(), value)
  1892. def test_uids(self):
  1893. value = self.read_status_file("Uid:")
  1894. value = tuple(map(int, value.split()[1:4]))
  1895. self.assertEqual(self.proc.uids(), value)
  1896. def test_gids(self):
  1897. value = self.read_status_file("Gid:")
  1898. value = tuple(map(int, value.split()[1:4]))
  1899. self.assertEqual(self.proc.gids(), value)
  1900. @retry_on_failure()
  1901. def test_num_ctx_switches(self):
  1902. value = self.read_status_file("voluntary_ctxt_switches:")
  1903. self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
  1904. value = self.read_status_file("nonvoluntary_ctxt_switches:")
  1905. self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
  1906. def test_cpu_affinity(self):
  1907. value = self.read_status_file("Cpus_allowed_list:")
  1908. if '-' in str(value):
  1909. min_, max_ = map(int, value.split('-'))
  1910. self.assertEqual(
  1911. self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
  1912. def test_cpu_affinity_eligible_cpus(self):
  1913. value = self.read_status_file("Cpus_allowed_list:")
  1914. with mock.patch("psutil._pslinux.per_cpu_times") as m:
  1915. self.proc._proc._get_eligible_cpus()
  1916. if '-' in str(value):
  1917. assert not m.called
  1918. else:
  1919. assert m.called
  1920. # =====================================================================
  1921. # --- test utils
  1922. # =====================================================================
  1923. @unittest.skipIf(not LINUX, "LINUX only")
  1924. class TestUtils(PsutilTestCase):
  1925. def test_readlink(self):
  1926. with mock.patch("os.readlink", return_value="foo (deleted)") as m:
  1927. self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
  1928. assert m.called
  1929. def test_cat(self):
  1930. testfn = self.get_testfn()
  1931. with open(testfn, "wt") as f:
  1932. f.write("foo ")
  1933. self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo")
  1934. self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo")
  1935. self.assertEqual(
  1936. psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar")
  1937. if __name__ == '__main__':
  1938. from psutil.tests.runner import run_from_name
  1939. run_from_name(__file__)