test_windows.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841
  1. #!/usr/bin/env python3
  2. # -*- coding: UTF-8 -*
  3. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  4. # Use of this source code is governed by a BSD-style license that can be
  5. # found in the LICENSE file.
  6. """Windows specific tests."""
  7. import datetime
  8. import errno
  9. import glob
  10. import os
  11. import platform
  12. import re
  13. import signal
  14. import subprocess
  15. import sys
  16. import time
  17. import warnings
  18. import psutil
  19. from psutil import WINDOWS
  20. from psutil._compat import FileNotFoundError
  21. from psutil._compat import super
  22. from psutil.tests import APPVEYOR
  23. from psutil.tests import GITHUB_WHEELS
  24. from psutil.tests import HAS_BATTERY
  25. from psutil.tests import IS_64BIT
  26. from psutil.tests import mock
  27. from psutil.tests import PsutilTestCase
  28. from psutil.tests import PY3
  29. from psutil.tests import PYPY
  30. from psutil.tests import retry_on_failure
  31. from psutil.tests import sh
  32. from psutil.tests import spawn_testproc
  33. from psutil.tests import terminate
  34. from psutil.tests import TOLERANCE_DISK_USAGE
  35. from psutil.tests import unittest
  36. if WINDOWS and not PYPY:
  37. with warnings.catch_warnings():
  38. warnings.simplefilter("ignore")
  39. import win32api # requires "pip install pywin32"
  40. import win32con
  41. import win32process
  42. import wmi # requires "pip install wmi" / "make setup-dev-env"
  43. cext = psutil._psplatform.cext
  44. def wrap_exceptions(fun):
  45. def wrapper(self, *args, **kwargs):
  46. try:
  47. return fun(self, *args, **kwargs)
  48. except OSError as err:
  49. from psutil._pswindows import ACCESS_DENIED_SET
  50. if err.errno in ACCESS_DENIED_SET:
  51. raise psutil.AccessDenied(None, None)
  52. if err.errno == errno.ESRCH:
  53. raise psutil.NoSuchProcess(None, None)
  54. raise
  55. return wrapper
  56. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  57. @unittest.skipIf(PYPY, "pywin32 not available on PYPY")
  58. # https://github.com/giampaolo/psutil/pull/1762#issuecomment-632892692
  59. @unittest.skipIf(GITHUB_WHEELS and (not PY3 or not IS_64BIT),
  60. "pywin32 broken on GITHUB + PY2")
  61. class WindowsTestCase(PsutilTestCase):
  62. pass
  63. # ===================================================================
  64. # System APIs
  65. # ===================================================================
  66. class TestCpuAPIs(WindowsTestCase):
  67. @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ,
  68. 'NUMBER_OF_PROCESSORS env var is not available')
  69. def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self):
  70. # Will likely fail on many-cores systems:
  71. # https://stackoverflow.com/questions/31209256
  72. num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
  73. self.assertEqual(num_cpus, psutil.cpu_count())
  74. def test_cpu_count_vs_GetSystemInfo(self):
  75. # Will likely fail on many-cores systems:
  76. # https://stackoverflow.com/questions/31209256
  77. sys_value = win32api.GetSystemInfo()[5]
  78. psutil_value = psutil.cpu_count()
  79. self.assertEqual(sys_value, psutil_value)
  80. def test_cpu_count_logical_vs_wmi(self):
  81. w = wmi.WMI()
  82. proc = w.Win32_Processor()[0]
  83. self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors)
  84. def test_cpu_count_phys_vs_wmi(self):
  85. w = wmi.WMI()
  86. proc = w.Win32_Processor()[0]
  87. self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores)
  88. def test_cpu_count_vs_cpu_times(self):
  89. self.assertEqual(psutil.cpu_count(),
  90. len(psutil.cpu_times(percpu=True)))
  91. def test_cpu_freq(self):
  92. w = wmi.WMI()
  93. proc = w.Win32_Processor()[0]
  94. self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current)
  95. self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max)
  96. class TestSystemAPIs(WindowsTestCase):
  97. def test_nic_names(self):
  98. out = sh('ipconfig /all')
  99. nics = psutil.net_io_counters(pernic=True).keys()
  100. for nic in nics:
  101. if "pseudo-interface" in nic.replace(' ', '-').lower():
  102. continue
  103. if nic not in out:
  104. self.fail(
  105. "%r nic wasn't found in 'ipconfig /all' output" % nic)
  106. def test_total_phymem(self):
  107. w = wmi.WMI().Win32_ComputerSystem()[0]
  108. self.assertEqual(int(w.TotalPhysicalMemory),
  109. psutil.virtual_memory().total)
  110. # @unittest.skipIf(wmi is None, "wmi module is not installed")
  111. # def test__UPTIME(self):
  112. # # _UPTIME constant is not public but it is used internally
  113. # # as value to return for pid 0 creation time.
  114. # # WMI behaves the same.
  115. # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  116. # p = psutil.Process(0)
  117. # wmic_create = str(w.CreationDate.split('.')[0])
  118. # psutil_create = time.strftime("%Y%m%d%H%M%S",
  119. # time.localtime(p.create_time()))
  120. # Note: this test is not very reliable
  121. @unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
  122. @retry_on_failure()
  123. def test_pids(self):
  124. # Note: this test might fail if the OS is starting/killing
  125. # other processes in the meantime
  126. w = wmi.WMI().Win32_Process()
  127. wmi_pids = set([x.ProcessId for x in w])
  128. psutil_pids = set(psutil.pids())
  129. self.assertEqual(wmi_pids, psutil_pids)
  130. @retry_on_failure()
  131. def test_disks(self):
  132. ps_parts = psutil.disk_partitions(all=True)
  133. wmi_parts = wmi.WMI().Win32_LogicalDisk()
  134. for ps_part in ps_parts:
  135. for wmi_part in wmi_parts:
  136. if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
  137. if not ps_part.mountpoint:
  138. # this is usually a CD-ROM with no disk inserted
  139. break
  140. if 'cdrom' in ps_part.opts:
  141. break
  142. if ps_part.mountpoint.startswith('A:'):
  143. break # floppy
  144. try:
  145. usage = psutil.disk_usage(ps_part.mountpoint)
  146. except FileNotFoundError:
  147. # usually this is the floppy
  148. break
  149. self.assertEqual(usage.total, int(wmi_part.Size))
  150. wmi_free = int(wmi_part.FreeSpace)
  151. self.assertEqual(usage.free, wmi_free)
  152. # 10 MB tollerance
  153. if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
  154. self.fail("psutil=%s, wmi=%s" % (
  155. usage.free, wmi_free))
  156. break
  157. else:
  158. self.fail("can't find partition %s" % repr(ps_part))
  159. @retry_on_failure()
  160. def test_disk_usage(self):
  161. for disk in psutil.disk_partitions():
  162. if 'cdrom' in disk.opts:
  163. continue
  164. sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
  165. psutil_value = psutil.disk_usage(disk.mountpoint)
  166. self.assertAlmostEqual(sys_value[0], psutil_value.free,
  167. delta=TOLERANCE_DISK_USAGE)
  168. self.assertAlmostEqual(sys_value[1], psutil_value.total,
  169. delta=TOLERANCE_DISK_USAGE)
  170. self.assertEqual(psutil_value.used,
  171. psutil_value.total - psutil_value.free)
  172. def test_disk_partitions(self):
  173. sys_value = [
  174. x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00")
  175. if x and not x.startswith('A:')]
  176. psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)
  177. if not x.mountpoint.startswith('A:')]
  178. self.assertEqual(sys_value, psutil_value)
  179. def test_net_if_stats(self):
  180. ps_names = set(cext.net_if_stats())
  181. wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
  182. wmi_names = set()
  183. for wmi_adapter in wmi_adapters:
  184. wmi_names.add(wmi_adapter.Name)
  185. wmi_names.add(wmi_adapter.NetConnectionID)
  186. self.assertTrue(ps_names & wmi_names,
  187. "no common entries in %s, %s" % (ps_names, wmi_names))
  188. def test_boot_time(self):
  189. wmi_os = wmi.WMI().Win32_OperatingSystem()
  190. wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0]
  191. wmi_btime_dt = datetime.datetime.strptime(
  192. wmi_btime_str, "%Y%m%d%H%M%S")
  193. psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time())
  194. diff = abs((wmi_btime_dt - psutil_dt).total_seconds())
  195. self.assertLessEqual(diff, 3)
  196. def test_boot_time_fluctuation(self):
  197. # https://github.com/giampaolo/psutil/issues/1007
  198. with mock.patch('psutil._pswindows.cext.boot_time', return_value=5):
  199. self.assertEqual(psutil.boot_time(), 5)
  200. with mock.patch('psutil._pswindows.cext.boot_time', return_value=4):
  201. self.assertEqual(psutil.boot_time(), 5)
  202. with mock.patch('psutil._pswindows.cext.boot_time', return_value=6):
  203. self.assertEqual(psutil.boot_time(), 5)
  204. with mock.patch('psutil._pswindows.cext.boot_time', return_value=333):
  205. self.assertEqual(psutil.boot_time(), 333)
  206. # ===================================================================
  207. # sensors_battery()
  208. # ===================================================================
  209. class TestSensorsBattery(WindowsTestCase):
  210. def test_has_battery(self):
  211. if win32api.GetPwrCapabilities()['SystemBatteriesPresent']:
  212. self.assertIsNotNone(psutil.sensors_battery())
  213. else:
  214. self.assertIsNone(psutil.sensors_battery())
  215. @unittest.skipIf(not HAS_BATTERY, "no battery")
  216. def test_percent(self):
  217. w = wmi.WMI()
  218. battery_wmi = w.query('select * from Win32_Battery')[0]
  219. battery_psutil = psutil.sensors_battery()
  220. self.assertAlmostEqual(
  221. battery_psutil.percent, battery_wmi.EstimatedChargeRemaining,
  222. delta=1)
  223. @unittest.skipIf(not HAS_BATTERY, "no battery")
  224. def test_power_plugged(self):
  225. w = wmi.WMI()
  226. battery_wmi = w.query('select * from Win32_Battery')[0]
  227. battery_psutil = psutil.sensors_battery()
  228. # Status codes:
  229. # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx
  230. self.assertEqual(battery_psutil.power_plugged,
  231. battery_wmi.BatteryStatus == 2)
  232. def test_emulate_no_battery(self):
  233. with mock.patch("psutil._pswindows.cext.sensors_battery",
  234. return_value=(0, 128, 0, 0)) as m:
  235. self.assertIsNone(psutil.sensors_battery())
  236. assert m.called
  237. def test_emulate_power_connected(self):
  238. with mock.patch("psutil._pswindows.cext.sensors_battery",
  239. return_value=(1, 0, 0, 0)) as m:
  240. self.assertEqual(psutil.sensors_battery().secsleft,
  241. psutil.POWER_TIME_UNLIMITED)
  242. assert m.called
  243. def test_emulate_power_charging(self):
  244. with mock.patch("psutil._pswindows.cext.sensors_battery",
  245. return_value=(0, 8, 0, 0)) as m:
  246. self.assertEqual(psutil.sensors_battery().secsleft,
  247. psutil.POWER_TIME_UNLIMITED)
  248. assert m.called
  249. def test_emulate_secs_left_unknown(self):
  250. with mock.patch("psutil._pswindows.cext.sensors_battery",
  251. return_value=(0, 0, 0, -1)) as m:
  252. self.assertEqual(psutil.sensors_battery().secsleft,
  253. psutil.POWER_TIME_UNKNOWN)
  254. assert m.called
  255. # ===================================================================
  256. # Process APIs
  257. # ===================================================================
  258. class TestProcess(WindowsTestCase):
  259. @classmethod
  260. def setUpClass(cls):
  261. cls.pid = spawn_testproc().pid
  262. @classmethod
  263. def tearDownClass(cls):
  264. terminate(cls.pid)
  265. def test_issue_24(self):
  266. p = psutil.Process(0)
  267. self.assertRaises(psutil.AccessDenied, p.kill)
  268. def test_special_pid(self):
  269. p = psutil.Process(4)
  270. self.assertEqual(p.name(), 'System')
  271. # use __str__ to access all common Process properties to check
  272. # that nothing strange happens
  273. str(p)
  274. p.username()
  275. self.assertTrue(p.create_time() >= 0.0)
  276. try:
  277. rss, vms = p.memory_info()[:2]
  278. except psutil.AccessDenied:
  279. # expected on Windows Vista and Windows 7
  280. if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
  281. raise
  282. else:
  283. self.assertTrue(rss > 0)
  284. def test_send_signal(self):
  285. p = psutil.Process(self.pid)
  286. self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
  287. def test_num_handles_increment(self):
  288. p = psutil.Process(os.getpid())
  289. before = p.num_handles()
  290. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  291. win32con.FALSE, os.getpid())
  292. after = p.num_handles()
  293. self.assertEqual(after, before + 1)
  294. win32api.CloseHandle(handle)
  295. self.assertEqual(p.num_handles(), before)
  296. @unittest.skipIf(not sys.version_info >= (2, 7),
  297. "CTRL_* signals not supported")
  298. def test_ctrl_signals(self):
  299. p = psutil.Process(self.spawn_testproc().pid)
  300. p.send_signal(signal.CTRL_C_EVENT)
  301. p.send_signal(signal.CTRL_BREAK_EVENT)
  302. p.kill()
  303. p.wait()
  304. self.assertRaises(psutil.NoSuchProcess,
  305. p.send_signal, signal.CTRL_C_EVENT)
  306. self.assertRaises(psutil.NoSuchProcess,
  307. p.send_signal, signal.CTRL_BREAK_EVENT)
  308. def test_username(self):
  309. self.assertEqual(psutil.Process().username(),
  310. win32api.GetUserNameEx(win32con.NameSamCompatible))
  311. def test_cmdline(self):
  312. sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip()
  313. psutil_value = ' '.join(psutil.Process().cmdline())
  314. self.assertEqual(sys_value, psutil_value)
  315. # XXX - occasional failures
  316. # def test_cpu_times(self):
  317. # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  318. # win32con.FALSE, os.getpid())
  319. # self.addCleanup(win32api.CloseHandle, handle)
  320. # sys_value = win32process.GetProcessTimes(handle)
  321. # psutil_value = psutil.Process().cpu_times()
  322. # self.assertAlmostEqual(
  323. # psutil_value.user, sys_value['UserTime'] / 10000000.0,
  324. # delta=0.2)
  325. # self.assertAlmostEqual(
  326. # psutil_value.user, sys_value['KernelTime'] / 10000000.0,
  327. # delta=0.2)
  328. def test_nice(self):
  329. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  330. win32con.FALSE, os.getpid())
  331. self.addCleanup(win32api.CloseHandle, handle)
  332. sys_value = win32process.GetPriorityClass(handle)
  333. psutil_value = psutil.Process().nice()
  334. self.assertEqual(psutil_value, sys_value)
  335. def test_memory_info(self):
  336. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  337. win32con.FALSE, self.pid)
  338. self.addCleanup(win32api.CloseHandle, handle)
  339. sys_value = win32process.GetProcessMemoryInfo(handle)
  340. psutil_value = psutil.Process(self.pid).memory_info()
  341. self.assertEqual(
  342. sys_value['PeakWorkingSetSize'], psutil_value.peak_wset)
  343. self.assertEqual(
  344. sys_value['WorkingSetSize'], psutil_value.wset)
  345. self.assertEqual(
  346. sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool)
  347. self.assertEqual(
  348. sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool)
  349. self.assertEqual(
  350. sys_value['QuotaPeakNonPagedPoolUsage'],
  351. psutil_value.peak_nonpaged_pool)
  352. self.assertEqual(
  353. sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool)
  354. self.assertEqual(
  355. sys_value['PagefileUsage'], psutil_value.pagefile)
  356. self.assertEqual(
  357. sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile)
  358. self.assertEqual(psutil_value.rss, psutil_value.wset)
  359. self.assertEqual(psutil_value.vms, psutil_value.pagefile)
  360. def test_wait(self):
  361. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  362. win32con.FALSE, self.pid)
  363. self.addCleanup(win32api.CloseHandle, handle)
  364. p = psutil.Process(self.pid)
  365. p.terminate()
  366. psutil_value = p.wait()
  367. sys_value = win32process.GetExitCodeProcess(handle)
  368. self.assertEqual(psutil_value, sys_value)
  369. def test_cpu_affinity(self):
  370. def from_bitmask(x):
  371. return [i for i in range(64) if (1 << i) & x]
  372. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  373. win32con.FALSE, self.pid)
  374. self.addCleanup(win32api.CloseHandle, handle)
  375. sys_value = from_bitmask(
  376. win32process.GetProcessAffinityMask(handle)[0])
  377. psutil_value = psutil.Process(self.pid).cpu_affinity()
  378. self.assertEqual(psutil_value, sys_value)
  379. def test_io_counters(self):
  380. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  381. win32con.FALSE, os.getpid())
  382. self.addCleanup(win32api.CloseHandle, handle)
  383. sys_value = win32process.GetProcessIoCounters(handle)
  384. psutil_value = psutil.Process().io_counters()
  385. self.assertEqual(
  386. psutil_value.read_count, sys_value['ReadOperationCount'])
  387. self.assertEqual(
  388. psutil_value.write_count, sys_value['WriteOperationCount'])
  389. self.assertEqual(
  390. psutil_value.read_bytes, sys_value['ReadTransferCount'])
  391. self.assertEqual(
  392. psutil_value.write_bytes, sys_value['WriteTransferCount'])
  393. self.assertEqual(
  394. psutil_value.other_count, sys_value['OtherOperationCount'])
  395. self.assertEqual(
  396. psutil_value.other_bytes, sys_value['OtherTransferCount'])
  397. def test_num_handles(self):
  398. import ctypes
  399. import ctypes.wintypes
  400. PROCESS_QUERY_INFORMATION = 0x400
  401. handle = ctypes.windll.kernel32.OpenProcess(
  402. PROCESS_QUERY_INFORMATION, 0, self.pid)
  403. self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle)
  404. hndcnt = ctypes.wintypes.DWORD()
  405. ctypes.windll.kernel32.GetProcessHandleCount(
  406. handle, ctypes.byref(hndcnt))
  407. sys_value = hndcnt.value
  408. psutil_value = psutil.Process(self.pid).num_handles()
  409. self.assertEqual(psutil_value, sys_value)
  410. def test_error_partial_copy(self):
  411. # https://github.com/giampaolo/psutil/issues/875
  412. exc = WindowsError()
  413. exc.winerror = 299
  414. with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc):
  415. with mock.patch("time.sleep") as m:
  416. p = psutil.Process()
  417. self.assertRaises(psutil.AccessDenied, p.cwd)
  418. self.assertGreaterEqual(m.call_count, 5)
  419. def test_exe(self):
  420. # NtQuerySystemInformation succeeds if process is gone. Make sure
  421. # it raises NSP for a non existent pid.
  422. pid = psutil.pids()[-1] + 99999
  423. proc = psutil._psplatform.Process(pid)
  424. self.assertRaises(psutil.NoSuchProcess, proc.exe)
  425. class TestProcessWMI(WindowsTestCase):
  426. """Compare Process API results with WMI."""
  427. @classmethod
  428. def setUpClass(cls):
  429. cls.pid = spawn_testproc().pid
  430. @classmethod
  431. def tearDownClass(cls):
  432. terminate(cls.pid)
  433. def test_name(self):
  434. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  435. p = psutil.Process(self.pid)
  436. self.assertEqual(p.name(), w.Caption)
  437. # This fail on github because using virtualenv for test environment
  438. @unittest.skipIf(GITHUB_WHEELS, "unreliable path on GITHUB_WHEELS")
  439. def test_exe(self):
  440. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  441. p = psutil.Process(self.pid)
  442. # Note: wmi reports the exe as a lower case string.
  443. # Being Windows paths case-insensitive we ignore that.
  444. self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
  445. def test_cmdline(self):
  446. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  447. p = psutil.Process(self.pid)
  448. self.assertEqual(' '.join(p.cmdline()),
  449. w.CommandLine.replace('"', ''))
  450. def test_username(self):
  451. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  452. p = psutil.Process(self.pid)
  453. domain, _, username = w.GetOwner()
  454. username = "%s\\%s" % (domain, username)
  455. self.assertEqual(p.username(), username)
  456. @retry_on_failure()
  457. def test_memory_rss(self):
  458. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  459. p = psutil.Process(self.pid)
  460. rss = p.memory_info().rss
  461. self.assertEqual(rss, int(w.WorkingSetSize))
  462. @retry_on_failure()
  463. def test_memory_vms(self):
  464. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  465. p = psutil.Process(self.pid)
  466. vms = p.memory_info().vms
  467. # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
  468. # ...claims that PageFileUsage is represented in Kilo
  469. # bytes but funnily enough on certain platforms bytes are
  470. # returned instead.
  471. wmi_usage = int(w.PageFileUsage)
  472. if (vms != wmi_usage) and (vms != wmi_usage * 1024):
  473. self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
  474. def test_create_time(self):
  475. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  476. p = psutil.Process(self.pid)
  477. wmic_create = str(w.CreationDate.split('.')[0])
  478. psutil_create = time.strftime("%Y%m%d%H%M%S",
  479. time.localtime(p.create_time()))
  480. self.assertEqual(wmic_create, psutil_create)
  481. # ---
  482. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  483. class TestDualProcessImplementation(PsutilTestCase):
  484. """
  485. Certain APIs on Windows have 2 internal implementations, one
  486. based on documented Windows APIs, another one based
  487. NtQuerySystemInformation() which gets called as fallback in
  488. case the first fails because of limited permission error.
  489. Here we test that the two methods return the exact same value,
  490. see:
  491. https://github.com/giampaolo/psutil/issues/304
  492. """
  493. @classmethod
  494. def setUpClass(cls):
  495. cls.pid = spawn_testproc().pid
  496. @classmethod
  497. def tearDownClass(cls):
  498. terminate(cls.pid)
  499. def test_memory_info(self):
  500. mem_1 = psutil.Process(self.pid).memory_info()
  501. with mock.patch("psutil._psplatform.cext.proc_memory_info",
  502. side_effect=OSError(errno.EPERM, "msg")) as fun:
  503. mem_2 = psutil.Process(self.pid).memory_info()
  504. self.assertEqual(len(mem_1), len(mem_2))
  505. for i in range(len(mem_1)):
  506. self.assertGreaterEqual(mem_1[i], 0)
  507. self.assertGreaterEqual(mem_2[i], 0)
  508. self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512)
  509. assert fun.called
  510. def test_create_time(self):
  511. ctime = psutil.Process(self.pid).create_time()
  512. with mock.patch("psutil._psplatform.cext.proc_times",
  513. side_effect=OSError(errno.EPERM, "msg")) as fun:
  514. self.assertEqual(psutil.Process(self.pid).create_time(), ctime)
  515. assert fun.called
  516. def test_cpu_times(self):
  517. cpu_times_1 = psutil.Process(self.pid).cpu_times()
  518. with mock.patch("psutil._psplatform.cext.proc_times",
  519. side_effect=OSError(errno.EPERM, "msg")) as fun:
  520. cpu_times_2 = psutil.Process(self.pid).cpu_times()
  521. assert fun.called
  522. self.assertAlmostEqual(
  523. cpu_times_1.user, cpu_times_2.user, delta=0.01)
  524. self.assertAlmostEqual(
  525. cpu_times_1.system, cpu_times_2.system, delta=0.01)
  526. def test_io_counters(self):
  527. io_counters_1 = psutil.Process(self.pid).io_counters()
  528. with mock.patch("psutil._psplatform.cext.proc_io_counters",
  529. side_effect=OSError(errno.EPERM, "msg")) as fun:
  530. io_counters_2 = psutil.Process(self.pid).io_counters()
  531. for i in range(len(io_counters_1)):
  532. self.assertAlmostEqual(
  533. io_counters_1[i], io_counters_2[i], delta=5)
  534. assert fun.called
  535. def test_num_handles(self):
  536. num_handles = psutil.Process(self.pid).num_handles()
  537. with mock.patch("psutil._psplatform.cext.proc_num_handles",
  538. side_effect=OSError(errno.EPERM, "msg")) as fun:
  539. self.assertEqual(psutil.Process(self.pid).num_handles(),
  540. num_handles)
  541. assert fun.called
  542. def test_cmdline(self):
  543. from psutil._pswindows import convert_oserror
  544. for pid in psutil.pids():
  545. try:
  546. a = cext.proc_cmdline(pid, use_peb=True)
  547. b = cext.proc_cmdline(pid, use_peb=False)
  548. except OSError as err:
  549. err = convert_oserror(err)
  550. if not isinstance(err, (psutil.AccessDenied,
  551. psutil.NoSuchProcess)):
  552. raise
  553. else:
  554. self.assertEqual(a, b)
  555. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  556. class RemoteProcessTestCase(PsutilTestCase):
  557. """Certain functions require calling ReadProcessMemory.
  558. This trivially works when called on the current process.
  559. Check that this works on other processes, especially when they
  560. have a different bitness.
  561. """
  562. @staticmethod
  563. def find_other_interpreter():
  564. # find a python interpreter that is of the opposite bitness from us
  565. code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))"
  566. # XXX: a different and probably more stable approach might be to access
  567. # the registry but accessing 64 bit paths from a 32 bit process
  568. for filename in glob.glob(r"C:\Python*\python.exe"):
  569. proc = subprocess.Popen(args=[filename, "-c", code],
  570. stdout=subprocess.PIPE,
  571. stderr=subprocess.STDOUT)
  572. output, _ = proc.communicate()
  573. proc.wait()
  574. if output == str(not IS_64BIT):
  575. return filename
  576. test_args = ["-c", "import sys; sys.stdin.read()"]
  577. def setUp(self):
  578. super().setUp()
  579. other_python = self.find_other_interpreter()
  580. if other_python is None:
  581. raise unittest.SkipTest(
  582. "could not find interpreter with opposite bitness")
  583. if IS_64BIT:
  584. self.python64 = sys.executable
  585. self.python32 = other_python
  586. else:
  587. self.python64 = other_python
  588. self.python32 = sys.executable
  589. env = os.environ.copy()
  590. env["THINK_OF_A_NUMBER"] = str(os.getpid())
  591. self.proc32 = self.spawn_testproc(
  592. [self.python32] + self.test_args,
  593. env=env,
  594. stdin=subprocess.PIPE)
  595. self.proc64 = self.spawn_testproc(
  596. [self.python64] + self.test_args,
  597. env=env,
  598. stdin=subprocess.PIPE)
  599. def tearDown(self):
  600. super().tearDown()
  601. self.proc32.communicate()
  602. self.proc64.communicate()
  603. def test_cmdline_32(self):
  604. p = psutil.Process(self.proc32.pid)
  605. self.assertEqual(len(p.cmdline()), 3)
  606. self.assertEqual(p.cmdline()[1:], self.test_args)
  607. def test_cmdline_64(self):
  608. p = psutil.Process(self.proc64.pid)
  609. self.assertEqual(len(p.cmdline()), 3)
  610. self.assertEqual(p.cmdline()[1:], self.test_args)
  611. def test_cwd_32(self):
  612. p = psutil.Process(self.proc32.pid)
  613. self.assertEqual(p.cwd(), os.getcwd())
  614. def test_cwd_64(self):
  615. p = psutil.Process(self.proc64.pid)
  616. self.assertEqual(p.cwd(), os.getcwd())
  617. def test_environ_32(self):
  618. p = psutil.Process(self.proc32.pid)
  619. e = p.environ()
  620. self.assertIn("THINK_OF_A_NUMBER", e)
  621. self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
  622. def test_environ_64(self):
  623. p = psutil.Process(self.proc64.pid)
  624. try:
  625. p.environ()
  626. except psutil.AccessDenied:
  627. pass
  628. # ===================================================================
  629. # Windows services
  630. # ===================================================================
  631. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  632. class TestServices(PsutilTestCase):
  633. def test_win_service_iter(self):
  634. valid_statuses = set([
  635. "running",
  636. "paused",
  637. "start",
  638. "pause",
  639. "continue",
  640. "stop",
  641. "stopped",
  642. ])
  643. valid_start_types = set([
  644. "automatic",
  645. "manual",
  646. "disabled",
  647. ])
  648. valid_statuses = set([
  649. "running",
  650. "paused",
  651. "start_pending",
  652. "pause_pending",
  653. "continue_pending",
  654. "stop_pending",
  655. "stopped"
  656. ])
  657. for serv in psutil.win_service_iter():
  658. data = serv.as_dict()
  659. self.assertIsInstance(data['name'], str)
  660. self.assertNotEqual(data['name'].strip(), "")
  661. self.assertIsInstance(data['display_name'], str)
  662. self.assertIsInstance(data['username'], str)
  663. self.assertIn(data['status'], valid_statuses)
  664. if data['pid'] is not None:
  665. psutil.Process(data['pid'])
  666. self.assertIsInstance(data['binpath'], str)
  667. self.assertIsInstance(data['username'], str)
  668. self.assertIsInstance(data['start_type'], str)
  669. self.assertIn(data['start_type'], valid_start_types)
  670. self.assertIn(data['status'], valid_statuses)
  671. self.assertIsInstance(data['description'], str)
  672. pid = serv.pid()
  673. if pid is not None:
  674. p = psutil.Process(pid)
  675. self.assertTrue(p.is_running())
  676. # win_service_get
  677. s = psutil.win_service_get(serv.name())
  678. # test __eq__
  679. self.assertEqual(serv, s)
  680. def test_win_service_get(self):
  681. ERROR_SERVICE_DOES_NOT_EXIST = \
  682. psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST
  683. ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED
  684. name = next(psutil.win_service_iter()).name()
  685. with self.assertRaises(psutil.NoSuchProcess) as cm:
  686. psutil.win_service_get(name + '???')
  687. self.assertEqual(cm.exception.name, name + '???')
  688. # test NoSuchProcess
  689. service = psutil.win_service_get(name)
  690. if PY3:
  691. args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST)
  692. else:
  693. args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg")
  694. exc = WindowsError(*args)
  695. with mock.patch("psutil._psplatform.cext.winservice_query_status",
  696. side_effect=exc):
  697. self.assertRaises(psutil.NoSuchProcess, service.status)
  698. with mock.patch("psutil._psplatform.cext.winservice_query_config",
  699. side_effect=exc):
  700. self.assertRaises(psutil.NoSuchProcess, service.username)
  701. # test AccessDenied
  702. if PY3:
  703. args = (0, "msg", 0, ERROR_ACCESS_DENIED)
  704. else:
  705. args = (ERROR_ACCESS_DENIED, "msg")
  706. exc = WindowsError(*args)
  707. with mock.patch("psutil._psplatform.cext.winservice_query_status",
  708. side_effect=exc):
  709. self.assertRaises(psutil.AccessDenied, service.status)
  710. with mock.patch("psutil._psplatform.cext.winservice_query_config",
  711. side_effect=exc):
  712. self.assertRaises(psutil.AccessDenied, service.username)
  713. # test __str__ and __repr__
  714. self.assertIn(service.name(), str(service))
  715. self.assertIn(service.display_name(), str(service))
  716. self.assertIn(service.name(), repr(service))
  717. self.assertIn(service.display_name(), repr(service))
  718. if __name__ == '__main__':
  719. from psutil.tests.runner import run_from_name
  720. run_from_name(__file__)