_posix.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #!/usr/bin/env python
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """POSIX specific tests. These are implicitly run by test_psutil.py."""
  6. import datetime
  7. import os
  8. import subprocess
  9. import sys
  10. import time
  11. import psutil
  12. from psutil._compat import PY3, callable
  13. from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS
  14. from test_psutil import (get_test_subprocess, skip_on_access_denied,
  15. retry_before_failing, reap_children, sh, unittest,
  16. get_kernel_version, wait_for_pid)
  17. def ps(cmd):
  18. """Expects a ps command with a -o argument and parse the result
  19. returning only the value of interest.
  20. """
  21. if not LINUX:
  22. cmd = cmd.replace(" --no-headers ", " ")
  23. if SUNOS:
  24. cmd = cmd.replace("-o command", "-o comm")
  25. cmd = cmd.replace("-o start", "-o stime")
  26. p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
  27. output = p.communicate()[0].strip()
  28. if PY3:
  29. output = str(output, sys.stdout.encoding)
  30. if not LINUX:
  31. output = output.split('\n')[1].strip()
  32. try:
  33. return int(output)
  34. except ValueError:
  35. return output
  36. @unittest.skipUnless(POSIX, "not a POSIX system")
  37. class PosixSpecificTestCase(unittest.TestCase):
  38. """Compare psutil results against 'ps' command line utility."""
  39. @classmethod
  40. def setUpClass(cls):
  41. cls.pid = get_test_subprocess([PYTHON, "-E", "-O"],
  42. stdin=subprocess.PIPE).pid
  43. wait_for_pid(cls.pid)
  44. @classmethod
  45. def tearDownClass(cls):
  46. reap_children()
  47. # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
  48. def test_process_parent_pid(self):
  49. ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
  50. ppid_psutil = psutil.Process(self.pid).ppid()
  51. self.assertEqual(ppid_ps, ppid_psutil)
  52. def test_process_uid(self):
  53. uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
  54. uid_psutil = psutil.Process(self.pid).uids().real
  55. self.assertEqual(uid_ps, uid_psutil)
  56. def test_process_gid(self):
  57. gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
  58. gid_psutil = psutil.Process(self.pid).gids().real
  59. self.assertEqual(gid_ps, gid_psutil)
  60. def test_process_username(self):
  61. username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
  62. username_psutil = psutil.Process(self.pid).username()
  63. self.assertEqual(username_ps, username_psutil)
  64. @skip_on_access_denied()
  65. @retry_before_failing()
  66. def test_process_rss_memory(self):
  67. # give python interpreter some time to properly initialize
  68. # so that the results are the same
  69. time.sleep(0.1)
  70. rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
  71. rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
  72. self.assertEqual(rss_ps, rss_psutil)
  73. @skip_on_access_denied()
  74. @retry_before_failing()
  75. def test_process_vsz_memory(self):
  76. # give python interpreter some time to properly initialize
  77. # so that the results are the same
  78. time.sleep(0.1)
  79. vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
  80. vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
  81. self.assertEqual(vsz_ps, vsz_psutil)
  82. def test_process_name(self):
  83. # use command + arg since "comm" keyword not supported on all platforms
  84. name_ps = ps("ps --no-headers -o command -p %s" % (
  85. self.pid)).split(' ')[0]
  86. # remove path if there is any, from the command
  87. name_ps = os.path.basename(name_ps).lower()
  88. name_psutil = psutil.Process(self.pid).name().lower()
  89. self.assertEqual(name_ps, name_psutil)
  90. @unittest.skipIf(OSX or BSD,
  91. 'ps -o start not available')
  92. def test_process_create_time(self):
  93. time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
  94. time_psutil = psutil.Process(self.pid).create_time()
  95. time_psutil_tstamp = datetime.datetime.fromtimestamp(
  96. time_psutil).strftime("%H:%M:%S")
  97. # sometimes ps shows the time rounded up instead of down, so we check
  98. # for both possible values
  99. round_time_psutil = round(time_psutil)
  100. round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
  101. round_time_psutil).strftime("%H:%M:%S")
  102. self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
  103. def test_process_exe(self):
  104. ps_pathname = ps("ps --no-headers -o command -p %s" %
  105. self.pid).split(' ')[0]
  106. psutil_pathname = psutil.Process(self.pid).exe()
  107. try:
  108. self.assertEqual(ps_pathname, psutil_pathname)
  109. except AssertionError:
  110. # certain platforms such as BSD are more accurate returning:
  111. # "/usr/local/bin/python2.7"
  112. # ...instead of:
  113. # "/usr/local/bin/python"
  114. # We do not want to consider this difference in accuracy
  115. # an error.
  116. adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
  117. self.assertEqual(ps_pathname, adjusted_ps_pathname)
  118. def test_process_cmdline(self):
  119. ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
  120. psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
  121. if SUNOS:
  122. # ps on Solaris only shows the first part of the cmdline
  123. psutil_cmdline = psutil_cmdline.split(" ")[0]
  124. self.assertEqual(ps_cmdline, psutil_cmdline)
  125. @retry_before_failing()
  126. def test_pids(self):
  127. # Note: this test might fail if the OS is starting/killing
  128. # other processes in the meantime
  129. if SUNOS:
  130. cmd = ["ps", "ax"]
  131. else:
  132. cmd = ["ps", "ax", "-o", "pid"]
  133. p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
  134. output = p.communicate()[0].strip()
  135. if PY3:
  136. output = str(output, sys.stdout.encoding)
  137. pids_ps = []
  138. for line in output.split('\n')[1:]:
  139. if line:
  140. pid = int(line.split()[0].strip())
  141. pids_ps.append(pid)
  142. # remove ps subprocess pid which is supposed to be dead in meantime
  143. pids_ps.remove(p.pid)
  144. pids_psutil = psutil.pids()
  145. pids_ps.sort()
  146. pids_psutil.sort()
  147. # on OSX ps doesn't show pid 0
  148. if OSX and 0 not in pids_ps:
  149. pids_ps.insert(0, 0)
  150. if pids_ps != pids_psutil:
  151. difference = [x for x in pids_psutil if x not in pids_ps] + \
  152. [x for x in pids_ps if x not in pids_psutil]
  153. self.fail("difference: " + str(difference))
  154. # for some reason ifconfig -a does not report all interfaces
  155. # returned by psutil
  156. @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
  157. @unittest.skipIf(TRAVIS, "test not reliable on Travis")
  158. def test_nic_names(self):
  159. p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
  160. output = p.communicate()[0].strip()
  161. if PY3:
  162. output = str(output, sys.stdout.encoding)
  163. for nic in psutil.net_io_counters(pernic=True).keys():
  164. for line in output.split():
  165. if line.startswith(nic):
  166. break
  167. else:
  168. self.fail(
  169. "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
  170. nic, output))
  171. @retry_before_failing()
  172. def test_users(self):
  173. out = sh("who")
  174. lines = out.split('\n')
  175. users = [x.split()[0] for x in lines]
  176. self.assertEqual(len(users), len(psutil.users()))
  177. terminals = [x.split()[1] for x in lines]
  178. for u in psutil.users():
  179. self.assertTrue(u.name in users, u.name)
  180. self.assertTrue(u.terminal in terminals, u.terminal)
  181. def test_fds_open(self):
  182. # Note: this fails from time to time; I'm keen on thinking
  183. # it doesn't mean something is broken
  184. def call(p, attr):
  185. args = ()
  186. attr = getattr(p, name, None)
  187. if attr is not None and callable(attr):
  188. if name == 'rlimit':
  189. args = (psutil.RLIMIT_NOFILE,)
  190. attr(*args)
  191. else:
  192. attr
  193. p = psutil.Process(os.getpid())
  194. failures = []
  195. ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
  196. 'send_signal', 'wait', 'children', 'as_dict']
  197. if LINUX and get_kernel_version() < (2, 6, 36):
  198. ignored_names.append('rlimit')
  199. if LINUX and get_kernel_version() < (2, 6, 23):
  200. ignored_names.append('num_ctx_switches')
  201. for name in dir(psutil.Process):
  202. if (name.startswith('_') or name in ignored_names):
  203. continue
  204. else:
  205. try:
  206. num1 = p.num_fds()
  207. for x in range(2):
  208. call(p, name)
  209. num2 = p.num_fds()
  210. except psutil.AccessDenied:
  211. pass
  212. else:
  213. if abs(num2 - num1) > 1:
  214. fail = "failure while processing Process.%s method " \
  215. "(before=%s, after=%s)" % (name, num1, num2)
  216. failures.append(fail)
  217. if failures:
  218. self.fail('\n' + '\n'.join(failures))
  219. def main():
  220. test_suite = unittest.TestSuite()
  221. test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
  222. result = unittest.TextTestRunner(verbosity=2).run(test_suite)
  223. return result.wasSuccessful()
  224. if __name__ == '__main__':
  225. if not main():
  226. sys.exit(1)