_linux.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. #!/usr/bin/env python
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Linux specific tests. These are implicitly run by test_psutil.py."""
  6. from __future__ import division
  7. import contextlib
  8. import errno
  9. import fcntl
  10. import io
  11. import os
  12. import pprint
  13. import re
  14. import socket
  15. import struct
  16. import sys
  17. import tempfile
  18. import time
  19. import warnings
  20. try:
  21. from unittest import mock # py3
  22. except ImportError:
  23. import mock # requires "pip install mock"
  24. from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX
  25. from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess,
  26. retry_before_failing, get_kernel_version, unittest,
  27. which, call_until)
  28. import psutil
  29. import psutil._pslinux
  30. from psutil._compat import PY3, u
  31. SIOCGIFADDR = 0x8915
  32. SIOCGIFCONF = 0x8912
  33. SIOCGIFHWADDR = 0x8927
  34. def get_ipv4_address(ifname):
  35. ifname = ifname[:15]
  36. if PY3:
  37. ifname = bytes(ifname, 'ascii')
  38. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  39. with contextlib.closing(s):
  40. return socket.inet_ntoa(
  41. fcntl.ioctl(s.fileno(),
  42. SIOCGIFADDR,
  43. struct.pack('256s', ifname))[20:24])
  44. def get_mac_address(ifname):
  45. ifname = ifname[:15]
  46. if PY3:
  47. ifname = bytes(ifname, 'ascii')
  48. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  49. with contextlib.closing(s):
  50. info = fcntl.ioctl(
  51. s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
  52. if PY3:
  53. def ord(x):
  54. return x
  55. else:
  56. import __builtin__
  57. ord = __builtin__.ord
  58. return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
  59. @unittest.skipUnless(LINUX, "not a Linux system")
  60. class LinuxSpecificTestCase(unittest.TestCase):
  61. @unittest.skipIf(
  62. POSIX and not hasattr(os, 'statvfs'),
  63. reason="os.statvfs() function not available on this platform")
  64. @skip_on_not_implemented()
  65. def test_disks(self):
  66. # test psutil.disk_usage() and psutil.disk_partitions()
  67. # against "df -a"
  68. def df(path):
  69. out = sh('df -P -B 1 "%s"' % path).strip()
  70. lines = out.split('\n')
  71. lines.pop(0)
  72. line = lines.pop(0)
  73. dev, total, used, free = line.split()[:4]
  74. if dev == 'none':
  75. dev = ''
  76. total, used, free = int(total), int(used), int(free)
  77. return dev, total, used, free
  78. for part in psutil.disk_partitions(all=False):
  79. usage = psutil.disk_usage(part.mountpoint)
  80. dev, total, used, free = df(part.mountpoint)
  81. self.assertEqual(part.device, dev)
  82. self.assertEqual(usage.total, total)
  83. # 10 MB tollerance
  84. if abs(usage.free - free) > 10 * 1024 * 1024:
  85. self.fail("psutil=%s, df=%s" % (usage.free, free))
  86. if abs(usage.used - used) > 10 * 1024 * 1024:
  87. self.fail("psutil=%s, df=%s" % (usage.used, used))
  88. def test_memory_maps(self):
  89. sproc = get_test_subprocess()
  90. time.sleep(1)
  91. p = psutil.Process(sproc.pid)
  92. maps = p.memory_maps(grouped=False)
  93. pmap = sh('pmap -x %s' % p.pid).split('\n')
  94. # get rid of header
  95. del pmap[0]
  96. del pmap[0]
  97. while maps and pmap:
  98. this = maps.pop(0)
  99. other = pmap.pop(0)
  100. addr, _, rss, dirty, mode, path = other.split(None, 5)
  101. if not path.startswith('[') and not path.endswith(']'):
  102. self.assertEqual(path, os.path.basename(this.path))
  103. self.assertEqual(int(rss) * 1024, this.rss)
  104. # test only rwx chars, ignore 's' and 'p'
  105. self.assertEqual(mode[:3], this.perms[:3])
  106. def test_vmem_total(self):
  107. lines = sh('free').split('\n')[1:]
  108. total = int(lines[0].split()[1]) * 1024
  109. self.assertEqual(total, psutil.virtual_memory().total)
  110. @retry_before_failing()
  111. def test_vmem_used(self):
  112. lines = sh('free').split('\n')[1:]
  113. used = int(lines[0].split()[2]) * 1024
  114. self.assertAlmostEqual(used, psutil.virtual_memory().used,
  115. delta=TOLERANCE)
  116. @retry_before_failing()
  117. def test_vmem_free(self):
  118. lines = sh('free').split('\n')[1:]
  119. free = int(lines[0].split()[3]) * 1024
  120. self.assertAlmostEqual(free, psutil.virtual_memory().free,
  121. delta=TOLERANCE)
  122. @retry_before_failing()
  123. def test_vmem_buffers(self):
  124. lines = sh('free').split('\n')[1:]
  125. buffers = int(lines[0].split()[5]) * 1024
  126. self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers,
  127. delta=TOLERANCE)
  128. @retry_before_failing()
  129. def test_vmem_cached(self):
  130. lines = sh('free').split('\n')[1:]
  131. cached = int(lines[0].split()[6]) * 1024
  132. self.assertAlmostEqual(cached, psutil.virtual_memory().cached,
  133. delta=TOLERANCE)
  134. def test_swapmem_total(self):
  135. lines = sh('free').split('\n')[1:]
  136. total = int(lines[2].split()[1]) * 1024
  137. self.assertEqual(total, psutil.swap_memory().total)
  138. @retry_before_failing()
  139. def test_swapmem_used(self):
  140. lines = sh('free').split('\n')[1:]
  141. used = int(lines[2].split()[2]) * 1024
  142. self.assertAlmostEqual(used, psutil.swap_memory().used,
  143. delta=TOLERANCE)
  144. @retry_before_failing()
  145. def test_swapmem_free(self):
  146. lines = sh('free').split('\n')[1:]
  147. free = int(lines[2].split()[3]) * 1024
  148. self.assertAlmostEqual(free, psutil.swap_memory().free,
  149. delta=TOLERANCE)
  150. @unittest.skipIf(TRAVIS, "unknown failure on travis")
  151. def test_cpu_times(self):
  152. fields = psutil.cpu_times()._fields
  153. kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
  154. kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
  155. if kernel_ver_info >= (2, 6, 11):
  156. self.assertIn('steal', fields)
  157. else:
  158. self.assertNotIn('steal', fields)
  159. if kernel_ver_info >= (2, 6, 24):
  160. self.assertIn('guest', fields)
  161. else:
  162. self.assertNotIn('guest', fields)
  163. if kernel_ver_info >= (3, 2, 0):
  164. self.assertIn('guest_nice', fields)
  165. else:
  166. self.assertNotIn('guest_nice', fields)
  167. def test_net_if_addrs_ips(self):
  168. for name, addrs in psutil.net_if_addrs().items():
  169. for addr in addrs:
  170. if addr.family == psutil.AF_LINK:
  171. self.assertEqual(addr.address, get_mac_address(name))
  172. elif addr.family == socket.AF_INET:
  173. self.assertEqual(addr.address, get_ipv4_address(name))
  174. # TODO: test for AF_INET6 family
  175. @unittest.skipUnless(which('ip'), "'ip' utility not available")
  176. @unittest.skipIf(TRAVIS, "skipped on Travis")
  177. def test_net_if_names(self):
  178. out = sh("ip addr").strip()
  179. nics = psutil.net_if_addrs()
  180. found = 0
  181. for line in out.split('\n'):
  182. line = line.strip()
  183. if re.search("^\d+:", line):
  184. found += 1
  185. name = line.split(':')[1].strip()
  186. self.assertIn(name, nics.keys())
  187. self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
  188. pprint.pformat(nics), out))
  189. @unittest.skipUnless(which("nproc"), "nproc utility not available")
  190. def test_cpu_count_logical_w_nproc(self):
  191. num = int(sh("nproc --all"))
  192. self.assertEqual(psutil.cpu_count(logical=True), num)
  193. @unittest.skipUnless(which("lscpu"), "lscpu utility not available")
  194. def test_cpu_count_logical_w_lscpu(self):
  195. out = sh("lscpu -p")
  196. num = len([x for x in out.split('\n') if not x.startswith('#')])
  197. self.assertEqual(psutil.cpu_count(logical=True), num)
  198. # --- mocked tests
  199. def test_virtual_memory_mocked_warnings(self):
  200. with mock.patch('psutil._pslinux.open', create=True) as m:
  201. with warnings.catch_warnings(record=True) as ws:
  202. warnings.simplefilter("always")
  203. ret = psutil._pslinux.virtual_memory()
  204. assert m.called
  205. self.assertEqual(len(ws), 1)
  206. w = ws[0]
  207. self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
  208. self.assertIn(
  209. "'cached', 'active' and 'inactive' memory stats couldn't "
  210. "be determined", str(w.message))
  211. self.assertEqual(ret.cached, 0)
  212. self.assertEqual(ret.active, 0)
  213. self.assertEqual(ret.inactive, 0)
  214. def test_swap_memory_mocked_warnings(self):
  215. with mock.patch('psutil._pslinux.open', create=True) as m:
  216. with warnings.catch_warnings(record=True) as ws:
  217. warnings.simplefilter("always")
  218. ret = psutil._pslinux.swap_memory()
  219. assert m.called
  220. self.assertEqual(len(ws), 1)
  221. w = ws[0]
  222. self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
  223. self.assertIn(
  224. "'sin' and 'sout' swap memory stats couldn't "
  225. "be determined", str(w.message))
  226. self.assertEqual(ret.sin, 0)
  227. self.assertEqual(ret.sout, 0)
  228. def test_cpu_count_logical_mocked(self):
  229. import psutil._pslinux
  230. original = psutil._pslinux.cpu_count_logical()
  231. # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
  232. # order to cause the parsing of /proc/cpuinfo and /proc/stat.
  233. with mock.patch(
  234. 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
  235. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  236. assert m.called
  237. # Let's have open() return emtpy data and make sure None is
  238. # returned ('cause we mimick os.cpu_count()).
  239. with mock.patch('psutil._pslinux.open', create=True) as m:
  240. self.assertIsNone(psutil._pslinux.cpu_count_logical())
  241. self.assertEqual(m.call_count, 2)
  242. # /proc/stat should be the last one
  243. self.assertEqual(m.call_args[0][0], '/proc/stat')
  244. # Let's push this a bit further and make sure /proc/cpuinfo
  245. # parsing works as expected.
  246. with open('/proc/cpuinfo', 'rb') as f:
  247. cpuinfo_data = f.read()
  248. fake_file = io.BytesIO(cpuinfo_data)
  249. with mock.patch('psutil._pslinux.open',
  250. return_value=fake_file, create=True) as m:
  251. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  252. def test_cpu_count_physical_mocked(self):
  253. # Have open() return emtpy data and make sure None is returned
  254. # ('cause we want to mimick os.cpu_count())
  255. with mock.patch('psutil._pslinux.open', create=True) as m:
  256. self.assertIsNone(psutil._pslinux.cpu_count_physical())
  257. assert m.called
  258. def test_proc_open_files_file_gone(self):
  259. # simulates a file which gets deleted during open_files()
  260. # execution
  261. p = psutil.Process()
  262. files = p.open_files()
  263. with tempfile.NamedTemporaryFile():
  264. # give the kernel some time to see the new file
  265. call_until(p.open_files, "len(ret) != %i" % len(files))
  266. with mock.patch('psutil._pslinux.os.readlink',
  267. side_effect=OSError(errno.ENOENT, "")) as m:
  268. files = p.open_files()
  269. assert not files
  270. assert m.called
  271. # also simulate the case where os.readlink() returns EINVAL
  272. # in which case psutil is supposed to 'continue'
  273. with mock.patch('psutil._pslinux.os.readlink',
  274. side_effect=OSError(errno.EINVAL, "")) as m:
  275. self.assertEqual(p.open_files(), [])
  276. assert m.called
  277. def test_proc_terminal_mocked(self):
  278. with mock.patch('psutil._pslinux._psposix._get_terminal_map',
  279. return_value={}) as m:
  280. self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
  281. assert m.called
  282. def test_proc_num_ctx_switches_mocked(self):
  283. with mock.patch('psutil._pslinux.open', create=True) as m:
  284. self.assertRaises(
  285. NotImplementedError,
  286. psutil._pslinux.Process(os.getpid()).num_ctx_switches)
  287. assert m.called
  288. def test_proc_num_threads_mocked(self):
  289. with mock.patch('psutil._pslinux.open', create=True) as m:
  290. self.assertRaises(
  291. NotImplementedError,
  292. psutil._pslinux.Process(os.getpid()).num_threads)
  293. assert m.called
  294. def test_proc_ppid_mocked(self):
  295. with mock.patch('psutil._pslinux.open', create=True) as m:
  296. self.assertRaises(
  297. NotImplementedError,
  298. psutil._pslinux.Process(os.getpid()).ppid)
  299. assert m.called
  300. def test_proc_uids_mocked(self):
  301. with mock.patch('psutil._pslinux.open', create=True) as m:
  302. self.assertRaises(
  303. NotImplementedError,
  304. psutil._pslinux.Process(os.getpid()).uids)
  305. assert m.called
  306. def test_proc_gids_mocked(self):
  307. with mock.patch('psutil._pslinux.open', create=True) as m:
  308. self.assertRaises(
  309. NotImplementedError,
  310. psutil._pslinux.Process(os.getpid()).gids)
  311. assert m.called
  312. def test_proc_cmdline_mocked(self):
  313. # see: https://github.com/giampaolo/psutil/issues/639
  314. p = psutil.Process()
  315. fake_file = io.StringIO(u('foo\x00bar\x00'))
  316. with mock.patch('psutil._pslinux.open',
  317. return_value=fake_file, create=True) as m:
  318. p.cmdline() == ['foo', 'bar']
  319. assert m.called
  320. fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
  321. with mock.patch('psutil._pslinux.open',
  322. return_value=fake_file, create=True) as m:
  323. p.cmdline() == ['foo', 'bar', '']
  324. assert m.called
  325. def test_proc_io_counters_mocked(self):
  326. with mock.patch('psutil._pslinux.open', create=True) as m:
  327. self.assertRaises(
  328. NotImplementedError,
  329. psutil._pslinux.Process(os.getpid()).io_counters)
  330. assert m.called
  331. def test_boot_time_mocked(self):
  332. with mock.patch('psutil._pslinux.open', create=True) as m:
  333. self.assertRaises(
  334. RuntimeError,
  335. psutil._pslinux.boot_time)
  336. assert m.called
  337. def test_users_mocked(self):
  338. # Make sure ':0' and ':0.0' (returned by C ext) are converted
  339. # to 'localhost'.
  340. with mock.patch('psutil._pslinux.cext.users',
  341. return_value=[('giampaolo', 'pts/2', ':0',
  342. 1436573184.0, True)]) as m:
  343. self.assertEqual(psutil.users()[0].host, 'localhost')
  344. assert m.called
  345. with mock.patch('psutil._pslinux.cext.users',
  346. return_value=[('giampaolo', 'pts/2', ':0.0',
  347. 1436573184.0, True)]) as m:
  348. self.assertEqual(psutil.users()[0].host, 'localhost')
  349. assert m.called
  350. # ...otherwise it should be returned as-is
  351. with mock.patch('psutil._pslinux.cext.users',
  352. return_value=[('giampaolo', 'pts/2', 'foo',
  353. 1436573184.0, True)]) as m:
  354. self.assertEqual(psutil.users()[0].host, 'foo')
  355. assert m.called
  356. def test_disk_partitions_mocked(self):
  357. # Test that ZFS partitions are returned.
  358. with open("/proc/filesystems", "r") as f:
  359. data = f.read()
  360. if 'zfs' in data:
  361. for part in psutil.disk_partitions():
  362. if part.fstype == 'zfs':
  363. break
  364. else:
  365. self.fail("couldn't find any ZFS partition")
  366. else:
  367. # No ZFS partitions on this system. Let's fake one.
  368. fake_file = io.StringIO(u("nodev\tzfs\n"))
  369. with mock.patch('psutil._pslinux.open',
  370. return_value=fake_file, create=True) as m1:
  371. with mock.patch(
  372. 'psutil._pslinux.cext.disk_partitions',
  373. return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
  374. ret = psutil.disk_partitions()
  375. assert m1.called
  376. assert m2.called
  377. assert ret
  378. self.assertEqual(ret[0].fstype, 'zfs')
  379. # --- tests for specific kernel versions
  380. @unittest.skipUnless(
  381. get_kernel_version() >= (2, 6, 36),
  382. "prlimit() not available on this Linux kernel version")
  383. def test_prlimit_availability(self):
  384. # prlimit() should be available starting from kernel 2.6.36
  385. p = psutil.Process(os.getpid())
  386. p.rlimit(psutil.RLIMIT_NOFILE)
  387. # if prlimit() is supported *at least* these constants should
  388. # be available
  389. self.assertTrue(hasattr(psutil, "RLIM_INFINITY"))
  390. self.assertTrue(hasattr(psutil, "RLIMIT_AS"))
  391. self.assertTrue(hasattr(psutil, "RLIMIT_CORE"))
  392. self.assertTrue(hasattr(psutil, "RLIMIT_CPU"))
  393. self.assertTrue(hasattr(psutil, "RLIMIT_DATA"))
  394. self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE"))
  395. self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS"))
  396. self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK"))
  397. self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE"))
  398. self.assertTrue(hasattr(psutil, "RLIMIT_NPROC"))
  399. self.assertTrue(hasattr(psutil, "RLIMIT_RSS"))
  400. self.assertTrue(hasattr(psutil, "RLIMIT_STACK"))
  401. @unittest.skipUnless(
  402. get_kernel_version() >= (3, 0),
  403. "prlimit constants not available on this Linux kernel version")
  404. def test_resource_consts_kernel_v(self):
  405. # more recent constants
  406. self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE"))
  407. self.assertTrue(hasattr(psutil, "RLIMIT_NICE"))
  408. self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO"))
  409. self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME"))
  410. self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING"))
  411. def main():
  412. test_suite = unittest.TestSuite()
  413. test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase))
  414. result = unittest.TextTestRunner(verbosity=2).run(test_suite)
  415. return result.wasSuccessful()
  416. if __name__ == '__main__':
  417. if not main():
  418. sys.exit(1)