| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191 |
- #!/usr/bin/env python3
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Linux specific tests."""
- from __future__ import division
- import collections
- import contextlib
- import errno
- import glob
- import io
- import os
- import re
- import shutil
- import socket
- import struct
- import textwrap
- import time
- import warnings
- import psutil
- from psutil import LINUX
- from psutil._compat import basestring
- from psutil._compat import FileNotFoundError
- from psutil._compat import PY3
- from psutil._compat import u
- from psutil.tests import call_until
- from psutil.tests import GLOBAL_TIMEOUT
- from psutil.tests import HAS_BATTERY
- from psutil.tests import HAS_CPU_FREQ
- from psutil.tests import HAS_GETLOADAVG
- from psutil.tests import HAS_RLIMIT
- from psutil.tests import mock
- from psutil.tests import PsutilTestCase
- from psutil.tests import PYPY
- from psutil.tests import reload_module
- from psutil.tests import retry_on_failure
- from psutil.tests import safe_rmpath
- from psutil.tests import sh
- from psutil.tests import skip_on_not_implemented
- from psutil.tests import ThreadTask
- from psutil.tests import TOLERANCE_DISK_USAGE
- from psutil.tests import TOLERANCE_SYS_MEM
- from psutil.tests import TRAVIS
- from psutil.tests import unittest
- from psutil.tests import which
- HERE = os.path.abspath(os.path.dirname(__file__))
- SIOCGIFADDR = 0x8915
- SIOCGIFCONF = 0x8912
- SIOCGIFHWADDR = 0x8927
- SIOCGIFNETMASK = 0x891b
- SIOCGIFBRDADDR = 0x8919
- if LINUX:
- SECTOR_SIZE = 512
- EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
- # =====================================================================
- # --- utils
- # =====================================================================
- def get_ipv4_address(ifname):
- import fcntl
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- return socket.inet_ntoa(
- fcntl.ioctl(s.fileno(),
- SIOCGIFADDR,
- struct.pack('256s', ifname))[20:24])
- def get_ipv4_netmask(ifname):
- import fcntl
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- return socket.inet_ntoa(
- fcntl.ioctl(s.fileno(),
- SIOCGIFNETMASK,
- struct.pack('256s', ifname))[20:24])
- def get_ipv4_broadcast(ifname):
- import fcntl
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- return socket.inet_ntoa(
- fcntl.ioctl(s.fileno(),
- SIOCGIFBRDADDR,
- struct.pack('256s', ifname))[20:24])
- def get_ipv6_address(ifname):
- with open("/proc/net/if_inet6", 'rt') as f:
- for line in f.readlines():
- fields = line.split()
- if fields[-1] == ifname:
- break
- else:
- raise ValueError("could not find interface %r" % ifname)
- unformatted = fields[0]
- groups = []
- for i in range(0, len(unformatted), 4):
- groups.append(unformatted[i:i + 4])
- formatted = ":".join(groups)
- packed = socket.inet_pton(socket.AF_INET6, formatted)
- return socket.inet_ntop(socket.AF_INET6, packed)
- def get_mac_address(ifname):
- import fcntl
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- info = fcntl.ioctl(
- s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
- if PY3:
- def ord(x):
- return x
- else:
- import __builtin__
- ord = __builtin__.ord
- return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
- def free_swap():
- """Parse 'free' cmd and return swap memory's s total, used and free
- values.
- """
- out = sh('free -b', env={"LANG": "C.UTF-8"})
- lines = out.split('\n')
- for line in lines:
- if line.startswith('Swap'):
- _, total, used, free = line.split()
- nt = collections.namedtuple('free', 'total used free')
- return nt(int(total), int(used), int(free))
- raise ValueError(
- "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
- def free_physmem():
- """Parse 'free' cmd and return physical memory's total, used
- and free values.
- """
- # Note: free can have 2 different formats, invalidating 'shared'
- # and 'cached' memory which may have different positions so we
- # do not return them.
- # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
- out = sh('free -b', env={"LANG": "C.UTF-8"})
- lines = out.split('\n')
- for line in lines:
- if line.startswith('Mem'):
- total, used, free, shared = \
- [int(x) for x in line.split()[1:5]]
- nt = collections.namedtuple(
- 'free', 'total used free shared output')
- return nt(total, used, free, shared, out)
- raise ValueError(
- "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
- def vmstat(stat):
- out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
- for line in out.split("\n"):
- line = line.strip()
- if stat in line:
- return int(line.split(' ')[0])
- raise ValueError("can't find %r in 'vmstat' output" % stat)
- def get_free_version_info():
- out = sh("free -V").strip()
- if 'UNKNOWN' in out:
- raise unittest.SkipTest("can't determine free version")
- return tuple(map(int, out.split()[-1].split('.')))
- @contextlib.contextmanager
- def mock_open_content(for_path, content):
- """Mock open() builtin and forces it to return a certain `content`
- on read() if the path being opened matches `for_path`.
- """
- def open_mock(name, *args, **kwargs):
- if name == for_path:
- if PY3:
- if isinstance(content, basestring):
- return io.StringIO(content)
- else:
- return io.BytesIO(content)
- else:
- return io.BytesIO(content)
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
- yield m
- @contextlib.contextmanager
- def mock_open_exception(for_path, exc):
- """Mock open() builtin and raises `exc` if the path being opened
- matches `for_path`.
- """
- def open_mock(name, *args, **kwargs):
- if name == for_path:
- raise exc
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
- yield m
- # =====================================================================
- # --- system virtual memory
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemVirtualMemory(PsutilTestCase):
- def test_total(self):
- # free_value = free_physmem().total
- # psutil_value = psutil.virtual_memory().total
- # self.assertEqual(free_value, psutil_value)
- vmstat_value = vmstat('total memory') * 1024
- psutil_value = psutil.virtual_memory().total
- self.assertAlmostEqual(vmstat_value, psutil_value)
- @retry_on_failure()
- def test_used(self):
- # Older versions of procps used slab memory to calculate used memory.
- # This got changed in:
- # https://gitlab.com/procps-ng/procps/commit/
- # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e
- if get_free_version_info() < (3, 3, 12):
- raise self.skipTest("old free version")
- free = free_physmem()
- free_value = free.used
- psutil_value = psutil.virtual_memory().used
- self.assertAlmostEqual(
- free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
- msg='%s %s \n%s' % (free_value, psutil_value, free.output))
- @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
- @retry_on_failure()
- def test_free(self):
- vmstat_value = vmstat('free memory') * 1024
- psutil_value = psutil.virtual_memory().free
- self.assertAlmostEqual(
- vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
- @retry_on_failure()
- def test_buffers(self):
- vmstat_value = vmstat('buffer memory') * 1024
- psutil_value = psutil.virtual_memory().buffers
- self.assertAlmostEqual(
- vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
- # https://travis-ci.org/giampaolo/psutil/jobs/226719664
- @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
- @retry_on_failure()
- def test_active(self):
- vmstat_value = vmstat('active memory') * 1024
- psutil_value = psutil.virtual_memory().active
- self.assertAlmostEqual(
- vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
- # https://travis-ci.org/giampaolo/psutil/jobs/227242952
- @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
- @retry_on_failure()
- def test_inactive(self):
- vmstat_value = vmstat('inactive memory') * 1024
- psutil_value = psutil.virtual_memory().inactive
- self.assertAlmostEqual(
- vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
- @retry_on_failure()
- def test_shared(self):
- free = free_physmem()
- free_value = free.shared
- if free_value == 0:
- raise unittest.SkipTest("free does not support 'shared' column")
- psutil_value = psutil.virtual_memory().shared
- self.assertAlmostEqual(
- free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
- msg='%s %s \n%s' % (free_value, psutil_value, free.output))
- @retry_on_failure()
- def test_available(self):
- # "free" output format has changed at some point:
- # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
- out = sh("free -b")
- lines = out.split('\n')
- if 'available' not in lines[0]:
- raise unittest.SkipTest("free does not support 'available' column")
- else:
- free_value = int(lines[1].split()[-1])
- psutil_value = psutil.virtual_memory().available
- self.assertAlmostEqual(
- free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
- msg='%s %s \n%s' % (free_value, psutil_value, out))
- def test_warnings_on_misses(self):
- # Emulate a case where /proc/meminfo provides few info.
- # psutil is supposed to set the missing fields to 0 and
- # raise a warning.
- with mock_open_content(
- '/proc/meminfo',
- textwrap.dedent("""\
- Active(anon): 6145416 kB
- Active(file): 2950064 kB
- Inactive(anon): 574764 kB
- Inactive(file): 1567648 kB
- MemAvailable: -1 kB
- MemFree: 2057400 kB
- MemTotal: 16325648 kB
- SReclaimable: 346648 kB
- """).encode()) as m:
- with warnings.catch_warnings(record=True) as ws:
- warnings.simplefilter("always")
- ret = psutil.virtual_memory()
- assert m.called
- self.assertEqual(len(ws), 1)
- w = ws[0]
- assert w.filename.endswith('psutil/_pslinux.py')
- self.assertIn(
- "memory stats couldn't be determined", str(w.message))
- self.assertIn("cached", str(w.message))
- self.assertIn("shared", str(w.message))
- self.assertIn("active", str(w.message))
- self.assertIn("inactive", str(w.message))
- self.assertIn("buffers", str(w.message))
- self.assertIn("available", str(w.message))
- self.assertEqual(ret.cached, 0)
- self.assertEqual(ret.active, 0)
- self.assertEqual(ret.inactive, 0)
- self.assertEqual(ret.shared, 0)
- self.assertEqual(ret.buffers, 0)
- self.assertEqual(ret.available, 0)
- self.assertEqual(ret.slab, 0)
- @retry_on_failure()
- def test_avail_old_percent(self):
- # Make sure that our calculation of avail mem for old kernels
- # is off by max 15%.
- from psutil._pslinux import calculate_avail_vmem
- from psutil._pslinux import open_binary
- mems = {}
- with open_binary('/proc/meminfo') as f:
- for line in f:
- fields = line.split()
- mems[fields[0]] = int(fields[1]) * 1024
- a = calculate_avail_vmem(mems)
- if b'MemAvailable:' in mems:
- b = mems[b'MemAvailable:']
- diff_percent = abs(a - b) / a * 100
- self.assertLess(diff_percent, 15)
- def test_avail_old_comes_from_kernel(self):
- # Make sure "MemAvailable:" coluimn is used instead of relying
- # on our internal algorithm to calculate avail mem.
- with mock_open_content(
- '/proc/meminfo',
- textwrap.dedent("""\
- Active: 9444728 kB
- Active(anon): 6145416 kB
- Active(file): 2950064 kB
- Buffers: 287952 kB
- Cached: 4818144 kB
- Inactive(file): 1578132 kB
- Inactive(anon): 574764 kB
- Inactive(file): 1567648 kB
- MemAvailable: 6574984 kB
- MemFree: 2057400 kB
- MemTotal: 16325648 kB
- Shmem: 577588 kB
- SReclaimable: 346648 kB
- """).encode()) as m:
- with warnings.catch_warnings(record=True) as ws:
- ret = psutil.virtual_memory()
- assert m.called
- self.assertEqual(ret.available, 6574984 * 1024)
- w = ws[0]
- self.assertIn(
- "inactive memory stats couldn't be determined", str(w.message))
- def test_avail_old_missing_fields(self):
- # Remove Active(file), Inactive(file) and SReclaimable
- # from /proc/meminfo and make sure the fallback is used
- # (free + cached),
- with mock_open_content(
- "/proc/meminfo",
- textwrap.dedent("""\
- Active: 9444728 kB
- Active(anon): 6145416 kB
- Buffers: 287952 kB
- Cached: 4818144 kB
- Inactive(file): 1578132 kB
- Inactive(anon): 574764 kB
- MemFree: 2057400 kB
- MemTotal: 16325648 kB
- Shmem: 577588 kB
- """).encode()) as m:
- with warnings.catch_warnings(record=True) as ws:
- ret = psutil.virtual_memory()
- assert m.called
- self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
- w = ws[0]
- self.assertIn(
- "inactive memory stats couldn't be determined", str(w.message))
- def test_avail_old_missing_zoneinfo(self):
- # Remove /proc/zoneinfo file. Make sure fallback is used
- # (free + cached).
- with mock_open_content(
- "/proc/meminfo",
- textwrap.dedent("""\
- Active: 9444728 kB
- Active(anon): 6145416 kB
- Active(file): 2950064 kB
- Buffers: 287952 kB
- Cached: 4818144 kB
- Inactive(file): 1578132 kB
- Inactive(anon): 574764 kB
- Inactive(file): 1567648 kB
- MemFree: 2057400 kB
- MemTotal: 16325648 kB
- Shmem: 577588 kB
- SReclaimable: 346648 kB
- """).encode()):
- with mock_open_exception(
- "/proc/zoneinfo",
- IOError(errno.ENOENT, 'no such file or directory')):
- with warnings.catch_warnings(record=True) as ws:
- ret = psutil.virtual_memory()
- self.assertEqual(
- ret.available, 2057400 * 1024 + 4818144 * 1024)
- w = ws[0]
- self.assertIn(
- "inactive memory stats couldn't be determined",
- str(w.message))
- def test_virtual_memory_mocked(self):
- # Emulate /proc/meminfo because neither vmstat nor free return slab.
- def open_mock(name, *args, **kwargs):
- if name == '/proc/meminfo':
- return io.BytesIO(textwrap.dedent("""\
- MemTotal: 100 kB
- MemFree: 2 kB
- MemAvailable: 3 kB
- Buffers: 4 kB
- Cached: 5 kB
- SwapCached: 6 kB
- Active: 7 kB
- Inactive: 8 kB
- Active(anon): 9 kB
- Inactive(anon): 10 kB
- Active(file): 11 kB
- Inactive(file): 12 kB
- Unevictable: 13 kB
- Mlocked: 14 kB
- SwapTotal: 15 kB
- SwapFree: 16 kB
- Dirty: 17 kB
- Writeback: 18 kB
- AnonPages: 19 kB
- Mapped: 20 kB
- Shmem: 21 kB
- Slab: 22 kB
- SReclaimable: 23 kB
- SUnreclaim: 24 kB
- KernelStack: 25 kB
- PageTables: 26 kB
- NFS_Unstable: 27 kB
- Bounce: 28 kB
- WritebackTmp: 29 kB
- CommitLimit: 30 kB
- Committed_AS: 31 kB
- VmallocTotal: 32 kB
- VmallocUsed: 33 kB
- VmallocChunk: 34 kB
- HardwareCorrupted: 35 kB
- AnonHugePages: 36 kB
- ShmemHugePages: 37 kB
- ShmemPmdMapped: 38 kB
- CmaTotal: 39 kB
- CmaFree: 40 kB
- HugePages_Total: 41 kB
- HugePages_Free: 42 kB
- HugePages_Rsvd: 43 kB
- HugePages_Surp: 44 kB
- Hugepagesize: 45 kB
- DirectMap46k: 46 kB
- DirectMap47M: 47 kB
- DirectMap48G: 48 kB
- """).encode())
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
- mem = psutil.virtual_memory()
- assert m.called
- self.assertEqual(mem.total, 100 * 1024)
- self.assertEqual(mem.free, 2 * 1024)
- self.assertEqual(mem.buffers, 4 * 1024)
- # cached mem also includes reclaimable memory
- self.assertEqual(mem.cached, (5 + 23) * 1024)
- self.assertEqual(mem.shared, 21 * 1024)
- self.assertEqual(mem.active, 7 * 1024)
- self.assertEqual(mem.inactive, 8 * 1024)
- self.assertEqual(mem.slab, 22 * 1024)
- self.assertEqual(mem.available, 3 * 1024)
- # =====================================================================
- # --- system swap memory
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemSwapMemory(PsutilTestCase):
- @staticmethod
- def meminfo_has_swap_info():
- """Return True if /proc/meminfo provides swap metrics."""
- with open("/proc/meminfo") as f:
- data = f.read()
- return 'SwapTotal:' in data and 'SwapFree:' in data
- def test_total(self):
- free_value = free_swap().total
- psutil_value = psutil.swap_memory().total
- return self.assertAlmostEqual(
- free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
- @retry_on_failure()
- def test_used(self):
- free_value = free_swap().used
- psutil_value = psutil.swap_memory().used
- return self.assertAlmostEqual(
- free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
- @retry_on_failure()
- def test_free(self):
- free_value = free_swap().free
- psutil_value = psutil.swap_memory().free
- return self.assertAlmostEqual(
- free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
- def test_missing_sin_sout(self):
- with mock.patch('psutil._common.open', create=True) as m:
- with warnings.catch_warnings(record=True) as ws:
- warnings.simplefilter("always")
- ret = psutil.swap_memory()
- assert m.called
- self.assertEqual(len(ws), 1)
- w = ws[0]
- assert w.filename.endswith('psutil/_pslinux.py')
- self.assertIn(
- "'sin' and 'sout' swap memory stats couldn't "
- "be determined", str(w.message))
- self.assertEqual(ret.sin, 0)
- self.assertEqual(ret.sout, 0)
- def test_no_vmstat_mocked(self):
- # see https://github.com/giampaolo/psutil/issues/722
- with mock_open_exception(
- "/proc/vmstat",
- IOError(errno.ENOENT, 'no such file or directory')) as m:
- with warnings.catch_warnings(record=True) as ws:
- warnings.simplefilter("always")
- ret = psutil.swap_memory()
- assert m.called
- self.assertEqual(len(ws), 1)
- w = ws[0]
- assert w.filename.endswith('psutil/_pslinux.py')
- self.assertIn(
- "'sin' and 'sout' swap memory stats couldn't "
- "be determined and were set to 0",
- str(w.message))
- self.assertEqual(ret.sin, 0)
- self.assertEqual(ret.sout, 0)
- def test_meminfo_against_sysinfo(self):
- # Make sure the content of /proc/meminfo about swap memory
- # matches sysinfo() syscall, see:
- # https://github.com/giampaolo/psutil/issues/1015
- if not self.meminfo_has_swap_info():
- return unittest.skip("/proc/meminfo has no swap metrics")
- with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
- swap = psutil.swap_memory()
- assert not m.called
- import psutil._psutil_linux as cext
- _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
- total *= unit_multiplier
- free *= unit_multiplier
- self.assertEqual(swap.total, total)
- self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM)
- def test_emulate_meminfo_has_no_metrics(self):
- # Emulate a case where /proc/meminfo provides no swap metrics
- # in which case sysinfo() syscall is supposed to be used
- # as a fallback.
- with mock_open_content("/proc/meminfo", b"") as m:
- psutil.swap_memory()
- assert m.called
- # =====================================================================
- # --- system CPU
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemCPUTimes(PsutilTestCase):
- @unittest.skipIf(TRAVIS, "unknown failure on travis")
- def test_fields(self):
- fields = psutil.cpu_times()._fields
- kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
- kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
- if kernel_ver_info >= (2, 6, 11):
- self.assertIn('steal', fields)
- else:
- self.assertNotIn('steal', fields)
- if kernel_ver_info >= (2, 6, 24):
- self.assertIn('guest', fields)
- else:
- self.assertNotIn('guest', fields)
- if kernel_ver_info >= (3, 2, 0):
- self.assertIn('guest_nice', fields)
- else:
- self.assertNotIn('guest_nice', fields)
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemCPUCountLogical(PsutilTestCase):
- @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
- "/sys/devices/system/cpu/online does not exist")
- def test_against_sysdev_cpu_online(self):
- with open("/sys/devices/system/cpu/online") as f:
- value = f.read().strip()
- if "-" in str(value):
- value = int(value.split('-')[1]) + 1
- self.assertEqual(psutil.cpu_count(), value)
- @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
- "/sys/devices/system/cpu does not exist")
- def test_against_sysdev_cpu_num(self):
- ls = os.listdir("/sys/devices/system/cpu")
- count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
- self.assertEqual(psutil.cpu_count(), count)
- @unittest.skipIf(not which("nproc"), "nproc utility not available")
- def test_against_nproc(self):
- num = int(sh("nproc --all"))
- self.assertEqual(psutil.cpu_count(logical=True), num)
- @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
- def test_against_lscpu(self):
- out = sh("lscpu -p")
- num = len([x for x in out.split('\n') if not x.startswith('#')])
- self.assertEqual(psutil.cpu_count(logical=True), num)
- def test_emulate_fallbacks(self):
- import psutil._pslinux
- original = psutil._pslinux.cpu_count_logical()
- # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
- # order to cause the parsing of /proc/cpuinfo and /proc/stat.
- with mock.patch(
- 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
- self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
- assert m.called
- # Let's have open() return emtpy data and make sure None is
- # returned ('cause we mimick os.cpu_count()).
- with mock.patch('psutil._common.open', create=True) as m:
- self.assertIsNone(psutil._pslinux.cpu_count_logical())
- self.assertEqual(m.call_count, 2)
- # /proc/stat should be the last one
- self.assertEqual(m.call_args[0][0], '/proc/stat')
- # Let's push this a bit further and make sure /proc/cpuinfo
- # parsing works as expected.
- with open('/proc/cpuinfo', 'rb') as f:
- cpuinfo_data = f.read()
- fake_file = io.BytesIO(cpuinfo_data)
- with mock.patch('psutil._common.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
- # Finally, let's make /proc/cpuinfo return meaningless data;
- # this way we'll fall back on relying on /proc/stat
- with mock_open_content('/proc/cpuinfo', b"") as m:
- self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
- m.called
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemCPUCountPhysical(PsutilTestCase):
- @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
- def test_against_lscpu(self):
- out = sh("lscpu -p")
- core_ids = set()
- for line in out.split('\n'):
- if not line.startswith('#'):
- fields = line.split(',')
- core_ids.add(fields[1])
- self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
- def test_method_2(self):
- meth_1 = psutil._pslinux.cpu_count_physical()
- with mock.patch('glob.glob', return_value=[]) as m:
- meth_2 = psutil._pslinux.cpu_count_physical()
- assert m.called
- if meth_1 is not None:
- self.assertEqual(meth_1, meth_2)
- def test_emulate_none(self):
- with mock.patch('glob.glob', return_value=[]) as m1:
- with mock.patch('psutil._common.open', create=True) as m2:
- self.assertIsNone(psutil._pslinux.cpu_count_physical())
- assert m1.called
- assert m2.called
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemCPUFrequency(PsutilTestCase):
- @unittest.skipIf(TRAVIS, "fails on Travis")
- @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
- def test_emulate_use_second_file(self):
- # https://github.com/giampaolo/psutil/issues/981
- def path_exists_mock(path):
- if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
- return False
- else:
- return orig_exists(path)
- orig_exists = os.path.exists
- with mock.patch("os.path.exists", side_effect=path_exists_mock,
- create=True):
- assert psutil.cpu_freq()
- @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
- def test_emulate_use_cpuinfo(self):
- # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
- # exist and /proc/cpuinfo is used instead.
- def path_exists_mock(path):
- if path.startswith('/sys/devices/system/cpu/'):
- return False
- else:
- if path == "/proc/cpuinfo":
- flags.append(None)
- return os_path_exists(path)
- flags = []
- os_path_exists = os.path.exists
- try:
- with mock.patch("os.path.exists", side_effect=path_exists_mock):
- reload_module(psutil._pslinux)
- ret = psutil.cpu_freq()
- assert ret
- assert flags
- self.assertEqual(ret.max, 0.0)
- self.assertEqual(ret.min, 0.0)
- for freq in psutil.cpu_freq(percpu=True):
- self.assertEqual(ret.max, 0.0)
- self.assertEqual(ret.min, 0.0)
- finally:
- reload_module(psutil._pslinux)
- reload_module(psutil)
- @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
- def test_emulate_data(self):
- def open_mock(name, *args, **kwargs):
- if (name.endswith('/scaling_cur_freq') and
- name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
- return io.BytesIO(b"500000")
- elif (name.endswith('/scaling_min_freq') and
- name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
- return io.BytesIO(b"600000")
- elif (name.endswith('/scaling_max_freq') and
- name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
- return io.BytesIO(b"700000")
- elif name == '/proc/cpuinfo':
- return io.BytesIO(b"cpu MHz : 500")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock):
- with mock.patch(
- 'os.path.exists', return_value=True):
- freq = psutil.cpu_freq()
- self.assertEqual(freq.current, 500.0)
- # when /proc/cpuinfo is used min and max frequencies are not
- # available and are set to 0.
- if freq.min != 0.0:
- self.assertEqual(freq.min, 600.0)
- if freq.max != 0.0:
- self.assertEqual(freq.max, 700.0)
- @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
- def test_emulate_multi_cpu(self):
- def open_mock(name, *args, **kwargs):
- n = name
- if (n.endswith('/scaling_cur_freq') and
- n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
- return io.BytesIO(b"100000")
- elif (n.endswith('/scaling_min_freq') and
- n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
- return io.BytesIO(b"200000")
- elif (n.endswith('/scaling_max_freq') and
- n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
- return io.BytesIO(b"300000")
- elif (n.endswith('/scaling_cur_freq') and
- n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
- return io.BytesIO(b"400000")
- elif (n.endswith('/scaling_min_freq') and
- n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
- return io.BytesIO(b"500000")
- elif (n.endswith('/scaling_max_freq') and
- n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
- return io.BytesIO(b"600000")
- elif name == '/proc/cpuinfo':
- return io.BytesIO(b"cpu MHz : 100\n"
- b"cpu MHz : 400")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock):
- with mock.patch('os.path.exists', return_value=True):
- with mock.patch('psutil._pslinux.cpu_count_logical',
- return_value=2):
- freq = psutil.cpu_freq(percpu=True)
- self.assertEqual(freq[0].current, 100.0)
- if freq[0].min != 0.0:
- self.assertEqual(freq[0].min, 200.0)
- if freq[0].max != 0.0:
- self.assertEqual(freq[0].max, 300.0)
- self.assertEqual(freq[1].current, 400.0)
- if freq[1].min != 0.0:
- self.assertEqual(freq[1].min, 500.0)
- if freq[1].max != 0.0:
- self.assertEqual(freq[1].max, 600.0)
- @unittest.skipIf(TRAVIS, "fails on Travis")
- @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
- def test_emulate_no_scaling_cur_freq_file(self):
- # See: https://github.com/giampaolo/psutil/issues/1071
- def open_mock(name, *args, **kwargs):
- if name.endswith('/scaling_cur_freq'):
- raise IOError(errno.ENOENT, "")
- elif name.endswith('/cpuinfo_cur_freq'):
- return io.BytesIO(b"200000")
- elif name == '/proc/cpuinfo':
- return io.BytesIO(b"cpu MHz : 200")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock):
- with mock.patch('os.path.exists', return_value=True):
- with mock.patch('psutil._pslinux.cpu_count_logical',
- return_value=1):
- freq = psutil.cpu_freq()
- self.assertEqual(freq.current, 200)
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemCPUStats(PsutilTestCase):
- @unittest.skipIf(TRAVIS, "fails on Travis")
- def test_ctx_switches(self):
- vmstat_value = vmstat("context switches")
- psutil_value = psutil.cpu_stats().ctx_switches
- self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
- @unittest.skipIf(TRAVIS, "fails on Travis")
- def test_interrupts(self):
- vmstat_value = vmstat("interrupts")
- psutil_value = psutil.cpu_stats().interrupts
- self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestLoadAvg(PsutilTestCase):
- @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
- def test_getloadavg(self):
- psutil_value = psutil.getloadavg()
- with open("/proc/loadavg", "r") as f:
- proc_value = f.read().split()
- self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
- self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
- self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
- # =====================================================================
- # --- system network
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemNetIfAddrs(PsutilTestCase):
- def test_ips(self):
- for name, addrs in psutil.net_if_addrs().items():
- for addr in addrs:
- if addr.family == psutil.AF_LINK:
- self.assertEqual(addr.address, get_mac_address(name))
- elif addr.family == socket.AF_INET:
- self.assertEqual(addr.address, get_ipv4_address(name))
- self.assertEqual(addr.netmask, get_ipv4_netmask(name))
- if addr.broadcast is not None:
- self.assertEqual(addr.broadcast,
- get_ipv4_broadcast(name))
- else:
- self.assertEqual(get_ipv4_broadcast(name), '0.0.0.0')
- elif addr.family == socket.AF_INET6:
- # IPv6 addresses can have a percent symbol at the end.
- # E.g. these 2 are equivalent:
- # "fe80::1ff:fe23:4567:890a"
- # "fe80::1ff:fe23:4567:890a%eth0"
- # That is the "zone id" portion, which usually is the name
- # of the network interface.
- address = addr.address.split('%')[0]
- self.assertEqual(address, get_ipv6_address(name))
- # XXX - not reliable when having virtual NICs installed by Docker.
- # @unittest.skipIf(not which('ip'), "'ip' utility not available")
- # @unittest.skipIf(TRAVIS, "skipped on Travis")
- # def test_net_if_names(self):
- # out = sh("ip addr").strip()
- # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
- # found = 0
- # for line in out.split('\n'):
- # line = line.strip()
- # if re.search(r"^\d+:", line):
- # found += 1
- # name = line.split(':')[1].strip()
- # self.assertIn(name, nics)
- # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
- # pprint.pformat(nics), out))
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemNetIfStats(PsutilTestCase):
- def test_against_ifconfig(self):
- for name, stats in psutil.net_if_stats().items():
- try:
- out = sh("ifconfig %s" % name)
- except RuntimeError:
- pass
- else:
- self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
- self.assertEqual(stats.mtu,
- int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
- def test_mtu(self):
- for name, stats in psutil.net_if_stats().items():
- with open("/sys/class/net/%s/mtu" % name, "rt") as f:
- self.assertEqual(stats.mtu, int(f.read().strip()))
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemNetIOCounters(PsutilTestCase):
- @retry_on_failure()
- def test_against_ifconfig(self):
- def ifconfig(nic):
- ret = {}
- out = sh("ifconfig %s" % name)
- ret['packets_recv'] = int(
- re.findall(r'RX packets[: ](\d+)', out)[0])
- ret['packets_sent'] = int(
- re.findall(r'TX packets[: ](\d+)', out)[0])
- ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
- ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
- ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
- ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
- ret['bytes_recv'] = int(
- re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
- ret['bytes_sent'] = int(
- re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
- return ret
- nio = psutil.net_io_counters(pernic=True, nowrap=False)
- for name, stats in nio.items():
- try:
- ifconfig_ret = ifconfig(name)
- except RuntimeError:
- continue
- self.assertAlmostEqual(
- stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
- self.assertAlmostEqual(
- stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
- self.assertAlmostEqual(
- stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
- self.assertAlmostEqual(
- stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
- self.assertAlmostEqual(
- stats.errin, ifconfig_ret['errin'], delta=10)
- self.assertAlmostEqual(
- stats.errout, ifconfig_ret['errout'], delta=10)
- self.assertAlmostEqual(
- stats.dropin, ifconfig_ret['dropin'], delta=10)
- self.assertAlmostEqual(
- stats.dropout, ifconfig_ret['dropout'], delta=10)
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemNetConnections(PsutilTestCase):
- @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
- @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
- def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
- # see: https://github.com/giampaolo/psutil/issues/623
- try:
- s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- self.addCleanup(s.close)
- s.bind(("::1", 0))
- except socket.error:
- pass
- psutil.net_connections(kind='inet6')
- def test_emulate_unix(self):
- with mock_open_content(
- '/proc/net/unix',
- textwrap.dedent("""\
- 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
- 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
- 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
- 000000000000000000000000000000000000000000000000000000
- """)) as m:
- psutil.net_connections(kind='unix')
- assert m.called
- # =====================================================================
- # --- system disks
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemDiskPartitions(PsutilTestCase):
- @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
- @skip_on_not_implemented()
- def test_against_df(self):
- # test psutil.disk_usage() and psutil.disk_partitions()
- # against "df -a"
- def df(path):
- out = sh('df -P -B 1 "%s"' % path).strip()
- lines = out.split('\n')
- lines.pop(0)
- line = lines.pop(0)
- dev, total, used, free = line.split()[:4]
- if dev == 'none':
- dev = ''
- total, used, free = int(total), int(used), int(free)
- return dev, total, used, free
- for part in psutil.disk_partitions(all=False):
- usage = psutil.disk_usage(part.mountpoint)
- dev, total, used, free = df(part.mountpoint)
- self.assertEqual(usage.total, total)
- self.assertAlmostEqual(usage.free, free,
- delta=TOLERANCE_DISK_USAGE)
- self.assertAlmostEqual(usage.used, used,
- delta=TOLERANCE_DISK_USAGE)
- def test_zfs_fs(self):
- # Test that ZFS partitions are returned.
- with open("/proc/filesystems", "r") as f:
- data = f.read()
- if 'zfs' in data:
- for part in psutil.disk_partitions():
- if part.fstype == 'zfs':
- break
- else:
- self.fail("couldn't find any ZFS partition")
- else:
- # No ZFS partitions on this system. Let's fake one.
- fake_file = io.StringIO(u("nodev\tzfs\n"))
- with mock.patch('psutil._common.open',
- return_value=fake_file, create=True) as m1:
- with mock.patch(
- 'psutil._pslinux.cext.disk_partitions',
- return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
- ret = psutil.disk_partitions()
- assert m1.called
- assert m2.called
- assert ret
- self.assertEqual(ret[0].fstype, 'zfs')
- def test_emulate_realpath_fail(self):
- # See: https://github.com/giampaolo/psutil/issues/1307
- try:
- with mock.patch('os.path.realpath',
- return_value='/non/existent') as m:
- with self.assertRaises(FileNotFoundError):
- psutil.disk_partitions()
- assert m.called
- finally:
- psutil.PROCFS_PATH = "/proc"
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSystemDiskIoCounters(PsutilTestCase):
- def test_emulate_kernel_2_4(self):
- # Tests /proc/diskstats parsing format for 2.4 kernels, see:
- # https://github.com/giampaolo/psutil/issues/767
- with mock_open_content(
- '/proc/diskstats',
- " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"):
- with mock.patch('psutil._pslinux.is_storage_device',
- return_value=True):
- ret = psutil.disk_io_counters(nowrap=False)
- self.assertEqual(ret.read_count, 1)
- self.assertEqual(ret.read_merged_count, 2)
- self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
- self.assertEqual(ret.read_time, 4)
- self.assertEqual(ret.write_count, 5)
- self.assertEqual(ret.write_merged_count, 6)
- self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
- self.assertEqual(ret.write_time, 8)
- self.assertEqual(ret.busy_time, 10)
- def test_emulate_kernel_2_6_full(self):
- # Tests /proc/diskstats parsing format for 2.6 kernels,
- # lines reporting all metrics:
- # https://github.com/giampaolo/psutil/issues/767
- with mock_open_content(
- '/proc/diskstats',
- " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"):
- with mock.patch('psutil._pslinux.is_storage_device',
- return_value=True):
- ret = psutil.disk_io_counters(nowrap=False)
- self.assertEqual(ret.read_count, 1)
- self.assertEqual(ret.read_merged_count, 2)
- self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
- self.assertEqual(ret.read_time, 4)
- self.assertEqual(ret.write_count, 5)
- self.assertEqual(ret.write_merged_count, 6)
- self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
- self.assertEqual(ret.write_time, 8)
- self.assertEqual(ret.busy_time, 10)
- def test_emulate_kernel_2_6_limited(self):
- # Tests /proc/diskstats parsing format for 2.6 kernels,
- # where one line of /proc/partitions return a limited
- # amount of metrics when it bumps into a partition
- # (instead of a disk). See:
- # https://github.com/giampaolo/psutil/issues/767
- with mock_open_content(
- '/proc/diskstats',
- " 3 1 hda 1 2 3 4"):
- with mock.patch('psutil._pslinux.is_storage_device',
- return_value=True):
- ret = psutil.disk_io_counters(nowrap=False)
- self.assertEqual(ret.read_count, 1)
- self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
- self.assertEqual(ret.write_count, 3)
- self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
- self.assertEqual(ret.read_merged_count, 0)
- self.assertEqual(ret.read_time, 0)
- self.assertEqual(ret.write_merged_count, 0)
- self.assertEqual(ret.write_time, 0)
- self.assertEqual(ret.busy_time, 0)
- def test_emulate_include_partitions(self):
- # Make sure that when perdisk=True disk partitions are returned,
- # see:
- # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
- with mock_open_content(
- '/proc/diskstats',
- textwrap.dedent("""\
- 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
- 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
- """)):
- with mock.patch('psutil._pslinux.is_storage_device',
- return_value=False):
- ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
- self.assertEqual(len(ret), 2)
- self.assertEqual(ret['nvme0n1'].read_count, 1)
- self.assertEqual(ret['nvme0n1p1'].read_count, 1)
- self.assertEqual(ret['nvme0n1'].write_count, 5)
- self.assertEqual(ret['nvme0n1p1'].write_count, 5)
- def test_emulate_exclude_partitions(self):
- # Make sure that when perdisk=False partitions (e.g. 'sda1',
- # 'nvme0n1p1') are skipped and not included in the total count.
- # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
- with mock_open_content(
- '/proc/diskstats',
- textwrap.dedent("""\
- 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
- 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
- """)):
- with mock.patch('psutil._pslinux.is_storage_device',
- return_value=False):
- ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
- self.assertIsNone(ret)
- #
- def is_storage_device(name):
- return name == 'nvme0n1'
- with mock_open_content(
- '/proc/diskstats',
- textwrap.dedent("""\
- 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
- 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
- """)):
- with mock.patch('psutil._pslinux.is_storage_device',
- create=True, side_effect=is_storage_device):
- ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
- self.assertEqual(ret.read_count, 1)
- self.assertEqual(ret.write_count, 5)
- def test_emulate_use_sysfs(self):
- def exists(path):
- if path == '/proc/diskstats':
- return False
- return True
- wprocfs = psutil.disk_io_counters(perdisk=True)
- with mock.patch('psutil._pslinux.os.path.exists',
- create=True, side_effect=exists):
- wsysfs = psutil.disk_io_counters(perdisk=True)
- self.assertEqual(len(wprocfs), len(wsysfs))
- def test_emulate_not_impl(self):
- def exists(path):
- return False
- with mock.patch('psutil._pslinux.os.path.exists',
- create=True, side_effect=exists):
- self.assertRaises(NotImplementedError, psutil.disk_io_counters)
- # =====================================================================
- # --- misc
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestMisc(PsutilTestCase):
- def test_boot_time(self):
- vmstat_value = vmstat('boot time')
- psutil_value = psutil.boot_time()
- self.assertEqual(int(vmstat_value), int(psutil_value))
- def test_no_procfs_on_import(self):
- my_procfs = self.get_testfn()
- os.mkdir(my_procfs)
- with open(os.path.join(my_procfs, 'stat'), 'w') as f:
- f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
- f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
- f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
- try:
- orig_open = open
- def open_mock(name, *args, **kwargs):
- if name.startswith('/proc'):
- raise IOError(errno.ENOENT, 'rejecting access for test')
- return orig_open(name, *args, **kwargs)
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock):
- reload_module(psutil)
- self.assertRaises(IOError, psutil.cpu_times)
- self.assertRaises(IOError, psutil.cpu_times, percpu=True)
- self.assertRaises(IOError, psutil.cpu_percent)
- self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
- self.assertRaises(IOError, psutil.cpu_times_percent)
- self.assertRaises(
- IOError, psutil.cpu_times_percent, percpu=True)
- psutil.PROCFS_PATH = my_procfs
- self.assertEqual(psutil.cpu_percent(), 0)
- self.assertEqual(sum(psutil.cpu_times_percent()), 0)
- # since we don't know the number of CPUs at import time,
- # we awkwardly say there are none until the second call
- per_cpu_percent = psutil.cpu_percent(percpu=True)
- self.assertEqual(sum(per_cpu_percent), 0)
- # ditto awkward length
- per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
- self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
- # much user, very busy
- with open(os.path.join(my_procfs, 'stat'), 'w') as f:
- f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
- f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
- f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
- self.assertNotEqual(psutil.cpu_percent(), 0)
- self.assertNotEqual(
- sum(psutil.cpu_percent(percpu=True)), 0)
- self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
- self.assertNotEqual(
- sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
- finally:
- shutil.rmtree(my_procfs)
- reload_module(psutil)
- self.assertEqual(psutil.PROCFS_PATH, '/proc')
- def test_cpu_steal_decrease(self):
- # Test cumulative cpu stats decrease. We should ignore this.
- # See issue #1210.
- with mock_open_content(
- "/proc/stat",
- textwrap.dedent("""\
- cpu 0 0 0 0 0 0 0 1 0 0
- cpu0 0 0 0 0 0 0 0 1 0 0
- cpu1 0 0 0 0 0 0 0 1 0 0
- """).encode()) as m:
- # first call to "percent" functions should read the new stat file
- # and compare to the "real" file read at import time - so the
- # values are meaningless
- psutil.cpu_percent()
- assert m.called
- psutil.cpu_percent(percpu=True)
- psutil.cpu_times_percent()
- psutil.cpu_times_percent(percpu=True)
- with mock_open_content(
- "/proc/stat",
- textwrap.dedent("""\
- cpu 1 0 0 0 0 0 0 0 0 0
- cpu0 1 0 0 0 0 0 0 0 0 0
- cpu1 1 0 0 0 0 0 0 0 0 0
- """).encode()) as m:
- # Increase "user" while steal goes "backwards" to zero.
- cpu_percent = psutil.cpu_percent()
- assert m.called
- cpu_percent_percpu = psutil.cpu_percent(percpu=True)
- cpu_times_percent = psutil.cpu_times_percent()
- cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
- self.assertNotEqual(cpu_percent, 0)
- self.assertNotEqual(sum(cpu_percent_percpu), 0)
- self.assertNotEqual(sum(cpu_times_percent), 0)
- self.assertNotEqual(sum(cpu_times_percent), 100.0)
- self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
- self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
- self.assertEqual(cpu_times_percent.steal, 0)
- self.assertNotEqual(cpu_times_percent.user, 0)
- def test_boot_time_mocked(self):
- with mock.patch('psutil._common.open', create=True) as m:
- self.assertRaises(
- RuntimeError,
- psutil._pslinux.boot_time)
- assert m.called
- def test_users_mocked(self):
- # Make sure ':0' and ':0.0' (returned by C ext) are converted
- # to 'localhost'.
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', ':0',
- 1436573184.0, True, 2)]) as m:
- self.assertEqual(psutil.users()[0].host, 'localhost')
- assert m.called
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', ':0.0',
- 1436573184.0, True, 2)]) as m:
- self.assertEqual(psutil.users()[0].host, 'localhost')
- assert m.called
- # ...otherwise it should be returned as-is
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', 'foo',
- 1436573184.0, True, 2)]) as m:
- self.assertEqual(psutil.users()[0].host, 'foo')
- assert m.called
- def test_procfs_path(self):
- tdir = self.get_testfn()
- os.mkdir(tdir)
- try:
- psutil.PROCFS_PATH = tdir
- self.assertRaises(IOError, psutil.virtual_memory)
- self.assertRaises(IOError, psutil.cpu_times)
- self.assertRaises(IOError, psutil.cpu_times, percpu=True)
- self.assertRaises(IOError, psutil.boot_time)
- # self.assertRaises(IOError, psutil.pids)
- self.assertRaises(IOError, psutil.net_connections)
- self.assertRaises(IOError, psutil.net_io_counters)
- self.assertRaises(IOError, psutil.net_if_stats)
- # self.assertRaises(IOError, psutil.disk_io_counters)
- self.assertRaises(IOError, psutil.disk_partitions)
- self.assertRaises(psutil.NoSuchProcess, psutil.Process)
- finally:
- psutil.PROCFS_PATH = "/proc"
- @retry_on_failure()
- def test_issue_687(self):
- # In case of thread ID:
- # - pid_exists() is supposed to return False
- # - Process(tid) is supposed to work
- # - pids() should not return the TID
- # See: https://github.com/giampaolo/psutil/issues/687
- t = ThreadTask()
- t.start()
- try:
- p = psutil.Process()
- threads = p.threads()
- self.assertEqual(len(threads), 2)
- tid = sorted(threads, key=lambda x: x.id)[1].id
- self.assertNotEqual(p.pid, tid)
- pt = psutil.Process(tid)
- pt.as_dict()
- self.assertNotIn(tid, psutil.pids())
- finally:
- t.stop()
- def test_pid_exists_no_proc_status(self):
- # Internally pid_exists relies on /proc/{pid}/status.
- # Emulate a case where this file is empty in which case
- # psutil is supposed to fall back on using pids().
- with mock_open_content("/proc/%s/status", "") as m:
- assert psutil.pid_exists(os.getpid())
- assert m.called
- # =====================================================================
- # --- sensors
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- @unittest.skipIf(not HAS_BATTERY, "no battery")
- class TestSensorsBattery(PsutilTestCase):
- @unittest.skipIf(not which("acpi"), "acpi utility not available")
- def test_percent(self):
- out = sh("acpi -b")
- acpi_value = int(out.split(",")[1].strip().replace('%', ''))
- psutil_value = psutil.sensors_battery().percent
- self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
- def test_emulate_power_plugged(self):
- # Pretend the AC power cable is connected.
- def open_mock(name, *args, **kwargs):
- if name.endswith("AC0/online") or name.endswith("AC/online"):
- return io.BytesIO(b"1")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock) as m:
- self.assertEqual(psutil.sensors_battery().power_plugged, True)
- self.assertEqual(
- psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
- assert m.called
- def test_emulate_power_plugged_2(self):
- # Same as above but pretend /AC0/online does not exist in which
- # case code relies on /status file.
- def open_mock(name, *args, **kwargs):
- if name.endswith("AC0/online") or name.endswith("AC/online"):
- raise IOError(errno.ENOENT, "")
- elif name.endswith("/status"):
- return io.StringIO(u("charging"))
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock) as m:
- self.assertEqual(psutil.sensors_battery().power_plugged, True)
- assert m.called
- def test_emulate_power_not_plugged(self):
- # Pretend the AC power cable is not connected.
- def open_mock(name, *args, **kwargs):
- if name.endswith("AC0/online") or name.endswith("AC/online"):
- return io.BytesIO(b"0")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock) as m:
- self.assertEqual(psutil.sensors_battery().power_plugged, False)
- assert m.called
- def test_emulate_power_not_plugged_2(self):
- # Same as above but pretend /AC0/online does not exist in which
- # case code relies on /status file.
- def open_mock(name, *args, **kwargs):
- if name.endswith("AC0/online") or name.endswith("AC/online"):
- raise IOError(errno.ENOENT, "")
- elif name.endswith("/status"):
- return io.StringIO(u("discharging"))
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock) as m:
- self.assertEqual(psutil.sensors_battery().power_plugged, False)
- assert m.called
- def test_emulate_power_undetermined(self):
- # Pretend we can't know whether the AC power cable not
- # connected (assert fallback to False).
- def open_mock(name, *args, **kwargs):
- if name.startswith("/sys/class/power_supply/AC0/online") or \
- name.startswith("/sys/class/power_supply/AC/online"):
- raise IOError(errno.ENOENT, "")
- elif name.startswith("/sys/class/power_supply/BAT0/status"):
- return io.BytesIO(b"???")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock) as m:
- self.assertIsNone(psutil.sensors_battery().power_plugged)
- assert m.called
- def test_emulate_energy_full_0(self):
- # Emulate a case where energy_full files returns 0.
- with mock_open_content(
- "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
- self.assertEqual(psutil.sensors_battery().percent, 0)
- assert m.called
- def test_emulate_energy_full_not_avail(self):
- # Emulate a case where energy_full file does not exist.
- # Expected fallback on /capacity.
- with mock_open_exception(
- "/sys/class/power_supply/BAT0/energy_full",
- IOError(errno.ENOENT, "")):
- with mock_open_exception(
- "/sys/class/power_supply/BAT0/charge_full",
- IOError(errno.ENOENT, "")):
- with mock_open_content(
- "/sys/class/power_supply/BAT0/capacity", b"88"):
- self.assertEqual(psutil.sensors_battery().percent, 88)
- def test_emulate_no_power(self):
- # Emulate a case where /AC0/online file nor /BAT0/status exist.
- with mock_open_exception(
- "/sys/class/power_supply/AC/online",
- IOError(errno.ENOENT, "")):
- with mock_open_exception(
- "/sys/class/power_supply/AC0/online",
- IOError(errno.ENOENT, "")):
- with mock_open_exception(
- "/sys/class/power_supply/BAT0/status",
- IOError(errno.ENOENT, "")):
- self.assertIsNone(psutil.sensors_battery().power_plugged)
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSensorsBatteryEmulated(PsutilTestCase):
- def test_it(self):
- def open_mock(name, *args, **kwargs):
- if name.endswith("/energy_now"):
- return io.StringIO(u("60000000"))
- elif name.endswith("/power_now"):
- return io.StringIO(u("0"))
- elif name.endswith("/energy_full"):
- return io.StringIO(u("60000001"))
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch('os.listdir', return_value=["BAT0"]) as mlistdir:
- with mock.patch(patch_point, side_effect=open_mock) as mopen:
- self.assertIsNotNone(psutil.sensors_battery())
- assert mlistdir.called
- assert mopen.called
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSensorsTemperatures(PsutilTestCase):
- def test_emulate_class_hwmon(self):
- def open_mock(name, *args, **kwargs):
- if name.endswith('/name'):
- return io.StringIO(u("name"))
- elif name.endswith('/temp1_label'):
- return io.StringIO(u("label"))
- elif name.endswith('/temp1_input'):
- return io.BytesIO(b"30000")
- elif name.endswith('/temp1_max'):
- return io.BytesIO(b"40000")
- elif name.endswith('/temp1_crit'):
- return io.BytesIO(b"50000")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock):
- # Test case with /sys/class/hwmon
- with mock.patch('glob.glob',
- return_value=['/sys/class/hwmon/hwmon0/temp1']):
- temp = psutil.sensors_temperatures()['name'][0]
- self.assertEqual(temp.label, 'label')
- self.assertEqual(temp.current, 30.0)
- self.assertEqual(temp.high, 40.0)
- self.assertEqual(temp.critical, 50.0)
- def test_emulate_class_thermal(self):
- def open_mock(name, *args, **kwargs):
- if name.endswith('0_temp'):
- return io.BytesIO(b"50000")
- elif name.endswith('temp'):
- return io.BytesIO(b"30000")
- elif name.endswith('0_type'):
- return io.StringIO(u("critical"))
- elif name.endswith('type'):
- return io.StringIO(u("name"))
- else:
- return orig_open(name, *args, **kwargs)
- def glob_mock(path):
- if path == '/sys/class/hwmon/hwmon*/temp*_*':
- return []
- elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
- return []
- elif path == '/sys/class/thermal/thermal_zone*':
- return ['/sys/class/thermal/thermal_zone0']
- elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
- return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
- '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
- return []
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock):
- with mock.patch('glob.glob', create=True, side_effect=glob_mock):
- temp = psutil.sensors_temperatures()['name'][0]
- self.assertEqual(temp.label, '')
- self.assertEqual(temp.current, 30.0)
- self.assertEqual(temp.high, 50.0)
- self.assertEqual(temp.critical, 50.0)
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestSensorsFans(PsutilTestCase):
- def test_emulate_data(self):
- def open_mock(name, *args, **kwargs):
- if name.endswith('/name'):
- return io.StringIO(u("name"))
- elif name.endswith('/fan1_label'):
- return io.StringIO(u("label"))
- elif name.endswith('/fan1_input'):
- return io.StringIO(u("2000"))
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock):
- with mock.patch('glob.glob',
- return_value=['/sys/class/hwmon/hwmon2/fan1']):
- fan = psutil.sensors_fans()['name'][0]
- self.assertEqual(fan.label, 'label')
- self.assertEqual(fan.current, 2000)
- # =====================================================================
- # --- test process
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestProcess(PsutilTestCase):
- @retry_on_failure()
- def test_memory_full_info(self):
- testfn = self.get_testfn()
- src = textwrap.dedent("""
- import time
- with open("%s", "w") as f:
- time.sleep(10)
- """ % testfn)
- sproc = self.pyrun(src)
- call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn)
- p = psutil.Process(sproc.pid)
- time.sleep(.1)
- mem = p.memory_full_info()
- maps = p.memory_maps(grouped=False)
- self.assertAlmostEqual(
- mem.uss, sum([x.private_dirty + x.private_clean for x in maps]),
- delta=4096)
- self.assertAlmostEqual(
- mem.pss, sum([x.pss for x in maps]), delta=4096)
- self.assertAlmostEqual(
- mem.swap, sum([x.swap for x in maps]), delta=4096)
- def test_memory_full_info_mocked(self):
- # See: https://github.com/giampaolo/psutil/issues/1222
- with mock_open_content(
- "/proc/%s/smaps" % os.getpid(),
- textwrap.dedent("""\
- fffff0 r-xp 00000000 00:00 0 [vsyscall]
- Size: 1 kB
- Rss: 2 kB
- Pss: 3 kB
- Shared_Clean: 4 kB
- Shared_Dirty: 5 kB
- Private_Clean: 6 kB
- Private_Dirty: 7 kB
- Referenced: 8 kB
- Anonymous: 9 kB
- LazyFree: 10 kB
- AnonHugePages: 11 kB
- ShmemPmdMapped: 12 kB
- Shared_Hugetlb: 13 kB
- Private_Hugetlb: 14 kB
- Swap: 15 kB
- SwapPss: 16 kB
- KernelPageSize: 17 kB
- MMUPageSize: 18 kB
- Locked: 19 kB
- VmFlags: rd ex
- """).encode()) as m:
- p = psutil.Process()
- mem = p.memory_full_info()
- assert m.called
- self.assertEqual(mem.uss, (6 + 7 + 14) * 1024)
- self.assertEqual(mem.pss, 3 * 1024)
- self.assertEqual(mem.swap, 15 * 1024)
- # On PYPY file descriptors are not closed fast enough.
- @unittest.skipIf(PYPY, "unreliable on PYPY")
- def test_open_files_mode(self):
- def get_test_file(fname):
- p = psutil.Process()
- giveup_at = time.time() + GLOBAL_TIMEOUT
- while True:
- for file in p.open_files():
- if file.path == os.path.abspath(fname):
- return file
- elif time.time() > giveup_at:
- break
- raise RuntimeError("timeout looking for test file")
- #
- testfn = self.get_testfn()
- with open(testfn, "w"):
- self.assertEqual(get_test_file(testfn).mode, "w")
- with open(testfn, "r"):
- self.assertEqual(get_test_file(testfn).mode, "r")
- with open(testfn, "a"):
- self.assertEqual(get_test_file(testfn).mode, "a")
- #
- with open(testfn, "r+"):
- self.assertEqual(get_test_file(testfn).mode, "r+")
- with open(testfn, "w+"):
- self.assertEqual(get_test_file(testfn).mode, "r+")
- with open(testfn, "a+"):
- self.assertEqual(get_test_file(testfn).mode, "a+")
- # note: "x" bit is not supported
- if PY3:
- safe_rmpath(testfn)
- with open(testfn, "x"):
- self.assertEqual(get_test_file(testfn).mode, "w")
- safe_rmpath(testfn)
- with open(testfn, "x+"):
- self.assertEqual(get_test_file(testfn).mode, "r+")
- def test_open_files_file_gone(self):
- # simulates a file which gets deleted during open_files()
- # execution
- p = psutil.Process()
- files = p.open_files()
- with open(self.get_testfn(), 'w'):
- # give the kernel some time to see the new file
- call_until(p.open_files, "len(ret) != %i" % len(files))
- with mock.patch('psutil._pslinux.os.readlink',
- side_effect=OSError(errno.ENOENT, "")) as m:
- files = p.open_files()
- assert not files
- assert m.called
- # also simulate the case where os.readlink() returns EINVAL
- # in which case psutil is supposed to 'continue'
- with mock.patch('psutil._pslinux.os.readlink',
- side_effect=OSError(errno.EINVAL, "")) as m:
- self.assertEqual(p.open_files(), [])
- assert m.called
- def test_open_files_fd_gone(self):
- # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
- # while iterating through fds.
- # https://travis-ci.org/giampaolo/psutil/jobs/225694530
- p = psutil.Process()
- files = p.open_files()
- with open(self.get_testfn(), 'w'):
- # give the kernel some time to see the new file
- call_until(p.open_files, "len(ret) != %i" % len(files))
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point,
- side_effect=IOError(errno.ENOENT, "")) as m:
- files = p.open_files()
- assert not files
- assert m.called
- # --- mocked tests
- def test_terminal_mocked(self):
- with mock.patch('psutil._pslinux._psposix.get_terminal_map',
- return_value={}) as m:
- self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
- assert m.called
- # TODO: re-enable this test.
- # def test_num_ctx_switches_mocked(self):
- # with mock.patch('psutil._common.open', create=True) as m:
- # self.assertRaises(
- # NotImplementedError,
- # psutil._pslinux.Process(os.getpid()).num_ctx_switches)
- # assert m.called
- def test_cmdline_mocked(self):
- # see: https://github.com/giampaolo/psutil/issues/639
- p = psutil.Process()
- fake_file = io.StringIO(u('foo\x00bar\x00'))
- with mock.patch('psutil._common.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(p.cmdline(), ['foo', 'bar'])
- assert m.called
- fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
- with mock.patch('psutil._common.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
- assert m.called
- def test_cmdline_spaces_mocked(self):
- # see: https://github.com/giampaolo/psutil/issues/1179
- p = psutil.Process()
- fake_file = io.StringIO(u('foo bar '))
- with mock.patch('psutil._common.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(p.cmdline(), ['foo', 'bar'])
- assert m.called
- fake_file = io.StringIO(u('foo bar '))
- with mock.patch('psutil._common.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
- assert m.called
- def test_cmdline_mixed_separators(self):
- # https://github.com/giampaolo/psutil/issues/
- # 1179#issuecomment-552984549
- p = psutil.Process()
- fake_file = io.StringIO(u('foo\x20bar\x00'))
- with mock.patch('psutil._common.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(p.cmdline(), ['foo', 'bar'])
- assert m.called
- def test_readlink_path_deleted_mocked(self):
- with mock.patch('psutil._pslinux.os.readlink',
- return_value='/home/foo (deleted)'):
- self.assertEqual(psutil.Process().exe(), "/home/foo")
- self.assertEqual(psutil.Process().cwd(), "/home/foo")
- def test_threads_mocked(self):
- # Test the case where os.listdir() returns a file (thread)
- # which no longer exists by the time we open() it (race
- # condition). threads() is supposed to ignore that instead
- # of raising NSP.
- def open_mock(name, *args, **kwargs):
- if name.startswith('/proc/%s/task' % os.getpid()):
- raise IOError(errno.ENOENT, "")
- else:
- return orig_open(name, *args, **kwargs)
- orig_open = open
- patch_point = 'builtins.open' if PY3 else '__builtin__.open'
- with mock.patch(patch_point, side_effect=open_mock) as m:
- ret = psutil.Process().threads()
- assert m.called
- self.assertEqual(ret, [])
- # ...but if it bumps into something != ENOENT we want an
- # exception.
- def open_mock(name, *args, **kwargs):
- if name.startswith('/proc/%s/task' % os.getpid()):
- raise IOError(errno.EPERM, "")
- else:
- return orig_open(name, *args, **kwargs)
- with mock.patch(patch_point, side_effect=open_mock):
- self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
- def test_exe_mocked(self):
- with mock.patch('psutil._pslinux.readlink',
- side_effect=OSError(errno.ENOENT, "")) as m1:
- with mock.patch('psutil.Process.cmdline',
- side_effect=psutil.AccessDenied(0, "")) as m2:
- # No such file error; might be raised also if /proc/pid/exe
- # path actually exists for system processes with low pids
- # (about 0-20). In this case psutil is supposed to return
- # an empty string.
- ret = psutil.Process().exe()
- assert m1.called
- assert m2.called
- self.assertEqual(ret, "")
- # ...but if /proc/pid no longer exist we're supposed to treat
- # it as an alias for zombie process
- with mock.patch('psutil._pslinux.os.path.lexists',
- return_value=False):
- self.assertRaises(
- psutil.ZombieProcess, psutil.Process().exe)
- def test_issue_1014(self):
- # Emulates a case where smaps file does not exist. In this case
- # wrap_exception decorator should not raise NoSuchProcess.
- with mock_open_exception(
- '/proc/%s/smaps' % os.getpid(),
- IOError(errno.ENOENT, "")) as m:
- p = psutil.Process()
- with self.assertRaises(FileNotFoundError):
- p.memory_maps()
- assert m.called
- @unittest.skipIf(not HAS_RLIMIT, "not supported")
- def test_rlimit_zombie(self):
- # Emulate a case where rlimit() raises ENOSYS, which may
- # happen in case of zombie process:
- # https://travis-ci.org/giampaolo/psutil/jobs/51368273
- with mock.patch("psutil._pslinux.cext.linux_prlimit",
- side_effect=OSError(errno.ENOSYS, "")) as m:
- p = psutil.Process()
- p.name()
- with self.assertRaises(psutil.ZombieProcess) as exc:
- p.rlimit(psutil.RLIMIT_NOFILE)
- assert m.called
- self.assertEqual(exc.exception.pid, p.pid)
- self.assertEqual(exc.exception.name, p.name())
- def test_cwd_zombie(self):
- with mock.patch("psutil._pslinux.os.readlink",
- side_effect=OSError(errno.ENOENT, "")) as m:
- p = psutil.Process()
- p.name()
- with self.assertRaises(psutil.ZombieProcess) as exc:
- p.cwd()
- assert m.called
- self.assertEqual(exc.exception.pid, p.pid)
- self.assertEqual(exc.exception.name, p.name())
- def test_stat_file_parsing(self):
- from psutil._pslinux import CLOCK_TICKS
- args = [
- "0", # pid
- "(cat)", # name
- "Z", # status
- "1", # ppid
- "0", # pgrp
- "0", # session
- "0", # tty
- "0", # tpgid
- "0", # flags
- "0", # minflt
- "0", # cminflt
- "0", # majflt
- "0", # cmajflt
- "2", # utime
- "3", # stime
- "4", # cutime
- "5", # cstime
- "0", # priority
- "0", # nice
- "0", # num_threads
- "0", # itrealvalue
- "6", # starttime
- "0", # vsize
- "0", # rss
- "0", # rsslim
- "0", # startcode
- "0", # endcode
- "0", # startstack
- "0", # kstkesp
- "0", # kstkeip
- "0", # signal
- "0", # blocked
- "0", # sigignore
- "0", # sigcatch
- "0", # wchan
- "0", # nswap
- "0", # cnswap
- "0", # exit_signal
- "6", # processor
- "0", # rt priority
- "0", # policy
- "7", # delayacct_blkio_ticks
- ]
- content = " ".join(args).encode()
- with mock_open_content('/proc/%s/stat' % os.getpid(), content):
- p = psutil.Process()
- self.assertEqual(p.name(), 'cat')
- self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
- self.assertEqual(p.ppid(), 1)
- self.assertEqual(
- p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
- cpu = p.cpu_times()
- self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
- self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
- self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
- self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
- self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
- self.assertEqual(p.cpu_num(), 6)
- def test_status_file_parsing(self):
- with mock_open_content(
- '/proc/%s/status' % os.getpid(),
- textwrap.dedent("""\
- Uid:\t1000\t1001\t1002\t1003
- Gid:\t1004\t1005\t1006\t1007
- Threads:\t66
- Cpus_allowed:\tf
- Cpus_allowed_list:\t0-7
- voluntary_ctxt_switches:\t12
- nonvoluntary_ctxt_switches:\t13""").encode()):
- p = psutil.Process()
- self.assertEqual(p.num_ctx_switches().voluntary, 12)
- self.assertEqual(p.num_ctx_switches().involuntary, 13)
- self.assertEqual(p.num_threads(), 66)
- uids = p.uids()
- self.assertEqual(uids.real, 1000)
- self.assertEqual(uids.effective, 1001)
- self.assertEqual(uids.saved, 1002)
- gids = p.gids()
- self.assertEqual(gids.real, 1004)
- self.assertEqual(gids.effective, 1005)
- self.assertEqual(gids.saved, 1006)
- self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestProcessAgainstStatus(PsutilTestCase):
- """/proc/pid/stat and /proc/pid/status have many values in common.
- Whenever possible, psutil uses /proc/pid/stat (it's faster).
- For all those cases we check that the value found in
- /proc/pid/stat (by psutil) matches the one found in
- /proc/pid/status.
- """
- @classmethod
- def setUpClass(cls):
- cls.proc = psutil.Process()
- def read_status_file(self, linestart):
- with psutil._psplatform.open_text(
- '/proc/%s/status' % self.proc.pid) as f:
- for line in f:
- line = line.strip()
- if line.startswith(linestart):
- value = line.partition('\t')[2]
- try:
- return int(value)
- except ValueError:
- return value
- raise ValueError("can't find %r" % linestart)
- def test_name(self):
- value = self.read_status_file("Name:")
- self.assertEqual(self.proc.name(), value)
- def test_status(self):
- value = self.read_status_file("State:")
- value = value[value.find('(') + 1:value.rfind(')')]
- value = value.replace(' ', '-')
- self.assertEqual(self.proc.status(), value)
- def test_ppid(self):
- value = self.read_status_file("PPid:")
- self.assertEqual(self.proc.ppid(), value)
- def test_num_threads(self):
- value = self.read_status_file("Threads:")
- self.assertEqual(self.proc.num_threads(), value)
- def test_uids(self):
- value = self.read_status_file("Uid:")
- value = tuple(map(int, value.split()[1:4]))
- self.assertEqual(self.proc.uids(), value)
- def test_gids(self):
- value = self.read_status_file("Gid:")
- value = tuple(map(int, value.split()[1:4]))
- self.assertEqual(self.proc.gids(), value)
- @retry_on_failure()
- def test_num_ctx_switches(self):
- value = self.read_status_file("voluntary_ctxt_switches:")
- self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
- value = self.read_status_file("nonvoluntary_ctxt_switches:")
- self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
- def test_cpu_affinity(self):
- value = self.read_status_file("Cpus_allowed_list:")
- if '-' in str(value):
- min_, max_ = map(int, value.split('-'))
- self.assertEqual(
- self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
- def test_cpu_affinity_eligible_cpus(self):
- value = self.read_status_file("Cpus_allowed_list:")
- with mock.patch("psutil._pslinux.per_cpu_times") as m:
- self.proc._proc._get_eligible_cpus()
- if '-' in str(value):
- assert not m.called
- else:
- assert m.called
- # =====================================================================
- # --- test utils
- # =====================================================================
- @unittest.skipIf(not LINUX, "LINUX only")
- class TestUtils(PsutilTestCase):
- def test_readlink(self):
- with mock.patch("os.readlink", return_value="foo (deleted)") as m:
- self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
- assert m.called
- def test_cat(self):
- testfn = self.get_testfn()
- with open(testfn, "wt") as f:
- f.write("foo ")
- self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo")
- self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo")
- self.assertEqual(
- psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar")
- if __name__ == '__main__':
- from psutil.tests.runner import run_from_name
- run_from_name(__file__)
|