tests_management.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2013 The Distro Tracker Developers
  3. # See the COPYRIGHT file at the top-level directory of this distribution and
  4. # at http://deb.li/DTAuthors
  5. #
  6. # This file is part of Distro Tracker. It is subject to the license terms
  7. # in the LICENSE file found in the top-level directory of this
  8. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  9. # including this file, may be copied, modified, propagated, or distributed
  10. # except according to the terms contained in the LICENSE file.
  11. """
  12. Tests for the Distro Tracker core management commands.
  13. """
  14. from __future__ import unicode_literals
  15. from django.utils.six.moves import mock
  16. from django.core.management import call_command
  17. from distro_tracker.accounts.models import User
  18. from distro_tracker.accounts.models import UserEmail
  19. from distro_tracker.core.models import EmailNews
  20. from distro_tracker.core.models import EmailSettings
  21. from distro_tracker.core.models import News
  22. from distro_tracker.core.models import SourcePackageName
  23. from distro_tracker.core.models import Subscription
  24. from distro_tracker.core.utils import message_from_bytes
  25. from distro_tracker.test import SimpleTestCase
  26. from distro_tracker.test import TestCase
  27. class RunTaskManagementCommandTest(SimpleTestCase):
  28. """
  29. Test for the :mod:`distro_tracker.core.management.commands.tracker_run_task`
  30. management command.
  31. """
  32. def run_command(self, tasks, **kwargs):
  33. call_command('tracker_run_task', *tasks, **kwargs)
  34. @mock.patch(
  35. 'distro_tracker.core.management.commands.tracker_run_task.run_task')
  36. def test_runs_all(self, mock_run_task):
  37. """
  38. Tests that the management command calls the
  39. :func:`run_task <distro_tracker.core.tasks.run_task>` function for each
  40. given task name.
  41. """
  42. self.run_command(['TaskName1', 'TaskName2'])
  43. # The run task was called only for the given commands
  44. self.assertEqual(2, mock_run_task.call_count)
  45. mock_run_task.assert_any_call('TaskName1', None)
  46. mock_run_task.assert_any_call('TaskName2', None)
  47. @mock.patch(
  48. 'distro_tracker.core.management.commands.tracker_run_task.run_task')
  49. def test_passes_force_flag(self, mock_run_task):
  50. """
  51. Tests that the management command passes the force flag to the task
  52. invocations when it is given.
  53. """
  54. self.run_command(['TaskName1'], force=True)
  55. mock_run_task.assert_called_with('TaskName1', {
  56. 'force_update': True,
  57. })
  58. @mock.patch('distro_tracker.core.tasks.import_all_tasks')
  59. @mock.patch('distro_tracker.core.management.commands.'
  60. 'tracker_run_all_tasks.run_all_tasks')
  61. class RunAllTasksTests(SimpleTestCase):
  62. """
  63. Test for the :mod:`distro_tracker.core.management.commands.tracker_run_task`
  64. management command.
  65. """
  66. def run_command(self, *args, **kwargs):
  67. call_command('tracker_run_all_tasks', *args, **kwargs)
  68. def test_runs_all(self, mock_run_all_tasks, *args, **kwargs):
  69. """
  70. Tests that the management command calls the
  71. :func:`run_task <distro_tracker.core.tasks.run_task>` function for each
  72. given task name.
  73. """
  74. self.run_command()
  75. # The run task was called only for the given commands
  76. mock_run_all_tasks.assert_called_once_with(None)
  77. def test_passes_force_flag(self, mock_run_all_tasks, *args, **kwargs):
  78. """
  79. Tests that the management command passes the force flag to the task
  80. invocations when it is given.
  81. """
  82. self.run_command(force=True)
  83. mock_run_all_tasks.assert_called_once_with({
  84. 'force_update': True,
  85. })
  86. class UpdateNewsSignaturesCommandTest(TestCase):
  87. """
  88. Tests for the
  89. :mod:`distro_tracker.core.management.commands.tracker_update_news_signatures`
  90. management command.
  91. """
  92. def setUp(self):
  93. self.package = SourcePackageName.objects.create(name='dummy-package')
  94. def test_signatures_added(self):
  95. """
  96. Tests that signatures are correctly added to the news which previously
  97. didn't have any, despite having signed content.
  98. """
  99. # Set up news based on a signed message.
  100. signed_news = []
  101. unsigned_news = []
  102. self.import_key_into_keyring('key1.pub')
  103. # The content of the test news item is found in a file
  104. file_path = self.get_test_data_path(
  105. 'signed-message-quoted-printable')
  106. with open(file_path, 'rb') as f:
  107. content = f.read()
  108. expected_name = 'PTS Tests'
  109. expected_email = 'fake-address@domain.com'
  110. # The first signed news has the same content as what is found
  111. # the signed test file.
  112. signed_news.append(EmailNews.objects.create_email_news(
  113. message=message_from_bytes(content),
  114. package=self.package))
  115. # For the second one, add some text after the signature: this
  116. # should still mean that the correct signature can be extracted!
  117. signed_news.append(EmailNews.objects.create_email_news(
  118. message=message_from_bytes(content + b'\nMore content'),
  119. package=self.package))
  120. # Set up some unsigned news.
  121. unsigned_news.append(EmailNews.objects.create_email_news(
  122. message=message_from_bytes(b'Subject: Hi\n\nPayload.'),
  123. package=self.package))
  124. # A non-email based news item
  125. unsigned_news.append(News.objects.create(
  126. package=self.package,
  127. content="Some content.."
  128. ))
  129. # Make sure that the signed news do not have associated
  130. # signature information
  131. for signed in signed_news:
  132. signed.signed_by.clear()
  133. # Run the command
  134. call_command("tracker_update_news_signatures")
  135. # The signed news items have associated signature information
  136. for signed in signed_news:
  137. self.assertEqual(1, signed.signed_by.count())
  138. signer = signed.signed_by.all()[0]
  139. # The signature is actually correct too?
  140. self.assertEqual(expected_name, signer.name)
  141. self.assertEqual(expected_email, signer.email)
  142. # The unsigned messages still do not have any signature info
  143. for unsigned in unsigned_news:
  144. self.assertEqual(0, unsigned.signed_by.count())
  145. class FixDatabaseCommandTests(TestCase):
  146. def setUp(self):
  147. self.email = 'user@example.net'
  148. self.alt_email = self.email.capitalize()
  149. self.user_email = UserEmail.objects.create(email=self.email)
  150. self.alt_user_email = UserEmail.objects.create(email=self.alt_email)
  151. self.subscribe(self.user_email, 'pkg-1')
  152. self.subscribe(self.alt_user_email, 'pkg-2')
  153. def subscribe(self, email, package):
  154. # Special subscribe method which avoids the case insensitive lookup
  155. pkg, _ = SourcePackageName.objects.get_or_create(name=package)
  156. user_email, _ = UserEmail.objects.get_or_create(
  157. email__exact=email,
  158. defaults={'email': email}
  159. )
  160. es, _ = EmailSettings.objects.get_or_create(user_email=user_email)
  161. Subscription.objects.create(package=pkg, email_settings=es)
  162. def test_management_command_drop_duplicates(self):
  163. call_command('tracker_fix_database')
  164. self.assertEqual(UserEmail.objects.filter(email=self.alt_email).count(),
  165. 0)
  166. def test_management_command_merges_users(self):
  167. user = User(main_email=self.alt_email)
  168. user.save()
  169. user.emails.add(self.alt_user_email)
  170. self.alt_user_email.refresh_from_db()
  171. self.assertIsNotNone(self.alt_user_email.user)
  172. self.user_email.refresh_from_db()
  173. self.assertIsNone(self.user_email.user)
  174. call_command('tracker_fix_database')
  175. self.user_email.refresh_from_db()
  176. self.assertEqual(self.user_email.user.pk, user.pk)
  177. def test_management_command_merges_subscriptions(self):
  178. call_command('tracker_fix_database')
  179. subscriptions = self.user_email.emailsettings.subscription_set
  180. self.assertEqual(subscriptions.filter(package__name='pkg-1').count(), 1)
  181. self.assertEqual(subscriptions.filter(package__name='pkg-2').count(), 1)
  182. def test_management_command_merges_subscriptions_ignores_existing(self):
  183. """
  184. Ensure that the subscription merge handles properly the case where
  185. the same subscription is present on both UserEmail
  186. """
  187. self.subscribe(self.user_email, 'pkg-2')
  188. call_command('tracker_fix_database')
  189. subscriptions = self.user_email.emailsettings.subscription_set
  190. self.assertEqual(subscriptions.filter(package__name='pkg-2').count(), 1)
  191. def test_management_command_no_email_settings_on_main_email(self):
  192. # Drop the EmailSettings on the main UserEmail
  193. self.user_email.emailsettings.delete()
  194. self.user_email = UserEmail.objects.get(email=self.email)
  195. with self.assertRaises(Exception):
  196. self.user_email.emailsettings
  197. call_command('tracker_fix_database')
  198. # Ensure migrations happened
  199. self.user_email = UserEmail.objects.get(email=self.email)
  200. self.assertIsNotNone(self.user_email.emailsettings)
  201. subscriptions = self.user_email.emailsettings.subscription_set
  202. self.assertEqual(subscriptions.filter(package__name='pkg-2').count(), 1)
  203. def test_management_command_no_email_settings_on_alt_email(self):
  204. # Drop the EmailSettings on the main UserEmail
  205. self.alt_user_email.emailsettings.delete()
  206. self.alt_user_email = UserEmail.objects.get(email=self.alt_email)
  207. with self.assertRaises(Exception):
  208. self.alt_user_email.emailsettings
  209. call_command('tracker_fix_database')
  210. def test_management_command_no_email_settings_at_all(self):
  211. self.user_email.emailsettings.delete()
  212. self.alt_user_email.emailsettings.delete()
  213. call_command('tracker_fix_database')