123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672 |
- # Copyright (C) 2011 Google Inc. All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are
- # met:
- #
- # * Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # * Redistributions in binary form must reproduce the above
- # copyright notice, this list of conditions and the following disclaimer
- # in the documentation and/or other materials provided with the
- # distribution.
- # * Neither the name of Google Inc. nor the names of its
- # contributors may be used to endorse or promote products derived from
- # this software without specific prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- import logging
- import math
- import threading
- import time
- from webkitpy.common import message_pool
- from webkitpy.layout_tests.controllers import single_test_runner
- from webkitpy.layout_tests.controllers import multiple_test_runner
- from webkitpy.layout_tests.models.test_run_results import TestRunResults
- from webkitpy.layout_tests.models import test_expectations
- from webkitpy.layout_tests.models import test_failures
- from webkitpy.layout_tests.models import test_results
- from webkitpy.tool import grammar
- _log = logging.getLogger(__name__)
- TestExpectations = test_expectations.TestExpectations
- # Export this so callers don't need to know about message pools.
- WorkerException = message_pool.WorkerException
- class TestRunInterruptedException(Exception):
- """Raised when a test run should be stopped immediately."""
- def __init__(self, reason):
- Exception.__init__(self)
- self.reason = reason
- self.msg = reason
- def __reduce__(self):
- return self.__class__, (self.reason,)
- class LayoutTestRunner(object):
- def __init__(self, options, port, printer, results_directory, test_is_slow_fn):
- self._options = options
- self._port = port
- self._printer = printer
- self._results_directory = results_directory
- self._test_is_slow = test_is_slow_fn
- self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards)
- self._filesystem = self._port.host.filesystem
- self._expectations = None
- self._test_inputs = []
- self._needs_http = None
- self._needs_websockets = None
- self._retrying = False
- self._current_run_results = None
- self._remaining_locked_shards = []
- self._has_http_lock = False
- def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, needs_http, needs_websockets, retrying):
- self._expectations = expectations
- self._test_inputs = test_inputs
- self._needs_http = needs_http
- self._needs_websockets = needs_websockets
- self._retrying = retrying
- # FIXME: rename all variables to test_run_results or some such ...
- run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
- self._current_run_results = run_results
- self._remaining_locked_shards = []
- self._has_http_lock = False
- self._printer.num_tests = len(test_inputs)
- self._printer.num_started = 0
- if not retrying:
- self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type)
- for test_name in set(tests_to_skip):
- result = test_results.TestResult(test_name)
- result.type = test_expectations.SKIP
- run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
- self._printer.write_update('Sharding tests ...')
- locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
- # FIXME: We don't have a good way to coordinate the workers so that
- # they don't try to run the shards that need a lock if we don't actually
- # have the lock. The easiest solution at the moment is to grab the
- # lock at the beginning of the run, and then run all of the locked
- # shards first. This minimizes the time spent holding the lock, but
- # means that we won't be running tests while we're waiting for the lock.
- # If this becomes a problem in practice we'll need to change this.
- all_shards = locked_shards + unlocked_shards
- self._remaining_locked_shards = locked_shards
- if locked_shards and self._options.http:
- self.start_servers_with_lock(2 * min(num_workers, len(locked_shards)))
- num_workers = min(num_workers, len(all_shards))
- self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards))
- if self._options.dry_run:
- return run_results
- self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
- try:
- with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
- pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
- except TestRunInterruptedException, e:
- _log.warning(e.reason)
- run_results.interrupted = True
- except KeyboardInterrupt:
- self._printer.flush()
- self._printer.writeln('Interrupted, exiting ...')
- raise
- except Exception, e:
- _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
- raise
- finally:
- self.stop_servers_with_lock()
- return run_results
- def _worker_factory(self, worker_connection):
- results_directory = self._results_directory
- if self._retrying:
- self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
- results_directory = self._filesystem.join(self._results_directory, 'retries')
- return Worker(worker_connection, results_directory, self._options)
- def _mark_interrupted_tests_as_skipped(self, run_results):
- for test_input in self._test_inputs:
- if test_input.test_name not in run_results.results_by_name:
- result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
- # FIXME: We probably need to loop here if there are multiple iterations.
- # FIXME: Also, these results are really neither expected nor unexpected. We probably
- # need a third type of result.
- run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
- def _interrupt_if_at_failure_limits(self, run_results):
- # Note: The messages in this method are constructed to match old-run-webkit-tests
- # so that existing buildbot grep rules work.
- def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
- if limit and failure_count >= limit:
- message += " %d tests run." % (run_results.expected + run_results.unexpected)
- self._mark_interrupted_tests_as_skipped(run_results)
- raise TestRunInterruptedException(message)
- interrupt_if_at_failure_limit(
- self._options.exit_after_n_failures,
- run_results.unexpected_failures,
- run_results,
- "Exiting early after %d failures." % run_results.unexpected_failures)
- interrupt_if_at_failure_limit(
- self._options.exit_after_n_crashes_or_timeouts,
- run_results.unexpected_crashes + run_results.unexpected_timeouts,
- run_results,
- # This differs from ORWT because it does not include WebProcess crashes.
- "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
- def _update_summary_with_result(self, run_results, result):
- if result.type == test_expectations.SKIP:
- exp_str = got_str = 'SKIP'
- expected = True
- else:
- expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type)
- exp_str = self._expectations.get_expectations_string(result.test_name)
- got_str = self._expectations.expectation_to_string(result.type)
- run_results.add(result, expected, self._test_is_slow(result.test_name))
- self._printer.print_finished_test(result, expected, exp_str, got_str)
- self._interrupt_if_at_failure_limits(run_results)
- def start_servers_with_lock(self, number_of_servers):
- self._printer.write_update('Acquiring http lock ...')
- self._port.acquire_http_lock()
- if self._needs_http:
- self._printer.write_update('Starting HTTP server ...')
- self._port.start_http_server(number_of_servers=number_of_servers)
- if self._needs_websockets:
- self._printer.write_update('Starting WebSocket server ...')
- self._port.start_websocket_server()
- self._has_http_lock = True
- def stop_servers_with_lock(self):
- if self._has_http_lock:
- if self._needs_http:
- self._printer.write_update('Stopping HTTP server ...')
- self._port.stop_http_server()
- if self._needs_websockets:
- self._printer.write_update('Stopping WebSocket server ...')
- self._port.stop_websocket_server()
- self._printer.write_update('Releasing server lock ...')
- self._port.release_http_lock()
- self._has_http_lock = False
- def handle(self, name, source, *args):
- method = getattr(self, '_handle_' + name)
- if method:
- return method(source, *args)
- raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
- def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
- self._printer.print_started_test(test_input.test_name)
- def _handle_finished_test_list(self, worker_name, list_name):
- def find(name, test_lists):
- for i in range(len(test_lists)):
- if test_lists[i].name == name:
- return i
- return -1
- index = find(list_name, self._remaining_locked_shards)
- if index >= 0:
- self._remaining_locked_shards.pop(index)
- if not self._remaining_locked_shards:
- self.stop_servers_with_lock()
- # After completing this test list, restore the finished test
- # counter.
- self._printer.reset_test_stats()
- def _handle_finished_test(self, worker_name, result, log_messages=[]):
- self._update_summary_with_result(self._current_run_results, result)
- def _handle_update_stats(self, worker_name, difference):
- self._printer.update_test_stats(difference)
-
- def _handle_update_running_test(self, worker_name):
- self._printer.update_running_test()
- class Worker(object):
- def __init__(self, caller, results_directory, options):
- self._caller = caller
- self._worker_number = caller.worker_number
- self._name = caller.name
- self._results_directory = results_directory
- self._options = options
- # The remaining fields are initialized in start()
- self._host = None
- self._port = None
- self._batch_size = None
- self._batch_count = None
- self._filesystem = None
- self._driver = None
- self._num_tests = 0
- def __del__(self):
- self.stop()
- def start(self):
- """This method is called when the object is starting to be used and it is safe
- for the object to create state that does not need to be pickled (usually this means
- it is called in a child process)."""
- self._host = self._caller.host
- self._filesystem = self._host.filesystem
- self._port = self._host.port_factory.get(self._options.platform, self._options)
- self._batch_count = 0
- self._batch_size = self._options.batch_size or 0
- def handle(self, name, source, test_list_name, test_inputs):
- assert name == 'test_list'
- tests_handled = 0
- chunk_size = self._options.chunk_size
- test_counter = 0
- current_test = 0
- j = 0
- size = chunk_size
- if chunk_size > 1 and len(test_inputs) >= 1:
- while test_counter < len(test_inputs):
- test_input = []
- while((current_test < size) and current_test <len(test_inputs)):
- test_input.append(test_inputs[current_test])
- current_test += 1
- test_counter += 1
- # Prepare for the next set of tests.
- tests_handled = self._run_test_chunks(test_input, test_list_name)
- if tests_handled < len(test_input):
- # Crash encountered in running a set of tests
- # Complete the rest of the list in single test runner mode
- j = (current_test - len(test_input)) + tests_handled
- _log.info("\nSwitching to single-test-runner, re-executing test %d" % (j+1))
- _log.info('\n')
- # Adjust printer stats for no of tests started and finished
- self._caller.post('update_stats', (len(test_input) - int(tests_handled)))
- self._run_test(test_inputs[j], test_list_name)
- current_test = test_counter = j + 1
- _log.info("\nSwitching back to multiple-test-runner")
- if current_test == size:
- size += chunk_size
- else:
- size += chunk_size
- else:
- for test_input in test_inputs:
- self._run_test(test_input, test_list_name)
- self._caller.post('finished_test_list', test_list_name)
- def _update_test_input(self, test_input):
- if test_input.reference_files is None:
- # Lazy initialization.
- test_input.reference_files = self._port.reference_files(test_input.test_name)
- if test_input.reference_files:
- test_input.should_run_pixel_test = True
- else:
- test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
- def _run_test(self, test_input, shard_name):
- self._batch_count += 1
- stop_when_done = False
- if self._batch_size > 0 and self._batch_count >= self._batch_size:
- self._batch_count = 0
- stop_when_done = True
- self._update_test_input(test_input)
- test_timeout_sec = self._timeout(test_input)
- start = time.time()
- self._caller.post('started_test', test_input, test_timeout_sec)
- result = self._run_test_with_timeout(test_input, test_timeout_sec, stop_when_done)
- result.shard_name = shard_name
- result.worker_name = self._name
- result.total_run_time = time.time() - start
- result.test_number = self._num_tests
- self._num_tests += 1
- self._caller.post('finished_test', result)
- self._clean_up_after_test(test_input, result)
- def _run_test_chunks(self, test_input, shard_name):
- num_test = len(test_input)
- result = []
- self._batch_count += num_test
- stop_when_done = False
- if self._batch_size > 0 and self._batch_count >= self._batch_size:
- self._batch_count = 0
- stop_when_done = True
- for test in test_input:
- self._update_test_input(test)
- test_timeout_sec = (self._timeout(test_input[0]))*num_test
- start = time.time()
- for x in range(0, num_test):
- self._caller.post('started_test', test_input[x], test_timeout_sec)
- result, num_tests_run = self._run_multiple_test_with_timeout(test_input, test_timeout_sec, stop_when_done)
- # Handle the list of result objects.
- for x in range(0, num_tests_run):
- result[x].shard_name = shard_name
- result[x].worker_name = self._name
- result[x].total_run_time = time.time() - start
- result[x].test_number = self._num_tests
- self._caller.post('finished_test', result[x])
- self._clean_up_after_test(test_input[x], result[x])
- if num_tests_run < len(test_input):
- self._caller.post('update_running_test')
- self._num_tests = num_tests_run
- return num_tests_run
- def stop(self):
- _log.debug("%s cleaning up" % self._name)
- self._kill_driver()
- def _timeout(self, test_input):
- """Compute the appropriate timeout value for a test."""
- # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be
- # larger than that. We also add a little more padding if we're
- # running tests in a separate thread.
- #
- # Note that we need to convert the test timeout from a
- # string value in milliseconds to a float for Python.
- driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
- if not self._options.run_singly:
- return driver_timeout_sec
- thread_padding_sec = 1.0
- thread_timeout_sec = driver_timeout_sec + thread_padding_sec
- return thread_timeout_sec
- def _kill_driver(self):
- # Be careful about how and when we kill the driver; if driver.stop()
- # raises an exception, this routine may get re-entered via __del__.
- driver = self._driver
- self._driver = None
- if driver:
- _log.debug("%s killing driver" % self._name)
- driver.stop()
- def _run_test_with_timeout(self, test_input, timeout, stop_when_done):
- if self._options.run_singly:
- return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
- return self._run_test_in_this_thread(test_input, stop_when_done)
- def _run_multiple_test_with_timeout(self, test_input, timeout, stop_when_done):
- # Handling running a chunk of tests with WebKitTestRunnerOrbis
- # Currently this is not supported with --run-singly option
- # option --chunk-size=<number>
- if self._driver and self._driver.has_crashed():
- self._kill_driver()
- if not self._driver:
- self._driver = self._port.create_driver(self._worker_number)
- return self._run_multiple_tests(self._driver, test_input, stop_when_done)
- def _clean_up_after_test(self, test_input, result):
- test_name = test_input.test_name
- if result.failures:
- # Check and kill DumpRenderTree if we need to.
- if any([f.driver_needs_restart() for f in result.failures]):
- self._kill_driver()
- # Reset the batch count since the shell just bounced.
- self._batch_count = 0
- # Print the error message(s).
- _log.debug("%s %s failed:" % (self._name, test_name))
- for f in result.failures:
- _log.debug("%s %s" % (self._name, f.message()))
- elif result.type == test_expectations.SKIP:
- _log.debug("%s %s skipped" % (self._name, test_name))
- else:
- _log.debug("%s %s passed" % (self._name, test_name))
- def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
- """Run a test in a separate thread, enforcing a hard time limit.
- Since we can only detect the termination of a thread, not any internal
- state or progress, we can only run per-test timeouts when running test
- files singly.
- Args:
- test_input: Object containing the test filename and timeout
- thread_timeout_sec: time to wait before killing the driver process.
- Returns:
- A TestResult
- """
- worker = self
- driver = self._port.create_driver(self._worker_number)
- class SingleTestThread(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
- self.result = None
- def run(self):
- self.result = worker._run_single_test(driver, test_input, stop_when_done)
- thread = SingleTestThread()
- thread.start()
- thread.join(thread_timeout_sec)
- result = thread.result
- failures = []
- if thread.isAlive():
- # If join() returned with the thread still running, the
- # DumpRenderTree is completely hung and there's nothing
- # more we can do with it. We have to kill all the
- # DumpRenderTrees to free it up. If we're running more than
- # one DumpRenderTree thread, we'll end up killing the other
- # DumpRenderTrees too, introducing spurious crashes. We accept
- # that tradeoff in order to avoid losing the rest of this
- # thread's results.
- _log.error('Test thread hung: killing all DumpRenderTrees')
- failures = [test_failures.FailureTimeout()]
- driver.stop()
- if not result:
- result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
- return result
- def _run_test_in_this_thread(self, test_input, stop_when_done):
- """Run a single test file using a shared DumpRenderTree process.
- Args:
- test_input: Object containing the test filename, uri and timeout
- Returns: a TestResult object.
- """
- if self._driver and self._driver.has_crashed():
- self._kill_driver()
- if not self._driver:
- self._driver = self._port.create_driver(self._worker_number)
- return self._run_single_test(self._driver, test_input, stop_when_done)
- def _run_single_test(self, driver, test_input, stop_when_done):
- return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
- self._name, driver, test_input, stop_when_done)
- def _run_multiple_tests(self, driver, test_input, stop_when_done):
- return multiple_test_runner.run_multiple_tests(self._port, self._options, self._results_directory,
- self._name, driver, test_input, stop_when_done)
- class TestShard(object):
- """A test shard is a named list of TestInputs."""
- def __init__(self, name, test_inputs):
- self.name = name
- self.test_inputs = test_inputs
- self.requires_lock = test_inputs[0].requires_lock
- def __repr__(self):
- return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock)
- def __eq__(self, other):
- return self.name == other.name and self.test_inputs == other.test_inputs
- class Sharder(object):
- def __init__(self, test_split_fn, max_locked_shards):
- self._split = test_split_fn
- self._max_locked_shards = max_locked_shards
- def shard_tests(self, test_inputs, num_workers, fully_parallel):
- """Groups tests into batches.
- This helps ensure that tests that depend on each other (aka bad tests!)
- continue to run together as most cross-tests dependencies tend to
- occur within the same directory.
- Return:
- Two list of TestShards. The first contains tests that must only be
- run under the server lock, the second can be run whenever.
- """
- # FIXME: Move all of the sharding logic out of manager into its
- # own class or module. Consider grouping it with the chunking logic
- # in prepare_lists as well.
- if num_workers == 1:
- return self._shard_in_two(test_inputs)
- elif fully_parallel:
- return self._shard_every_file(test_inputs)
- return self._shard_by_directory(test_inputs, num_workers)
- def _shard_in_two(self, test_inputs):
- """Returns two lists of shards, one with all the tests requiring a lock and one with the rest.
- This is used when there's only one worker, to minimize the per-shard overhead."""
- locked_inputs = []
- unlocked_inputs = []
- for test_input in test_inputs:
- if test_input.requires_lock:
- locked_inputs.append(test_input)
- else:
- unlocked_inputs.append(test_input)
- locked_shards = []
- unlocked_shards = []
- if locked_inputs:
- locked_shards = [TestShard('locked_tests', locked_inputs)]
- if unlocked_inputs:
- unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)]
- return locked_shards, unlocked_shards
- def _shard_every_file(self, test_inputs):
- """Returns two lists of shards, each shard containing a single test file.
- This mode gets maximal parallelism at the cost of much higher flakiness."""
- locked_shards = []
- unlocked_shards = []
- for test_input in test_inputs:
- # Note that we use a '.' for the shard name; the name doesn't really
- # matter, and the only other meaningful value would be the filename,
- # which would be really redundant.
- if test_input.requires_lock:
- locked_shards.append(TestShard('.', [test_input]))
- else:
- unlocked_shards.append(TestShard('.', [test_input]))
- return locked_shards, unlocked_shards
- def _shard_by_directory(self, test_inputs, num_workers):
- """Returns two lists of shards, each shard containing all the files in a directory.
- This is the default mode, and gets as much parallelism as we can while
- minimizing flakiness caused by inter-test dependencies."""
- locked_shards = []
- unlocked_shards = []
- tests_by_dir = {}
- # FIXME: Given that the tests are already sorted by directory,
- # we can probably rewrite this to be clearer and faster.
- for test_input in test_inputs:
- directory = self._split(test_input.test_name)[0]
- tests_by_dir.setdefault(directory, [])
- tests_by_dir[directory].append(test_input)
- for directory, test_inputs in tests_by_dir.iteritems():
- shard = TestShard(directory, test_inputs)
- if test_inputs[0].requires_lock:
- locked_shards.append(shard)
- else:
- unlocked_shards.append(shard)
- # Sort the shards by directory name.
- locked_shards.sort(key=lambda shard: shard.name)
- unlocked_shards.sort(key=lambda shard: shard.name)
- # Put a ceiling on the number of locked shards, so that we
- # don't hammer the servers too badly.
- # FIXME: For now, limit to one shard or set it
- # with the --max-locked-shards. After testing to make sure we
- # can handle multiple shards, we should probably do something like
- # limit this to no more than a quarter of all workers, e.g.:
- # return max(math.ceil(num_workers / 4.0), 1)
- return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'),
- unlocked_shards)
- def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix):
- """Takes a list of shards and redistributes the tests into no more
- than |max_new_shards| new shards."""
- # This implementation assumes that each input shard only contains tests from a
- # single directory, and that tests in each shard must remain together; as a
- # result, a given input shard is never split between output shards.
- #
- # Each output shard contains the tests from one or more input shards and
- # hence may contain tests from multiple directories.
- def divide_and_round_up(numerator, divisor):
- return int(math.ceil(float(numerator) / divisor))
- def extract_and_flatten(shards):
- test_inputs = []
- for shard in shards:
- test_inputs.extend(shard.test_inputs)
- return test_inputs
- def split_at(seq, index):
- return (seq[:index], seq[index:])
- num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards)
- new_shards = []
- remaining_shards = old_shards
- while remaining_shards:
- some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new)
- new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards)))
- return new_shards
|