test_osx.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """MACOS specific tests."""
  6. import os
  7. import re
  8. import time
  9. import psutil
  10. from psutil import MACOS
  11. from psutil.tests import HAS_BATTERY
  12. from psutil.tests import PsutilTestCase
  13. from psutil.tests import retry_on_failure
  14. from psutil.tests import sh
  15. from psutil.tests import spawn_testproc
  16. from psutil.tests import terminate
  17. from psutil.tests import TOLERANCE_DISK_USAGE
  18. from psutil.tests import TOLERANCE_SYS_MEM
  19. from psutil.tests import unittest
  20. PAGESIZE = os.sysconf("SC_PAGE_SIZE") if MACOS else None
  21. def sysctl(cmdline):
  22. """Expects a sysctl command with an argument and parse the result
  23. returning only the value of interest.
  24. """
  25. out = sh(cmdline)
  26. result = out.split()[1]
  27. try:
  28. return int(result)
  29. except ValueError:
  30. return result
  31. def vm_stat(field):
  32. """Wrapper around 'vm_stat' cmdline utility."""
  33. out = sh('vm_stat')
  34. for line in out.split('\n'):
  35. if field in line:
  36. break
  37. else:
  38. raise ValueError("line not found")
  39. return int(re.search(r'\d+', line).group(0)) * PAGESIZE
  40. # http://code.activestate.com/recipes/578019/
  41. def human2bytes(s):
  42. SYMBOLS = {
  43. 'customary': ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'),
  44. }
  45. init = s
  46. num = ""
  47. while s and s[0:1].isdigit() or s[0:1] == '.':
  48. num += s[0]
  49. s = s[1:]
  50. num = float(num)
  51. letter = s.strip()
  52. for name, sset in SYMBOLS.items():
  53. if letter in sset:
  54. break
  55. else:
  56. if letter == 'k':
  57. sset = SYMBOLS['customary']
  58. letter = letter.upper()
  59. else:
  60. raise ValueError("can't interpret %r" % init)
  61. prefix = {sset[0]: 1}
  62. for i, s in enumerate(sset[1:]):
  63. prefix[s] = 1 << (i + 1) * 10
  64. return int(num * prefix[letter])
  65. @unittest.skipIf(not MACOS, "MACOS only")
  66. class TestProcess(PsutilTestCase):
  67. @classmethod
  68. def setUpClass(cls):
  69. cls.pid = spawn_testproc().pid
  70. @classmethod
  71. def tearDownClass(cls):
  72. terminate(cls.pid)
  73. def test_process_create_time(self):
  74. output = sh("ps -o lstart -p %s" % self.pid)
  75. start_ps = output.replace('STARTED', '').strip()
  76. hhmmss = start_ps.split(' ')[-2]
  77. year = start_ps.split(' ')[-1]
  78. start_psutil = psutil.Process(self.pid).create_time()
  79. self.assertEqual(
  80. hhmmss,
  81. time.strftime("%H:%M:%S", time.localtime(start_psutil)))
  82. self.assertEqual(
  83. year,
  84. time.strftime("%Y", time.localtime(start_psutil)))
  85. @unittest.skipIf(not MACOS, "MACOS only")
  86. class TestSystemAPIs(PsutilTestCase):
  87. # --- disk
  88. @retry_on_failure()
  89. def test_disks(self):
  90. # test psutil.disk_usage() and psutil.disk_partitions()
  91. # against "df -a"
  92. def df(path):
  93. out = sh('df -k "%s"' % path).strip()
  94. lines = out.split('\n')
  95. lines.pop(0)
  96. line = lines.pop(0)
  97. dev, total, used, free = line.split()[:4]
  98. if dev == 'none':
  99. dev = ''
  100. total = int(total) * 1024
  101. used = int(used) * 1024
  102. free = int(free) * 1024
  103. return dev, total, used, free
  104. for part in psutil.disk_partitions(all=False):
  105. usage = psutil.disk_usage(part.mountpoint)
  106. dev, total, used, free = df(part.mountpoint)
  107. self.assertEqual(part.device, dev)
  108. self.assertEqual(usage.total, total)
  109. self.assertAlmostEqual(usage.free, free,
  110. delta=TOLERANCE_DISK_USAGE)
  111. self.assertAlmostEqual(usage.used, used,
  112. delta=TOLERANCE_DISK_USAGE)
  113. # --- cpu
  114. def test_cpu_count_logical(self):
  115. num = sysctl("sysctl hw.logicalcpu")
  116. self.assertEqual(num, psutil.cpu_count(logical=True))
  117. def test_cpu_count_physical(self):
  118. num = sysctl("sysctl hw.physicalcpu")
  119. self.assertEqual(num, psutil.cpu_count(logical=False))
  120. def test_cpu_freq(self):
  121. freq = psutil.cpu_freq()
  122. self.assertEqual(
  123. freq.current * 1000 * 1000, sysctl("sysctl hw.cpufrequency"))
  124. self.assertEqual(
  125. freq.min * 1000 * 1000, sysctl("sysctl hw.cpufrequency_min"))
  126. self.assertEqual(
  127. freq.max * 1000 * 1000, sysctl("sysctl hw.cpufrequency_max"))
  128. # --- virtual mem
  129. def test_vmem_total(self):
  130. sysctl_hwphymem = sysctl('sysctl hw.memsize')
  131. self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total)
  132. @retry_on_failure()
  133. def test_vmem_free(self):
  134. vmstat_val = vm_stat("free")
  135. psutil_val = psutil.virtual_memory().free
  136. self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM)
  137. @retry_on_failure()
  138. def test_vmem_active(self):
  139. vmstat_val = vm_stat("active")
  140. psutil_val = psutil.virtual_memory().active
  141. self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM)
  142. @retry_on_failure()
  143. def test_vmem_inactive(self):
  144. vmstat_val = vm_stat("inactive")
  145. psutil_val = psutil.virtual_memory().inactive
  146. self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM)
  147. @retry_on_failure()
  148. def test_vmem_wired(self):
  149. vmstat_val = vm_stat("wired")
  150. psutil_val = psutil.virtual_memory().wired
  151. self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM)
  152. # --- swap mem
  153. @retry_on_failure()
  154. def test_swapmem_sin(self):
  155. vmstat_val = vm_stat("Pageins")
  156. psutil_val = psutil.swap_memory().sin
  157. self.assertEqual(psutil_val, vmstat_val)
  158. @retry_on_failure()
  159. def test_swapmem_sout(self):
  160. vmstat_val = vm_stat("Pageout")
  161. psutil_val = psutil.swap_memory().sout
  162. self.assertEqual(psutil_val, vmstat_val)
  163. # Not very reliable.
  164. # def test_swapmem_total(self):
  165. # out = sh('sysctl vm.swapusage')
  166. # out = out.replace('vm.swapusage: ', '')
  167. # total, used, free = re.findall('\d+.\d+\w', out)
  168. # psutil_smem = psutil.swap_memory()
  169. # self.assertEqual(psutil_smem.total, human2bytes(total))
  170. # self.assertEqual(psutil_smem.used, human2bytes(used))
  171. # self.assertEqual(psutil_smem.free, human2bytes(free))
  172. # --- network
  173. def test_net_if_stats(self):
  174. for name, stats in psutil.net_if_stats().items():
  175. try:
  176. out = sh("ifconfig %s" % name)
  177. except RuntimeError:
  178. pass
  179. else:
  180. self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
  181. self.assertEqual(stats.mtu,
  182. int(re.findall(r'mtu (\d+)', out)[0]))
  183. # --- sensors_battery
  184. @unittest.skipIf(not HAS_BATTERY, "no battery")
  185. def test_sensors_battery(self):
  186. out = sh("pmset -g batt")
  187. percent = re.search(r"(\d+)%", out).group(1)
  188. drawing_from = re.search("Now drawing from '([^']+)'", out).group(1)
  189. power_plugged = drawing_from == "AC Power"
  190. psutil_result = psutil.sensors_battery()
  191. self.assertEqual(psutil_result.power_plugged, power_plugged)
  192. self.assertEqual(psutil_result.percent, int(percent))
  193. if __name__ == '__main__':
  194. from psutil.tests.runner import run_from_name
  195. run_from_name(__file__)