123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """
- Miscellaneous tests.
- """
- import ast
- import collections
- import errno
- import json
- import os
- import pickle
- import socket
- import stat
- from psutil import LINUX
- from psutil import POSIX
- from psutil import WINDOWS
- from psutil._common import memoize
- from psutil._common import memoize_when_activated
- from psutil._common import supports_ipv6
- from psutil._common import wrap_numbers
- from psutil._compat import PY3
- from psutil.tests import APPVEYOR
- from psutil.tests import CI_TESTING
- from psutil.tests import DEVNULL
- from psutil.tests import HAS_BATTERY
- from psutil.tests import HAS_MEMORY_MAPS
- from psutil.tests import HAS_NET_IO_COUNTERS
- from psutil.tests import HAS_SENSORS_BATTERY
- from psutil.tests import HAS_SENSORS_FANS
- from psutil.tests import HAS_SENSORS_TEMPERATURES
- from psutil.tests import import_module_by_path
- from psutil.tests import mock
- from psutil.tests import PsutilTestCase
- from psutil.tests import PYTHON_EXE
- from psutil.tests import reload_module
- from psutil.tests import ROOT_DIR
- from psutil.tests import SCRIPTS_DIR
- from psutil.tests import sh
- from psutil.tests import TRAVIS
- from psutil.tests import unittest
- import psutil
- import psutil.tests
- # ===================================================================
- # --- Misc / generic tests.
- # ===================================================================
- class TestMisc(PsutilTestCase):
- def test_process__repr__(self, func=repr):
- p = psutil.Process(self.spawn_testproc().pid)
- r = func(p)
- self.assertIn("psutil.Process", r)
- self.assertIn("pid=%s" % p.pid, r)
- self.assertIn("name='%s'" % p.name(), r)
- self.assertIn("status=", r)
- self.assertNotIn("exitcode=", r)
- p.terminate()
- p.wait()
- r = func(p)
- self.assertIn("status='terminated'", r)
- self.assertIn("exitcode=", r)
- with mock.patch.object(psutil.Process, "name",
- side_effect=psutil.ZombieProcess(os.getpid())):
- p = psutil.Process()
- r = func(p)
- self.assertIn("pid=%s" % p.pid, r)
- self.assertIn("status='zombie'", r)
- self.assertNotIn("name=", r)
- with mock.patch.object(psutil.Process, "name",
- side_effect=psutil.NoSuchProcess(os.getpid())):
- p = psutil.Process()
- r = func(p)
- self.assertIn("pid=%s" % p.pid, r)
- self.assertIn("terminated", r)
- self.assertNotIn("name=", r)
- with mock.patch.object(psutil.Process, "name",
- side_effect=psutil.AccessDenied(os.getpid())):
- p = psutil.Process()
- r = func(p)
- self.assertIn("pid=%s" % p.pid, r)
- self.assertNotIn("name=", r)
- def test_process__str__(self):
- self.test_process__repr__(func=str)
- def test_no_such_process__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.NoSuchProcess(321)),
- "psutil.NoSuchProcess process no longer exists (pid=321)")
- self.assertEqual(
- repr(psutil.NoSuchProcess(321, name='foo')),
- "psutil.NoSuchProcess process no longer exists (pid=321, "
- "name='foo')")
- self.assertEqual(
- repr(psutil.NoSuchProcess(321, msg='foo')),
- "psutil.NoSuchProcess foo")
- def test_zombie_process__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.ZombieProcess(321)),
- "psutil.ZombieProcess process still exists but it's a zombie "
- "(pid=321)")
- self.assertEqual(
- repr(psutil.ZombieProcess(321, name='foo')),
- "psutil.ZombieProcess process still exists but it's a zombie "
- "(pid=321, name='foo')")
- self.assertEqual(
- repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
- "psutil.ZombieProcess process still exists but it's a zombie "
- "(pid=321, name='foo', ppid=1)")
- self.assertEqual(
- repr(psutil.ZombieProcess(321, msg='foo')),
- "psutil.ZombieProcess foo")
- def test_access_denied__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.AccessDenied(321)),
- "psutil.AccessDenied (pid=321)")
- self.assertEqual(
- repr(psutil.AccessDenied(321, name='foo')),
- "psutil.AccessDenied (pid=321, name='foo')")
- self.assertEqual(
- repr(psutil.AccessDenied(321, msg='foo')),
- "psutil.AccessDenied foo")
- def test_timeout_expired__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.TimeoutExpired(321)),
- "psutil.TimeoutExpired timeout after 321 seconds")
- self.assertEqual(
- repr(psutil.TimeoutExpired(321, pid=111)),
- "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
- self.assertEqual(
- repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
- "psutil.TimeoutExpired timeout after 321 seconds "
- "(pid=111, name='foo')")
- def test_process__eq__(self):
- p1 = psutil.Process()
- p2 = psutil.Process()
- self.assertEqual(p1, p2)
- p2._ident = (0, 0)
- self.assertNotEqual(p1, p2)
- self.assertNotEqual(p1, 'foo')
- def test_process__hash__(self):
- s = set([psutil.Process(), psutil.Process()])
- self.assertEqual(len(s), 1)
- def test__all__(self):
- dir_psutil = dir(psutil)
- for name in dir_psutil:
- if name in ('long', 'tests', 'test', 'PermissionError',
- 'ProcessLookupError'):
- continue
- if not name.startswith('_'):
- try:
- __import__(name)
- except ImportError:
- if name not in psutil.__all__:
- fun = getattr(psutil, name)
- if fun is None:
- continue
- if (fun.__doc__ is not None and
- 'deprecated' not in fun.__doc__.lower()):
- self.fail('%r not in psutil.__all__' % name)
- # Import 'star' will break if __all__ is inconsistent, see:
- # https://github.com/giampaolo/psutil/issues/656
- # Can't do `from psutil import *` as it won't work on python 3
- # so we simply iterate over __all__.
- for name in psutil.__all__:
- self.assertIn(name, dir_psutil)
- def test_version(self):
- self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
- psutil.__version__)
- def test_process_as_dict_no_new_names(self):
- # See https://github.com/giampaolo/psutil/issues/813
- p = psutil.Process()
- p.foo = '1'
- self.assertNotIn('foo', p.as_dict())
- def test_memoize(self):
- @memoize
- def foo(*args, **kwargs):
- "foo docstring"
- calls.append(None)
- return (args, kwargs)
- calls = []
- # no args
- for x in range(2):
- ret = foo()
- expected = ((), {})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 1)
- # with args
- for x in range(2):
- ret = foo(1)
- expected = ((1, ), {})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 2)
- # with args + kwargs
- for x in range(2):
- ret = foo(1, bar=2)
- expected = ((1, ), {'bar': 2})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 3)
- # clear cache
- foo.cache_clear()
- ret = foo()
- expected = ((), {})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 4)
- # docstring
- self.assertEqual(foo.__doc__, "foo docstring")
- def test_memoize_when_activated(self):
- class Foo:
- @memoize_when_activated
- def foo(self):
- calls.append(None)
- f = Foo()
- calls = []
- f.foo()
- f.foo()
- self.assertEqual(len(calls), 2)
- # activate
- calls = []
- f.foo.cache_activate(f)
- f.foo()
- f.foo()
- self.assertEqual(len(calls), 1)
- # deactivate
- calls = []
- f.foo.cache_deactivate(f)
- f.foo()
- f.foo()
- self.assertEqual(len(calls), 2)
- def test_parse_environ_block(self):
- from psutil._common import parse_environ_block
- def k(s):
- return s.upper() if WINDOWS else s
- self.assertEqual(parse_environ_block("a=1\0"),
- {k("a"): "1"})
- self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
- {k("a"): "1", k("b"): "2"})
- self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
- {k("a"): "1", k("b"): ""})
- # ignore everything after \0\0
- self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
- {k("a"): "1", k("b"): "2"})
- # ignore everything that is not an assignment
- self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
- self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
- # do not fail if the block is incomplete
- self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
- def test_supports_ipv6(self):
- self.addCleanup(supports_ipv6.cache_clear)
- if supports_ipv6():
- with mock.patch('psutil._common.socket') as s:
- s.has_ipv6 = False
- supports_ipv6.cache_clear()
- assert not supports_ipv6()
- supports_ipv6.cache_clear()
- with mock.patch('psutil._common.socket.socket',
- side_effect=socket.error) as s:
- assert not supports_ipv6()
- assert s.called
- supports_ipv6.cache_clear()
- with mock.patch('psutil._common.socket.socket',
- side_effect=socket.gaierror) as s:
- assert not supports_ipv6()
- supports_ipv6.cache_clear()
- assert s.called
- supports_ipv6.cache_clear()
- with mock.patch('psutil._common.socket.socket.bind',
- side_effect=socket.gaierror) as s:
- assert not supports_ipv6()
- supports_ipv6.cache_clear()
- assert s.called
- else:
- with self.assertRaises(Exception):
- sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- try:
- sock.bind(("::1", 0))
- finally:
- sock.close()
- def test_isfile_strict(self):
- from psutil._common import isfile_strict
- this_file = os.path.abspath(__file__)
- assert isfile_strict(this_file)
- assert not isfile_strict(os.path.dirname(this_file))
- with mock.patch('psutil._common.os.stat',
- side_effect=OSError(errno.EPERM, "foo")):
- self.assertRaises(OSError, isfile_strict, this_file)
- with mock.patch('psutil._common.os.stat',
- side_effect=OSError(errno.EACCES, "foo")):
- self.assertRaises(OSError, isfile_strict, this_file)
- with mock.patch('psutil._common.os.stat',
- side_effect=OSError(errno.ENOENT, "foo")):
- assert not isfile_strict(this_file)
- with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
- assert not isfile_strict(this_file)
- def test_serialization(self):
- def check(ret):
- if json is not None:
- json.loads(json.dumps(ret))
- a = pickle.dumps(ret)
- b = pickle.loads(a)
- self.assertEqual(ret, b)
- check(psutil.Process().as_dict())
- check(psutil.virtual_memory())
- check(psutil.swap_memory())
- check(psutil.cpu_times())
- check(psutil.cpu_times_percent(interval=0))
- check(psutil.net_io_counters())
- if LINUX and not os.path.exists('/proc/diskstats'):
- pass
- else:
- if not APPVEYOR:
- check(psutil.disk_io_counters())
- check(psutil.disk_partitions())
- check(psutil.disk_usage(os.getcwd()))
- check(psutil.users())
- def test_setup_script(self):
- setup_py = os.path.join(ROOT_DIR, 'setup.py')
- if CI_TESTING and not os.path.exists(setup_py):
- return self.skipTest("can't find setup.py")
- module = import_module_by_path(setup_py)
- self.assertRaises(SystemExit, module.setup)
- self.assertEqual(module.get_version(), psutil.__version__)
- def test_ad_on_process_creation(self):
- # We are supposed to be able to instantiate Process also in case
- # of zombie processes or access denied.
- with mock.patch.object(psutil.Process, 'create_time',
- side_effect=psutil.AccessDenied) as meth:
- psutil.Process()
- assert meth.called
- with mock.patch.object(psutil.Process, 'create_time',
- side_effect=psutil.ZombieProcess(1)) as meth:
- psutil.Process()
- assert meth.called
- with mock.patch.object(psutil.Process, 'create_time',
- side_effect=ValueError) as meth:
- with self.assertRaises(ValueError):
- psutil.Process()
- assert meth.called
- def test_sanity_version_check(self):
- # see: https://github.com/giampaolo/psutil/issues/564
- with mock.patch(
- "psutil._psplatform.cext.version", return_value="0.0.0"):
- with self.assertRaises(ImportError) as cm:
- reload_module(psutil)
- self.assertIn("version conflict", str(cm.exception).lower())
- # ===================================================================
- # --- Tests for wrap_numbers() function.
- # ===================================================================
- nt = collections.namedtuple('foo', 'a b c')
- class TestWrapNumbers(PsutilTestCase):
- def setUp(self):
- wrap_numbers.cache_clear()
- tearDown = setUp
- def test_first_call(self):
- input = {'disk1': nt(5, 5, 5)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- def test_input_hasnt_changed(self):
- input = {'disk1': nt(5, 5, 5)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- def test_increase_but_no_wrap(self):
- input = {'disk1': nt(5, 5, 5)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- input = {'disk1': nt(10, 15, 20)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- input = {'disk1': nt(20, 25, 30)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- input = {'disk1': nt(20, 25, 30)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- def test_wrap(self):
- # let's say 100 is the threshold
- input = {'disk1': nt(100, 100, 100)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- # first wrap restarts from 10
- input = {'disk1': nt(100, 100, 10)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(100, 100, 110)})
- # then it remains the same
- input = {'disk1': nt(100, 100, 10)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(100, 100, 110)})
- # then it goes up
- input = {'disk1': nt(100, 100, 90)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(100, 100, 190)})
- # then it wraps again
- input = {'disk1': nt(100, 100, 20)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(100, 100, 210)})
- # and remains the same
- input = {'disk1': nt(100, 100, 20)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(100, 100, 210)})
- # now wrap another num
- input = {'disk1': nt(50, 100, 20)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(150, 100, 210)})
- # and again
- input = {'disk1': nt(40, 100, 20)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(190, 100, 210)})
- # keep it the same
- input = {'disk1': nt(40, 100, 20)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(190, 100, 210)})
- def test_changing_keys(self):
- # Emulate a case where the second call to disk_io()
- # (or whatever) provides a new disk, then the new disk
- # disappears on the third call.
- input = {'disk1': nt(5, 5, 5)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- input = {'disk1': nt(5, 5, 5),
- 'disk2': nt(7, 7, 7)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- input = {'disk1': nt(8, 8, 8)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- def test_changing_keys_w_wrap(self):
- input = {'disk1': nt(50, 50, 50),
- 'disk2': nt(100, 100, 100)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- # disk 2 wraps
- input = {'disk1': nt(50, 50, 50),
- 'disk2': nt(100, 100, 10)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(50, 50, 50),
- 'disk2': nt(100, 100, 110)})
- # disk 2 disappears
- input = {'disk1': nt(50, 50, 50)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- # then it appears again; the old wrap is supposed to be
- # gone.
- input = {'disk1': nt(50, 50, 50),
- 'disk2': nt(100, 100, 100)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- # remains the same
- input = {'disk1': nt(50, 50, 50),
- 'disk2': nt(100, 100, 100)}
- self.assertEqual(wrap_numbers(input, 'disk_io'), input)
- # and then wraps again
- input = {'disk1': nt(50, 50, 50),
- 'disk2': nt(100, 100, 10)}
- self.assertEqual(wrap_numbers(input, 'disk_io'),
- {'disk1': nt(50, 50, 50),
- 'disk2': nt(100, 100, 110)})
- def test_real_data(self):
- d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
- 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
- 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
- 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
- self.assertEqual(wrap_numbers(d, 'disk_io'), d)
- self.assertEqual(wrap_numbers(d, 'disk_io'), d)
- # decrease this ↓
- d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
- 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
- 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
- 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
- out = wrap_numbers(d, 'disk_io')
- self.assertEqual(out['nvme0n1'][0], 400)
- # --- cache tests
- def test_cache_first_call(self):
- input = {'disk1': nt(5, 5, 5)}
- wrap_numbers(input, 'disk_io')
- cache = wrap_numbers.cache_info()
- self.assertEqual(cache[0], {'disk_io': input})
- self.assertEqual(cache[1], {'disk_io': {}})
- self.assertEqual(cache[2], {'disk_io': {}})
- def test_cache_call_twice(self):
- input = {'disk1': nt(5, 5, 5)}
- wrap_numbers(input, 'disk_io')
- input = {'disk1': nt(10, 10, 10)}
- wrap_numbers(input, 'disk_io')
- cache = wrap_numbers.cache_info()
- self.assertEqual(cache[0], {'disk_io': input})
- self.assertEqual(
- cache[1],
- {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
- self.assertEqual(cache[2], {'disk_io': {}})
- def test_cache_wrap(self):
- # let's say 100 is the threshold
- input = {'disk1': nt(100, 100, 100)}
- wrap_numbers(input, 'disk_io')
- # first wrap restarts from 10
- input = {'disk1': nt(100, 100, 10)}
- wrap_numbers(input, 'disk_io')
- cache = wrap_numbers.cache_info()
- self.assertEqual(cache[0], {'disk_io': input})
- self.assertEqual(
- cache[1],
- {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
- self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
- def assert_():
- cache = wrap_numbers.cache_info()
- self.assertEqual(
- cache[1],
- {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
- ('disk1', 2): 100}})
- self.assertEqual(cache[2],
- {'disk_io': {'disk1': set([('disk1', 2)])}})
- # then it remains the same
- input = {'disk1': nt(100, 100, 10)}
- wrap_numbers(input, 'disk_io')
- cache = wrap_numbers.cache_info()
- self.assertEqual(cache[0], {'disk_io': input})
- assert_()
- # then it goes up
- input = {'disk1': nt(100, 100, 90)}
- wrap_numbers(input, 'disk_io')
- cache = wrap_numbers.cache_info()
- self.assertEqual(cache[0], {'disk_io': input})
- assert_()
- # then it wraps again
- input = {'disk1': nt(100, 100, 20)}
- wrap_numbers(input, 'disk_io')
- cache = wrap_numbers.cache_info()
- self.assertEqual(cache[0], {'disk_io': input})
- self.assertEqual(
- cache[1],
- {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
- self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
- def test_cache_changing_keys(self):
- input = {'disk1': nt(5, 5, 5)}
- wrap_numbers(input, 'disk_io')
- input = {'disk1': nt(5, 5, 5),
- 'disk2': nt(7, 7, 7)}
- wrap_numbers(input, 'disk_io')
- cache = wrap_numbers.cache_info()
- self.assertEqual(cache[0], {'disk_io': input})
- self.assertEqual(
- cache[1],
- {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
- self.assertEqual(cache[2], {'disk_io': {}})
- def test_cache_clear(self):
- input = {'disk1': nt(5, 5, 5)}
- wrap_numbers(input, 'disk_io')
- wrap_numbers(input, 'disk_io')
- wrap_numbers.cache_clear('disk_io')
- self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
- wrap_numbers.cache_clear('disk_io')
- wrap_numbers.cache_clear('?!?')
- @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
- def test_cache_clear_public_apis(self):
- if not psutil.disk_io_counters() or not psutil.net_io_counters():
- return self.skipTest("no disks or NICs available")
- psutil.disk_io_counters()
- psutil.net_io_counters()
- caches = wrap_numbers.cache_info()
- for cache in caches:
- self.assertIn('psutil.disk_io_counters', cache)
- self.assertIn('psutil.net_io_counters', cache)
- psutil.disk_io_counters.cache_clear()
- caches = wrap_numbers.cache_info()
- for cache in caches:
- self.assertIn('psutil.net_io_counters', cache)
- self.assertNotIn('psutil.disk_io_counters', cache)
- psutil.net_io_counters.cache_clear()
- caches = wrap_numbers.cache_info()
- self.assertEqual(caches, ({}, {}, {}))
- # ===================================================================
- # --- Example script tests
- # ===================================================================
- @unittest.skipIf(not os.path.exists(SCRIPTS_DIR),
- "can't locate scripts directory")
- class TestScripts(PsutilTestCase):
- """Tests for scripts in the "scripts" directory."""
- @staticmethod
- def assert_stdout(exe, *args, **kwargs):
- exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
- cmd = [PYTHON_EXE, exe]
- for arg in args:
- cmd.append(arg)
- try:
- out = sh(cmd, **kwargs).strip()
- except RuntimeError as err:
- if 'AccessDenied' in str(err):
- return str(err)
- else:
- raise
- assert out, out
- return out
- @staticmethod
- def assert_syntax(exe, args=None):
- exe = os.path.join(SCRIPTS_DIR, exe)
- if PY3:
- f = open(exe, 'rt', encoding='utf8')
- else:
- f = open(exe, 'rt')
- with f:
- src = f.read()
- ast.parse(src)
- def test_coverage(self):
- # make sure all example scripts have a test method defined
- meths = dir(self)
- for name in os.listdir(SCRIPTS_DIR):
- if name.endswith('.py'):
- if 'test_' + os.path.splitext(name)[0] not in meths:
- # self.assert_stdout(name)
- self.fail('no test defined for %r script'
- % os.path.join(SCRIPTS_DIR, name))
- @unittest.skipIf(not POSIX, "POSIX only")
- def test_executable(self):
- for name in os.listdir(SCRIPTS_DIR):
- if name.endswith('.py'):
- path = os.path.join(SCRIPTS_DIR, name)
- if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
- self.fail('%r is not executable' % path)
- def test_disk_usage(self):
- self.assert_stdout('disk_usage.py')
- def test_free(self):
- self.assert_stdout('free.py')
- def test_meminfo(self):
- self.assert_stdout('meminfo.py')
- def test_procinfo(self):
- self.assert_stdout('procinfo.py', str(os.getpid()))
- @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
- def test_who(self):
- self.assert_stdout('who.py')
- def test_ps(self):
- self.assert_stdout('ps.py')
- def test_pstree(self):
- self.assert_stdout('pstree.py')
- def test_netstat(self):
- self.assert_stdout('netstat.py')
- # permission denied on travis
- @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
- def test_ifconfig(self):
- self.assert_stdout('ifconfig.py')
- @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
- def test_pmap(self):
- self.assert_stdout('pmap.py', str(os.getpid()))
- def test_procsmem(self):
- if 'uss' not in psutil.Process().memory_full_info()._fields:
- raise self.skipTest("not supported")
- self.assert_stdout('procsmem.py', stderr=DEVNULL)
- def test_killall(self):
- self.assert_syntax('killall.py')
- def test_nettop(self):
- self.assert_syntax('nettop.py')
- def test_top(self):
- self.assert_syntax('top.py')
- def test_iotop(self):
- self.assert_syntax('iotop.py')
- def test_pidof(self):
- output = self.assert_stdout('pidof.py', psutil.Process().name())
- self.assertIn(str(os.getpid()), output)
- @unittest.skipIf(not WINDOWS, "WINDOWS only")
- def test_winservices(self):
- self.assert_stdout('winservices.py')
- def test_cpu_distribution(self):
- self.assert_syntax('cpu_distribution.py')
- @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
- @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
- def test_temperatures(self):
- if not psutil.sensors_temperatures():
- self.skipTest("no temperatures")
- self.assert_stdout('temperatures.py')
- @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
- @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
- def test_fans(self):
- if not psutil.sensors_fans():
- self.skipTest("no fans")
- self.assert_stdout('fans.py')
- @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
- @unittest.skipIf(not HAS_BATTERY, "no battery")
- def test_battery(self):
- self.assert_stdout('battery.py')
- @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
- @unittest.skipIf(not HAS_BATTERY, "no battery")
- def test_sensors(self):
- self.assert_stdout('sensors.py')
- if __name__ == '__main__':
- from psutil.tests.runner import run_from_name
- run_from_name(__file__)
|