test_conversion.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import os.path
  4. import shutil
  5. import sys
  6. import tempfile
  7. import time
  8. from lvc import video
  9. from lvc import converter
  10. from lvc import conversion
  11. import base
  12. class FakeConverterInfo(converter.ConverterInfo):
  13. extension = 'fake'
  14. def get_executable(self):
  15. return sys.executable
  16. def get_arguments(self, video, output):
  17. return ['-u', os.path.join(
  18. os.path.dirname(__file__), 'testdata', 'fake_converter.py'),
  19. video.filename, output]
  20. def process_status_line(self, video, line):
  21. return json.loads(line)
  22. class ConversionManagerTest(base.Test):
  23. def setUp(self):
  24. base.Test.setUp(self)
  25. self.converter = FakeConverterInfo('Fake')
  26. self.manager = conversion.ConversionManager()
  27. self.temp_dir = tempfile.mkdtemp()
  28. self.changes = []
  29. def tearDown(self):
  30. base.Test.tearDown(self)
  31. shutil.rmtree(self.temp_dir, ignore_errors=True)
  32. def changed(self, conversion):
  33. self.changes.append(
  34. {'status': conversion.status,
  35. 'duration': conversion.duration,
  36. 'progress': conversion.progress,
  37. 'eta': conversion.eta
  38. })
  39. def spin(self, timeout):
  40. finish_by = time.time() + timeout
  41. while time.time() < finish_by and self.manager.running:
  42. self.manager.check_notifications()
  43. time.sleep(0.1)
  44. def start_conversion(self, filename, timeout=3):
  45. vf = video.VideoFile(filename)
  46. c = self.manager.start_conversion(vf, self.converter)
  47. # XXX HACK: for test harness change the output directory to be the
  48. # same as the input file (so we can nuke in one go)
  49. c.output_dir = os.path.dirname(filename)
  50. c.output = os.path.join(c.output_dir,
  51. self.converter.get_output_filename(vf))
  52. c.listen(self.changed)
  53. self.assertTrue(self.manager.running)
  54. self.assertTrue(c in self.manager.in_progress)
  55. self.spin(timeout)
  56. self.assertFalse(self.manager.running)
  57. self.assertFalse(os.path.exists(c.temp_output))
  58. return c
  59. def test_initial(self):
  60. self.assertEqual(self.manager.notify_queue, set())
  61. self.assertEqual(self.manager.in_progress, set())
  62. self.assertFalse(self.manager.running)
  63. def test_conversion(self):
  64. filename = os.path.join(self.temp_dir, 'webm-0.webm')
  65. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  66. filename)
  67. c = self.start_conversion(filename)
  68. self.assertEqual(c.status, 'finished')
  69. self.assertEqual(c.progress, c.duration)
  70. self.assertEqual(c.progress_percent, 1.0)
  71. self.assertTrue(os.path.exists(c.output))
  72. self.assertEqual(file(c.output).read(), 'blank')
  73. self.assertEqual(self.changes, [
  74. {'status': 'converting', 'duration': 5.0, 'eta': 5.0,
  75. 'progress': 0.0},
  76. {'status': 'converting', 'duration': 5.0, 'eta': 4.0,
  77. 'progress': 1.0},
  78. {'status': 'converting', 'duration': 5.0, 'eta': 3.0,
  79. 'progress': 2.0},
  80. {'status': 'converting', 'duration': 5.0, 'eta': 2.0,
  81. 'progress': 3.0},
  82. {'status': 'converting', 'duration': 5.0, 'eta': 1.0,
  83. 'progress': 4.0},
  84. {'status': 'finished', 'duration': 5.0, 'eta': 0.0,
  85. 'progress': 5.0}
  86. ])
  87. def test_conversion_with_error(self):
  88. filename = os.path.join(self.temp_dir, 'error.webm')
  89. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  90. filename)
  91. c = self.start_conversion(filename)
  92. self.assertFalse(os.path.exists(c.output))
  93. self.assertEqual(c.status, 'failed')
  94. self.assertEqual(c.error, 'test error')
  95. def test_conversion_with_missing_executable(self):
  96. missing = sys.executable + '.does-not-exist'
  97. self.converter.get_executable = lambda: missing
  98. filename = os.path.join(self.temp_dir, 'webm-0.webm')
  99. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  100. filename)
  101. c = self.start_conversion(filename)
  102. self.assertEqual(c.status, 'failed')
  103. self.assertEqual(c.error, '%r does not exist' % missing)
  104. self.assertFalse(os.path.exists(c.output))
  105. def test_multiple_simultaneous_conversions(self):
  106. filename = os.path.join(self.temp_dir, 'webm-0.webm')
  107. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  108. filename)
  109. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  110. filename + '2')
  111. vf = video.VideoFile(filename)
  112. vf2 = video.VideoFile(filename + '2')
  113. c = self.manager.start_conversion(vf, self.converter)
  114. c2 = self.manager.start_conversion(vf2, self.converter)
  115. self.assertEqual(len(self.manager.in_progress), 2)
  116. # if they're linear, it should take < 5s
  117. self.spin(3)
  118. self.assertEqual(c.status, 'finished')
  119. self.assertEqual(c2.status, 'finished')
  120. def test_limit_simultaneous_conversions(self):
  121. self.manager.simultaneous = 1
  122. filename = os.path.join(self.temp_dir, 'webm-0.webm')
  123. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  124. filename)
  125. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  126. filename + '2')
  127. vf = video.VideoFile(filename)
  128. vf2 = video.VideoFile(filename + '2')
  129. c = self.manager.start_conversion(vf, self.converter)
  130. c2 = self.manager.start_conversion(vf2, self.converter)
  131. self.assertEqual(len(self.manager.in_progress), 1)
  132. self.assertEqual(len(self.manager.waiting), 1)
  133. self.spin(5)
  134. self.assertEqual(c.status, 'finished')
  135. self.assertEqual(c2.status, 'finished')
  136. def test_unicode_characters(self):
  137. for filename in (
  138. u'"TAKE2\'s" REHEARSAL човен поўны вуграмі',
  139. u'ところで早け',
  140. u'Lputefartøavål'):
  141. full_filename = os.path.join(self.temp_dir, filename)
  142. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  143. full_filename)
  144. c = self.start_conversion(full_filename)
  145. self.assertEqual(c.status, 'finished')
  146. self.assertTrue(filename in c.output, '%r not in %r' % (
  147. filename, c.output))
  148. def test_overwrite(self):
  149. filename = os.path.join(self.temp_dir, 'webm-0.webm')
  150. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  151. filename)
  152. c = self.start_conversion(filename)
  153. self.assertEqual(c.status, 'finished')
  154. c2 = self.start_conversion(filename)
  155. self.assertEqual(c2.status, 'finished')
  156. self.assertEqual(c2.output, c.output)
  157. def test_stop(self):
  158. filename = os.path.join(self.temp_dir, 'webm-0.webm')
  159. shutil.copyfile(os.path.join(self.testdata_dir, 'webm-0.webm'),
  160. filename)
  161. vf = video.VideoFile(filename)
  162. c = self.manager.start_conversion(vf, self.converter)
  163. time.sleep(0.5)
  164. c.stop()
  165. self.spin(1)
  166. self.assertEqual(c.status, 'canceled')
  167. self.assertEqual(c.error, 'manually stopped')