123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474 |
- #!/usr/bin/env python
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Linux specific tests. These are implicitly run by test_psutil.py."""
- from __future__ import division
- import contextlib
- import errno
- import fcntl
- import io
- import os
- import pprint
- import re
- import socket
- import struct
- import sys
- import tempfile
- import time
- import warnings
- try:
- from unittest import mock # py3
- except ImportError:
- import mock # requires "pip install mock"
- from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX
- from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess,
- retry_before_failing, get_kernel_version, unittest,
- which, call_until)
- import psutil
- import psutil._pslinux
- from psutil._compat import PY3, u
- SIOCGIFADDR = 0x8915
- SIOCGIFCONF = 0x8912
- SIOCGIFHWADDR = 0x8927
- def get_ipv4_address(ifname):
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- return socket.inet_ntoa(
- fcntl.ioctl(s.fileno(),
- SIOCGIFADDR,
- struct.pack('256s', ifname))[20:24])
- def get_mac_address(ifname):
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- info = fcntl.ioctl(
- s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
- if PY3:
- def ord(x):
- return x
- else:
- import __builtin__
- ord = __builtin__.ord
- return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
- @unittest.skipUnless(LINUX, "not a Linux system")
- class LinuxSpecificTestCase(unittest.TestCase):
- @unittest.skipIf(
- POSIX and not hasattr(os, 'statvfs'),
- reason="os.statvfs() function not available on this platform")
- @skip_on_not_implemented()
- def test_disks(self):
- # test psutil.disk_usage() and psutil.disk_partitions()
- # against "df -a"
- def df(path):
- out = sh('df -P -B 1 "%s"' % path).strip()
- lines = out.split('\n')
- lines.pop(0)
- line = lines.pop(0)
- dev, total, used, free = line.split()[:4]
- if dev == 'none':
- dev = ''
- total, used, free = int(total), int(used), int(free)
- return dev, total, used, free
- for part in psutil.disk_partitions(all=False):
- usage = psutil.disk_usage(part.mountpoint)
- dev, total, used, free = df(part.mountpoint)
- self.assertEqual(part.device, dev)
- self.assertEqual(usage.total, total)
- # 10 MB tollerance
- if abs(usage.free - free) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % (usage.free, free))
- if abs(usage.used - used) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % (usage.used, used))
- def test_memory_maps(self):
- sproc = get_test_subprocess()
- time.sleep(1)
- p = psutil.Process(sproc.pid)
- maps = p.memory_maps(grouped=False)
- pmap = sh('pmap -x %s' % p.pid).split('\n')
- # get rid of header
- del pmap[0]
- del pmap[0]
- while maps and pmap:
- this = maps.pop(0)
- other = pmap.pop(0)
- addr, _, rss, dirty, mode, path = other.split(None, 5)
- if not path.startswith('[') and not path.endswith(']'):
- self.assertEqual(path, os.path.basename(this.path))
- self.assertEqual(int(rss) * 1024, this.rss)
- # test only rwx chars, ignore 's' and 'p'
- self.assertEqual(mode[:3], this.perms[:3])
- def test_vmem_total(self):
- lines = sh('free').split('\n')[1:]
- total = int(lines[0].split()[1]) * 1024
- self.assertEqual(total, psutil.virtual_memory().total)
- @retry_before_failing()
- def test_vmem_used(self):
- lines = sh('free').split('\n')[1:]
- used = int(lines[0].split()[2]) * 1024
- self.assertAlmostEqual(used, psutil.virtual_memory().used,
- delta=TOLERANCE)
- @retry_before_failing()
- def test_vmem_free(self):
- lines = sh('free').split('\n')[1:]
- free = int(lines[0].split()[3]) * 1024
- self.assertAlmostEqual(free, psutil.virtual_memory().free,
- delta=TOLERANCE)
- @retry_before_failing()
- def test_vmem_buffers(self):
- lines = sh('free').split('\n')[1:]
- buffers = int(lines[0].split()[5]) * 1024
- self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers,
- delta=TOLERANCE)
- @retry_before_failing()
- def test_vmem_cached(self):
- lines = sh('free').split('\n')[1:]
- cached = int(lines[0].split()[6]) * 1024
- self.assertAlmostEqual(cached, psutil.virtual_memory().cached,
- delta=TOLERANCE)
- def test_swapmem_total(self):
- lines = sh('free').split('\n')[1:]
- total = int(lines[2].split()[1]) * 1024
- self.assertEqual(total, psutil.swap_memory().total)
- @retry_before_failing()
- def test_swapmem_used(self):
- lines = sh('free').split('\n')[1:]
- used = int(lines[2].split()[2]) * 1024
- self.assertAlmostEqual(used, psutil.swap_memory().used,
- delta=TOLERANCE)
- @retry_before_failing()
- def test_swapmem_free(self):
- lines = sh('free').split('\n')[1:]
- free = int(lines[2].split()[3]) * 1024
- self.assertAlmostEqual(free, psutil.swap_memory().free,
- delta=TOLERANCE)
- @unittest.skipIf(TRAVIS, "unknown failure on travis")
- def test_cpu_times(self):
- fields = psutil.cpu_times()._fields
- kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
- kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
- if kernel_ver_info >= (2, 6, 11):
- self.assertIn('steal', fields)
- else:
- self.assertNotIn('steal', fields)
- if kernel_ver_info >= (2, 6, 24):
- self.assertIn('guest', fields)
- else:
- self.assertNotIn('guest', fields)
- if kernel_ver_info >= (3, 2, 0):
- self.assertIn('guest_nice', fields)
- else:
- self.assertNotIn('guest_nice', fields)
- def test_net_if_addrs_ips(self):
- for name, addrs in psutil.net_if_addrs().items():
- for addr in addrs:
- if addr.family == psutil.AF_LINK:
- self.assertEqual(addr.address, get_mac_address(name))
- elif addr.family == socket.AF_INET:
- self.assertEqual(addr.address, get_ipv4_address(name))
- # TODO: test for AF_INET6 family
- @unittest.skipUnless(which('ip'), "'ip' utility not available")
- @unittest.skipIf(TRAVIS, "skipped on Travis")
- def test_net_if_names(self):
- out = sh("ip addr").strip()
- nics = psutil.net_if_addrs()
- found = 0
- for line in out.split('\n'):
- line = line.strip()
- if re.search("^\d+:", line):
- found += 1
- name = line.split(':')[1].strip()
- self.assertIn(name, nics.keys())
- self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
- pprint.pformat(nics), out))
- @unittest.skipUnless(which("nproc"), "nproc utility not available")
- def test_cpu_count_logical_w_nproc(self):
- num = int(sh("nproc --all"))
- self.assertEqual(psutil.cpu_count(logical=True), num)
- @unittest.skipUnless(which("lscpu"), "lscpu utility not available")
- def test_cpu_count_logical_w_lscpu(self):
- out = sh("lscpu -p")
- num = len([x for x in out.split('\n') if not x.startswith('#')])
- self.assertEqual(psutil.cpu_count(logical=True), num)
- # --- mocked tests
- def test_virtual_memory_mocked_warnings(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- with warnings.catch_warnings(record=True) as ws:
- warnings.simplefilter("always")
- ret = psutil._pslinux.virtual_memory()
- assert m.called
- self.assertEqual(len(ws), 1)
- w = ws[0]
- self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
- self.assertIn(
- "'cached', 'active' and 'inactive' memory stats couldn't "
- "be determined", str(w.message))
- self.assertEqual(ret.cached, 0)
- self.assertEqual(ret.active, 0)
- self.assertEqual(ret.inactive, 0)
- def test_swap_memory_mocked_warnings(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- with warnings.catch_warnings(record=True) as ws:
- warnings.simplefilter("always")
- ret = psutil._pslinux.swap_memory()
- assert m.called
- self.assertEqual(len(ws), 1)
- w = ws[0]
- self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
- self.assertIn(
- "'sin' and 'sout' swap memory stats couldn't "
- "be determined", str(w.message))
- self.assertEqual(ret.sin, 0)
- self.assertEqual(ret.sout, 0)
- def test_cpu_count_logical_mocked(self):
- import psutil._pslinux
- original = psutil._pslinux.cpu_count_logical()
- # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
- # order to cause the parsing of /proc/cpuinfo and /proc/stat.
- with mock.patch(
- 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
- self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
- assert m.called
- # Let's have open() return emtpy data and make sure None is
- # returned ('cause we mimick os.cpu_count()).
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertIsNone(psutil._pslinux.cpu_count_logical())
- self.assertEqual(m.call_count, 2)
- # /proc/stat should be the last one
- self.assertEqual(m.call_args[0][0], '/proc/stat')
- # Let's push this a bit further and make sure /proc/cpuinfo
- # parsing works as expected.
- with open('/proc/cpuinfo', 'rb') as f:
- cpuinfo_data = f.read()
- fake_file = io.BytesIO(cpuinfo_data)
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
- def test_cpu_count_physical_mocked(self):
- # Have open() return emtpy data and make sure None is returned
- # ('cause we want to mimick os.cpu_count())
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertIsNone(psutil._pslinux.cpu_count_physical())
- assert m.called
- def test_proc_open_files_file_gone(self):
- # simulates a file which gets deleted during open_files()
- # execution
- p = psutil.Process()
- files = p.open_files()
- with tempfile.NamedTemporaryFile():
- # give the kernel some time to see the new file
- call_until(p.open_files, "len(ret) != %i" % len(files))
- with mock.patch('psutil._pslinux.os.readlink',
- side_effect=OSError(errno.ENOENT, "")) as m:
- files = p.open_files()
- assert not files
- assert m.called
- # also simulate the case where os.readlink() returns EINVAL
- # in which case psutil is supposed to 'continue'
- with mock.patch('psutil._pslinux.os.readlink',
- side_effect=OSError(errno.EINVAL, "")) as m:
- self.assertEqual(p.open_files(), [])
- assert m.called
- def test_proc_terminal_mocked(self):
- with mock.patch('psutil._pslinux._psposix._get_terminal_map',
- return_value={}) as m:
- self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
- assert m.called
- def test_proc_num_ctx_switches_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).num_ctx_switches)
- assert m.called
- def test_proc_num_threads_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).num_threads)
- assert m.called
- def test_proc_ppid_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).ppid)
- assert m.called
- def test_proc_uids_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).uids)
- assert m.called
- def test_proc_gids_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).gids)
- assert m.called
- def test_proc_cmdline_mocked(self):
- # see: https://github.com/giampaolo/psutil/issues/639
- p = psutil.Process()
- fake_file = io.StringIO(u('foo\x00bar\x00'))
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m:
- p.cmdline() == ['foo', 'bar']
- assert m.called
- fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m:
- p.cmdline() == ['foo', 'bar', '']
- assert m.called
- def test_proc_io_counters_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).io_counters)
- assert m.called
- def test_boot_time_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- RuntimeError,
- psutil._pslinux.boot_time)
- assert m.called
- def test_users_mocked(self):
- # Make sure ':0' and ':0.0' (returned by C ext) are converted
- # to 'localhost'.
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', ':0',
- 1436573184.0, True)]) as m:
- self.assertEqual(psutil.users()[0].host, 'localhost')
- assert m.called
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', ':0.0',
- 1436573184.0, True)]) as m:
- self.assertEqual(psutil.users()[0].host, 'localhost')
- assert m.called
- # ...otherwise it should be returned as-is
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', 'foo',
- 1436573184.0, True)]) as m:
- self.assertEqual(psutil.users()[0].host, 'foo')
- assert m.called
- def test_disk_partitions_mocked(self):
- # Test that ZFS partitions are returned.
- with open("/proc/filesystems", "r") as f:
- data = f.read()
- if 'zfs' in data:
- for part in psutil.disk_partitions():
- if part.fstype == 'zfs':
- break
- else:
- self.fail("couldn't find any ZFS partition")
- else:
- # No ZFS partitions on this system. Let's fake one.
- fake_file = io.StringIO(u("nodev\tzfs\n"))
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m1:
- with mock.patch(
- 'psutil._pslinux.cext.disk_partitions',
- return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
- ret = psutil.disk_partitions()
- assert m1.called
- assert m2.called
- assert ret
- self.assertEqual(ret[0].fstype, 'zfs')
- # --- tests for specific kernel versions
- @unittest.skipUnless(
- get_kernel_version() >= (2, 6, 36),
- "prlimit() not available on this Linux kernel version")
- def test_prlimit_availability(self):
- # prlimit() should be available starting from kernel 2.6.36
- p = psutil.Process(os.getpid())
- p.rlimit(psutil.RLIMIT_NOFILE)
- # if prlimit() is supported *at least* these constants should
- # be available
- self.assertTrue(hasattr(psutil, "RLIM_INFINITY"))
- self.assertTrue(hasattr(psutil, "RLIMIT_AS"))
- self.assertTrue(hasattr(psutil, "RLIMIT_CORE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_CPU"))
- self.assertTrue(hasattr(psutil, "RLIMIT_DATA"))
- self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS"))
- self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK"))
- self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_NPROC"))
- self.assertTrue(hasattr(psutil, "RLIMIT_RSS"))
- self.assertTrue(hasattr(psutil, "RLIMIT_STACK"))
- @unittest.skipUnless(
- get_kernel_version() >= (3, 0),
- "prlimit constants not available on this Linux kernel version")
- def test_resource_consts_kernel_v(self):
- # more recent constants
- self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_NICE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO"))
- self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME"))
- self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING"))
- def main():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
- if __name__ == '__main__':
- if not main():
- sys.exit(1)
|