server_process.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. # Copyright (C) 2010 Google Inc. All rights reserved.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions are
  5. # met:
  6. #
  7. # * Redistributions of source code must retain the above copyright
  8. # notice, this list of conditions and the following disclaimer.
  9. # * Redistributions in binary form must reproduce the above
  10. # copyright notice, this list of conditions and the following disclaimer
  11. # in the documentation and/or other materials provided with the
  12. # distribution.
  13. # * Neither the Google name nor the names of its
  14. # contributors may be used to endorse or promote products derived from
  15. # this software without specific prior written permission.
  16. #
  17. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  18. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  19. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  20. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  21. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  22. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  23. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  24. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  25. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  27. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. """Package that implements the ServerProcess wrapper class"""
  29. import errno
  30. import logging
  31. import signal
  32. import sys
  33. import time
  34. # Note that although win32 python does provide an implementation of
  35. # the win32 select API, it only works on sockets, and not on the named pipes
  36. # used by subprocess, so we have to use the native APIs directly.
  37. if sys.platform == 'win32':
  38. import msvcrt
  39. import win32pipe
  40. import win32file
  41. else:
  42. import fcntl
  43. import os
  44. import select
  45. from webkitpy.common.system.executive import ScriptError
  46. _log = logging.getLogger(__name__)
  47. class ServerProcess(object):
  48. """This class provides a wrapper around a subprocess that
  49. implements a simple request/response usage model. The primary benefit
  50. is that reading responses takes a deadline, so that we don't ever block
  51. indefinitely. The class also handles transparently restarting processes
  52. as necessary to keep issuing commands."""
  53. def __init__(self, port_obj, name, cmd, env=None, universal_newlines=False, treat_no_data_as_crash=False):
  54. self._port = port_obj
  55. self._name = name # Should be the command name (e.g. DumpRenderTree, ImageDiff)
  56. self._cmd = cmd
  57. self._env = env
  58. # Set if the process outputs non-standard newlines like '\r\n' or '\r'.
  59. # Don't set if there will be binary data or the data must be ASCII encoded.
  60. self._universal_newlines = universal_newlines
  61. self._treat_no_data_as_crash = treat_no_data_as_crash
  62. self._host = self._port.host
  63. self._pid = None
  64. self._reset()
  65. # See comment in imports for why we need the win32 APIs and can't just use select.
  66. # FIXME: there should be a way to get win32 vs. cygwin from platforminfo.
  67. self._use_win32_apis = sys.platform == 'win32'
  68. def name(self):
  69. return self._name
  70. def pid(self):
  71. return self._pid
  72. def _reset(self):
  73. if getattr(self, '_proc', None):
  74. if self._proc.stdin:
  75. self._proc.stdin.close()
  76. self._proc.stdin = None
  77. if self._proc.stdout:
  78. self._proc.stdout.close()
  79. self._proc.stdout = None
  80. if self._proc.stderr:
  81. self._proc.stderr.close()
  82. self._proc.stderr = None
  83. self._proc = None
  84. self._output = str() # bytesarray() once we require Python 2.6
  85. self._error = str() # bytesarray() once we require Python 2.6
  86. self._crashed = False
  87. self.timed_out = False
  88. def process_name(self):
  89. return self._name
  90. def _start(self):
  91. if self._proc:
  92. raise ValueError("%s already running" % self._name)
  93. self._reset()
  94. # close_fds is a workaround for http://bugs.python.org/issue2320
  95. close_fds = not self._host.platform.is_win()
  96. self._proc = self._host.executive.popen(self._cmd, stdin=self._host.executive.PIPE,
  97. stdout=self._host.executive.PIPE,
  98. stderr=self._host.executive.PIPE,
  99. close_fds=close_fds,
  100. env=self._env,
  101. universal_newlines=self._universal_newlines)
  102. self._pid = self._proc.pid
  103. self._port.find_system_pid(self.name(), self._pid)
  104. fd = self._proc.stdout.fileno()
  105. if not self._use_win32_apis:
  106. fl = fcntl.fcntl(fd, fcntl.F_GETFL)
  107. fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
  108. fd = self._proc.stderr.fileno()
  109. fl = fcntl.fcntl(fd, fcntl.F_GETFL)
  110. fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
  111. def _handle_possible_interrupt(self):
  112. """This routine checks to see if the process crashed or exited
  113. because of a keyboard interrupt and raises KeyboardInterrupt
  114. accordingly."""
  115. # FIXME: Linux and Mac set the returncode to -signal.SIGINT if a
  116. # subprocess is killed with a ctrl^C. Previous comments in this
  117. # routine said that supposedly Windows returns 0xc000001d, but that's not what
  118. # -1073741510 evaluates to. Figure out what the right value is
  119. # for win32 and cygwin here ...
  120. if self._proc.returncode in (-1073741510, -signal.SIGINT):
  121. raise KeyboardInterrupt
  122. def poll(self):
  123. """Check to see if the underlying process is running; returns None
  124. if it still is (wrapper around subprocess.poll)."""
  125. if self._proc:
  126. return self._proc.poll()
  127. return None
  128. def write(self, bytes):
  129. """Write a request to the subprocess. The subprocess is (re-)start()'ed
  130. if is not already running."""
  131. if not self._proc:
  132. self._start()
  133. try:
  134. self._proc.stdin.write(bytes)
  135. except IOError, e:
  136. self.stop(0.0)
  137. # stop() calls _reset(), so we have to set crashed to True after calling stop().
  138. self._crashed = True
  139. def _pop_stdout_line_if_ready(self):
  140. index_after_newline = self._output.find('\n') + 1
  141. if index_after_newline > 0:
  142. return self._pop_output_bytes(index_after_newline)
  143. return None
  144. def _pop_stderr_line_if_ready(self):
  145. index_after_newline = self._error.find('\n') + 1
  146. if index_after_newline > 0:
  147. return self._pop_error_bytes(index_after_newline)
  148. return None
  149. def pop_all_buffered_stderr(self):
  150. return self._pop_error_bytes(len(self._error))
  151. def read_stdout_line(self, deadline):
  152. return self._read(deadline, self._pop_stdout_line_if_ready)
  153. def read_stderr_line(self, deadline):
  154. return self._read(deadline, self._pop_stderr_line_if_ready)
  155. def read_either_stdout_or_stderr_line(self, deadline):
  156. def retrieve_bytes_from_buffers():
  157. stdout_line = self._pop_stdout_line_if_ready()
  158. if stdout_line:
  159. return stdout_line, None
  160. stderr_line = self._pop_stderr_line_if_ready()
  161. if stderr_line:
  162. return None, stderr_line
  163. return None # Instructs the caller to keep waiting.
  164. return_value = self._read(deadline, retrieve_bytes_from_buffers)
  165. # FIXME: This is a bit of a hack around the fact that _read normally only returns one value, but this caller wants it to return two.
  166. if return_value is None:
  167. return None, None
  168. return return_value
  169. def read_stdout(self, deadline, size):
  170. if size <= 0:
  171. raise ValueError('ServerProcess.read() called with a non-positive size: %d ' % size)
  172. def retrieve_bytes_from_stdout_buffer():
  173. if len(self._output) >= size:
  174. return self._pop_output_bytes(size)
  175. return None
  176. return self._read(deadline, retrieve_bytes_from_stdout_buffer)
  177. def _log(self, message):
  178. # This is a bit of a hack, but we first log a blank line to avoid
  179. # messing up the master process's output.
  180. _log.info('')
  181. _log.info(message)
  182. def _handle_timeout(self):
  183. self.timed_out = True
  184. self._port.sample_process(self._name, self._proc.pid)
  185. def _split_string_after_index(self, string, index):
  186. return string[:index], string[index:]
  187. def _pop_output_bytes(self, bytes_count):
  188. output, self._output = self._split_string_after_index(self._output, bytes_count)
  189. return output
  190. def _pop_error_bytes(self, bytes_count):
  191. output, self._error = self._split_string_after_index(self._error, bytes_count)
  192. return output
  193. def _wait_for_data_and_update_buffers_using_select(self, deadline, stopping=False):
  194. if self._proc.stdout.closed or self._proc.stderr.closed:
  195. # If the process crashed and is using FIFOs, like Chromium Android, the
  196. # stdout and stderr pipes will be closed.
  197. return
  198. out_fd = self._proc.stdout.fileno()
  199. err_fd = self._proc.stderr.fileno()
  200. select_fds = (out_fd, err_fd)
  201. try:
  202. read_fds, _, _ = select.select(select_fds, [], select_fds, max(deadline - time.time(), 0))
  203. except select.error, e:
  204. # We can ignore EINVAL since it's likely the process just crashed and we'll
  205. # figure that out the next time through the loop in _read().
  206. if e.args[0] == errno.EINVAL:
  207. return
  208. raise
  209. try:
  210. # Note that we may get no data during read() even though
  211. # select says we got something; see the select() man page
  212. # on linux. I don't know if this happens on Mac OS and
  213. # other Unixen as well, but we don't bother special-casing
  214. # Linux because it's relatively harmless either way.
  215. if out_fd in read_fds:
  216. data = self._proc.stdout.read()
  217. if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
  218. self._crashed = True
  219. self._output += data
  220. if err_fd in read_fds:
  221. data = self._proc.stderr.read()
  222. if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
  223. self._crashed = True
  224. self._error += data
  225. except IOError, e:
  226. # We can ignore the IOErrors because we will detect if the subporcess crashed
  227. # the next time through the loop in _read()
  228. pass
  229. def _wait_for_data_and_update_buffers_using_win32_apis(self, deadline):
  230. # See http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
  231. # and http://docs.activestate.com/activepython/2.6/pywin32/modules.html
  232. # for documentation on all of these win32-specific modules.
  233. now = time.time()
  234. out_fh = msvcrt.get_osfhandle(self._proc.stdout.fileno())
  235. err_fh = msvcrt.get_osfhandle(self._proc.stderr.fileno())
  236. while (self._proc.poll() is None) and (now < deadline):
  237. output = self._non_blocking_read_win32(out_fh)
  238. error = self._non_blocking_read_win32(err_fh)
  239. if output or error:
  240. if output:
  241. self._output += output
  242. if error:
  243. self._error += error
  244. return
  245. time.sleep(0.01)
  246. now = time.time()
  247. return
  248. def _non_blocking_read_win32(self, handle):
  249. try:
  250. _, avail, _ = win32pipe.PeekNamedPipe(handle, 0)
  251. if avail > 0:
  252. _, buf = win32file.ReadFile(handle, avail, None)
  253. return buf
  254. except Exception, e:
  255. if e[0] not in (109, errno.ESHUTDOWN): # 109 == win32 ERROR_BROKEN_PIPE
  256. raise
  257. return None
  258. def has_crashed(self):
  259. if not self._crashed and self.poll():
  260. self._crashed = True
  261. self._handle_possible_interrupt()
  262. return self._crashed
  263. # This read function is a bit oddly-designed, as it polls both stdout and stderr, yet
  264. # only reads/returns from one of them (buffering both in local self._output/self._error).
  265. # It might be cleaner to pass in the file descriptor to poll instead.
  266. def _read(self, deadline, fetch_bytes_from_buffers_callback):
  267. while True:
  268. if self.has_crashed():
  269. return None
  270. if time.time() > deadline:
  271. self._handle_timeout()
  272. return None
  273. bytes = fetch_bytes_from_buffers_callback()
  274. if bytes is not None:
  275. return bytes
  276. if self._use_win32_apis:
  277. self._wait_for_data_and_update_buffers_using_win32_apis(deadline)
  278. else:
  279. self._wait_for_data_and_update_buffers_using_select(deadline)
  280. def start(self):
  281. if not self._proc:
  282. self._start()
  283. def stop(self, timeout_secs=3.0):
  284. if not self._proc:
  285. return (None, None)
  286. # Only bother to check for leaks or stderr if the process is still running.
  287. if self.poll() is None:
  288. self._port.check_for_leaks(self.name(), self.pid())
  289. now = time.time()
  290. if self._proc.stdin:
  291. self._proc.stdin.close()
  292. self._proc.stdin = None
  293. killed = False
  294. if timeout_secs:
  295. deadline = now + timeout_secs
  296. while self._proc.poll() is None and time.time() < deadline:
  297. time.sleep(0.01)
  298. if self._proc.poll() is None:
  299. _log.warning('stopping %s(pid %d) timed out, killing it' % (self._name, self._proc.pid))
  300. if self._proc.poll() is None:
  301. self._kill()
  302. killed = True
  303. _log.debug('killed pid %d' % self._proc.pid)
  304. # read any remaining data on the pipes and return it.
  305. if not killed:
  306. if self._use_win32_apis:
  307. self._wait_for_data_and_update_buffers_using_win32_apis(now)
  308. else:
  309. self._wait_for_data_and_update_buffers_using_select(now, stopping=True)
  310. out, err = self._output, self._error
  311. self._reset()
  312. return (out, err)
  313. def kill(self):
  314. self.stop(0.0)
  315. def _kill(self):
  316. self._host.executive.kill_process(self._proc.pid)
  317. if self._proc.poll() is not None:
  318. self._proc.wait()
  319. def replace_outputs(self, stdout, stderr):
  320. assert self._proc
  321. if stdout:
  322. self._proc.stdout.close()
  323. self._proc.stdout = stdout
  324. if stderr:
  325. self._proc.stderr.close()
  326. self._proc.stderr = stderr