conversion.py 11 KB


  1. import collections
  2. import errno
  3. import os
  4. import time
  5. import tempfile
  6. import threading
  7. import shutil
  8. import logging
  9. from lvc import execute
  10. from lvc.utils import line_reader
  11. from lvc.video import get_thumbnail_synchronous
  12. from lvc.widgets import get_conversion_directory
  13. logger = logging.getLogger(__name__)
  14. class Conversion(object):
  15. def __init__(self, video, converter, manager, output_dir=None):
  16. self.video = video
  17. self.manager = manager
  18. if output_dir is None:
  19. output_dir = get_conversion_directory()
  20. self.output_dir = output_dir
  21. self.lines = []
  22. self.thread = None
  23. self.popen = None
  24. self.status = 'initialized'
  25. self.temp_output = None
  26. self.error = None
  27. self.started_at = None
  28. self.duration = None
  29. self.progress = None
  30. self.progress_percent = None
  31. self.create_thumbnail = False
  32. self.eta = None
  33. self.listeners = set()
  34. self.set_converter(converter)
  35. logger.info('created %r', self)
  36. def set_converter(self, converter):
  37. if self.status != 'initialized':
  38. raise RuntimeError("can't change converter after starting")
  39. self.converter = converter
  40. self.output = os.path.join(self.output_dir,
  41. converter.get_output_filename(self.video))
  42. def __repr__(self):
  43. return unicode(self)
  44. def __str__(self):
  45. return unicode(self).encode('utf8')
  46. def __unicode__(self):
  47. return u'<Conversion (%s) %r -> %r>' % (
  48. self.converter.name, self.video.filename, self.output)
  49. def listen(self, f):
  50. self.listeners.add(f)
  51. def unlisten(self, f):
  52. self.listeners.remove(f)
  53. def notify_listeners(self):
  54. self.manager.notify_queue.add(self)
  55. def run(self):
  56. logger.info('starting %r', self)
  57. try:
  58. self.temp_output = tempfile.mktemp(
  59. dir=os.path.dirname(self.output))
  60. except EnvironmentError as e:
  61. logger.exception('while creating temp file for %r',
  62. self.output)
  63. self.error = str(e)
  64. self.finalize()
  65. return
  66. logger.info('commandline: %r', ' '.join(
  67. self.get_subprocess_arguments(self.temp_output)))
  68. self.thread = threading.Thread(target=self._thread,
  69. name="Thread:%s" % (self,))
  70. self.thread.setDaemon(True)
  71. self.thread.start()
  72. def stop(self):
  73. logger.info('stopping %r', self)
  74. self.error = 'manually stopped'
  75. if self.popen is None:
  76. status = 'canceled'
  77. try:
  78. self.manager.remove(self)
  79. except ValueError:
  80. status = 'failed'
  81. logger.exception('not running and not waiting %s' % (self,))
  82. self.status = status
  83. return
  84. else:
  85. try:
  86. self.popen.kill()
  87. self.popen.wait()
  88. # set the status transition last, if we had hit an exception
  89. # then we will transition the next state to 'failed' in
  90. # finalize()
  91. self.status = 'canceled'
  92. except EnvironmentError as e:
  93. logger.exception('while stopping %s' % (self,))
  94. self.error = str(e)
  95. self.popen = None
  96. self.manager.conversion_finished(self)
  97. def _thread(self):
  98. try:
  99. commandline = self.get_subprocess_arguments(self.temp_output)
  100. self.popen = execute.Popen(commandline, bufsize=1)
  101. self.process_output()
  102. if self.popen:
  103. # if we stop the thread, we can get here after `.stop()`
  104. # finishes.
  105. self.popen.wait()
  106. except OSError as e:
  107. if e.errno == errno.ENOENT:
  108. self.error = '%r does not exist' % (
  109. self.converter.get_executable(),)
  110. else:
  111. logger.exception('OSError in %s' % (self.thread.name,))
  112. self.error = str(e)
  113. except Exception as e:
  114. logger.exception('in %s' % (self.thread.name,))
  115. self.error = str(e)
  116. if self.create_thumbnail:
  117. self.write_thumbnail_file()
  118. self.finalize()
  119. def write_thumbnail_file(self):
  120. try:
  121. self._write_thumbnail_file()
  122. except StandardError:
  123. logging.warn("Error writing thumbnail", exc_info=True)
  124. def _write_thumbnail_file(self):
  125. if self.video.audio_only:
  126. logging.warning("write_thumbnail_file: audio_only=True "
  127. "not writing thumbnail %s", self.video.filename)
  128. return
  129. output_basename = os.path.splitext(os.path.basename(self.output))[0]
  130. logging.info("td: %s ob: %s", self._get_thumbnail_dir(),
  131. output_basename)
  132. thumbnail_path = os.path.join(self._get_thumbnail_dir(),
  133. output_basename + '.png')
  134. logging.info("creating thumbnail: %s", thumbnail_path)
  135. width, height = self.converter.get_target_size(self.video)
  136. get_thumbnail_synchronous(self.video.filename, width, height,
  137. thumbnail_path)
  138. if os.path.exists(thumbnail_path):
  139. logging.info("thumbnail successful: %s", thumbnail_path)
  140. else:
  141. logging.warning("get_thumbnail_synchronous() succeeded, but the "
  142. "thumbnail file is missing!")
  143. def _get_thumbnail_dir(self):
  144. """Get the directory to store thumbnails in it.
  145. This method will create the directory if it doesn't exist
  146. """
  147. thumbnail_dir = os.path.join(self.output_dir, 'thumbnails')
  148. if not os.path.exists(thumbnail_dir):
  149. os.mkdir(thumbnail_dir)
  150. return thumbnail_dir
  151. def calc_progress_percent(self):
  152. if not self.duration:
  153. return 0.0
  154. if self.create_thumbnail:
  155. # assume that thumbnail creation takes as long as 2 seconds of
  156. # video processing
  157. effective_duration = self.duration + 2.0
  158. else:
  159. effective_duration = self.duration
  160. return self.progress / effective_duration
  161. def process_output(self):
  162. self.started_at = time.time()
  163. self.status = 'converting'
  164. # We use line_reader, rather than just iterating over the file object,
  165. # because iterating over the file object gives us all the lines when
  166. # the process ends, and we're looking for real-time updates.
  167. for line in line_reader(self.popen.stdout):
  168. self.lines.append(line) # for debugging, if needed
  169. try:
  170. status = self.converter.process_status_line(self.video, line)
  171. except StandardError:
  172. logging.warn("error in process_status_line()", exc_info=True)
  173. continue
  174. if status is None:
  175. continue
  176. updated = set()
  177. if 'finished' in status:
  178. self.error = status.get('error', None)
  179. break
  180. if 'duration' in status:
  181. updated.update(('duration', 'progress'))
  182. self.duration = float(status['duration'])
  183. if self.progress is None:
  184. self.progress = 0.0
  185. if 'progress' in status:
  186. updated.add('progress')
  187. self.progress = min(float(status['progress']),
  188. self.duration)
  189. if 'eta' in status:
  190. updated.add('eta')
  191. self.eta = float(status['eta'])
  192. if updated:
  193. self.progress_percent = self.calc_progress_percent()
  194. if 'eta' not in updated:
  195. if self.duration and 0 < self.progress_percent < 1.0:
  196. progress = self.progress_percent * 100
  197. elapsed = time.time() - self.started_at
  198. time_per_percent = elapsed / progress
  199. self.eta = float(
  200. time_per_percent * (100 - progress))
  201. else:
  202. self.eta = 0.0
  203. self.notify_listeners()
  204. def finalize(self):
  205. self.progress = self.duration
  206. self.progress_percent = 1.0
  207. self.eta = 0
  208. if self.error is None:
  209. self.status = 'staging'
  210. self.notify_listeners()
  211. try:
  212. self.converter.finalize(self.temp_output, self.output)
  213. except EnvironmentError as e:
  214. logger.exception('while trying to move %r to %r after %s',
  215. self.temp_output, self.output, self)
  216. self.error = str(e)
  217. self.status = 'failed'
  218. else:
  219. self.status = 'finished'
  220. else:
  221. if self.temp_output is not None:
  222. try:
  223. os.unlink(self.temp_output)
  224. except EnvironmentError:
  225. pass
  226. '''
  227. ignore errors removing temp files;
  228. they may not have been created
  229. '''
  230. if self.status != 'canceled':
  231. self.status = 'failed'
  232. if self.status != 'canceled':
  233. self.notify_listeners()
  234. logger.info('finished %r; status: %s', self, self.status)
  235. def get_subprocess_arguments(self, output):
  236. return ([self.converter.get_executable()] +
  237. list(self.converter.get_arguments(self.video, output)))
  238. class ConversionManager(object):
  239. def __init__(self, simultaneous=None):
  240. self.notify_queue = set()
  241. self.in_progress = set()
  242. self.waiting = collections.deque()
  243. self.simultaneous = simultaneous
  244. self.running = False
  245. self.create_thumbnails = False
  246. def get_conversion(self, video, converter, **kwargs):
  247. return Conversion(video, converter, self, **kwargs)
  248. def remove(self, conversion):
  249. self.waiting.remove(conversion)
  250. def start_conversion(self, video, converter):
  251. return self.run_conversion(self.get_conversion(video, converter))
  252. def run_conversion(self, conversion):
  253. if self.simultaneous is not None and len(self.in_progress) \
  254. >= self.simultaneous:
  255. self.waiting.append(conversion)
  256. else:
  257. self._start_conversion(conversion)
  258. self.running = True
  259. return conversion
  260. def _start_conversion(self, conversion):
  261. self.in_progress.add(conversion)
  262. conversion.create_thumbnail = self.create_thumbnails
  263. conversion.run()
  264. def check_notifications(self):
  265. if not self.running:
  266. # don't bother checking if we're not running
  267. return
  268. self.notify_queue, changed = set(), self.notify_queue
  269. for conversion in changed:
  270. if conversion.status in ('canceled', 'finished', 'failed'):
  271. self.conversion_finished(conversion)
  272. for listener in conversion.listeners:
  273. listener(conversion)
  274. def conversion_finished(self, conversion):
  275. self.in_progress.discard(conversion)
  276. while (self.waiting and self.simultaneous is not None and
  277. len(self.in_progress) < self.simultaneous):
  278. c = self.waiting.popleft()
  279. self._start_conversion(c)
  280. if not self.in_progress:
  281. self.running = False