layout_test_runner.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. # Copyright (C) 2011 Google Inc. All rights reserved.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions are
  5. # met:
  6. #
  7. # * Redistributions of source code must retain the above copyright
  8. # notice, this list of conditions and the following disclaimer.
  9. # * Redistributions in binary form must reproduce the above
  10. # copyright notice, this list of conditions and the following disclaimer
  11. # in the documentation and/or other materials provided with the
  12. # distribution.
  13. # * Neither the name of Google Inc. nor the names of its
  14. # contributors may be used to endorse or promote products derived from
  15. # this software without specific prior written permission.
  16. #
  17. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  18. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  19. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  20. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  21. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  22. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  23. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  24. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  25. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  27. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. import logging
  29. import math
  30. import threading
  31. import time
  32. from webkitpy.common import message_pool
  33. from webkitpy.layout_tests.controllers import single_test_runner
  34. from webkitpy.layout_tests.controllers import multiple_test_runner
  35. from webkitpy.layout_tests.models.test_run_results import TestRunResults
  36. from webkitpy.layout_tests.models import test_expectations
  37. from webkitpy.layout_tests.models import test_failures
  38. from webkitpy.layout_tests.models import test_results
  39. from webkitpy.tool import grammar
  40. _log = logging.getLogger(__name__)
  41. TestExpectations = test_expectations.TestExpectations
  42. # Export this so callers don't need to know about message pools.
  43. WorkerException = message_pool.WorkerException
  44. class TestRunInterruptedException(Exception):
  45. """Raised when a test run should be stopped immediately."""
  46. def __init__(self, reason):
  47. Exception.__init__(self)
  48. self.reason = reason
  49. self.msg = reason
  50. def __reduce__(self):
  51. return self.__class__, (self.reason,)
  52. class LayoutTestRunner(object):
  53. def __init__(self, options, port, printer, results_directory, test_is_slow_fn):
  54. self._options = options
  55. self._port = port
  56. self._printer = printer
  57. self._results_directory = results_directory
  58. self._test_is_slow = test_is_slow_fn
  59. self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards)
  60. self._filesystem = self._port.host.filesystem
  61. self._expectations = None
  62. self._test_inputs = []
  63. self._needs_http = None
  64. self._needs_websockets = None
  65. self._retrying = False
  66. self._current_run_results = None
  67. self._remaining_locked_shards = []
  68. self._has_http_lock = False
  69. def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, needs_http, needs_websockets, retrying):
  70. self._expectations = expectations
  71. self._test_inputs = test_inputs
  72. self._needs_http = needs_http
  73. self._needs_websockets = needs_websockets
  74. self._retrying = retrying
  75. # FIXME: rename all variables to test_run_results or some such ...
  76. run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
  77. self._current_run_results = run_results
  78. self._remaining_locked_shards = []
  79. self._has_http_lock = False
  80. self._printer.num_tests = len(test_inputs)
  81. self._printer.num_started = 0
  82. if not retrying:
  83. self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type)
  84. for test_name in set(tests_to_skip):
  85. result = test_results.TestResult(test_name)
  86. result.type = test_expectations.SKIP
  87. run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
  88. self._printer.write_update('Sharding tests ...')
  89. locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
  90. # FIXME: We don't have a good way to coordinate the workers so that
  91. # they don't try to run the shards that need a lock if we don't actually
  92. # have the lock. The easiest solution at the moment is to grab the
  93. # lock at the beginning of the run, and then run all of the locked
  94. # shards first. This minimizes the time spent holding the lock, but
  95. # means that we won't be running tests while we're waiting for the lock.
  96. # If this becomes a problem in practice we'll need to change this.
  97. all_shards = locked_shards + unlocked_shards
  98. self._remaining_locked_shards = locked_shards
  99. if locked_shards and self._options.http:
  100. self.start_servers_with_lock(2 * min(num_workers, len(locked_shards)))
  101. num_workers = min(num_workers, len(all_shards))
  102. self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards))
  103. if self._options.dry_run:
  104. return run_results
  105. self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
  106. try:
  107. with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
  108. pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
  109. except TestRunInterruptedException, e:
  110. _log.warning(e.reason)
  111. run_results.interrupted = True
  112. except KeyboardInterrupt:
  113. self._printer.flush()
  114. self._printer.writeln('Interrupted, exiting ...')
  115. raise
  116. except Exception, e:
  117. _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
  118. raise
  119. finally:
  120. self.stop_servers_with_lock()
  121. return run_results
  122. def _worker_factory(self, worker_connection):
  123. results_directory = self._results_directory
  124. if self._retrying:
  125. self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
  126. results_directory = self._filesystem.join(self._results_directory, 'retries')
  127. return Worker(worker_connection, results_directory, self._options)
  128. def _mark_interrupted_tests_as_skipped(self, run_results):
  129. for test_input in self._test_inputs:
  130. if test_input.test_name not in run_results.results_by_name:
  131. result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
  132. # FIXME: We probably need to loop here if there are multiple iterations.
  133. # FIXME: Also, these results are really neither expected nor unexpected. We probably
  134. # need a third type of result.
  135. run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
  136. def _interrupt_if_at_failure_limits(self, run_results):
  137. # Note: The messages in this method are constructed to match old-run-webkit-tests
  138. # so that existing buildbot grep rules work.
  139. def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
  140. if limit and failure_count >= limit:
  141. message += " %d tests run." % (run_results.expected + run_results.unexpected)
  142. self._mark_interrupted_tests_as_skipped(run_results)
  143. raise TestRunInterruptedException(message)
  144. interrupt_if_at_failure_limit(
  145. self._options.exit_after_n_failures,
  146. run_results.unexpected_failures,
  147. run_results,
  148. "Exiting early after %d failures." % run_results.unexpected_failures)
  149. interrupt_if_at_failure_limit(
  150. self._options.exit_after_n_crashes_or_timeouts,
  151. run_results.unexpected_crashes + run_results.unexpected_timeouts,
  152. run_results,
  153. # This differs from ORWT because it does not include WebProcess crashes.
  154. "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
  155. def _update_summary_with_result(self, run_results, result):
  156. if result.type == test_expectations.SKIP:
  157. exp_str = got_str = 'SKIP'
  158. expected = True
  159. else:
  160. expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type)
  161. exp_str = self._expectations.get_expectations_string(result.test_name)
  162. got_str = self._expectations.expectation_to_string(result.type)
  163. run_results.add(result, expected, self._test_is_slow(result.test_name))
  164. self._printer.print_finished_test(result, expected, exp_str, got_str)
  165. self._interrupt_if_at_failure_limits(run_results)
  166. def start_servers_with_lock(self, number_of_servers):
  167. self._printer.write_update('Acquiring http lock ...')
  168. self._port.acquire_http_lock()
  169. if self._needs_http:
  170. self._printer.write_update('Starting HTTP server ...')
  171. self._port.start_http_server(number_of_servers=number_of_servers)
  172. if self._needs_websockets:
  173. self._printer.write_update('Starting WebSocket server ...')
  174. self._port.start_websocket_server()
  175. self._has_http_lock = True
  176. def stop_servers_with_lock(self):
  177. if self._has_http_lock:
  178. if self._needs_http:
  179. self._printer.write_update('Stopping HTTP server ...')
  180. self._port.stop_http_server()
  181. if self._needs_websockets:
  182. self._printer.write_update('Stopping WebSocket server ...')
  183. self._port.stop_websocket_server()
  184. self._printer.write_update('Releasing server lock ...')
  185. self._port.release_http_lock()
  186. self._has_http_lock = False
  187. def handle(self, name, source, *args):
  188. method = getattr(self, '_handle_' + name)
  189. if method:
  190. return method(source, *args)
  191. raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
  192. def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
  193. self._printer.print_started_test(test_input.test_name)
  194. def _handle_finished_test_list(self, worker_name, list_name):
  195. def find(name, test_lists):
  196. for i in range(len(test_lists)):
  197. if test_lists[i].name == name:
  198. return i
  199. return -1
  200. index = find(list_name, self._remaining_locked_shards)
  201. if index >= 0:
  202. self._remaining_locked_shards.pop(index)
  203. if not self._remaining_locked_shards:
  204. self.stop_servers_with_lock()
  205. # After completing this test list, restore the finished test
  206. # counter.
  207. self._printer.reset_test_stats()
  208. def _handle_finished_test(self, worker_name, result, log_messages=[]):
  209. self._update_summary_with_result(self._current_run_results, result)
  210. def _handle_update_stats(self, worker_name, difference):
  211. self._printer.update_test_stats(difference)
  212. def _handle_update_running_test(self, worker_name):
  213. self._printer.update_running_test()
  214. class Worker(object):
  215. def __init__(self, caller, results_directory, options):
  216. self._caller = caller
  217. self._worker_number = caller.worker_number
  218. self._name = caller.name
  219. self._results_directory = results_directory
  220. self._options = options
  221. # The remaining fields are initialized in start()
  222. self._host = None
  223. self._port = None
  224. self._batch_size = None
  225. self._batch_count = None
  226. self._filesystem = None
  227. self._driver = None
  228. self._num_tests = 0
  229. def __del__(self):
  230. self.stop()
  231. def start(self):
  232. """This method is called when the object is starting to be used and it is safe
  233. for the object to create state that does not need to be pickled (usually this means
  234. it is called in a child process)."""
  235. self._host = self._caller.host
  236. self._filesystem = self._host.filesystem
  237. self._port = self._host.port_factory.get(self._options.platform, self._options)
  238. self._batch_count = 0
  239. self._batch_size = self._options.batch_size or 0
  240. def handle(self, name, source, test_list_name, test_inputs):
  241. assert name == 'test_list'
  242. tests_handled = 0
  243. chunk_size = self._options.chunk_size
  244. test_counter = 0
  245. current_test = 0
  246. j = 0
  247. size = chunk_size
  248. if chunk_size > 1 and len(test_inputs) >= 1:
  249. while test_counter < len(test_inputs):
  250. test_input = []
  251. while((current_test < size) and current_test <len(test_inputs)):
  252. test_input.append(test_inputs[current_test])
  253. current_test += 1
  254. test_counter += 1
  255. # Prepare for the next set of tests.
  256. tests_handled = self._run_test_chunks(test_input, test_list_name)
  257. if tests_handled < len(test_input):
  258. # Crash encountered in running a set of tests
  259. # Complete the rest of the list in single test runner mode
  260. j = (current_test - len(test_input)) + tests_handled
  261. _log.info("\nSwitching to single-test-runner, re-executing test %d" % (j+1))
  262. _log.info('\n')
  263. # Adjust printer stats for no of tests started and finished
  264. self._caller.post('update_stats', (len(test_input) - int(tests_handled)))
  265. self._run_test(test_inputs[j], test_list_name)
  266. current_test = test_counter = j + 1
  267. _log.info("\nSwitching back to multiple-test-runner")
  268. if current_test == size:
  269. size += chunk_size
  270. else:
  271. size += chunk_size
  272. else:
  273. for test_input in test_inputs:
  274. self._run_test(test_input, test_list_name)
  275. self._caller.post('finished_test_list', test_list_name)
  276. def _update_test_input(self, test_input):
  277. if test_input.reference_files is None:
  278. # Lazy initialization.
  279. test_input.reference_files = self._port.reference_files(test_input.test_name)
  280. if test_input.reference_files:
  281. test_input.should_run_pixel_test = True
  282. else:
  283. test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
  284. def _run_test(self, test_input, shard_name):
  285. self._batch_count += 1
  286. stop_when_done = False
  287. if self._batch_size > 0 and self._batch_count >= self._batch_size:
  288. self._batch_count = 0
  289. stop_when_done = True
  290. self._update_test_input(test_input)
  291. test_timeout_sec = self._timeout(test_input)
  292. start = time.time()
  293. self._caller.post('started_test', test_input, test_timeout_sec)
  294. result = self._run_test_with_timeout(test_input, test_timeout_sec, stop_when_done)
  295. result.shard_name = shard_name
  296. result.worker_name = self._name
  297. result.total_run_time = time.time() - start
  298. result.test_number = self._num_tests
  299. self._num_tests += 1
  300. self._caller.post('finished_test', result)
  301. self._clean_up_after_test(test_input, result)
  302. def _run_test_chunks(self, test_input, shard_name):
  303. num_test = len(test_input)
  304. result = []
  305. self._batch_count += num_test
  306. stop_when_done = False
  307. if self._batch_size > 0 and self._batch_count >= self._batch_size:
  308. self._batch_count = 0
  309. stop_when_done = True
  310. for test in test_input:
  311. self._update_test_input(test)
  312. test_timeout_sec = (self._timeout(test_input[0]))*num_test
  313. start = time.time()
  314. for x in range(0, num_test):
  315. self._caller.post('started_test', test_input[x], test_timeout_sec)
  316. result, num_tests_run = self._run_multiple_test_with_timeout(test_input, test_timeout_sec, stop_when_done)
  317. # Handle the list of result objects.
  318. for x in range(0, num_tests_run):
  319. result[x].shard_name = shard_name
  320. result[x].worker_name = self._name
  321. result[x].total_run_time = time.time() - start
  322. result[x].test_number = self._num_tests
  323. self._caller.post('finished_test', result[x])
  324. self._clean_up_after_test(test_input[x], result[x])
  325. if num_tests_run < len(test_input):
  326. self._caller.post('update_running_test')
  327. self._num_tests = num_tests_run
  328. return num_tests_run
  329. def stop(self):
  330. _log.debug("%s cleaning up" % self._name)
  331. self._kill_driver()
  332. def _timeout(self, test_input):
  333. """Compute the appropriate timeout value for a test."""
  334. # The DumpRenderTree watchdog uses 2.5x the timeout; we want to be
  335. # larger than that. We also add a little more padding if we're
  336. # running tests in a separate thread.
  337. #
  338. # Note that we need to convert the test timeout from a
  339. # string value in milliseconds to a float for Python.
  340. driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
  341. if not self._options.run_singly:
  342. return driver_timeout_sec
  343. thread_padding_sec = 1.0
  344. thread_timeout_sec = driver_timeout_sec + thread_padding_sec
  345. return thread_timeout_sec
  346. def _kill_driver(self):
  347. # Be careful about how and when we kill the driver; if driver.stop()
  348. # raises an exception, this routine may get re-entered via __del__.
  349. driver = self._driver
  350. self._driver = None
  351. if driver:
  352. _log.debug("%s killing driver" % self._name)
  353. driver.stop()
  354. def _run_test_with_timeout(self, test_input, timeout, stop_when_done):
  355. if self._options.run_singly:
  356. return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
  357. return self._run_test_in_this_thread(test_input, stop_when_done)
  358. def _run_multiple_test_with_timeout(self, test_input, timeout, stop_when_done):
  359. # Handling running a chunk of tests with WebKitTestRunnerOrbis
  360. # Currently this is not supported with --run-singly option
  361. # option --chunk-size=<number>
  362. if self._driver and self._driver.has_crashed():
  363. self._kill_driver()
  364. if not self._driver:
  365. self._driver = self._port.create_driver(self._worker_number)
  366. return self._run_multiple_tests(self._driver, test_input, stop_when_done)
  367. def _clean_up_after_test(self, test_input, result):
  368. test_name = test_input.test_name
  369. if result.failures:
  370. # Check and kill DumpRenderTree if we need to.
  371. if any([f.driver_needs_restart() for f in result.failures]):
  372. self._kill_driver()
  373. # Reset the batch count since the shell just bounced.
  374. self._batch_count = 0
  375. # Print the error message(s).
  376. _log.debug("%s %s failed:" % (self._name, test_name))
  377. for f in result.failures:
  378. _log.debug("%s %s" % (self._name, f.message()))
  379. elif result.type == test_expectations.SKIP:
  380. _log.debug("%s %s skipped" % (self._name, test_name))
  381. else:
  382. _log.debug("%s %s passed" % (self._name, test_name))
  383. def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
  384. """Run a test in a separate thread, enforcing a hard time limit.
  385. Since we can only detect the termination of a thread, not any internal
  386. state or progress, we can only run per-test timeouts when running test
  387. files singly.
  388. Args:
  389. test_input: Object containing the test filename and timeout
  390. thread_timeout_sec: time to wait before killing the driver process.
  391. Returns:
  392. A TestResult
  393. """
  394. worker = self
  395. driver = self._port.create_driver(self._worker_number)
  396. class SingleTestThread(threading.Thread):
  397. def __init__(self):
  398. threading.Thread.__init__(self)
  399. self.result = None
  400. def run(self):
  401. self.result = worker._run_single_test(driver, test_input, stop_when_done)
  402. thread = SingleTestThread()
  403. thread.start()
  404. thread.join(thread_timeout_sec)
  405. result = thread.result
  406. failures = []
  407. if thread.isAlive():
  408. # If join() returned with the thread still running, the
  409. # DumpRenderTree is completely hung and there's nothing
  410. # more we can do with it. We have to kill all the
  411. # DumpRenderTrees to free it up. If we're running more than
  412. # one DumpRenderTree thread, we'll end up killing the other
  413. # DumpRenderTrees too, introducing spurious crashes. We accept
  414. # that tradeoff in order to avoid losing the rest of this
  415. # thread's results.
  416. _log.error('Test thread hung: killing all DumpRenderTrees')
  417. failures = [test_failures.FailureTimeout()]
  418. driver.stop()
  419. if not result:
  420. result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
  421. return result
  422. def _run_test_in_this_thread(self, test_input, stop_when_done):
  423. """Run a single test file using a shared DumpRenderTree process.
  424. Args:
  425. test_input: Object containing the test filename, uri and timeout
  426. Returns: a TestResult object.
  427. """
  428. if self._driver and self._driver.has_crashed():
  429. self._kill_driver()
  430. if not self._driver:
  431. self._driver = self._port.create_driver(self._worker_number)
  432. return self._run_single_test(self._driver, test_input, stop_when_done)
  433. def _run_single_test(self, driver, test_input, stop_when_done):
  434. return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
  435. self._name, driver, test_input, stop_when_done)
  436. def _run_multiple_tests(self, driver, test_input, stop_when_done):
  437. return multiple_test_runner.run_multiple_tests(self._port, self._options, self._results_directory,
  438. self._name, driver, test_input, stop_when_done)
  439. class TestShard(object):
  440. """A test shard is a named list of TestInputs."""
  441. def __init__(self, name, test_inputs):
  442. self.name = name
  443. self.test_inputs = test_inputs
  444. self.requires_lock = test_inputs[0].requires_lock
  445. def __repr__(self):
  446. return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock)
  447. def __eq__(self, other):
  448. return self.name == other.name and self.test_inputs == other.test_inputs
  449. class Sharder(object):
  450. def __init__(self, test_split_fn, max_locked_shards):
  451. self._split = test_split_fn
  452. self._max_locked_shards = max_locked_shards
  453. def shard_tests(self, test_inputs, num_workers, fully_parallel):
  454. """Groups tests into batches.
  455. This helps ensure that tests that depend on each other (aka bad tests!)
  456. continue to run together as most cross-tests dependencies tend to
  457. occur within the same directory.
  458. Return:
  459. Two list of TestShards. The first contains tests that must only be
  460. run under the server lock, the second can be run whenever.
  461. """
  462. # FIXME: Move all of the sharding logic out of manager into its
  463. # own class or module. Consider grouping it with the chunking logic
  464. # in prepare_lists as well.
  465. if num_workers == 1:
  466. return self._shard_in_two(test_inputs)
  467. elif fully_parallel:
  468. return self._shard_every_file(test_inputs)
  469. return self._shard_by_directory(test_inputs, num_workers)
  470. def _shard_in_two(self, test_inputs):
  471. """Returns two lists of shards, one with all the tests requiring a lock and one with the rest.
  472. This is used when there's only one worker, to minimize the per-shard overhead."""
  473. locked_inputs = []
  474. unlocked_inputs = []
  475. for test_input in test_inputs:
  476. if test_input.requires_lock:
  477. locked_inputs.append(test_input)
  478. else:
  479. unlocked_inputs.append(test_input)
  480. locked_shards = []
  481. unlocked_shards = []
  482. if locked_inputs:
  483. locked_shards = [TestShard('locked_tests', locked_inputs)]
  484. if unlocked_inputs:
  485. unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)]
  486. return locked_shards, unlocked_shards
  487. def _shard_every_file(self, test_inputs):
  488. """Returns two lists of shards, each shard containing a single test file.
  489. This mode gets maximal parallelism at the cost of much higher flakiness."""
  490. locked_shards = []
  491. unlocked_shards = []
  492. for test_input in test_inputs:
  493. # Note that we use a '.' for the shard name; the name doesn't really
  494. # matter, and the only other meaningful value would be the filename,
  495. # which would be really redundant.
  496. if test_input.requires_lock:
  497. locked_shards.append(TestShard('.', [test_input]))
  498. else:
  499. unlocked_shards.append(TestShard('.', [test_input]))
  500. return locked_shards, unlocked_shards
  501. def _shard_by_directory(self, test_inputs, num_workers):
  502. """Returns two lists of shards, each shard containing all the files in a directory.
  503. This is the default mode, and gets as much parallelism as we can while
  504. minimizing flakiness caused by inter-test dependencies."""
  505. locked_shards = []
  506. unlocked_shards = []
  507. tests_by_dir = {}
  508. # FIXME: Given that the tests are already sorted by directory,
  509. # we can probably rewrite this to be clearer and faster.
  510. for test_input in test_inputs:
  511. directory = self._split(test_input.test_name)[0]
  512. tests_by_dir.setdefault(directory, [])
  513. tests_by_dir[directory].append(test_input)
  514. for directory, test_inputs in tests_by_dir.iteritems():
  515. shard = TestShard(directory, test_inputs)
  516. if test_inputs[0].requires_lock:
  517. locked_shards.append(shard)
  518. else:
  519. unlocked_shards.append(shard)
  520. # Sort the shards by directory name.
  521. locked_shards.sort(key=lambda shard: shard.name)
  522. unlocked_shards.sort(key=lambda shard: shard.name)
  523. # Put a ceiling on the number of locked shards, so that we
  524. # don't hammer the servers too badly.
  525. # FIXME: For now, limit to one shard or set it
  526. # with the --max-locked-shards. After testing to make sure we
  527. # can handle multiple shards, we should probably do something like
  528. # limit this to no more than a quarter of all workers, e.g.:
  529. # return max(math.ceil(num_workers / 4.0), 1)
  530. return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'),
  531. unlocked_shards)
  532. def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix):
  533. """Takes a list of shards and redistributes the tests into no more
  534. than |max_new_shards| new shards."""
  535. # This implementation assumes that each input shard only contains tests from a
  536. # single directory, and that tests in each shard must remain together; as a
  537. # result, a given input shard is never split between output shards.
  538. #
  539. # Each output shard contains the tests from one or more input shards and
  540. # hence may contain tests from multiple directories.
  541. def divide_and_round_up(numerator, divisor):
  542. return int(math.ceil(float(numerator) / divisor))
  543. def extract_and_flatten(shards):
  544. test_inputs = []
  545. for shard in shards:
  546. test_inputs.extend(shard.test_inputs)
  547. return test_inputs
  548. def split_at(seq, index):
  549. return (seq[:index], seq[index:])
  550. num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards)
  551. new_shards = []
  552. remaining_shards = old_shards
  553. while remaining_shards:
  554. some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new)
  555. new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards)))
  556. return new_shards