test_psutil.py 108 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  4. # Use of this source code is governed by a BSD-style license that can be
  5. # found in the LICENSE file.
  6. """
  7. psutil test suite. Run it with:
  8. $ make test
  9. If you're on Python < 2.7 unittest2 module must be installed first:
  10. https://pypi.python.org/pypi/unittest2
  11. """
  12. from __future__ import division
  13. import ast
  14. import atexit
  15. import collections
  16. import contextlib
  17. import datetime
  18. import errno
  19. import functools
  20. import imp
  21. import json
  22. import os
  23. import pickle
  24. import pprint
  25. import re
  26. import select
  27. import shutil
  28. import signal
  29. import socket
  30. import stat
  31. import subprocess
  32. import sys
  33. import tempfile
  34. import textwrap
  35. import threading
  36. import time
  37. import traceback
  38. import types
  39. import warnings
  40. from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM
  41. try:
  42. import ipaddress # python >= 3.3
  43. except ImportError:
  44. ipaddress = None
  45. try:
  46. from unittest import mock # py3
  47. except ImportError:
  48. import mock # requires "pip install mock"
  49. import psutil
  50. from psutil._compat import PY3, callable, long, unicode
  51. if sys.version_info < (2, 7):
  52. import unittest2 as unittest # https://pypi.python.org/pypi/unittest2
  53. else:
  54. import unittest
  55. if sys.version_info >= (3, 4):
  56. import enum
  57. else:
  58. enum = None
  59. # ===================================================================
  60. # --- Constants
  61. # ===================================================================
  62. # conf for retry_before_failing() decorator
  63. NO_RETRIES = 10
  64. # bytes tolerance for OS memory related tests
  65. TOLERANCE = 500 * 1024 # 500KB
  66. # the timeout used in functions which have to wait
  67. GLOBAL_TIMEOUT = 3
  68. AF_INET6 = getattr(socket, "AF_INET6")
  69. AF_UNIX = getattr(socket, "AF_UNIX", None)
  70. PYTHON = os.path.realpath(sys.executable)
  71. DEVNULL = open(os.devnull, 'r+')
  72. TESTFN = os.path.join(os.getcwd(), "$testfile")
  73. TESTFN_UNICODE = TESTFN + "ƒőő"
  74. TESTFILE_PREFIX = 'psutil-test-suite-'
  75. if not PY3:
  76. try:
  77. TESTFN_UNICODE = unicode(TESTFN_UNICODE, sys.getfilesystemencoding())
  78. except UnicodeDecodeError:
  79. TESTFN_UNICODE = TESTFN + "???"
  80. EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
  81. '..', 'examples'))
  82. POSIX = os.name == 'posix'
  83. WINDOWS = os.name == 'nt'
  84. if WINDOWS:
  85. WIN_VISTA = (6, 0, 0)
  86. LINUX = sys.platform.startswith("linux")
  87. OSX = sys.platform.startswith("darwin")
  88. BSD = sys.platform.startswith("freebsd")
  89. SUNOS = sys.platform.startswith("sunos")
  90. VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil)
  91. if x.startswith('STATUS_')]
  92. # whether we're running this test suite on Travis (https://travis-ci.org/)
  93. TRAVIS = bool(os.environ.get('TRAVIS'))
  94. # whether we're running this test suite on Appveyor for Windows
  95. # (http://www.appveyor.com/)
  96. APPVEYOR = bool(os.environ.get('APPVEYOR'))
  97. if TRAVIS or 'tox' in sys.argv[0]:
  98. import ipaddress
  99. if TRAVIS or APPVEYOR:
  100. GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4
  101. # ===================================================================
  102. # --- Utility functions
  103. # ===================================================================
  104. def cleanup():
  105. reap_children(search_all=True)
  106. safe_remove(TESTFN)
  107. try:
  108. safe_rmdir(TESTFN_UNICODE)
  109. except UnicodeEncodeError:
  110. pass
  111. for path in _testfiles:
  112. safe_remove(path)
  113. atexit.register(cleanup)
  114. atexit.register(lambda: DEVNULL.close())
  115. _subprocesses_started = set()
  116. def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL,
  117. stdin=DEVNULL, wait=False):
  118. """Return a subprocess.Popen object to use in tests.
  119. By default stdout and stderr are redirected to /dev/null and the
  120. python interpreter is used as test process.
  121. If 'wait' is True attemps to make sure the process is in a
  122. reasonably initialized state.
  123. """
  124. if cmd is None:
  125. pyline = ""
  126. if wait:
  127. pyline += "open(r'%s', 'w'); " % TESTFN
  128. pyline += "import time; time.sleep(60);"
  129. cmd_ = [PYTHON, "-c", pyline]
  130. else:
  131. cmd_ = cmd
  132. sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin)
  133. if wait:
  134. if cmd is None:
  135. stop_at = time.time() + 3
  136. while stop_at > time.time():
  137. if os.path.exists(TESTFN):
  138. break
  139. time.sleep(0.001)
  140. else:
  141. warn("couldn't make sure test file was actually created")
  142. else:
  143. wait_for_pid(sproc.pid)
  144. _subprocesses_started.add(psutil.Process(sproc.pid))
  145. return sproc
  146. _testfiles = []
  147. def pyrun(src):
  148. """Run python code 'src' in a separate interpreter.
  149. Return interpreter subprocess.
  150. """
  151. if PY3:
  152. src = bytes(src, 'ascii')
  153. with tempfile.NamedTemporaryFile(
  154. prefix=TESTFILE_PREFIX, delete=False) as f:
  155. _testfiles.append(f.name)
  156. f.write(src)
  157. f.flush()
  158. subp = get_test_subprocess([PYTHON, f.name], stdout=None,
  159. stderr=None)
  160. wait_for_pid(subp.pid)
  161. return subp
  162. def warn(msg):
  163. """Raise a warning msg."""
  164. warnings.warn(msg, UserWarning)
  165. def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
  166. """run cmd in a subprocess and return its output.
  167. raises RuntimeError on error.
  168. """
  169. p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr)
  170. stdout, stderr = p.communicate()
  171. if p.returncode != 0:
  172. raise RuntimeError(stderr)
  173. if stderr:
  174. warn(stderr)
  175. if PY3:
  176. stdout = str(stdout, sys.stdout.encoding)
  177. return stdout.strip()
  178. def which(program):
  179. """Same as UNIX which command. Return None on command not found."""
  180. def is_exe(fpath):
  181. return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
  182. fpath, fname = os.path.split(program)
  183. if fpath:
  184. if is_exe(program):
  185. return program
  186. else:
  187. for path in os.environ["PATH"].split(os.pathsep):
  188. exe_file = os.path.join(path, program)
  189. if is_exe(exe_file):
  190. return exe_file
  191. return None
  192. if POSIX:
  193. def get_kernel_version():
  194. """Return a tuple such as (2, 6, 36)."""
  195. s = ""
  196. uname = os.uname()[2]
  197. for c in uname:
  198. if c.isdigit() or c == '.':
  199. s += c
  200. else:
  201. break
  202. if not s:
  203. raise ValueError("can't parse %r" % uname)
  204. minor = 0
  205. micro = 0
  206. nums = s.split('.')
  207. major = int(nums[0])
  208. if len(nums) >= 2:
  209. minor = int(nums[1])
  210. if len(nums) >= 3:
  211. micro = int(nums[2])
  212. return (major, minor, micro)
  213. if LINUX:
  214. RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36)
  215. else:
  216. RLIMIT_SUPPORT = False
  217. def wait_for_pid(pid, timeout=GLOBAL_TIMEOUT):
  218. """Wait for pid to show up in the process list then return.
  219. Used in the test suite to give time the sub process to initialize.
  220. """
  221. raise_at = time.time() + timeout
  222. while True:
  223. if pid in psutil.pids():
  224. # give it one more iteration to allow full initialization
  225. time.sleep(0.01)
  226. return
  227. time.sleep(0.0001)
  228. if time.time() >= raise_at:
  229. raise RuntimeError("Timed out")
  230. def wait_for_file(fname, timeout=GLOBAL_TIMEOUT, delete_file=True):
  231. """Wait for a file to be written on disk."""
  232. stop_at = time.time() + 3
  233. while time.time() < stop_at:
  234. try:
  235. with open(fname, "r") as f:
  236. data = f.read()
  237. if not data:
  238. continue
  239. if delete_file:
  240. os.remove(fname)
  241. return data
  242. except IOError:
  243. time.sleep(0.001)
  244. raise RuntimeError("timed out (couldn't read file)")
  245. def reap_children(search_all=False):
  246. """Kill any subprocess started by this test suite and ensure that
  247. no zombies stick around to hog resources and create problems when
  248. looking for refleaks.
  249. """
  250. global _subprocesses_started
  251. procs = _subprocesses_started.copy()
  252. if search_all:
  253. this_process = psutil.Process()
  254. for p in this_process.children(recursive=True):
  255. procs.add(p)
  256. for p in procs:
  257. try:
  258. p.terminate()
  259. except psutil.NoSuchProcess:
  260. pass
  261. gone, alive = psutil.wait_procs(procs, timeout=GLOBAL_TIMEOUT)
  262. for p in alive:
  263. warn("couldn't terminate process %s" % p)
  264. try:
  265. p.kill()
  266. except psutil.NoSuchProcess:
  267. pass
  268. _, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT)
  269. if alive:
  270. warn("couldn't not kill processes %s" % str(alive))
  271. _subprocesses_started = set(alive)
  272. def check_ip_address(addr, family):
  273. """Attempts to check IP address's validity."""
  274. if enum and PY3:
  275. assert isinstance(family, enum.IntEnum), family
  276. if family == AF_INET:
  277. octs = [int(x) for x in addr.split('.')]
  278. assert len(octs) == 4, addr
  279. for num in octs:
  280. assert 0 <= num <= 255, addr
  281. if ipaddress:
  282. if not PY3:
  283. addr = unicode(addr)
  284. ipaddress.IPv4Address(addr)
  285. elif family == AF_INET6:
  286. assert isinstance(addr, str), addr
  287. if ipaddress:
  288. if not PY3:
  289. addr = unicode(addr)
  290. ipaddress.IPv6Address(addr)
  291. elif family == psutil.AF_LINK:
  292. assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr
  293. else:
  294. raise ValueError("unknown family %r", family)
  295. def check_connection_ntuple(conn):
  296. """Check validity of a connection namedtuple."""
  297. valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if
  298. x.startswith('CONN_')]
  299. assert conn[0] == conn.fd
  300. assert conn[1] == conn.family
  301. assert conn[2] == conn.type
  302. assert conn[3] == conn.laddr
  303. assert conn[4] == conn.raddr
  304. assert conn[5] == conn.status
  305. assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type)
  306. assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family)
  307. assert conn.status in valid_conn_states, conn.status
  308. # check IP address and port sanity
  309. for addr in (conn.laddr, conn.raddr):
  310. if not addr:
  311. continue
  312. if conn.family in (AF_INET, AF_INET6):
  313. assert isinstance(addr, tuple), addr
  314. ip, port = addr
  315. assert isinstance(port, int), port
  316. assert 0 <= port <= 65535, port
  317. check_ip_address(ip, conn.family)
  318. elif conn.family == AF_UNIX:
  319. assert isinstance(addr, (str, None)), addr
  320. else:
  321. raise ValueError("unknown family %r", conn.family)
  322. if conn.family in (AF_INET, AF_INET6):
  323. # actually try to bind the local socket; ignore IPv6
  324. # sockets as their address might be represented as
  325. # an IPv4-mapped-address (e.g. "::127.0.0.1")
  326. # and that's rejected by bind()
  327. if conn.family == AF_INET:
  328. s = socket.socket(conn.family, conn.type)
  329. with contextlib.closing(s):
  330. try:
  331. s.bind((conn.laddr[0], 0))
  332. except socket.error as err:
  333. if err.errno != errno.EADDRNOTAVAIL:
  334. raise
  335. elif conn.family == AF_UNIX:
  336. assert not conn.raddr, repr(conn.raddr)
  337. assert conn.status == psutil.CONN_NONE, conn.status
  338. if getattr(conn, 'fd', -1) != -1:
  339. assert conn.fd > 0, conn
  340. if hasattr(socket, 'fromfd') and not WINDOWS:
  341. try:
  342. dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
  343. except (socket.error, OSError) as err:
  344. if err.args[0] != errno.EBADF:
  345. raise
  346. else:
  347. with contextlib.closing(dupsock):
  348. assert dupsock.family == conn.family
  349. assert dupsock.type == conn.type
  350. def safe_remove(file):
  351. "Convenience function for removing temporary test files"
  352. try:
  353. os.remove(file)
  354. except OSError as err:
  355. if err.errno != errno.ENOENT:
  356. # file is being used by another process
  357. if WINDOWS and isinstance(err, WindowsError) and err.errno == 13:
  358. return
  359. raise
  360. def safe_rmdir(dir):
  361. "Convenience function for removing temporary test directories"
  362. try:
  363. os.rmdir(dir)
  364. except OSError as err:
  365. if err.errno != errno.ENOENT:
  366. raise
  367. def call_until(fun, expr, timeout=GLOBAL_TIMEOUT):
  368. """Keep calling function for timeout secs and exit if eval()
  369. expression is True.
  370. """
  371. stop_at = time.time() + timeout
  372. while time.time() < stop_at:
  373. ret = fun()
  374. if eval(expr):
  375. return ret
  376. time.sleep(0.001)
  377. raise RuntimeError('timed out (ret=%r)' % ret)
  378. def retry_before_failing(ntimes=None):
  379. """Decorator which runs a test function and retries N times before
  380. actually failing.
  381. """
  382. def decorator(fun):
  383. @functools.wraps(fun)
  384. def wrapper(*args, **kwargs):
  385. for x in range(ntimes or NO_RETRIES):
  386. try:
  387. return fun(*args, **kwargs)
  388. except AssertionError:
  389. pass
  390. raise
  391. return wrapper
  392. return decorator
  393. def skip_on_access_denied(only_if=None):
  394. """Decorator to Ignore AccessDenied exceptions."""
  395. def decorator(fun):
  396. @functools.wraps(fun)
  397. def wrapper(*args, **kwargs):
  398. try:
  399. return fun(*args, **kwargs)
  400. except psutil.AccessDenied:
  401. if only_if is not None:
  402. if not only_if:
  403. raise
  404. msg = "%r was skipped because it raised AccessDenied" \
  405. % fun.__name__
  406. raise unittest.SkipTest(msg)
  407. return wrapper
  408. return decorator
  409. def skip_on_not_implemented(only_if=None):
  410. """Decorator to Ignore NotImplementedError exceptions."""
  411. def decorator(fun):
  412. @functools.wraps(fun)
  413. def wrapper(*args, **kwargs):
  414. try:
  415. return fun(*args, **kwargs)
  416. except NotImplementedError:
  417. if only_if is not None:
  418. if not only_if:
  419. raise
  420. msg = "%r was skipped because it raised NotImplementedError" \
  421. % fun.__name__
  422. raise unittest.SkipTest(msg)
  423. return wrapper
  424. return decorator
  425. def supports_ipv6():
  426. """Return True if IPv6 is supported on this platform."""
  427. if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
  428. return False
  429. sock = None
  430. try:
  431. sock = socket.socket(AF_INET6, SOCK_STREAM)
  432. sock.bind(("::1", 0))
  433. except (socket.error, socket.gaierror):
  434. return False
  435. else:
  436. return True
  437. finally:
  438. if sock is not None:
  439. sock.close()
  440. if WINDOWS:
  441. def get_winver():
  442. wv = sys.getwindowsversion()
  443. if hasattr(wv, 'service_pack_major'): # python >= 2.7
  444. sp = wv.service_pack_major or 0
  445. else:
  446. r = re.search("\s\d$", wv[4])
  447. if r:
  448. sp = int(r.group(0))
  449. else:
  450. sp = 0
  451. return (wv[0], wv[1], sp)
  452. class ThreadTask(threading.Thread):
  453. """A thread object used for running process thread tests."""
  454. def __init__(self):
  455. threading.Thread.__init__(self)
  456. self._running = False
  457. self._interval = None
  458. self._flag = threading.Event()
  459. def __repr__(self):
  460. name = self.__class__.__name__
  461. return '<%s running=%s at %#x>' % (name, self._running, id(self))
  462. def start(self, interval=0.001):
  463. """Start thread and keep it running until an explicit
  464. stop() request. Polls for shutdown every 'timeout' seconds.
  465. """
  466. if self._running:
  467. raise ValueError("already started")
  468. self._interval = interval
  469. threading.Thread.start(self)
  470. self._flag.wait()
  471. def run(self):
  472. self._running = True
  473. self._flag.set()
  474. while self._running:
  475. time.sleep(self._interval)
  476. def stop(self):
  477. """Stop thread execution and and waits until it is stopped."""
  478. if not self._running:
  479. raise ValueError("already stopped")
  480. self._running = False
  481. self.join()
  482. # ===================================================================
  483. # --- System-related API tests
  484. # ===================================================================
  485. class TestSystemAPIs(unittest.TestCase):
  486. """Tests for system-related APIs."""
  487. def setUp(self):
  488. safe_remove(TESTFN)
  489. def tearDown(self):
  490. reap_children()
  491. def test_process_iter(self):
  492. self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
  493. sproc = get_test_subprocess()
  494. self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
  495. p = psutil.Process(sproc.pid)
  496. p.kill()
  497. p.wait()
  498. self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
  499. def test_wait_procs(self):
  500. def callback(p):
  501. l.append(p.pid)
  502. l = []
  503. sproc1 = get_test_subprocess()
  504. sproc2 = get_test_subprocess()
  505. sproc3 = get_test_subprocess()
  506. procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
  507. self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
  508. self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
  509. t = time.time()
  510. gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
  511. self.assertLess(time.time() - t, 0.5)
  512. self.assertEqual(gone, [])
  513. self.assertEqual(len(alive), 3)
  514. self.assertEqual(l, [])
  515. for p in alive:
  516. self.assertFalse(hasattr(p, 'returncode'))
  517. @retry_before_failing(30)
  518. def test(procs, callback):
  519. gone, alive = psutil.wait_procs(procs, timeout=0.03,
  520. callback=callback)
  521. self.assertEqual(len(gone), 1)
  522. self.assertEqual(len(alive), 2)
  523. return gone, alive
  524. sproc3.terminate()
  525. gone, alive = test(procs, callback)
  526. self.assertIn(sproc3.pid, [x.pid for x in gone])
  527. if POSIX:
  528. self.assertEqual(gone.pop().returncode, signal.SIGTERM)
  529. else:
  530. self.assertEqual(gone.pop().returncode, 1)
  531. self.assertEqual(l, [sproc3.pid])
  532. for p in alive:
  533. self.assertFalse(hasattr(p, 'returncode'))
  534. @retry_before_failing(30)
  535. def test(procs, callback):
  536. gone, alive = psutil.wait_procs(procs, timeout=0.03,
  537. callback=callback)
  538. self.assertEqual(len(gone), 3)
  539. self.assertEqual(len(alive), 0)
  540. return gone, alive
  541. sproc1.terminate()
  542. sproc2.terminate()
  543. gone, alive = test(procs, callback)
  544. self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid]))
  545. for p in gone:
  546. self.assertTrue(hasattr(p, 'returncode'))
  547. def test_wait_procs_no_timeout(self):
  548. sproc1 = get_test_subprocess()
  549. sproc2 = get_test_subprocess()
  550. sproc3 = get_test_subprocess()
  551. procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
  552. for p in procs:
  553. p.terminate()
  554. gone, alive = psutil.wait_procs(procs)
  555. def test_boot_time(self):
  556. bt = psutil.boot_time()
  557. self.assertIsInstance(bt, float)
  558. self.assertGreater(bt, 0)
  559. self.assertLess(bt, time.time())
  560. @unittest.skipUnless(POSIX, 'posix only')
  561. def test_PAGESIZE(self):
  562. # pagesize is used internally to perform different calculations
  563. # and it's determined by using SC_PAGE_SIZE; make sure
  564. # getpagesize() returns the same value.
  565. import resource
  566. self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
  567. def test_virtual_memory(self):
  568. mem = psutil.virtual_memory()
  569. assert mem.total > 0, mem
  570. assert mem.available > 0, mem
  571. assert 0 <= mem.percent <= 100, mem
  572. assert mem.used > 0, mem
  573. assert mem.free >= 0, mem
  574. for name in mem._fields:
  575. value = getattr(mem, name)
  576. if name != 'percent':
  577. self.assertIsInstance(value, (int, long))
  578. if name != 'total':
  579. if not value >= 0:
  580. self.fail("%r < 0 (%s)" % (name, value))
  581. if value > mem.total:
  582. self.fail("%r > total (total=%s, %s=%s)"
  583. % (name, mem.total, name, value))
  584. def test_swap_memory(self):
  585. mem = psutil.swap_memory()
  586. assert mem.total >= 0, mem
  587. assert mem.used >= 0, mem
  588. if mem.total > 0:
  589. # likely a system with no swap partition
  590. assert mem.free > 0, mem
  591. else:
  592. assert mem.free == 0, mem
  593. assert 0 <= mem.percent <= 100, mem
  594. assert mem.sin >= 0, mem
  595. assert mem.sout >= 0, mem
  596. def test_pid_exists(self):
  597. sproc = get_test_subprocess(wait=True)
  598. self.assertTrue(psutil.pid_exists(sproc.pid))
  599. p = psutil.Process(sproc.pid)
  600. p.kill()
  601. p.wait()
  602. self.assertFalse(psutil.pid_exists(sproc.pid))
  603. self.assertFalse(psutil.pid_exists(-1))
  604. self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
  605. # pid 0
  606. psutil.pid_exists(0) == 0 in psutil.pids()
  607. def test_pid_exists_2(self):
  608. reap_children()
  609. pids = psutil.pids()
  610. for pid in pids:
  611. try:
  612. assert psutil.pid_exists(pid)
  613. except AssertionError:
  614. # in case the process disappeared in meantime fail only
  615. # if it is no longer in psutil.pids()
  616. time.sleep(.1)
  617. if pid in psutil.pids():
  618. self.fail(pid)
  619. pids = range(max(pids) + 5000, max(pids) + 6000)
  620. for pid in pids:
  621. self.assertFalse(psutil.pid_exists(pid), msg=pid)
  622. def test_pids(self):
  623. plist = [x.pid for x in psutil.process_iter()]
  624. pidlist = psutil.pids()
  625. self.assertEqual(plist.sort(), pidlist.sort())
  626. # make sure every pid is unique
  627. self.assertEqual(len(pidlist), len(set(pidlist)))
  628. def test_test(self):
  629. # test for psutil.test() function
  630. stdout = sys.stdout
  631. sys.stdout = DEVNULL
  632. try:
  633. psutil.test()
  634. finally:
  635. sys.stdout = stdout
  636. def test_cpu_count(self):
  637. logical = psutil.cpu_count()
  638. self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
  639. self.assertGreaterEqual(logical, 1)
  640. #
  641. if LINUX:
  642. with open("/proc/cpuinfo") as fd:
  643. cpuinfo_data = fd.read()
  644. if "physical id" not in cpuinfo_data:
  645. raise unittest.SkipTest("cpuinfo doesn't include physical id")
  646. physical = psutil.cpu_count(logical=False)
  647. self.assertGreaterEqual(physical, 1)
  648. self.assertGreaterEqual(logical, physical)
  649. def test_sys_cpu_times(self):
  650. total = 0
  651. times = psutil.cpu_times()
  652. sum(times)
  653. for cp_time in times:
  654. self.assertIsInstance(cp_time, float)
  655. self.assertGreaterEqual(cp_time, 0.0)
  656. total += cp_time
  657. self.assertEqual(total, sum(times))
  658. str(times)
  659. if not WINDOWS:
  660. # CPU times are always supposed to increase over time or
  661. # remain the same but never go backwards, see:
  662. # https://github.com/giampaolo/psutil/issues/392
  663. last = psutil.cpu_times()
  664. for x in range(100):
  665. new = psutil.cpu_times()
  666. for field in new._fields:
  667. new_t = getattr(new, field)
  668. last_t = getattr(last, field)
  669. self.assertGreaterEqual(new_t, last_t,
  670. msg="%s %s" % (new_t, last_t))
  671. last = new
  672. def test_sys_cpu_times2(self):
  673. t1 = sum(psutil.cpu_times())
  674. time.sleep(0.1)
  675. t2 = sum(psutil.cpu_times())
  676. difference = t2 - t1
  677. if not difference >= 0.05:
  678. self.fail("difference %s" % difference)
  679. def test_sys_per_cpu_times(self):
  680. for times in psutil.cpu_times(percpu=True):
  681. total = 0
  682. sum(times)
  683. for cp_time in times:
  684. self.assertIsInstance(cp_time, float)
  685. self.assertGreaterEqual(cp_time, 0.0)
  686. total += cp_time
  687. self.assertEqual(total, sum(times))
  688. str(times)
  689. self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
  690. len(psutil.cpu_times(percpu=False)))
  691. # Note: in theory CPU times are always supposed to increase over
  692. # time or remain the same but never go backwards. In practice
  693. # sometimes this is not the case.
  694. # This issue seemd to be afflict Windows:
  695. # https://github.com/giampaolo/psutil/issues/392
  696. # ...but it turns out also Linux (rarely) behaves the same.
  697. # last = psutil.cpu_times(percpu=True)
  698. # for x in range(100):
  699. # new = psutil.cpu_times(percpu=True)
  700. # for index in range(len(new)):
  701. # newcpu = new[index]
  702. # lastcpu = last[index]
  703. # for field in newcpu._fields:
  704. # new_t = getattr(newcpu, field)
  705. # last_t = getattr(lastcpu, field)
  706. # self.assertGreaterEqual(
  707. # new_t, last_t, msg="%s %s" % (lastcpu, newcpu))
  708. # last = new
  709. def test_sys_per_cpu_times_2(self):
  710. tot1 = psutil.cpu_times(percpu=True)
  711. stop_at = time.time() + 0.1
  712. while True:
  713. if time.time() >= stop_at:
  714. break
  715. tot2 = psutil.cpu_times(percpu=True)
  716. for t1, t2 in zip(tot1, tot2):
  717. t1, t2 = sum(t1), sum(t2)
  718. difference = t2 - t1
  719. if difference >= 0.05:
  720. return
  721. self.fail()
  722. def _test_cpu_percent(self, percent, last_ret, new_ret):
  723. try:
  724. self.assertIsInstance(percent, float)
  725. self.assertGreaterEqual(percent, 0.0)
  726. self.assertIsNot(percent, -0.0)
  727. self.assertLessEqual(percent, 100.0 * psutil.cpu_count())
  728. except AssertionError as err:
  729. raise AssertionError("\n%s\nlast=%s\nnew=%s" % (
  730. err, pprint.pformat(last_ret), pprint.pformat(new_ret)))
  731. def test_sys_cpu_percent(self):
  732. last = psutil.cpu_percent(interval=0.001)
  733. for x in range(100):
  734. new = psutil.cpu_percent(interval=None)
  735. self._test_cpu_percent(new, last, new)
  736. last = new
  737. def test_sys_per_cpu_percent(self):
  738. last = psutil.cpu_percent(interval=0.001, percpu=True)
  739. self.assertEqual(len(last), psutil.cpu_count())
  740. for x in range(100):
  741. new = psutil.cpu_percent(interval=None, percpu=True)
  742. for percent in new:
  743. self._test_cpu_percent(percent, last, new)
  744. last = new
  745. def test_sys_cpu_times_percent(self):
  746. last = psutil.cpu_times_percent(interval=0.001)
  747. for x in range(100):
  748. new = psutil.cpu_times_percent(interval=None)
  749. for percent in new:
  750. self._test_cpu_percent(percent, last, new)
  751. self._test_cpu_percent(sum(new), last, new)
  752. last = new
  753. def test_sys_per_cpu_times_percent(self):
  754. last = psutil.cpu_times_percent(interval=0.001, percpu=True)
  755. self.assertEqual(len(last), psutil.cpu_count())
  756. for x in range(100):
  757. new = psutil.cpu_times_percent(interval=None, percpu=True)
  758. for cpu in new:
  759. for percent in cpu:
  760. self._test_cpu_percent(percent, last, new)
  761. self._test_cpu_percent(sum(cpu), last, new)
  762. last = new
  763. def test_sys_per_cpu_times_percent_negative(self):
  764. # see: https://github.com/giampaolo/psutil/issues/645
  765. psutil.cpu_times_percent(percpu=True)
  766. zero_times = [x._make([0 for x in range(len(x._fields))])
  767. for x in psutil.cpu_times(percpu=True)]
  768. with mock.patch('psutil.cpu_times', return_value=zero_times):
  769. for cpu in psutil.cpu_times_percent(percpu=True):
  770. for percent in cpu:
  771. self._test_cpu_percent(percent, None, None)
  772. @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
  773. "os.statvfs() function not available on this platform")
  774. def test_disk_usage(self):
  775. usage = psutil.disk_usage(os.getcwd())
  776. assert usage.total > 0, usage
  777. assert usage.used > 0, usage
  778. assert usage.free > 0, usage
  779. assert usage.total > usage.used, usage
  780. assert usage.total > usage.free, usage
  781. assert 0 <= usage.percent <= 100, usage.percent
  782. if hasattr(shutil, 'disk_usage'):
  783. # py >= 3.3, see: http://bugs.python.org/issue12442
  784. shutil_usage = shutil.disk_usage(os.getcwd())
  785. tolerance = 5 * 1024 * 1024 # 5MB
  786. self.assertEqual(usage.total, shutil_usage.total)
  787. self.assertAlmostEqual(usage.free, shutil_usage.free,
  788. delta=tolerance)
  789. self.assertAlmostEqual(usage.used, shutil_usage.used,
  790. delta=tolerance)
  791. # if path does not exist OSError ENOENT is expected across
  792. # all platforms
  793. fname = tempfile.mktemp()
  794. try:
  795. psutil.disk_usage(fname)
  796. except OSError as err:
  797. if err.args[0] != errno.ENOENT:
  798. raise
  799. else:
  800. self.fail("OSError not raised")
  801. @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
  802. "os.statvfs() function not available on this platform")
  803. def test_disk_usage_unicode(self):
  804. # see: https://github.com/giampaolo/psutil/issues/416
  805. # XXX this test is not really reliable as it always fails on
  806. # Python 3.X (2.X is fine)
  807. try:
  808. safe_rmdir(TESTFN_UNICODE)
  809. os.mkdir(TESTFN_UNICODE)
  810. psutil.disk_usage(TESTFN_UNICODE)
  811. safe_rmdir(TESTFN_UNICODE)
  812. except UnicodeEncodeError:
  813. pass
  814. @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
  815. "os.statvfs() function not available on this platform")
  816. @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis")
  817. def test_disk_partitions(self):
  818. # all = False
  819. ls = psutil.disk_partitions(all=False)
  820. # on travis we get:
  821. # self.assertEqual(p.cpu_affinity(), [n])
  822. # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
  823. self.assertTrue(ls, msg=ls)
  824. for disk in ls:
  825. if WINDOWS and 'cdrom' in disk.opts:
  826. continue
  827. if not POSIX:
  828. assert os.path.exists(disk.device), disk
  829. else:
  830. # we cannot make any assumption about this, see:
  831. # http://goo.gl/p9c43
  832. disk.device
  833. if SUNOS:
  834. # on solaris apparently mount points can also be files
  835. assert os.path.exists(disk.mountpoint), disk
  836. else:
  837. assert os.path.isdir(disk.mountpoint), disk
  838. assert disk.fstype, disk
  839. self.assertIsInstance(disk.opts, str)
  840. # all = True
  841. ls = psutil.disk_partitions(all=True)
  842. self.assertTrue(ls, msg=ls)
  843. for disk in psutil.disk_partitions(all=True):
  844. if not WINDOWS:
  845. try:
  846. os.stat(disk.mountpoint)
  847. except OSError as err:
  848. # http://mail.python.org/pipermail/python-dev/
  849. # 2012-June/120787.html
  850. if err.errno not in (errno.EPERM, errno.EACCES):
  851. raise
  852. else:
  853. if SUNOS:
  854. # on solaris apparently mount points can also be files
  855. assert os.path.exists(disk.mountpoint), disk
  856. else:
  857. assert os.path.isdir(disk.mountpoint), disk
  858. self.assertIsInstance(disk.fstype, str)
  859. self.assertIsInstance(disk.opts, str)
  860. def find_mount_point(path):
  861. path = os.path.abspath(path)
  862. while not os.path.ismount(path):
  863. path = os.path.dirname(path)
  864. return path
  865. mount = find_mount_point(__file__)
  866. mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
  867. self.assertIn(mount, mounts)
  868. psutil.disk_usage(mount)
  869. @skip_on_access_denied()
  870. def test_net_connections(self):
  871. def check(cons, families, types_):
  872. for conn in cons:
  873. self.assertIn(conn.family, families, msg=conn)
  874. if conn.family != getattr(socket, 'AF_UNIX', object()):
  875. self.assertIn(conn.type, types_, msg=conn)
  876. from psutil._common import conn_tmap
  877. for kind, groups in conn_tmap.items():
  878. if SUNOS and kind == 'unix':
  879. continue
  880. families, types_ = groups
  881. cons = psutil.net_connections(kind)
  882. self.assertEqual(len(cons), len(set(cons)))
  883. check(cons, families, types_)
  884. def test_net_io_counters(self):
  885. def check_ntuple(nt):
  886. self.assertEqual(nt[0], nt.bytes_sent)
  887. self.assertEqual(nt[1], nt.bytes_recv)
  888. self.assertEqual(nt[2], nt.packets_sent)
  889. self.assertEqual(nt[3], nt.packets_recv)
  890. self.assertEqual(nt[4], nt.errin)
  891. self.assertEqual(nt[5], nt.errout)
  892. self.assertEqual(nt[6], nt.dropin)
  893. self.assertEqual(nt[7], nt.dropout)
  894. assert nt.bytes_sent >= 0, nt
  895. assert nt.bytes_recv >= 0, nt
  896. assert nt.packets_sent >= 0, nt
  897. assert nt.packets_recv >= 0, nt
  898. assert nt.errin >= 0, nt
  899. assert nt.errout >= 0, nt
  900. assert nt.dropin >= 0, nt
  901. assert nt.dropout >= 0, nt
  902. ret = psutil.net_io_counters(pernic=False)
  903. check_ntuple(ret)
  904. ret = psutil.net_io_counters(pernic=True)
  905. self.assertNotEqual(ret, [])
  906. for key in ret:
  907. self.assertTrue(key)
  908. check_ntuple(ret[key])
  909. def test_net_if_addrs(self):
  910. nics = psutil.net_if_addrs()
  911. assert nics, nics
  912. # Not reliable on all platforms (net_if_addrs() reports more
  913. # interfaces).
  914. # self.assertEqual(sorted(nics.keys()),
  915. # sorted(psutil.net_io_counters(pernic=True).keys()))
  916. families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
  917. for nic, addrs in nics.items():
  918. self.assertEqual(len(set(addrs)), len(addrs))
  919. for addr in addrs:
  920. self.assertIsInstance(addr.family, int)
  921. self.assertIsInstance(addr.address, str)
  922. self.assertIsInstance(addr.netmask, (str, type(None)))
  923. self.assertIsInstance(addr.broadcast, (str, type(None)))
  924. self.assertIn(addr.family, families)
  925. if sys.version_info >= (3, 4):
  926. self.assertIsInstance(addr.family, enum.IntEnum)
  927. if addr.family == socket.AF_INET:
  928. s = socket.socket(addr.family)
  929. with contextlib.closing(s):
  930. s.bind((addr.address, 0))
  931. elif addr.family == socket.AF_INET6:
  932. info = socket.getaddrinfo(
  933. addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM,
  934. 0, socket.AI_PASSIVE)[0]
  935. af, socktype, proto, canonname, sa = info
  936. s = socket.socket(af, socktype, proto)
  937. with contextlib.closing(s):
  938. s.bind(sa)
  939. for ip in (addr.address, addr.netmask, addr.broadcast):
  940. if ip is not None:
  941. # TODO: skip AF_INET6 for now because I get:
  942. # AddressValueError: Only hex digits permitted in
  943. # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
  944. if addr.family != AF_INET6:
  945. check_ip_address(ip, addr.family)
  946. if BSD or OSX or SUNOS:
  947. if hasattr(socket, "AF_LINK"):
  948. self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
  949. elif LINUX:
  950. self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
  951. elif WINDOWS:
  952. self.assertEqual(psutil.AF_LINK, -1)
  953. @unittest.skipIf(TRAVIS, "EPERM on travis")
  954. def test_net_if_stats(self):
  955. nics = psutil.net_if_stats()
  956. assert nics, nics
  957. all_duplexes = (psutil.NIC_DUPLEX_FULL,
  958. psutil.NIC_DUPLEX_HALF,
  959. psutil.NIC_DUPLEX_UNKNOWN)
  960. for nic, stats in nics.items():
  961. isup, duplex, speed, mtu = stats
  962. self.assertIsInstance(isup, bool)
  963. self.assertIn(duplex, all_duplexes)
  964. self.assertIn(duplex, all_duplexes)
  965. self.assertGreaterEqual(speed, 0)
  966. self.assertGreaterEqual(mtu, 0)
  967. @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
  968. '/proc/diskstats not available on this linux version')
  969. @unittest.skipIf(APPVEYOR,
  970. "can't find any physical disk on Appveyor")
  971. def test_disk_io_counters(self):
  972. def check_ntuple(nt):
  973. self.assertEqual(nt[0], nt.read_count)
  974. self.assertEqual(nt[1], nt.write_count)
  975. self.assertEqual(nt[2], nt.read_bytes)
  976. self.assertEqual(nt[3], nt.write_bytes)
  977. self.assertEqual(nt[4], nt.read_time)
  978. self.assertEqual(nt[5], nt.write_time)
  979. assert nt.read_count >= 0, nt
  980. assert nt.write_count >= 0, nt
  981. assert nt.read_bytes >= 0, nt
  982. assert nt.write_bytes >= 0, nt
  983. assert nt.read_time >= 0, nt
  984. assert nt.write_time >= 0, nt
  985. ret = psutil.disk_io_counters(perdisk=False)
  986. check_ntuple(ret)
  987. ret = psutil.disk_io_counters(perdisk=True)
  988. # make sure there are no duplicates
  989. self.assertEqual(len(ret), len(set(ret)))
  990. for key in ret:
  991. assert key, key
  992. check_ntuple(ret[key])
  993. if LINUX and key[-1].isdigit():
  994. # if 'sda1' is listed 'sda' shouldn't, see:
  995. # https://github.com/giampaolo/psutil/issues/338
  996. while key[-1].isdigit():
  997. key = key[:-1]
  998. self.assertNotIn(key, ret.keys())
  999. def test_users(self):
  1000. users = psutil.users()
  1001. if not APPVEYOR:
  1002. self.assertNotEqual(users, [])
  1003. for user in users:
  1004. assert user.name, user
  1005. user.terminal
  1006. user.host
  1007. assert user.started > 0.0, user
  1008. datetime.datetime.fromtimestamp(user.started)
  1009. # ===================================================================
  1010. # --- psutil.Process class tests
  1011. # ===================================================================
  1012. class TestProcess(unittest.TestCase):
  1013. """Tests for psutil.Process class."""
  1014. def setUp(self):
  1015. safe_remove(TESTFN)
  1016. def tearDown(self):
  1017. reap_children()
  1018. def test_pid(self):
  1019. self.assertEqual(psutil.Process().pid, os.getpid())
  1020. sproc = get_test_subprocess()
  1021. self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
  1022. def test_kill(self):
  1023. sproc = get_test_subprocess(wait=True)
  1024. test_pid = sproc.pid
  1025. p = psutil.Process(test_pid)
  1026. p.kill()
  1027. sig = p.wait()
  1028. self.assertFalse(psutil.pid_exists(test_pid))
  1029. if POSIX:
  1030. self.assertEqual(sig, signal.SIGKILL)
  1031. def test_terminate(self):
  1032. sproc = get_test_subprocess(wait=True)
  1033. test_pid = sproc.pid
  1034. p = psutil.Process(test_pid)
  1035. p.terminate()
  1036. sig = p.wait()
  1037. self.assertFalse(psutil.pid_exists(test_pid))
  1038. if POSIX:
  1039. self.assertEqual(sig, signal.SIGTERM)
  1040. def test_send_signal(self):
  1041. sig = signal.SIGKILL if POSIX else signal.SIGTERM
  1042. sproc = get_test_subprocess()
  1043. p = psutil.Process(sproc.pid)
  1044. p.send_signal(sig)
  1045. exit_sig = p.wait()
  1046. self.assertFalse(psutil.pid_exists(p.pid))
  1047. if POSIX:
  1048. self.assertEqual(exit_sig, sig)
  1049. #
  1050. sproc = get_test_subprocess()
  1051. p = psutil.Process(sproc.pid)
  1052. p.send_signal(sig)
  1053. with mock.patch('psutil.os.kill',
  1054. side_effect=OSError(errno.ESRCH, "")) as fun:
  1055. with self.assertRaises(psutil.NoSuchProcess):
  1056. p.send_signal(sig)
  1057. assert fun.called
  1058. #
  1059. sproc = get_test_subprocess()
  1060. p = psutil.Process(sproc.pid)
  1061. p.send_signal(sig)
  1062. with mock.patch('psutil.os.kill',
  1063. side_effect=OSError(errno.EPERM, "")) as fun:
  1064. with self.assertRaises(psutil.AccessDenied):
  1065. p.send_signal(sig)
  1066. assert fun.called
  1067. def test_wait(self):
  1068. # check exit code signal
  1069. sproc = get_test_subprocess()
  1070. p = psutil.Process(sproc.pid)
  1071. p.kill()
  1072. code = p.wait()
  1073. if POSIX:
  1074. self.assertEqual(code, signal.SIGKILL)
  1075. else:
  1076. self.assertEqual(code, 0)
  1077. self.assertFalse(p.is_running())
  1078. sproc = get_test_subprocess()
  1079. p = psutil.Process(sproc.pid)
  1080. p.terminate()
  1081. code = p.wait()
  1082. if POSIX:
  1083. self.assertEqual(code, signal.SIGTERM)
  1084. else:
  1085. self.assertEqual(code, 0)
  1086. self.assertFalse(p.is_running())
  1087. # check sys.exit() code
  1088. code = "import time, sys; time.sleep(0.01); sys.exit(5);"
  1089. sproc = get_test_subprocess([PYTHON, "-c", code])
  1090. p = psutil.Process(sproc.pid)
  1091. self.assertEqual(p.wait(), 5)
  1092. self.assertFalse(p.is_running())
  1093. # Test wait() issued twice.
  1094. # It is not supposed to raise NSP when the process is gone.
  1095. # On UNIX this should return None, on Windows it should keep
  1096. # returning the exit code.
  1097. sproc = get_test_subprocess([PYTHON, "-c", code])
  1098. p = psutil.Process(sproc.pid)
  1099. self.assertEqual(p.wait(), 5)
  1100. self.assertIn(p.wait(), (5, None))
  1101. # test timeout
  1102. sproc = get_test_subprocess()
  1103. p = psutil.Process(sproc.pid)
  1104. p.name()
  1105. self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
  1106. # timeout < 0 not allowed
  1107. self.assertRaises(ValueError, p.wait, -1)
  1108. # XXX why is this skipped on Windows?
  1109. @unittest.skipUnless(POSIX, 'skipped on Windows')
  1110. def test_wait_non_children(self):
  1111. # test wait() against processes which are not our children
  1112. code = "import sys;"
  1113. code += "from subprocess import Popen, PIPE;"
  1114. code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
  1115. code += "sp = Popen(cmd, stdout=PIPE);"
  1116. code += "sys.stdout.write(str(sp.pid));"
  1117. sproc = get_test_subprocess([PYTHON, "-c", code],
  1118. stdout=subprocess.PIPE)
  1119. grandson_pid = int(sproc.stdout.read())
  1120. grandson_proc = psutil.Process(grandson_pid)
  1121. try:
  1122. self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
  1123. grandson_proc.kill()
  1124. ret = grandson_proc.wait()
  1125. self.assertEqual(ret, None)
  1126. finally:
  1127. if grandson_proc.is_running():
  1128. grandson_proc.kill()
  1129. grandson_proc.wait()
  1130. def test_wait_timeout_0(self):
  1131. sproc = get_test_subprocess()
  1132. p = psutil.Process(sproc.pid)
  1133. self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
  1134. p.kill()
  1135. stop_at = time.time() + 2
  1136. while True:
  1137. try:
  1138. code = p.wait(0)
  1139. except psutil.TimeoutExpired:
  1140. if time.time() >= stop_at:
  1141. raise
  1142. else:
  1143. break
  1144. if POSIX:
  1145. self.assertEqual(code, signal.SIGKILL)
  1146. else:
  1147. self.assertEqual(code, 0)
  1148. self.assertFalse(p.is_running())
  1149. def test_cpu_percent(self):
  1150. p = psutil.Process()
  1151. p.cpu_percent(interval=0.001)
  1152. p.cpu_percent(interval=0.001)
  1153. for x in range(100):
  1154. percent = p.cpu_percent(interval=None)
  1155. self.assertIsInstance(percent, float)
  1156. self.assertGreaterEqual(percent, 0.0)
  1157. if not POSIX:
  1158. self.assertLessEqual(percent, 100.0)
  1159. else:
  1160. self.assertGreaterEqual(percent, 0.0)
  1161. def test_cpu_times(self):
  1162. times = psutil.Process().cpu_times()
  1163. assert (times.user > 0.0) or (times.system > 0.0), times
  1164. # make sure returned values can be pretty printed with strftime
  1165. time.strftime("%H:%M:%S", time.localtime(times.user))
  1166. time.strftime("%H:%M:%S", time.localtime(times.system))
  1167. # Test Process.cpu_times() against os.times()
  1168. # os.times() is broken on Python 2.6
  1169. # http://bugs.python.org/issue1040026
  1170. # XXX fails on OSX: not sure if it's for os.times(). We should
  1171. # try this with Python 2.7 and re-enable the test.
  1172. @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX,
  1173. 'os.times() is not reliable on this Python version')
  1174. def test_cpu_times2(self):
  1175. user_time, kernel_time = psutil.Process().cpu_times()
  1176. utime, ktime = os.times()[:2]
  1177. # Use os.times()[:2] as base values to compare our results
  1178. # using a tolerance of +/- 0.1 seconds.
  1179. # It will fail if the difference between the values is > 0.1s.
  1180. if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
  1181. self.fail("expected: %s, found: %s" % (utime, user_time))
  1182. if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
  1183. self.fail("expected: %s, found: %s" % (ktime, kernel_time))
  1184. def test_create_time(self):
  1185. sproc = get_test_subprocess(wait=True)
  1186. now = time.time()
  1187. p = psutil.Process(sproc.pid)
  1188. create_time = p.create_time()
  1189. # Use time.time() as base value to compare our result using a
  1190. # tolerance of +/- 1 second.
  1191. # It will fail if the difference between the values is > 2s.
  1192. difference = abs(create_time - now)
  1193. if difference > 2:
  1194. self.fail("expected: %s, found: %s, difference: %s"
  1195. % (now, create_time, difference))
  1196. # make sure returned value can be pretty printed with strftime
  1197. time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
  1198. @unittest.skipIf(WINDOWS, 'Windows only')
  1199. def test_terminal(self):
  1200. terminal = psutil.Process().terminal()
  1201. if sys.stdin.isatty():
  1202. self.assertEqual(terminal, sh('tty'))
  1203. else:
  1204. assert terminal, repr(terminal)
  1205. @unittest.skipUnless(LINUX or BSD or WINDOWS,
  1206. 'not available on this platform')
  1207. @skip_on_not_implemented(only_if=LINUX)
  1208. def test_io_counters(self):
  1209. p = psutil.Process()
  1210. # test reads
  1211. io1 = p.io_counters()
  1212. with open(PYTHON, 'rb') as f:
  1213. f.read()
  1214. io2 = p.io_counters()
  1215. if not BSD:
  1216. assert io2.read_count > io1.read_count, (io1, io2)
  1217. self.assertEqual(io2.write_count, io1.write_count)
  1218. assert io2.read_bytes >= io1.read_bytes, (io1, io2)
  1219. assert io2.write_bytes >= io1.write_bytes, (io1, io2)
  1220. # test writes
  1221. io1 = p.io_counters()
  1222. with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f:
  1223. if PY3:
  1224. f.write(bytes("x" * 1000000, 'ascii'))
  1225. else:
  1226. f.write("x" * 1000000)
  1227. io2 = p.io_counters()
  1228. assert io2.write_count >= io1.write_count, (io1, io2)
  1229. assert io2.write_bytes >= io1.write_bytes, (io1, io2)
  1230. assert io2.read_count >= io1.read_count, (io1, io2)
  1231. assert io2.read_bytes >= io1.read_bytes, (io1, io2)
  1232. @unittest.skipUnless(LINUX or (WINDOWS and get_winver() >= WIN_VISTA),
  1233. 'Linux and Windows Vista only')
  1234. @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis")
  1235. def test_ionice(self):
  1236. if LINUX:
  1237. from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT,
  1238. IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE)
  1239. self.assertEqual(IOPRIO_CLASS_NONE, 0)
  1240. self.assertEqual(IOPRIO_CLASS_RT, 1)
  1241. self.assertEqual(IOPRIO_CLASS_BE, 2)
  1242. self.assertEqual(IOPRIO_CLASS_IDLE, 3)
  1243. p = psutil.Process()
  1244. try:
  1245. p.ionice(2)
  1246. ioclass, value = p.ionice()
  1247. if enum is not None:
  1248. self.assertIsInstance(ioclass, enum.IntEnum)
  1249. self.assertEqual(ioclass, 2)
  1250. self.assertEqual(value, 4)
  1251. #
  1252. p.ionice(3)
  1253. ioclass, value = p.ionice()
  1254. self.assertEqual(ioclass, 3)
  1255. self.assertEqual(value, 0)
  1256. #
  1257. p.ionice(2, 0)
  1258. ioclass, value = p.ionice()
  1259. self.assertEqual(ioclass, 2)
  1260. self.assertEqual(value, 0)
  1261. p.ionice(2, 7)
  1262. ioclass, value = p.ionice()
  1263. self.assertEqual(ioclass, 2)
  1264. self.assertEqual(value, 7)
  1265. #
  1266. self.assertRaises(ValueError, p.ionice, 2, 10)
  1267. self.assertRaises(ValueError, p.ionice, 2, -1)
  1268. self.assertRaises(ValueError, p.ionice, 4)
  1269. self.assertRaises(TypeError, p.ionice, 2, "foo")
  1270. self.assertRaisesRegexp(
  1271. ValueError, "can't specify value with IOPRIO_CLASS_NONE",
  1272. p.ionice, psutil.IOPRIO_CLASS_NONE, 1)
  1273. self.assertRaisesRegexp(
  1274. ValueError, "can't specify value with IOPRIO_CLASS_IDLE",
  1275. p.ionice, psutil.IOPRIO_CLASS_IDLE, 1)
  1276. self.assertRaisesRegexp(
  1277. ValueError, "'ioclass' argument must be specified",
  1278. p.ionice, value=1)
  1279. finally:
  1280. p.ionice(IOPRIO_CLASS_NONE)
  1281. else:
  1282. p = psutil.Process()
  1283. original = p.ionice()
  1284. self.assertIsInstance(original, int)
  1285. try:
  1286. value = 0 # very low
  1287. if original == value:
  1288. value = 1 # low
  1289. p.ionice(value)
  1290. self.assertEqual(p.ionice(), value)
  1291. finally:
  1292. p.ionice(original)
  1293. #
  1294. self.assertRaises(ValueError, p.ionice, 3)
  1295. self.assertRaises(TypeError, p.ionice, 2, 1)
  1296. @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
  1297. "only available on Linux >= 2.6.36")
  1298. def test_rlimit_get(self):
  1299. import resource
  1300. p = psutil.Process(os.getpid())
  1301. names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
  1302. assert names, names
  1303. for name in names:
  1304. value = getattr(psutil, name)
  1305. self.assertGreaterEqual(value, 0)
  1306. if name in dir(resource):
  1307. self.assertEqual(value, getattr(resource, name))
  1308. self.assertEqual(p.rlimit(value), resource.getrlimit(value))
  1309. else:
  1310. ret = p.rlimit(value)
  1311. self.assertEqual(len(ret), 2)
  1312. self.assertGreaterEqual(ret[0], -1)
  1313. self.assertGreaterEqual(ret[1], -1)
  1314. @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
  1315. "only available on Linux >= 2.6.36")
  1316. def test_rlimit_set(self):
  1317. sproc = get_test_subprocess()
  1318. p = psutil.Process(sproc.pid)
  1319. p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
  1320. self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
  1321. # If pid is 0 prlimit() applies to the calling process and
  1322. # we don't want that.
  1323. with self.assertRaises(ValueError):
  1324. psutil._psplatform.Process(0).rlimit(0)
  1325. with self.assertRaises(ValueError):
  1326. p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
  1327. def test_num_threads(self):
  1328. # on certain platforms such as Linux we might test for exact
  1329. # thread number, since we always have with 1 thread per process,
  1330. # but this does not apply across all platforms (OSX, Windows)
  1331. p = psutil.Process()
  1332. step1 = p.num_threads()
  1333. thread = ThreadTask()
  1334. thread.start()
  1335. try:
  1336. step2 = p.num_threads()
  1337. self.assertEqual(step2, step1 + 1)
  1338. thread.stop()
  1339. finally:
  1340. if thread._running:
  1341. thread.stop()
  1342. @unittest.skipUnless(WINDOWS, 'Windows only')
  1343. def test_num_handles(self):
  1344. # a better test is done later into test/_windows.py
  1345. p = psutil.Process()
  1346. self.assertGreater(p.num_handles(), 0)
  1347. def test_threads(self):
  1348. p = psutil.Process()
  1349. step1 = p.threads()
  1350. thread = ThreadTask()
  1351. thread.start()
  1352. try:
  1353. step2 = p.threads()
  1354. self.assertEqual(len(step2), len(step1) + 1)
  1355. # on Linux, first thread id is supposed to be this process
  1356. if LINUX:
  1357. self.assertEqual(step2[0].id, os.getpid())
  1358. athread = step2[0]
  1359. # test named tuple
  1360. self.assertEqual(athread.id, athread[0])
  1361. self.assertEqual(athread.user_time, athread[1])
  1362. self.assertEqual(athread.system_time, athread[2])
  1363. # test num threads
  1364. thread.stop()
  1365. finally:
  1366. if thread._running:
  1367. thread.stop()
  1368. def test_memory_info(self):
  1369. p = psutil.Process()
  1370. # step 1 - get a base value to compare our results
  1371. rss1, vms1 = p.memory_info()
  1372. percent1 = p.memory_percent()
  1373. self.assertGreater(rss1, 0)
  1374. self.assertGreater(vms1, 0)
  1375. # step 2 - allocate some memory
  1376. memarr = [None] * 1500000
  1377. rss2, vms2 = p.memory_info()
  1378. percent2 = p.memory_percent()
  1379. # make sure that the memory usage bumped up
  1380. self.assertGreater(rss2, rss1)
  1381. self.assertGreaterEqual(vms2, vms1) # vms might be equal
  1382. self.assertGreater(percent2, percent1)
  1383. del memarr
  1384. # def test_memory_info_ex(self):
  1385. # # tested later in fetch all test suite
  1386. def test_memory_maps(self):
  1387. p = psutil.Process()
  1388. maps = p.memory_maps()
  1389. paths = [x for x in maps]
  1390. self.assertEqual(len(paths), len(set(paths)))
  1391. ext_maps = p.memory_maps(grouped=False)
  1392. for nt in maps:
  1393. if not nt.path.startswith('['):
  1394. assert os.path.isabs(nt.path), nt.path
  1395. if POSIX:
  1396. assert os.path.exists(nt.path), nt.path
  1397. else:
  1398. # XXX - On Windows we have this strange behavior with
  1399. # 64 bit dlls: they are visible via explorer but cannot
  1400. # be accessed via os.stat() (wtf?).
  1401. if '64' not in os.path.basename(nt.path):
  1402. assert os.path.exists(nt.path), nt.path
  1403. for nt in ext_maps:
  1404. for fname in nt._fields:
  1405. value = getattr(nt, fname)
  1406. if fname == 'path':
  1407. continue
  1408. elif fname in ('addr', 'perms'):
  1409. assert value, value
  1410. else:
  1411. self.assertIsInstance(value, (int, long))
  1412. assert value >= 0, value
  1413. def test_memory_percent(self):
  1414. p = psutil.Process()
  1415. self.assertGreater(p.memory_percent(), 0.0)
  1416. def test_is_running(self):
  1417. sproc = get_test_subprocess(wait=True)
  1418. p = psutil.Process(sproc.pid)
  1419. assert p.is_running()
  1420. assert p.is_running()
  1421. p.kill()
  1422. p.wait()
  1423. assert not p.is_running()
  1424. assert not p.is_running()
  1425. def test_exe(self):
  1426. sproc = get_test_subprocess(wait=True)
  1427. exe = psutil.Process(sproc.pid).exe()
  1428. try:
  1429. self.assertEqual(exe, PYTHON)
  1430. except AssertionError:
  1431. if WINDOWS and len(exe) == len(PYTHON):
  1432. # on Windows we don't care about case sensitivity
  1433. self.assertEqual(exe.lower(), PYTHON.lower())
  1434. else:
  1435. # certain platforms such as BSD are more accurate returning:
  1436. # "/usr/local/bin/python2.7"
  1437. # ...instead of:
  1438. # "/usr/local/bin/python"
  1439. # We do not want to consider this difference in accuracy
  1440. # an error.
  1441. ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
  1442. self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
  1443. def test_cmdline(self):
  1444. cmdline = [PYTHON, "-c", "import time; time.sleep(60)"]
  1445. sproc = get_test_subprocess(cmdline, wait=True)
  1446. self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
  1447. ' '.join(cmdline))
  1448. def test_name(self):
  1449. sproc = get_test_subprocess(PYTHON, wait=True)
  1450. name = psutil.Process(sproc.pid).name().lower()
  1451. pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
  1452. assert pyexe.startswith(name), (pyexe, name)
  1453. @unittest.skipUnless(POSIX, "posix only")
  1454. # TODO: add support for other compilers
  1455. @unittest.skipUnless(which("gcc"), "gcc not available")
  1456. def test_prog_w_funky_name(self):
  1457. # Test that name(), exe() and cmdline() correctly handle programs
  1458. # with funky chars such as spaces and ")", see:
  1459. # https://github.com/giampaolo/psutil/issues/628
  1460. funky_name = "/tmp/foo bar )"
  1461. _, c_file = tempfile.mkstemp(prefix='psutil-', suffix='.c', dir="/tmp")
  1462. self.addCleanup(lambda: safe_remove(c_file))
  1463. self.addCleanup(lambda: safe_remove(funky_name))
  1464. with open(c_file, "w") as f:
  1465. f.write("void main() { pause(); }")
  1466. subprocess.check_call(["gcc", c_file, "-o", funky_name])
  1467. sproc = get_test_subprocess(
  1468. [funky_name, "arg1", "arg2", "", "arg3", ""])
  1469. p = psutil.Process(sproc.pid)
  1470. # ...in order to try to prevent occasional failures on travis
  1471. wait_for_pid(p.pid)
  1472. self.assertEqual(p.name(), "foo bar )")
  1473. self.assertEqual(p.exe(), "/tmp/foo bar )")
  1474. self.assertEqual(
  1475. p.cmdline(), ["/tmp/foo bar )", "arg1", "arg2", "", "arg3", ""])
  1476. @unittest.skipUnless(POSIX, 'posix only')
  1477. def test_uids(self):
  1478. p = psutil.Process()
  1479. real, effective, saved = p.uids()
  1480. # os.getuid() refers to "real" uid
  1481. self.assertEqual(real, os.getuid())
  1482. # os.geteuid() refers to "effective" uid
  1483. self.assertEqual(effective, os.geteuid())
  1484. # no such thing as os.getsuid() ("saved" uid), but starting
  1485. # from python 2.7 we have os.getresuid()[2]
  1486. if hasattr(os, "getresuid"):
  1487. self.assertEqual(saved, os.getresuid()[2])
  1488. @unittest.skipUnless(POSIX, 'posix only')
  1489. def test_gids(self):
  1490. p = psutil.Process()
  1491. real, effective, saved = p.gids()
  1492. # os.getuid() refers to "real" uid
  1493. self.assertEqual(real, os.getgid())
  1494. # os.geteuid() refers to "effective" uid
  1495. self.assertEqual(effective, os.getegid())
  1496. # no such thing as os.getsuid() ("saved" uid), but starting
  1497. # from python 2.7 we have os.getresgid()[2]
  1498. if hasattr(os, "getresuid"):
  1499. self.assertEqual(saved, os.getresgid()[2])
  1500. def test_nice(self):
  1501. p = psutil.Process()
  1502. self.assertRaises(TypeError, p.nice, "str")
  1503. if WINDOWS:
  1504. try:
  1505. init = p.nice()
  1506. if sys.version_info > (3, 4):
  1507. self.assertIsInstance(init, enum.IntEnum)
  1508. else:
  1509. self.assertIsInstance(init, int)
  1510. self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS)
  1511. p.nice(psutil.HIGH_PRIORITY_CLASS)
  1512. self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS)
  1513. p.nice(psutil.NORMAL_PRIORITY_CLASS)
  1514. self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS)
  1515. finally:
  1516. p.nice(psutil.NORMAL_PRIORITY_CLASS)
  1517. else:
  1518. try:
  1519. first_nice = p.nice()
  1520. p.nice(1)
  1521. self.assertEqual(p.nice(), 1)
  1522. # going back to previous nice value raises
  1523. # AccessDenied on OSX
  1524. if not OSX:
  1525. p.nice(0)
  1526. self.assertEqual(p.nice(), 0)
  1527. except psutil.AccessDenied:
  1528. pass
  1529. finally:
  1530. try:
  1531. p.nice(first_nice)
  1532. except psutil.AccessDenied:
  1533. pass
  1534. def test_status(self):
  1535. p = psutil.Process()
  1536. self.assertEqual(p.status(), psutil.STATUS_RUNNING)
  1537. def test_username(self):
  1538. sproc = get_test_subprocess()
  1539. p = psutil.Process(sproc.pid)
  1540. if POSIX:
  1541. import pwd
  1542. self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name)
  1543. with mock.patch("psutil.pwd.getpwuid",
  1544. side_effect=KeyError) as fun:
  1545. p.username() == str(p.uids().real)
  1546. assert fun.called
  1547. elif WINDOWS and 'USERNAME' in os.environ:
  1548. expected_username = os.environ['USERNAME']
  1549. expected_domain = os.environ['USERDOMAIN']
  1550. domain, username = p.username().split('\\')
  1551. self.assertEqual(domain, expected_domain)
  1552. self.assertEqual(username, expected_username)
  1553. else:
  1554. p.username()
  1555. def test_cwd(self):
  1556. sproc = get_test_subprocess(wait=True)
  1557. p = psutil.Process(sproc.pid)
  1558. self.assertEqual(p.cwd(), os.getcwd())
  1559. def test_cwd_2(self):
  1560. cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(60)"]
  1561. sproc = get_test_subprocess(cmd, wait=True)
  1562. p = psutil.Process(sproc.pid)
  1563. call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
  1564. @unittest.skipUnless(WINDOWS or LINUX or BSD,
  1565. 'not available on this platform')
  1566. @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis")
  1567. def test_cpu_affinity(self):
  1568. p = psutil.Process()
  1569. initial = p.cpu_affinity()
  1570. if hasattr(os, "sched_getaffinity"):
  1571. self.assertEqual(initial, list(os.sched_getaffinity(p.pid)))
  1572. self.assertEqual(len(initial), len(set(initial)))
  1573. all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
  1574. # setting on travis doesn't seem to work (always return all
  1575. # CPUs on get):
  1576. # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0]
  1577. for n in all_cpus:
  1578. p.cpu_affinity([n])
  1579. self.assertEqual(p.cpu_affinity(), [n])
  1580. if hasattr(os, "sched_getaffinity"):
  1581. self.assertEqual(p.cpu_affinity(),
  1582. list(os.sched_getaffinity(p.pid)))
  1583. #
  1584. p.cpu_affinity(all_cpus)
  1585. self.assertEqual(p.cpu_affinity(), all_cpus)
  1586. if hasattr(os, "sched_getaffinity"):
  1587. self.assertEqual(p.cpu_affinity(),
  1588. list(os.sched_getaffinity(p.pid)))
  1589. #
  1590. self.assertRaises(TypeError, p.cpu_affinity, 1)
  1591. p.cpu_affinity(initial)
  1592. # it should work with all iterables, not only lists
  1593. p.cpu_affinity(set(all_cpus))
  1594. p.cpu_affinity(tuple(all_cpus))
  1595. invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
  1596. self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
  1597. self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
  1598. self.assertRaises(TypeError, p.cpu_affinity, [0, "1"])
  1599. # TODO
  1600. @unittest.skipIf(BSD, "broken on BSD, see #595")
  1601. @unittest.skipIf(APPVEYOR,
  1602. "can't find any process file on Appveyor")
  1603. def test_open_files(self):
  1604. # current process
  1605. p = psutil.Process()
  1606. files = p.open_files()
  1607. self.assertFalse(TESTFN in files)
  1608. with open(TESTFN, 'w'):
  1609. # give the kernel some time to see the new file
  1610. call_until(p.open_files, "len(ret) != %i" % len(files))
  1611. filenames = [x.path for x in p.open_files()]
  1612. self.assertIn(TESTFN, filenames)
  1613. for file in filenames:
  1614. assert os.path.isfile(file), file
  1615. # another process
  1616. cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN
  1617. sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True)
  1618. p = psutil.Process(sproc.pid)
  1619. for x in range(100):
  1620. filenames = [x.path for x in p.open_files()]
  1621. if TESTFN in filenames:
  1622. break
  1623. time.sleep(.01)
  1624. else:
  1625. self.assertIn(TESTFN, filenames)
  1626. for file in filenames:
  1627. assert os.path.isfile(file), file
  1628. # TODO
  1629. @unittest.skipIf(BSD, "broken on BSD, see #595")
  1630. @unittest.skipIf(APPVEYOR,
  1631. "can't find any process file on Appveyor")
  1632. def test_open_files2(self):
  1633. # test fd and path fields
  1634. with open(TESTFN, 'w') as fileobj:
  1635. p = psutil.Process()
  1636. for path, fd in p.open_files():
  1637. if path == fileobj.name or fd == fileobj.fileno():
  1638. break
  1639. else:
  1640. self.fail("no file found; files=%s" % repr(p.open_files()))
  1641. self.assertEqual(path, fileobj.name)
  1642. if WINDOWS:
  1643. self.assertEqual(fd, -1)
  1644. else:
  1645. self.assertEqual(fd, fileobj.fileno())
  1646. # test positions
  1647. ntuple = p.open_files()[0]
  1648. self.assertEqual(ntuple[0], ntuple.path)
  1649. self.assertEqual(ntuple[1], ntuple.fd)
  1650. # test file is gone
  1651. self.assertTrue(fileobj.name not in p.open_files())
  1652. def compare_proc_sys_cons(self, pid, proc_cons):
  1653. from psutil._common import pconn
  1654. sys_cons = []
  1655. for c in psutil.net_connections(kind='all'):
  1656. if c.pid == pid:
  1657. sys_cons.append(pconn(*c[:-1]))
  1658. if BSD:
  1659. # on BSD all fds are set to -1
  1660. proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
  1661. self.assertEqual(sorted(proc_cons), sorted(sys_cons))
  1662. @skip_on_access_denied(only_if=OSX)
  1663. def test_connections(self):
  1664. def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
  1665. all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4",
  1666. "tcp6", "udp", "udp4", "udp6")
  1667. check_connection_ntuple(conn)
  1668. self.assertEqual(conn.family, family)
  1669. self.assertEqual(conn.type, type)
  1670. self.assertEqual(conn.laddr, laddr)
  1671. self.assertEqual(conn.raddr, raddr)
  1672. self.assertEqual(conn.status, status)
  1673. for kind in all_kinds:
  1674. cons = proc.connections(kind=kind)
  1675. if kind in kinds:
  1676. self.assertNotEqual(cons, [])
  1677. else:
  1678. self.assertEqual(cons, [])
  1679. # compare against system-wide connections
  1680. # XXX Solaris can't retrieve system-wide UNIX
  1681. # sockets.
  1682. if not SUNOS:
  1683. self.compare_proc_sys_cons(proc.pid, [conn])
  1684. tcp_template = textwrap.dedent("""
  1685. import socket, time
  1686. s = socket.socket($family, socket.SOCK_STREAM)
  1687. s.bind(('$addr', 0))
  1688. s.listen(1)
  1689. with open('$testfn', 'w') as f:
  1690. f.write(str(s.getsockname()[:2]))
  1691. time.sleep(60)
  1692. """)
  1693. udp_template = textwrap.dedent("""
  1694. import socket, time
  1695. s = socket.socket($family, socket.SOCK_DGRAM)
  1696. s.bind(('$addr', 0))
  1697. with open('$testfn', 'w') as f:
  1698. f.write(str(s.getsockname()[:2]))
  1699. time.sleep(60)
  1700. """)
  1701. from string import Template
  1702. testfile = os.path.basename(TESTFN)
  1703. tcp4_template = Template(tcp_template).substitute(
  1704. family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
  1705. udp4_template = Template(udp_template).substitute(
  1706. family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
  1707. tcp6_template = Template(tcp_template).substitute(
  1708. family=int(AF_INET6), addr="::1", testfn=testfile)
  1709. udp6_template = Template(udp_template).substitute(
  1710. family=int(AF_INET6), addr="::1", testfn=testfile)
  1711. # launch various subprocess instantiating a socket of various
  1712. # families and types to enrich psutil results
  1713. tcp4_proc = pyrun(tcp4_template)
  1714. tcp4_addr = eval(wait_for_file(testfile))
  1715. udp4_proc = pyrun(udp4_template)
  1716. udp4_addr = eval(wait_for_file(testfile))
  1717. if supports_ipv6():
  1718. tcp6_proc = pyrun(tcp6_template)
  1719. tcp6_addr = eval(wait_for_file(testfile))
  1720. udp6_proc = pyrun(udp6_template)
  1721. udp6_addr = eval(wait_for_file(testfile))
  1722. else:
  1723. tcp6_proc = None
  1724. udp6_proc = None
  1725. tcp6_addr = None
  1726. udp6_addr = None
  1727. for p in psutil.Process().children():
  1728. cons = p.connections()
  1729. self.assertEqual(len(cons), 1)
  1730. for conn in cons:
  1731. # TCP v4
  1732. if p.pid == tcp4_proc.pid:
  1733. check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (),
  1734. psutil.CONN_LISTEN,
  1735. ("all", "inet", "inet4", "tcp", "tcp4"))
  1736. # UDP v4
  1737. elif p.pid == udp4_proc.pid:
  1738. check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (),
  1739. psutil.CONN_NONE,
  1740. ("all", "inet", "inet4", "udp", "udp4"))
  1741. # TCP v6
  1742. elif p.pid == getattr(tcp6_proc, "pid", None):
  1743. check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (),
  1744. psutil.CONN_LISTEN,
  1745. ("all", "inet", "inet6", "tcp", "tcp6"))
  1746. # UDP v6
  1747. elif p.pid == getattr(udp6_proc, "pid", None):
  1748. check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (),
  1749. psutil.CONN_NONE,
  1750. ("all", "inet", "inet6", "udp", "udp6"))
  1751. @unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
  1752. 'AF_UNIX is not supported')
  1753. @skip_on_access_denied(only_if=OSX)
  1754. def test_connections_unix(self):
  1755. def check(type):
  1756. safe_remove(TESTFN)
  1757. sock = socket.socket(AF_UNIX, type)
  1758. with contextlib.closing(sock):
  1759. sock.bind(TESTFN)
  1760. cons = psutil.Process().connections(kind='unix')
  1761. conn = cons[0]
  1762. check_connection_ntuple(conn)
  1763. if conn.fd != -1: # != sunos and windows
  1764. self.assertEqual(conn.fd, sock.fileno())
  1765. self.assertEqual(conn.family, AF_UNIX)
  1766. self.assertEqual(conn.type, type)
  1767. self.assertEqual(conn.laddr, TESTFN)
  1768. if not SUNOS:
  1769. # XXX Solaris can't retrieve system-wide UNIX
  1770. # sockets.
  1771. self.compare_proc_sys_cons(os.getpid(), cons)
  1772. check(SOCK_STREAM)
  1773. check(SOCK_DGRAM)
  1774. @unittest.skipUnless(hasattr(socket, "fromfd"),
  1775. 'socket.fromfd() is not availble')
  1776. @unittest.skipIf(WINDOWS or SUNOS,
  1777. 'connection fd not available on this platform')
  1778. def test_connection_fromfd(self):
  1779. with contextlib.closing(socket.socket()) as sock:
  1780. sock.bind(('localhost', 0))
  1781. sock.listen(1)
  1782. p = psutil.Process()
  1783. for conn in p.connections():
  1784. if conn.fd == sock.fileno():
  1785. break
  1786. else:
  1787. self.fail("couldn't find socket fd")
  1788. dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
  1789. with contextlib.closing(dupsock):
  1790. self.assertEqual(dupsock.getsockname(), conn.laddr)
  1791. self.assertNotEqual(sock.fileno(), dupsock.fileno())
  1792. def test_connection_constants(self):
  1793. ints = []
  1794. strs = []
  1795. for name in dir(psutil):
  1796. if name.startswith('CONN_'):
  1797. num = getattr(psutil, name)
  1798. str_ = str(num)
  1799. assert str_.isupper(), str_
  1800. assert str_ not in strs, str_
  1801. assert num not in ints, num
  1802. ints.append(num)
  1803. strs.append(str_)
  1804. if SUNOS:
  1805. psutil.CONN_IDLE
  1806. psutil.CONN_BOUND
  1807. if WINDOWS:
  1808. psutil.CONN_DELETE_TCB
  1809. @unittest.skipUnless(POSIX, 'posix only')
  1810. def test_num_fds(self):
  1811. p = psutil.Process()
  1812. start = p.num_fds()
  1813. file = open(TESTFN, 'w')
  1814. self.addCleanup(file.close)
  1815. self.assertEqual(p.num_fds(), start + 1)
  1816. sock = socket.socket()
  1817. self.addCleanup(sock.close)
  1818. self.assertEqual(p.num_fds(), start + 2)
  1819. file.close()
  1820. sock.close()
  1821. self.assertEqual(p.num_fds(), start)
  1822. @skip_on_not_implemented(only_if=LINUX)
  1823. def test_num_ctx_switches(self):
  1824. p = psutil.Process()
  1825. before = sum(p.num_ctx_switches())
  1826. for x in range(500000):
  1827. after = sum(p.num_ctx_switches())
  1828. if after > before:
  1829. return
  1830. self.fail("num ctx switches still the same after 50.000 iterations")
  1831. def test_parent_ppid(self):
  1832. this_parent = os.getpid()
  1833. sproc = get_test_subprocess()
  1834. p = psutil.Process(sproc.pid)
  1835. self.assertEqual(p.ppid(), this_parent)
  1836. self.assertEqual(p.parent().pid, this_parent)
  1837. # no other process is supposed to have us as parent
  1838. for p in psutil.process_iter():
  1839. if p.pid == sproc.pid:
  1840. continue
  1841. self.assertTrue(p.ppid() != this_parent)
  1842. def test_children(self):
  1843. p = psutil.Process()
  1844. self.assertEqual(p.children(), [])
  1845. self.assertEqual(p.children(recursive=True), [])
  1846. sproc = get_test_subprocess()
  1847. children1 = p.children()
  1848. children2 = p.children(recursive=True)
  1849. for children in (children1, children2):
  1850. self.assertEqual(len(children), 1)
  1851. self.assertEqual(children[0].pid, sproc.pid)
  1852. self.assertEqual(children[0].ppid(), os.getpid())
  1853. def test_children_recursive(self):
  1854. # here we create a subprocess which creates another one as in:
  1855. # A (parent) -> B (child) -> C (grandchild)
  1856. s = "import subprocess, os, sys, time;"
  1857. s += "PYTHON = os.path.realpath(sys.executable);"
  1858. s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
  1859. s += "subprocess.Popen(cmd);"
  1860. s += "time.sleep(60);"
  1861. get_test_subprocess(cmd=[PYTHON, "-c", s])
  1862. p = psutil.Process()
  1863. self.assertEqual(len(p.children(recursive=False)), 1)
  1864. # give the grandchild some time to start
  1865. stop_at = time.time() + GLOBAL_TIMEOUT
  1866. while time.time() < stop_at:
  1867. children = p.children(recursive=True)
  1868. if len(children) > 1:
  1869. break
  1870. self.assertEqual(len(children), 2)
  1871. self.assertEqual(children[0].ppid(), os.getpid())
  1872. self.assertEqual(children[1].ppid(), children[0].pid)
  1873. def test_children_duplicates(self):
  1874. # find the process which has the highest number of children
  1875. table = collections.defaultdict(int)
  1876. for p in psutil.process_iter():
  1877. try:
  1878. table[p.ppid()] += 1
  1879. except psutil.Error:
  1880. pass
  1881. # this is the one, now let's make sure there are no duplicates
  1882. pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
  1883. p = psutil.Process(pid)
  1884. try:
  1885. c = p.children(recursive=True)
  1886. except psutil.AccessDenied: # windows
  1887. pass
  1888. else:
  1889. self.assertEqual(len(c), len(set(c)))
  1890. def test_suspend_resume(self):
  1891. sproc = get_test_subprocess(wait=True)
  1892. p = psutil.Process(sproc.pid)
  1893. p.suspend()
  1894. for x in range(100):
  1895. if p.status() == psutil.STATUS_STOPPED:
  1896. break
  1897. time.sleep(0.01)
  1898. p.resume()
  1899. self.assertNotEqual(p.status(), psutil.STATUS_STOPPED)
  1900. def test_invalid_pid(self):
  1901. self.assertRaises(TypeError, psutil.Process, "1")
  1902. self.assertRaises(ValueError, psutil.Process, -1)
  1903. def test_as_dict(self):
  1904. p = psutil.Process()
  1905. d = p.as_dict(attrs=['exe', 'name'])
  1906. self.assertEqual(sorted(d.keys()), ['exe', 'name'])
  1907. p = psutil.Process(min(psutil.pids()))
  1908. d = p.as_dict(attrs=['connections'], ad_value='foo')
  1909. if not isinstance(d['connections'], list):
  1910. self.assertEqual(d['connections'], 'foo')
  1911. def test_halfway_terminated_process(self):
  1912. # Test that NoSuchProcess exception gets raised in case the
  1913. # process dies after we create the Process object.
  1914. # Example:
  1915. # >>> proc = Process(1234)
  1916. # >>> time.sleep(2) # time-consuming task, process dies in meantime
  1917. # >>> proc.name()
  1918. # Refers to Issue #15
  1919. sproc = get_test_subprocess()
  1920. p = psutil.Process(sproc.pid)
  1921. p.terminate()
  1922. p.wait()
  1923. if WINDOWS:
  1924. wait_for_pid(p.pid)
  1925. self.assertFalse(p.is_running())
  1926. self.assertFalse(p.pid in psutil.pids())
  1927. excluded_names = ['pid', 'is_running', 'wait', 'create_time']
  1928. if LINUX and not RLIMIT_SUPPORT:
  1929. excluded_names.append('rlimit')
  1930. for name in dir(p):
  1931. if (name.startswith('_') or
  1932. name in excluded_names):
  1933. continue
  1934. try:
  1935. meth = getattr(p, name)
  1936. # get/set methods
  1937. if name == 'nice':
  1938. if POSIX:
  1939. ret = meth(1)
  1940. else:
  1941. ret = meth(psutil.NORMAL_PRIORITY_CLASS)
  1942. elif name == 'ionice':
  1943. ret = meth()
  1944. ret = meth(2)
  1945. elif name == 'rlimit':
  1946. ret = meth(psutil.RLIMIT_NOFILE)
  1947. ret = meth(psutil.RLIMIT_NOFILE, (5, 5))
  1948. elif name == 'cpu_affinity':
  1949. ret = meth()
  1950. ret = meth([0])
  1951. elif name == 'send_signal':
  1952. ret = meth(signal.SIGTERM)
  1953. else:
  1954. ret = meth()
  1955. except psutil.ZombieProcess:
  1956. self.fail("ZombieProcess for %r was not supposed to happen" %
  1957. name)
  1958. except psutil.NoSuchProcess:
  1959. pass
  1960. except NotImplementedError:
  1961. pass
  1962. else:
  1963. self.fail(
  1964. "NoSuchProcess exception not raised for %r, retval=%s" % (
  1965. name, ret))
  1966. @unittest.skipUnless(POSIX, 'posix only')
  1967. def test_zombie_process(self):
  1968. def succeed_or_zombie_p_exc(fun, *args, **kwargs):
  1969. try:
  1970. fun(*args, **kwargs)
  1971. except (psutil.ZombieProcess, psutil.AccessDenied):
  1972. pass
  1973. # Note: in this test we'll be creating two sub processes.
  1974. # Both of them are supposed to be freed / killed by
  1975. # reap_children() as they are attributable to 'us'
  1976. # (os.getpid()) via children(recursive=True).
  1977. src = textwrap.dedent("""\
  1978. import os, sys, time, socket, contextlib
  1979. child_pid = os.fork()
  1980. if child_pid > 0:
  1981. time.sleep(3000)
  1982. else:
  1983. # this is the zombie process
  1984. s = socket.socket(socket.AF_UNIX)
  1985. with contextlib.closing(s):
  1986. s.connect('%s')
  1987. if sys.version_info < (3, ):
  1988. pid = str(os.getpid())
  1989. else:
  1990. pid = bytes(str(os.getpid()), 'ascii')
  1991. s.sendall(pid)
  1992. """ % TESTFN)
  1993. with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
  1994. try:
  1995. sock.settimeout(GLOBAL_TIMEOUT)
  1996. sock.bind(TESTFN)
  1997. sock.listen(1)
  1998. pyrun(src)
  1999. conn, _ = sock.accept()
  2000. select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT)
  2001. zpid = int(conn.recv(1024))
  2002. zproc = psutil.Process(zpid)
  2003. call_until(lambda: zproc.status(),
  2004. "ret == psutil.STATUS_ZOMBIE")
  2005. # A zombie process should always be instantiable
  2006. zproc = psutil.Process(zpid)
  2007. # ...and at least its status always be querable
  2008. self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
  2009. # ...and it should be considered 'running'
  2010. self.assertTrue(zproc.is_running())
  2011. # ...and as_dict() shouldn't crash
  2012. zproc.as_dict()
  2013. if hasattr(zproc, "rlimit"):
  2014. succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE)
  2015. succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE,
  2016. (5, 5))
  2017. # set methods
  2018. succeed_or_zombie_p_exc(zproc.parent)
  2019. if hasattr(zproc, 'cpu_affinity'):
  2020. succeed_or_zombie_p_exc(zproc.cpu_affinity, [0])
  2021. succeed_or_zombie_p_exc(zproc.nice, 0)
  2022. if hasattr(zproc, 'ionice'):
  2023. if LINUX:
  2024. succeed_or_zombie_p_exc(zproc.ionice, 2, 0)
  2025. else:
  2026. succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows
  2027. if hasattr(zproc, 'rlimit'):
  2028. succeed_or_zombie_p_exc(zproc.rlimit,
  2029. psutil.RLIMIT_NOFILE, (5, 5))
  2030. succeed_or_zombie_p_exc(zproc.suspend)
  2031. succeed_or_zombie_p_exc(zproc.resume)
  2032. succeed_or_zombie_p_exc(zproc.terminate)
  2033. succeed_or_zombie_p_exc(zproc.kill)
  2034. # ...its parent should 'see' it
  2035. # edit: not true on BSD and OSX
  2036. # descendants = [x.pid for x in psutil.Process().children(
  2037. # recursive=True)]
  2038. # self.assertIn(zpid, descendants)
  2039. # XXX should we also assume ppid be usable? Note: this
  2040. # would be an important use case as the only way to get
  2041. # rid of a zombie is to kill its parent.
  2042. # self.assertEqual(zpid.ppid(), os.getpid())
  2043. # ...and all other APIs should be able to deal with it
  2044. self.assertTrue(psutil.pid_exists(zpid))
  2045. self.assertIn(zpid, psutil.pids())
  2046. self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
  2047. psutil._pmap = {}
  2048. self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
  2049. finally:
  2050. reap_children(search_all=True)
  2051. def test_pid_0(self):
  2052. # Process(0) is supposed to work on all platforms except Linux
  2053. if 0 not in psutil.pids():
  2054. self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
  2055. return
  2056. p = psutil.Process(0)
  2057. self.assertTrue(p.name())
  2058. if POSIX:
  2059. try:
  2060. self.assertEqual(p.uids().real, 0)
  2061. self.assertEqual(p.gids().real, 0)
  2062. except psutil.AccessDenied:
  2063. pass
  2064. self.assertRaisesRegexp(
  2065. ValueError, "preventing sending signal to process with PID 0",
  2066. p.send_signal, signal.SIGTERM)
  2067. self.assertIn(p.ppid(), (0, 1))
  2068. # self.assertEqual(p.exe(), "")
  2069. p.cmdline()
  2070. try:
  2071. p.num_threads()
  2072. except psutil.AccessDenied:
  2073. pass
  2074. try:
  2075. p.memory_info()
  2076. except psutil.AccessDenied:
  2077. pass
  2078. try:
  2079. if POSIX:
  2080. self.assertEqual(p.username(), 'root')
  2081. elif WINDOWS:
  2082. self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
  2083. else:
  2084. p.username()
  2085. except psutil.AccessDenied:
  2086. pass
  2087. self.assertIn(0, psutil.pids())
  2088. self.assertTrue(psutil.pid_exists(0))
  2089. def test_Popen(self):
  2090. # Popen class test
  2091. # XXX this test causes a ResourceWarning on Python 3 because
  2092. # psutil.__subproc instance doesn't get propertly freed.
  2093. # Not sure what to do though.
  2094. cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
  2095. proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
  2096. stderr=subprocess.PIPE)
  2097. try:
  2098. proc.name()
  2099. proc.stdin
  2100. self.assertTrue(hasattr(proc, 'name'))
  2101. self.assertTrue(hasattr(proc, 'stdin'))
  2102. self.assertTrue(dir(proc))
  2103. self.assertRaises(AttributeError, getattr, proc, 'foo')
  2104. finally:
  2105. proc.kill()
  2106. proc.wait()
  2107. self.assertIsNotNone(proc.returncode)
  2108. # ===================================================================
  2109. # --- Featch all processes test
  2110. # ===================================================================
  2111. class TestFetchAllProcesses(unittest.TestCase):
  2112. """Test which iterates over all running processes and performs
  2113. some sanity checks against Process API's returned values.
  2114. """
  2115. def setUp(self):
  2116. if POSIX:
  2117. import pwd
  2118. pall = pwd.getpwall()
  2119. self._uids = set([x.pw_uid for x in pall])
  2120. self._usernames = set([x.pw_name for x in pall])
  2121. def test_fetch_all(self):
  2122. valid_procs = 0
  2123. excluded_names = set([
  2124. 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait',
  2125. 'as_dict', 'cpu_percent', 'parent', 'children', 'pid'])
  2126. if LINUX and not RLIMIT_SUPPORT:
  2127. excluded_names.add('rlimit')
  2128. attrs = []
  2129. for name in dir(psutil.Process):
  2130. if name.startswith("_"):
  2131. continue
  2132. if name in excluded_names:
  2133. continue
  2134. attrs.append(name)
  2135. default = object()
  2136. failures = []
  2137. for name in attrs:
  2138. for p in psutil.process_iter():
  2139. ret = default
  2140. try:
  2141. try:
  2142. args = ()
  2143. attr = getattr(p, name, None)
  2144. if attr is not None and callable(attr):
  2145. if name == 'rlimit':
  2146. args = (psutil.RLIMIT_NOFILE,)
  2147. ret = attr(*args)
  2148. else:
  2149. ret = attr
  2150. valid_procs += 1
  2151. except NotImplementedError:
  2152. msg = "%r was skipped because not implemented" % (
  2153. self.__class__.__name__ + '.test_' + name)
  2154. warn(msg)
  2155. except (psutil.NoSuchProcess, psutil.AccessDenied) as err:
  2156. self.assertEqual(err.pid, p.pid)
  2157. if err.name:
  2158. # make sure exception's name attr is set
  2159. # with the actual process name
  2160. self.assertEqual(err.name, p.name())
  2161. self.assertTrue(str(err))
  2162. self.assertTrue(err.msg)
  2163. else:
  2164. if ret not in (0, 0.0, [], None, ''):
  2165. assert ret, ret
  2166. meth = getattr(self, name)
  2167. meth(ret)
  2168. except Exception as err:
  2169. s = '\n' + '=' * 70 + '\n'
  2170. s += "FAIL: test_%s (proc=%s" % (name, p)
  2171. if ret != default:
  2172. s += ", ret=%s)" % repr(ret)
  2173. s += ')\n'
  2174. s += '-' * 70
  2175. s += "\n%s" % traceback.format_exc()
  2176. s = "\n".join((" " * 4) + i for i in s.splitlines())
  2177. failures.append(s)
  2178. break
  2179. if failures:
  2180. self.fail(''.join(failures))
  2181. # we should always have a non-empty list, not including PID 0 etc.
  2182. # special cases.
  2183. self.assertTrue(valid_procs > 0)
  2184. def cmdline(self, ret):
  2185. pass
  2186. def exe(self, ret):
  2187. if not ret:
  2188. self.assertEqual(ret, '')
  2189. else:
  2190. assert os.path.isabs(ret), ret
  2191. # Note: os.stat() may return False even if the file is there
  2192. # hence we skip the test, see:
  2193. # http://stackoverflow.com/questions/3112546/os-path-exists-lies
  2194. if POSIX and os.path.isfile(ret):
  2195. if hasattr(os, 'access') and hasattr(os, "X_OK"):
  2196. # XXX may fail on OSX
  2197. self.assertTrue(os.access(ret, os.X_OK))
  2198. def ppid(self, ret):
  2199. self.assertTrue(ret >= 0)
  2200. def name(self, ret):
  2201. self.assertIsInstance(ret, (str, unicode))
  2202. self.assertTrue(ret)
  2203. def create_time(self, ret):
  2204. self.assertTrue(ret > 0)
  2205. # this can't be taken for granted on all platforms
  2206. # self.assertGreaterEqual(ret, psutil.boot_time())
  2207. # make sure returned value can be pretty printed
  2208. # with strftime
  2209. time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
  2210. def uids(self, ret):
  2211. for uid in ret:
  2212. self.assertTrue(uid >= 0)
  2213. self.assertIn(uid, self._uids)
  2214. def gids(self, ret):
  2215. # note: testing all gids as above seems not to be reliable for
  2216. # gid == 30 (nodoby); not sure why.
  2217. for gid in ret:
  2218. self.assertTrue(gid >= 0)
  2219. # self.assertIn(uid, self.gids
  2220. def username(self, ret):
  2221. self.assertTrue(ret)
  2222. if POSIX:
  2223. self.assertIn(ret, self._usernames)
  2224. def status(self, ret):
  2225. self.assertTrue(ret != "")
  2226. self.assertTrue(ret != '?')
  2227. self.assertIn(ret, VALID_PROC_STATUSES)
  2228. def io_counters(self, ret):
  2229. for field in ret:
  2230. if field != -1:
  2231. self.assertTrue(field >= 0)
  2232. def ionice(self, ret):
  2233. if LINUX:
  2234. self.assertTrue(ret.ioclass >= 0)
  2235. self.assertTrue(ret.value >= 0)
  2236. else:
  2237. self.assertTrue(ret >= 0)
  2238. self.assertIn(ret, (0, 1, 2))
  2239. def num_threads(self, ret):
  2240. self.assertTrue(ret >= 1)
  2241. def threads(self, ret):
  2242. for t in ret:
  2243. self.assertTrue(t.id >= 0)
  2244. self.assertTrue(t.user_time >= 0)
  2245. self.assertTrue(t.system_time >= 0)
  2246. def cpu_times(self, ret):
  2247. self.assertTrue(ret.user >= 0)
  2248. self.assertTrue(ret.system >= 0)
  2249. def memory_info(self, ret):
  2250. self.assertTrue(ret.rss >= 0)
  2251. self.assertTrue(ret.vms >= 0)
  2252. def memory_info_ex(self, ret):
  2253. for name in ret._fields:
  2254. self.assertTrue(getattr(ret, name) >= 0)
  2255. if POSIX and ret.vms != 0:
  2256. # VMS is always supposed to be the highest
  2257. for name in ret._fields:
  2258. if name != 'vms':
  2259. value = getattr(ret, name)
  2260. assert ret.vms > value, ret
  2261. elif WINDOWS:
  2262. assert ret.peak_wset >= ret.wset, ret
  2263. assert ret.peak_paged_pool >= ret.paged_pool, ret
  2264. assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
  2265. assert ret.peak_pagefile >= ret.pagefile, ret
  2266. def open_files(self, ret):
  2267. for f in ret:
  2268. if WINDOWS:
  2269. assert f.fd == -1, f
  2270. else:
  2271. self.assertIsInstance(f.fd, int)
  2272. assert os.path.isabs(f.path), f
  2273. assert os.path.isfile(f.path), f
  2274. def num_fds(self, ret):
  2275. self.assertTrue(ret >= 0)
  2276. def connections(self, ret):
  2277. self.assertEqual(len(ret), len(set(ret)))
  2278. for conn in ret:
  2279. check_connection_ntuple(conn)
  2280. def cwd(self, ret):
  2281. if ret is not None: # BSD may return None
  2282. assert os.path.isabs(ret), ret
  2283. try:
  2284. st = os.stat(ret)
  2285. except OSError as err:
  2286. # directory has been removed in mean time
  2287. if err.errno != errno.ENOENT:
  2288. raise
  2289. else:
  2290. self.assertTrue(stat.S_ISDIR(st.st_mode))
  2291. def memory_percent(self, ret):
  2292. assert 0 <= ret <= 100, ret
  2293. def is_running(self, ret):
  2294. self.assertTrue(ret)
  2295. def cpu_affinity(self, ret):
  2296. assert ret != [], ret
  2297. def terminal(self, ret):
  2298. if ret is not None:
  2299. assert os.path.isabs(ret), ret
  2300. assert os.path.exists(ret), ret
  2301. def memory_maps(self, ret):
  2302. for nt in ret:
  2303. for fname in nt._fields:
  2304. value = getattr(nt, fname)
  2305. if fname == 'path':
  2306. if not value.startswith('['):
  2307. assert os.path.isabs(nt.path), nt.path
  2308. # commented as on Linux we might get
  2309. # '/foo/bar (deleted)'
  2310. # assert os.path.exists(nt.path), nt.path
  2311. elif fname in ('addr', 'perms'):
  2312. self.assertTrue(value)
  2313. else:
  2314. self.assertIsInstance(value, (int, long))
  2315. assert value >= 0, value
  2316. def num_handles(self, ret):
  2317. if WINDOWS:
  2318. self.assertGreaterEqual(ret, 0)
  2319. else:
  2320. self.assertGreaterEqual(ret, 0)
  2321. def nice(self, ret):
  2322. if POSIX:
  2323. assert -20 <= ret <= 20, ret
  2324. else:
  2325. priorities = [getattr(psutil, x) for x in dir(psutil)
  2326. if x.endswith('_PRIORITY_CLASS')]
  2327. self.assertIn(ret, priorities)
  2328. def num_ctx_switches(self, ret):
  2329. self.assertTrue(ret.voluntary >= 0)
  2330. self.assertTrue(ret.involuntary >= 0)
  2331. def rlimit(self, ret):
  2332. self.assertEqual(len(ret), 2)
  2333. self.assertGreaterEqual(ret[0], -1)
  2334. self.assertGreaterEqual(ret[1], -1)
  2335. # ===================================================================
  2336. # --- Limited user tests
  2337. # ===================================================================
  2338. @unittest.skipUnless(POSIX, "UNIX only")
  2339. @unittest.skipUnless(hasattr(os, 'getuid') and os.getuid() == 0,
  2340. "super user privileges are required")
  2341. class LimitedUserTestCase(TestProcess):
  2342. """Repeat the previous tests by using a limited user.
  2343. Executed only on UNIX and only if the user who run the test script
  2344. is root.
  2345. """
  2346. # the uid/gid the test suite runs under
  2347. if hasattr(os, 'getuid'):
  2348. PROCESS_UID = os.getuid()
  2349. PROCESS_GID = os.getgid()
  2350. def __init__(self, *args, **kwargs):
  2351. TestProcess.__init__(self, *args, **kwargs)
  2352. # re-define all existent test methods in order to
  2353. # ignore AccessDenied exceptions
  2354. for attr in [x for x in dir(self) if x.startswith('test')]:
  2355. meth = getattr(self, attr)
  2356. def test_(self):
  2357. try:
  2358. meth()
  2359. except psutil.AccessDenied:
  2360. pass
  2361. setattr(self, attr, types.MethodType(test_, self))
  2362. def setUp(self):
  2363. safe_remove(TESTFN)
  2364. TestProcess.setUp(self)
  2365. os.setegid(1000)
  2366. os.seteuid(1000)
  2367. def tearDown(self):
  2368. os.setegid(self.PROCESS_UID)
  2369. os.seteuid(self.PROCESS_GID)
  2370. TestProcess.tearDown(self)
  2371. def test_nice(self):
  2372. try:
  2373. psutil.Process().nice(-1)
  2374. except psutil.AccessDenied:
  2375. pass
  2376. else:
  2377. self.fail("exception not raised")
  2378. def test_zombie_process(self):
  2379. # causes problems if test test suite is run as root
  2380. pass
  2381. # ===================================================================
  2382. # --- Misc tests
  2383. # ===================================================================
  2384. class TestMisc(unittest.TestCase):
  2385. """Misc / generic tests."""
  2386. def test_process__repr__(self, func=repr):
  2387. p = psutil.Process()
  2388. r = func(p)
  2389. self.assertIn("psutil.Process", r)
  2390. self.assertIn("pid=%s" % p.pid, r)
  2391. self.assertIn("name=", r)
  2392. self.assertIn(p.name(), r)
  2393. with mock.patch.object(psutil.Process, "name",
  2394. side_effect=psutil.ZombieProcess(os.getpid())):
  2395. p = psutil.Process()
  2396. r = func(p)
  2397. self.assertIn("pid=%s" % p.pid, r)
  2398. self.assertIn("zombie", r)
  2399. self.assertNotIn("name=", r)
  2400. with mock.patch.object(psutil.Process, "name",
  2401. side_effect=psutil.NoSuchProcess(os.getpid())):
  2402. p = psutil.Process()
  2403. r = func(p)
  2404. self.assertIn("pid=%s" % p.pid, r)
  2405. self.assertIn("terminated", r)
  2406. self.assertNotIn("name=", r)
  2407. def test_process__str__(self):
  2408. self.test_process__repr__(func=str)
  2409. def test_no_such_process__repr__(self, func=repr):
  2410. self.assertEqual(
  2411. repr(psutil.NoSuchProcess(321)),
  2412. "psutil.NoSuchProcess process no longer exists (pid=321)")
  2413. self.assertEqual(
  2414. repr(psutil.NoSuchProcess(321, name='foo')),
  2415. "psutil.NoSuchProcess process no longer exists (pid=321, "
  2416. "name='foo')")
  2417. self.assertEqual(
  2418. repr(psutil.NoSuchProcess(321, msg='foo')),
  2419. "psutil.NoSuchProcess foo")
  2420. def test_zombie_process__repr__(self, func=repr):
  2421. self.assertEqual(
  2422. repr(psutil.ZombieProcess(321)),
  2423. "psutil.ZombieProcess process still exists but it's a zombie "
  2424. "(pid=321)")
  2425. self.assertEqual(
  2426. repr(psutil.ZombieProcess(321, name='foo')),
  2427. "psutil.ZombieProcess process still exists but it's a zombie "
  2428. "(pid=321, name='foo')")
  2429. self.assertEqual(
  2430. repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
  2431. "psutil.ZombieProcess process still exists but it's a zombie "
  2432. "(pid=321, name='foo', ppid=1)")
  2433. self.assertEqual(
  2434. repr(psutil.ZombieProcess(321, msg='foo')),
  2435. "psutil.ZombieProcess foo")
  2436. def test_access_denied__repr__(self, func=repr):
  2437. self.assertEqual(
  2438. repr(psutil.AccessDenied(321)),
  2439. "psutil.AccessDenied (pid=321)")
  2440. self.assertEqual(
  2441. repr(psutil.AccessDenied(321, name='foo')),
  2442. "psutil.AccessDenied (pid=321, name='foo')")
  2443. self.assertEqual(
  2444. repr(psutil.AccessDenied(321, msg='foo')),
  2445. "psutil.AccessDenied foo")
  2446. def test_timeout_expired__repr__(self, func=repr):
  2447. self.assertEqual(
  2448. repr(psutil.TimeoutExpired(321)),
  2449. "psutil.TimeoutExpired timeout after 321 seconds")
  2450. self.assertEqual(
  2451. repr(psutil.TimeoutExpired(321, pid=111)),
  2452. "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
  2453. self.assertEqual(
  2454. repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
  2455. "psutil.TimeoutExpired timeout after 321 seconds "
  2456. "(pid=111, name='foo')")
  2457. def test_process__eq__(self):
  2458. p1 = psutil.Process()
  2459. p2 = psutil.Process()
  2460. self.assertEqual(p1, p2)
  2461. p2._ident = (0, 0)
  2462. self.assertNotEqual(p1, p2)
  2463. self.assertNotEqual(p1, 'foo')
  2464. def test_process__hash__(self):
  2465. s = set([psutil.Process(), psutil.Process()])
  2466. self.assertEqual(len(s), 1)
  2467. def test__all__(self):
  2468. dir_psutil = dir(psutil)
  2469. for name in dir_psutil:
  2470. if name in ('callable', 'error', 'namedtuple',
  2471. 'long', 'test', 'NUM_CPUS', 'BOOT_TIME',
  2472. 'TOTAL_PHYMEM'):
  2473. continue
  2474. if not name.startswith('_'):
  2475. try:
  2476. __import__(name)
  2477. except ImportError:
  2478. if name not in psutil.__all__:
  2479. fun = getattr(psutil, name)
  2480. if fun is None:
  2481. continue
  2482. if (fun.__doc__ is not None and
  2483. 'deprecated' not in fun.__doc__.lower()):
  2484. self.fail('%r not in psutil.__all__' % name)
  2485. # Import 'star' will break if __all__ is inconsistent, see:
  2486. # https://github.com/giampaolo/psutil/issues/656
  2487. # Can't do `from psutil import *` as it won't work on python 3
  2488. # so we simply iterate over __all__.
  2489. for name in psutil.__all__:
  2490. self.assertIn(name, dir_psutil)
  2491. def test_version(self):
  2492. self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
  2493. psutil.__version__)
  2494. def test_memoize(self):
  2495. from psutil._common import memoize
  2496. @memoize
  2497. def foo(*args, **kwargs):
  2498. "foo docstring"
  2499. calls.append(None)
  2500. return (args, kwargs)
  2501. calls = []
  2502. # no args
  2503. for x in range(2):
  2504. ret = foo()
  2505. expected = ((), {})
  2506. self.assertEqual(ret, expected)
  2507. self.assertEqual(len(calls), 1)
  2508. # with args
  2509. for x in range(2):
  2510. ret = foo(1)
  2511. expected = ((1, ), {})
  2512. self.assertEqual(ret, expected)
  2513. self.assertEqual(len(calls), 2)
  2514. # with args + kwargs
  2515. for x in range(2):
  2516. ret = foo(1, bar=2)
  2517. expected = ((1, ), {'bar': 2})
  2518. self.assertEqual(ret, expected)
  2519. self.assertEqual(len(calls), 3)
  2520. # clear cache
  2521. foo.cache_clear()
  2522. ret = foo()
  2523. expected = ((), {})
  2524. self.assertEqual(ret, expected)
  2525. self.assertEqual(len(calls), 4)
  2526. # docstring
  2527. self.assertEqual(foo.__doc__, "foo docstring")
  2528. def test_isfile_strict(self):
  2529. from psutil._common import isfile_strict
  2530. this_file = os.path.abspath(__file__)
  2531. assert isfile_strict(this_file)
  2532. assert not isfile_strict(os.path.dirname(this_file))
  2533. with mock.patch('psutil._common.os.stat',
  2534. side_effect=OSError(errno.EPERM, "foo")):
  2535. self.assertRaises(OSError, isfile_strict, this_file)
  2536. with mock.patch('psutil._common.os.stat',
  2537. side_effect=OSError(errno.EACCES, "foo")):
  2538. self.assertRaises(OSError, isfile_strict, this_file)
  2539. with mock.patch('psutil._common.os.stat',
  2540. side_effect=OSError(errno.EINVAL, "foo")):
  2541. assert not isfile_strict(this_file)
  2542. with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
  2543. assert not isfile_strict(this_file)
  2544. def test_serialization(self):
  2545. def check(ret):
  2546. if json is not None:
  2547. json.loads(json.dumps(ret))
  2548. a = pickle.dumps(ret)
  2549. b = pickle.loads(a)
  2550. self.assertEqual(ret, b)
  2551. check(psutil.Process().as_dict())
  2552. check(psutil.virtual_memory())
  2553. check(psutil.swap_memory())
  2554. check(psutil.cpu_times())
  2555. check(psutil.cpu_times_percent(interval=0))
  2556. check(psutil.net_io_counters())
  2557. if LINUX and not os.path.exists('/proc/diskstats'):
  2558. pass
  2559. else:
  2560. if not APPVEYOR:
  2561. check(psutil.disk_io_counters())
  2562. check(psutil.disk_partitions())
  2563. check(psutil.disk_usage(os.getcwd()))
  2564. check(psutil.users())
  2565. def test_setup_script(self):
  2566. here = os.path.abspath(os.path.dirname(__file__))
  2567. setup_py = os.path.realpath(os.path.join(here, '..', 'setup.py'))
  2568. module = imp.load_source('setup', setup_py)
  2569. self.assertRaises(SystemExit, module.setup)
  2570. self.assertEqual(module.get_version(), psutil.__version__)
  2571. def test_ad_on_process_creation(self):
  2572. # We are supposed to be able to instantiate Process also in case
  2573. # of zombie processes or access denied.
  2574. with mock.patch.object(psutil.Process, 'create_time',
  2575. side_effect=psutil.AccessDenied) as meth:
  2576. psutil.Process()
  2577. assert meth.called
  2578. with mock.patch.object(psutil.Process, 'create_time',
  2579. side_effect=psutil.ZombieProcess(1)) as meth:
  2580. psutil.Process()
  2581. assert meth.called
  2582. with mock.patch.object(psutil.Process, 'create_time',
  2583. side_effect=ValueError) as meth:
  2584. with self.assertRaises(ValueError):
  2585. psutil.Process()
  2586. assert meth.called
  2587. # ===================================================================
  2588. # --- Example script tests
  2589. # ===================================================================
  2590. class TestExampleScripts(unittest.TestCase):
  2591. """Tests for scripts in the examples directory."""
  2592. def assert_stdout(self, exe, args=None):
  2593. exe = os.path.join(EXAMPLES_DIR, exe)
  2594. if args:
  2595. exe = exe + ' ' + args
  2596. try:
  2597. out = sh(sys.executable + ' ' + exe).strip()
  2598. except RuntimeError as err:
  2599. if 'AccessDenied' in str(err):
  2600. return str(err)
  2601. else:
  2602. raise
  2603. assert out, out
  2604. return out
  2605. def assert_syntax(self, exe, args=None):
  2606. exe = os.path.join(EXAMPLES_DIR, exe)
  2607. with open(exe, 'r') as f:
  2608. src = f.read()
  2609. ast.parse(src)
  2610. def test_check_presence(self):
  2611. # make sure all example scripts have a test method defined
  2612. meths = dir(self)
  2613. for name in os.listdir(EXAMPLES_DIR):
  2614. if name.endswith('.py'):
  2615. if 'test_' + os.path.splitext(name)[0] not in meths:
  2616. # self.assert_stdout(name)
  2617. self.fail('no test defined for %r script'
  2618. % os.path.join(EXAMPLES_DIR, name))
  2619. def test_disk_usage(self):
  2620. self.assert_stdout('disk_usage.py')
  2621. def test_free(self):
  2622. self.assert_stdout('free.py')
  2623. def test_meminfo(self):
  2624. self.assert_stdout('meminfo.py')
  2625. def test_process_detail(self):
  2626. self.assert_stdout('process_detail.py')
  2627. @unittest.skipIf(APPVEYOR, "can't find users on Appveyor")
  2628. def test_who(self):
  2629. self.assert_stdout('who.py')
  2630. def test_ps(self):
  2631. self.assert_stdout('ps.py')
  2632. def test_pstree(self):
  2633. self.assert_stdout('pstree.py')
  2634. def test_netstat(self):
  2635. self.assert_stdout('netstat.py')
  2636. @unittest.skipIf(TRAVIS, "permission denied on travis")
  2637. def test_ifconfig(self):
  2638. self.assert_stdout('ifconfig.py')
  2639. def test_pmap(self):
  2640. self.assert_stdout('pmap.py', args=str(os.getpid()))
  2641. @unittest.skipIf(ast is None,
  2642. 'ast module not available on this python version')
  2643. def test_killall(self):
  2644. self.assert_syntax('killall.py')
  2645. @unittest.skipIf(ast is None,
  2646. 'ast module not available on this python version')
  2647. def test_nettop(self):
  2648. self.assert_syntax('nettop.py')
  2649. @unittest.skipIf(ast is None,
  2650. 'ast module not available on this python version')
  2651. def test_top(self):
  2652. self.assert_syntax('top.py')
  2653. @unittest.skipIf(ast is None,
  2654. 'ast module not available on this python version')
  2655. def test_iotop(self):
  2656. self.assert_syntax('iotop.py')
  2657. def test_pidof(self):
  2658. output = self.assert_stdout('pidof.py %s' % psutil.Process().name())
  2659. self.assertIn(str(os.getpid()), output)
  2660. def main():
  2661. tests = []
  2662. test_suite = unittest.TestSuite()
  2663. tests.append(TestSystemAPIs)
  2664. tests.append(TestProcess)
  2665. tests.append(TestFetchAllProcesses)
  2666. tests.append(TestMisc)
  2667. tests.append(TestExampleScripts)
  2668. tests.append(LimitedUserTestCase)
  2669. if POSIX:
  2670. from _posix import PosixSpecificTestCase
  2671. tests.append(PosixSpecificTestCase)
  2672. # import the specific platform test suite
  2673. stc = None
  2674. if LINUX:
  2675. from _linux import LinuxSpecificTestCase as stc
  2676. elif WINDOWS:
  2677. from _windows import WindowsSpecificTestCase as stc
  2678. from _windows import TestDualProcessImplementation
  2679. tests.append(TestDualProcessImplementation)
  2680. elif OSX:
  2681. from _osx import OSXSpecificTestCase as stc
  2682. elif BSD:
  2683. from _bsd import BSDSpecificTestCase as stc
  2684. elif SUNOS:
  2685. from _sunos import SunOSSpecificTestCase as stc
  2686. if stc is not None:
  2687. tests.append(stc)
  2688. for test_class in tests:
  2689. test_suite.addTest(unittest.makeSuite(test_class))
  2690. result = unittest.TextTestRunner(verbosity=2).run(test_suite)
  2691. return result.wasSuccessful()
  2692. if __name__ == '__main__':
  2693. if not main():
  2694. sys.exit(1)