_osx.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #!/usr/bin/env python
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """OSX specific tests. These are implicitly run by test_psutil.py."""
  6. import os
  7. import re
  8. import subprocess
  9. import sys
  10. import time
  11. import psutil
  12. from psutil._compat import PY3
  13. from test_psutil import (TOLERANCE, OSX, sh, get_test_subprocess,
  14. reap_children, retry_before_failing, unittest)
  15. PAGESIZE = os.sysconf("SC_PAGE_SIZE")
  16. def sysctl(cmdline):
  17. """Expects a sysctl command with an argument and parse the result
  18. returning only the value of interest.
  19. """
  20. p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
  21. result = p.communicate()[0].strip().split()[1]
  22. if PY3:
  23. result = str(result, sys.stdout.encoding)
  24. try:
  25. return int(result)
  26. except ValueError:
  27. return result
  28. def vm_stat(field):
  29. """Wrapper around 'vm_stat' cmdline utility."""
  30. out = sh('vm_stat')
  31. for line in out.split('\n'):
  32. if field in line:
  33. break
  34. else:
  35. raise ValueError("line not found")
  36. return int(re.search('\d+', line).group(0)) * PAGESIZE
  37. @unittest.skipUnless(OSX, "not an OSX system")
  38. class OSXSpecificTestCase(unittest.TestCase):
  39. @classmethod
  40. def setUpClass(cls):
  41. cls.pid = get_test_subprocess().pid
  42. @classmethod
  43. def tearDownClass(cls):
  44. reap_children()
  45. def test_process_create_time(self):
  46. cmdline = "ps -o lstart -p %s" % self.pid
  47. p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
  48. output = p.communicate()[0]
  49. if PY3:
  50. output = str(output, sys.stdout.encoding)
  51. start_ps = output.replace('STARTED', '').strip()
  52. start_psutil = psutil.Process(self.pid).create_time()
  53. start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
  54. time.localtime(start_psutil))
  55. self.assertEqual(start_ps, start_psutil)
  56. def test_disks(self):
  57. # test psutil.disk_usage() and psutil.disk_partitions()
  58. # against "df -a"
  59. def df(path):
  60. out = sh('df -k "%s"' % path).strip()
  61. lines = out.split('\n')
  62. lines.pop(0)
  63. line = lines.pop(0)
  64. dev, total, used, free = line.split()[:4]
  65. if dev == 'none':
  66. dev = ''
  67. total = int(total) * 1024
  68. used = int(used) * 1024
  69. free = int(free) * 1024
  70. return dev, total, used, free
  71. for part in psutil.disk_partitions(all=False):
  72. usage = psutil.disk_usage(part.mountpoint)
  73. dev, total, used, free = df(part.mountpoint)
  74. self.assertEqual(part.device, dev)
  75. self.assertEqual(usage.total, total)
  76. # 10 MB tollerance
  77. if abs(usage.free - free) > 10 * 1024 * 1024:
  78. self.fail("psutil=%s, df=%s" % usage.free, free)
  79. if abs(usage.used - used) > 10 * 1024 * 1024:
  80. self.fail("psutil=%s, df=%s" % usage.used, used)
  81. # --- virtual mem
  82. def test_vmem_total(self):
  83. sysctl_hwphymem = sysctl('sysctl hw.memsize')
  84. self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total)
  85. @retry_before_failing()
  86. def test_vmem_free(self):
  87. num = vm_stat("free")
  88. self.assertAlmostEqual(psutil.virtual_memory().free, num,
  89. delta=TOLERANCE)
  90. @retry_before_failing()
  91. def test_vmem_active(self):
  92. num = vm_stat("active")
  93. self.assertAlmostEqual(psutil.virtual_memory().active, num,
  94. delta=TOLERANCE)
  95. @retry_before_failing()
  96. def test_vmem_inactive(self):
  97. num = vm_stat("inactive")
  98. self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
  99. delta=TOLERANCE)
  100. @retry_before_failing()
  101. def test_vmem_wired(self):
  102. num = vm_stat("wired")
  103. self.assertAlmostEqual(psutil.virtual_memory().wired, num,
  104. delta=TOLERANCE)
  105. # --- swap mem
  106. def test_swapmem_sin(self):
  107. num = vm_stat("Pageins")
  108. self.assertEqual(psutil.swap_memory().sin, num)
  109. def test_swapmem_sout(self):
  110. num = vm_stat("Pageouts")
  111. self.assertEqual(psutil.swap_memory().sout, num)
  112. def test_swapmem_total(self):
  113. tot1 = psutil.swap_memory().total
  114. tot2 = 0
  115. # OSX uses multiple cache files:
  116. # http://en.wikipedia.org/wiki/Paging#OS_X
  117. for name in os.listdir("/var/vm/"):
  118. file = os.path.join("/var/vm", name)
  119. if os.path.isfile(file):
  120. tot2 += os.path.getsize(file)
  121. self.assertEqual(tot1, tot2)
  122. def main():
  123. test_suite = unittest.TestSuite()
  124. test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
  125. result = unittest.TextTestRunner(verbosity=2).run(test_suite)
  126. return result.wasSuccessful()
  127. if __name__ == '__main__':
  128. if not main():
  129. sys.exit(1)