123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- #!/usr/bin/env python
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """
- A test script which attempts to detect memory leaks by calling C
- functions many times and compare process memory usage before and
- after the calls. It might produce false positives.
- """
- import functools
- import gc
- import os
- import socket
- import sys
- import threading
- import time
- import psutil
- import psutil._common
- from psutil._compat import xrange, callable
- from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN,
- RLIMIT_SUPPORT, TRAVIS)
- from test_psutil import (reap_children, supports_ipv6, safe_remove,
- get_test_subprocess)
- if sys.version_info < (2, 7):
- import unittest2 as unittest # https://pypi.python.org/pypi/unittest2
- else:
- import unittest
- LOOPS = 1000
- TOLERANCE = 4096
- SKIP_PYTHON_IMPL = True
- def skip_if_linux():
- return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
- "not worth being tested on LINUX (pure python)")
- class Base(unittest.TestCase):
- proc = psutil.Process()
- def execute(self, function, *args, **kwargs):
- def call_many_times():
- for x in xrange(LOOPS - 1):
- self.call(function, *args, **kwargs)
- del x
- gc.collect()
- return self.get_mem()
- self.call(function, *args, **kwargs)
- self.assertEqual(gc.garbage, [])
- self.assertEqual(threading.active_count(), 1)
- # RSS comparison
- # step 1
- rss1 = call_many_times()
- # step 2
- rss2 = call_many_times()
- difference = rss2 - rss1
- if difference > TOLERANCE:
- # This doesn't necessarily mean we have a leak yet.
- # At this point we assume that after having called the
- # function so many times the memory usage is stabilized
- # and if there are no leaks it should not increase any
- # more.
- # Let's keep calling fun for 3 more seconds and fail if
- # we notice any difference.
- stop_at = time.time() + 3
- while True:
- self.call(function, *args, **kwargs)
- if time.time() >= stop_at:
- break
- del stop_at
- gc.collect()
- rss3 = self.get_mem()
- difference = rss3 - rss2
- if rss3 > rss2:
- self.fail("rss2=%s, rss3=%s, difference=%s"
- % (rss2, rss3, difference))
- def execute_w_exc(self, exc, function, *args, **kwargs):
- kwargs['_exc'] = exc
- self.execute(function, *args, **kwargs)
- def get_mem(self):
- return psutil.Process().memory_info()[0]
- def call(self, function, *args, **kwargs):
- raise NotImplementedError("must be implemented in subclass")
- class TestProcessObjectLeaks(Base):
- """Test leaks of Process class methods and properties"""
- def setUp(self):
- gc.collect()
- def tearDown(self):
- reap_children()
- def call(self, function, *args, **kwargs):
- if callable(function):
- if '_exc' in kwargs:
- exc = kwargs.pop('_exc')
- self.assertRaises(exc, function, *args, **kwargs)
- else:
- try:
- function(*args, **kwargs)
- except psutil.Error:
- pass
- else:
- meth = getattr(self.proc, function)
- if '_exc' in kwargs:
- exc = kwargs.pop('_exc')
- self.assertRaises(exc, meth, *args, **kwargs)
- else:
- try:
- meth(*args, **kwargs)
- except psutil.Error:
- pass
- @skip_if_linux()
- def test_name(self):
- self.execute('name')
- @skip_if_linux()
- def test_cmdline(self):
- self.execute('cmdline')
- @skip_if_linux()
- def test_exe(self):
- self.execute('exe')
- @skip_if_linux()
- def test_ppid(self):
- self.execute('ppid')
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_uids(self):
- self.execute('uids')
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_gids(self):
- self.execute('gids')
- @skip_if_linux()
- def test_status(self):
- self.execute('status')
- def test_nice_get(self):
- self.execute('nice')
- def test_nice_set(self):
- niceness = psutil.Process().nice()
- self.execute('nice', niceness)
- @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
- "Linux and Windows Vista only")
- def test_ionice_get(self):
- self.execute('ionice')
- @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
- "Linux and Windows Vista only")
- def test_ionice_set(self):
- if WINDOWS:
- value = psutil.Process().ionice()
- self.execute('ionice', value)
- else:
- from psutil._pslinux import cext
- self.execute('ionice', psutil.IOPRIO_CLASS_NONE)
- fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
- self.execute_w_exc(OSError, fun)
- @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform")
- @skip_if_linux()
- def test_io_counters(self):
- self.execute('io_counters')
- @unittest.skipUnless(WINDOWS, "not worth being tested on posix")
- def test_username(self):
- self.execute('username')
- @skip_if_linux()
- def test_create_time(self):
- self.execute('create_time')
- @skip_if_linux()
- def test_num_threads(self):
- self.execute('num_threads')
- @unittest.skipUnless(WINDOWS, "Windows only")
- def test_num_handles(self):
- self.execute('num_handles')
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_num_fds(self):
- self.execute('num_fds')
- @skip_if_linux()
- def test_threads(self):
- self.execute('threads')
- @skip_if_linux()
- def test_cpu_times(self):
- self.execute('cpu_times')
- @skip_if_linux()
- def test_memory_info(self):
- self.execute('memory_info')
- @skip_if_linux()
- def test_memory_info_ex(self):
- self.execute('memory_info_ex')
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_terminal(self):
- self.execute('terminal')
- @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
- "not worth being tested on POSIX (pure python)")
- def test_resume(self):
- self.execute('resume')
- @skip_if_linux()
- def test_cwd(self):
- self.execute('cwd')
- @unittest.skipUnless(WINDOWS or LINUX or BSD,
- "Windows or Linux or BSD only")
- def test_cpu_affinity_get(self):
- self.execute('cpu_affinity')
- @unittest.skipUnless(WINDOWS or LINUX or BSD,
- "Windows or Linux or BSD only")
- def test_cpu_affinity_set(self):
- affinity = psutil.Process().cpu_affinity()
- self.execute('cpu_affinity', affinity)
- if not TRAVIS:
- self.execute_w_exc(ValueError, 'cpu_affinity', [-1])
- @skip_if_linux()
- def test_open_files(self):
- safe_remove(TESTFN) # needed after UNIX socket test has run
- with open(TESTFN, 'w'):
- self.execute('open_files')
- # OSX implementation is unbelievably slow
- @unittest.skipIf(OSX, "OSX implementation is too slow")
- @skip_if_linux()
- def test_memory_maps(self):
- self.execute('memory_maps')
- @unittest.skipUnless(LINUX, "Linux only")
- @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
- "only available on Linux >= 2.6.36")
- def test_rlimit_get(self):
- self.execute('rlimit', psutil.RLIMIT_NOFILE)
- @unittest.skipUnless(LINUX, "Linux only")
- @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
- "only available on Linux >= 2.6.36")
- def test_rlimit_set(self):
- limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)
- self.execute('rlimit', psutil.RLIMIT_NOFILE, limit)
- self.execute_w_exc(OSError, 'rlimit', -1)
- @skip_if_linux()
- # Windows implementation is based on a single system-wide function
- @unittest.skipIf(WINDOWS, "tested later")
- def test_connections(self):
- def create_socket(family, type):
- sock = socket.socket(family, type)
- sock.bind(('', 0))
- if type == socket.SOCK_STREAM:
- sock.listen(1)
- return sock
- socks = []
- socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
- socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
- if supports_ipv6():
- socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
- socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
- if hasattr(socket, 'AF_UNIX'):
- safe_remove(TESTFN)
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.bind(TESTFN)
- s.listen(1)
- socks.append(s)
- kind = 'all'
- # TODO: UNIX sockets are temporarily implemented by parsing
- # 'pfiles' cmd output; we don't want that part of the code to
- # be executed.
- if SUNOS:
- kind = 'inet'
- try:
- self.execute('connections', kind=kind)
- finally:
- for s in socks:
- s.close()
- p = get_test_subprocess()
- DEAD_PROC = psutil.Process(p.pid)
- DEAD_PROC.kill()
- DEAD_PROC.wait()
- del p
- class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
- """Same as above but looks for leaks occurring when dealing with
- zombie processes raising NoSuchProcess exception.
- """
- proc = DEAD_PROC
- def call(self, *args, **kwargs):
- try:
- TestProcessObjectLeaks.call(self, *args, **kwargs)
- except psutil.NoSuchProcess:
- pass
- if not POSIX:
- def test_kill(self):
- self.execute('kill')
- def test_terminate(self):
- self.execute('terminate')
- def test_suspend(self):
- self.execute('suspend')
- def test_resume(self):
- self.execute('resume')
- def test_wait(self):
- self.execute('wait')
- class TestModuleFunctionsLeaks(Base):
- """Test leaks of psutil module functions."""
- def setUp(self):
- gc.collect()
- def call(self, function, *args, **kwargs):
- fun = getattr(psutil, function)
- fun(*args, **kwargs)
- @skip_if_linux()
- def test_cpu_count_logical(self):
- psutil.cpu_count = psutil._psplatform.cpu_count_logical
- self.execute('cpu_count')
- @skip_if_linux()
- def test_cpu_count_physical(self):
- psutil.cpu_count = psutil._psplatform.cpu_count_physical
- self.execute('cpu_count')
- @skip_if_linux()
- def test_boot_time(self):
- self.execute('boot_time')
- @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
- "not worth being tested on POSIX (pure python)")
- def test_pid_exists(self):
- self.execute('pid_exists', os.getpid())
- def test_virtual_memory(self):
- self.execute('virtual_memory')
- # TODO: remove this skip when this gets fixed
- @unittest.skipIf(SUNOS,
- "not worth being tested on SUNOS (uses a subprocess)")
- def test_swap_memory(self):
- self.execute('swap_memory')
- @skip_if_linux()
- def test_cpu_times(self):
- self.execute('cpu_times')
- @skip_if_linux()
- def test_per_cpu_times(self):
- self.execute('cpu_times', percpu=True)
- @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
- "not worth being tested on POSIX (pure python)")
- def test_disk_usage(self):
- self.execute('disk_usage', '.')
- def test_disk_partitions(self):
- self.execute('disk_partitions')
- @skip_if_linux()
- def test_net_io_counters(self):
- self.execute('net_io_counters')
- @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
- '/proc/diskstats not available on this Linux version')
- @skip_if_linux()
- def test_disk_io_counters(self):
- self.execute('disk_io_counters')
- # XXX - on Windows this produces a false positive
- @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")
- def test_users(self):
- self.execute('users')
- @unittest.skipIf(LINUX,
- "not worth being tested on Linux (pure python)")
- def test_net_connections(self):
- self.execute('net_connections')
- def test_net_if_addrs(self):
- self.execute('net_if_addrs')
- @unittest.skipIf(TRAVIS, "EPERM on travis")
- def test_net_if_stats(self):
- self.execute('net_if_stats')
- def main():
- test_suite = unittest.TestSuite()
- tests = [TestProcessObjectLeaksZombie,
- TestProcessObjectLeaks,
- TestModuleFunctionsLeaks]
- for test in tests:
- test_suite.addTest(unittest.makeSuite(test))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
- if __name__ == '__main__':
- if not main():
- sys.exit(1)
|