tests_management_commands.py 23 KB


  1. # Copyright 2013-2015 The Distro Tracker Developers
  2. # See the COPYRIGHT file at the top-level directory of this distribution and
  3. # at http://deb.li/DTAuthors
  4. #
  5. # This file is part of Distro Tracker. It is subject to the license terms
  6. # in the LICENSE file found in the top-level directory of this
  7. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  8. # including this file, may be copied, modified, propagated, or distributed
  9. # except according to the terms contained in the LICENSE file.
  10. """
  11. Tests the management command of the :mod:`distro_tracker.mail` app.
  12. """
  13. from __future__ import unicode_literals
  14. from distro_tracker.test import TestCase
  15. from distro_tracker.mail.management.commands.tracker_control import (
  16. Command as ControlCommand)
  17. from distro_tracker.mail.management.commands.tracker_dispatch import (
  18. Command as DispatchCommand)
  19. from django.core.management import call_command
  20. from django.core.management.base import CommandError
  21. from distro_tracker.core.models import PackageName, UserEmail, EmailSettings
  22. from distro_tracker.core.models import Subscription, Keyword
  23. from distro_tracker.core.models import SourcePackageName, PseudoPackageName
  24. from django.conf import settings
  25. from django.core import mail
  26. from django.utils import six
  27. from django.utils import timezone
  28. from django.utils.encoding import force_bytes
  29. from django.utils.six.moves import mock
  30. from email.message import Message
  31. import io
  32. import json
  33. class CommandWithInputTestCase(TestCase):
  34. def build_input_message(self, text, target):
  35. msg = Message()
  36. msg['From'] = 'user@example.net'
  37. msg['To'] = '{}@{}'.format(target, settings.DISTRO_TRACKER_FQDN)
  38. msg['Delivered-To'] = msg['To']
  39. msg['Subject'] = 'Test message'
  40. msg.set_payload(text)
  41. self.input_message = msg
  42. return force_bytes(msg.as_string(), 'utf-8')
  43. def call_command(self, input_data, *args, **kwargs):
  44. cmd = self.command_class()
  45. cmd.input_file = io.BytesIO(input_data)
  46. cmd.handle(*args, **kwargs)
  47. def assert_is_input_message(self, msg):
  48. """
  49. Ensure passed message is the same as the message we have fed to the
  50. command
  51. """
  52. self.assertEqual(force_bytes(self.input_message.as_string(), 'utf-8'),
  53. force_bytes(msg.as_string(), 'utf-8'))
  54. class CommandWithOutputTestCase(TestCase):
  55. def call_command(self, *args, **kwargs):
  56. stdout = six.StringIO()
  57. stderr = six.StringIO()
  58. kwargs['stdout'] = stdout
  59. kwargs['stderr'] = stderr
  60. call_command(self.command_name, *args, **kwargs)
  61. self.out = stdout.getvalue()
  62. self.err_out = stderr.getvalue()
  63. class ControlManagementCommand(CommandWithInputTestCase):
  64. command_class = ControlCommand
  65. @mock.patch('distro_tracker.mail.control.process')
  66. def test_control_command_calls_process(self, mock_process):
  67. data = self.build_input_message('help\n', 'control')
  68. self.call_command(data)
  69. self.assertTrue(mock_process.called)
  70. self.assert_is_input_message(mock_process.call_args[0][0])
  71. def test_control_command_does_something(self):
  72. data = self.build_input_message('help\n', 'control')
  73. self.call_command(data)
  74. self.assertEqual(len(mail.outbox), 1)
  75. self.assertIn('The package tracker supports the following commands:',
  76. mail.outbox[0].message().get_payload())
  77. class DispatchManagementCommand(CommandWithInputTestCase):
  78. command_class = DispatchCommand
  79. @mock.patch('distro_tracker.mail.dispatch.process')
  80. def test_dispatch_command_with_normal_message(self, mock_process):
  81. msg = self.build_input_message('hello\n', 'dispatch+dummy')
  82. self.call_command(msg)
  83. mock_process.assert_called_with(mock.ANY, package='dummy', keyword=None)
  84. self.assert_is_input_message(mock_process.call_args[0][0])
  85. def test_dispatch_command_forwards_something(self):
  86. PackageName.objects.create(name='dummy')
  87. Subscription.objects.create_for(
  88. package_name='dummy',
  89. email='user@example.net',
  90. active=True)
  91. msg = self.build_input_message('hello\n', 'dispatch+dummy_contact')
  92. self.call_command(msg)
  93. self.assertEqual(len(mail.outbox), 1)
  94. self.assertEqual(mail.outbox[0].message()['X-Distro-Tracker-Package'],
  95. 'dummy')
  96. @mock.patch('distro_tracker.mail.dispatch.handle_bounces')
  97. def test_dispatch_command_with_bounces(self, mock_handle_bounces):
  98. msg = self.build_input_message('hello\n', 'bounces+verpdata')
  99. self.call_command(msg)
  100. mock_handle_bounces.assert_called_with(
  101. 'bounces+verpdata@{}'.format(settings.DISTRO_TRACKER_FQDN))
  102. class UnsubscribeAllManagementCommand(CommandWithOutputTestCase):
  103. command_name = 'tracker_unsubscribe_all'
  104. def setUp(self):
  105. self.packages = [
  106. PackageName.objects.create(name='dummy-package'),
  107. PackageName.objects.create(name='second-package'),
  108. ]
  109. self.user = UserEmail.objects.create(email='email-user@domain.com')
  110. self.email_settings = EmailSettings.objects.create(user_email=self.user)
  111. for package in self.packages:
  112. Subscription.objects.create(package=package,
  113. email_settings=self.email_settings)
  114. self.nosub_user = UserEmail.objects.create(email='nosub@dom.com')
  115. def assert_unsubscribed_user_response(self):
  116. for package in self.packages:
  117. self.assertIn(
  118. 'Unsubscribing {email} from {package}'.format(
  119. email=self.user.email, package=package.name),
  120. self.out)
  121. def assert_no_subscriptions_response(self):
  122. self.assertIn(
  123. 'Email {email} is not subscribed to any packages.'.format(
  124. email=self.nosub_user),
  125. self.out)
  126. def assert_user_does_not_exist_response(self, user):
  127. self.assertIn(
  128. 'Email {email} is not subscribed to any packages. '
  129. 'Bad email?'.format(
  130. email=user),
  131. self.out)
  132. def test_unsubscribe_user(self):
  133. """
  134. Tests the management command ``distro_tracker_unsubscribe_all`` when a
  135. user with subscriptions is given.
  136. """
  137. self.call_command(self.user.email)
  138. self.assert_unsubscribed_user_response()
  139. self.assertEqual(self.email_settings.subscription_set.count(), 0)
  140. def test_unsubscribe_doesnt_exist(self):
  141. """
  142. Tests the management command ``distro_tracker_unsubscribe_all`` when
  143. the given user does not exist.
  144. """
  145. self.call_command('no-exist')
  146. self.assert_user_does_not_exist_response('no-exist')
  147. def test_unsubscribe_no_subscriptions(self):
  148. """
  149. Tests the management command ``distro_tracker_unsubscribe_all`` when
  150. the given user is not subscribed to any packages.
  151. """
  152. self.call_command(self.nosub_user.email)
  153. self.assert_no_subscriptions_response()
  154. def test_unsubscribe_multiple_user(self):
  155. """
  156. Tests the management command ``distro_tracker_unsubscribe_all`` when
  157. multiple users are passed to it.
  158. """
  159. args = ['no-exist', self.nosub_user.email, self.user.email]
  160. self.call_command(*args)
  161. self.assert_unsubscribed_user_response()
  162. self.assertEqual(self.email_settings.subscription_set.count(), 0)
  163. self.assert_user_does_not_exist_response('no-exist')
  164. self.assert_no_subscriptions_response()
  165. class DumpSubscribersManagementCommandTest(CommandWithOutputTestCase):
  166. command_name = 'tracker_dump_subscribers'
  167. def setUp(self):
  168. self.packages = [
  169. PackageName.objects.create(name='package' + str(i))
  170. for i in range(5)
  171. ]
  172. self.users = [
  173. UserEmail.objects.create(email='user@domain.com'),
  174. UserEmail.objects.create(email='other-user@domain.com'),
  175. ]
  176. def assert_warning_in_output(self, text):
  177. self.assertIn('Warning: ' + text, self.err_out)
  178. def assert_package_in_output(self, package):
  179. self.assertIn('{package} => ['.format(package=package), self.out)
  180. def assert_user_list_in_output(self, users):
  181. self.assertIn('[ ' + ' '.join(str(user) for user in users) + ' ]',
  182. self.out)
  183. def test_dump_one_package(self):
  184. user = self.users[0]
  185. email_settings = EmailSettings.objects.create(user_email=user)
  186. package = self.packages[0]
  187. Subscription.objects.create(email_settings=email_settings,
  188. package=package)
  189. self.call_command()
  190. self.assert_package_in_output(package)
  191. self.assert_user_list_in_output([user])
  192. def test_dump_all_active(self):
  193. # Subscribe the users
  194. for user in self.users:
  195. email_settings = EmailSettings.objects.create(user_email=user)
  196. for package in self.packages:
  197. Subscription.objects.create(email_settings=email_settings,
  198. package=package)
  199. self.call_command()
  200. for package in self.packages:
  201. self.assert_package_in_output(package)
  202. self.assert_user_list_in_output(self.users)
  203. def test_dump_only_active(self):
  204. """
  205. Tests that only users with an active subscriptions are returned by
  206. default.
  207. """
  208. # All users have an active subscription to the first package
  209. for user in self.users:
  210. email_settings = EmailSettings.objects.create(user_email=user)
  211. Subscription.objects.create(email_settings=email_settings,
  212. package=self.packages[0])
  213. # The first user has an active subscription to the second package
  214. Subscription.objects.create(email_settings=self.users[0].emailsettings,
  215. package=self.packages[1])
  216. # Whereas the second user has an inactive subscription.
  217. Subscription.objects.create(email_settings=self.users[1].emailsettings,
  218. package=self.packages[1],
  219. active=False)
  220. self.call_command()
  221. self.assert_user_list_in_output(self.users)
  222. self.assert_user_list_in_output([self.users[0]])
  223. def test_dump_inactive(self):
  224. user = self.users[0]
  225. package = self.packages[0]
  226. email_settings = EmailSettings.objects.create(user_email=user)
  227. Subscription.objects.create(email_settings=email_settings,
  228. package=package, active=False)
  229. self.call_command(inactive=True)
  230. self.assert_package_in_output(package)
  231. self.assert_user_list_in_output([user])
  232. def test_dump_json(self):
  233. # Subscribe all the users
  234. for user in self.users:
  235. email_settings = EmailSettings.objects.create(user_email=user)
  236. for package in self.packages:
  237. Subscription.objects.create(email_settings=email_settings,
  238. package=package)
  239. self.call_command(json=True)
  240. output = json.loads(self.out)
  241. # All packages in output?
  242. for package in self.packages:
  243. self.assertIn(str(package), output)
  244. # All users in each output list?
  245. for subscribers in output.values():
  246. for user in self.users:
  247. self.assertIn(str(user), subscribers)
  248. def test_dump_udd_format(self):
  249. # Subscribe all the users
  250. for user in self.users:
  251. email_settings = EmailSettings.objects.create(user_email=user)
  252. for package in self.packages:
  253. Subscription.objects.create(email_settings=email_settings,
  254. package=package)
  255. self.call_command(udd_format=True)
  256. out_lines = self.out.splitlines()
  257. out_packages = {}
  258. for line in out_lines:
  259. package_name, subscribers = line.split('\t', 1)
  260. out_packages[package_name] = [
  261. subscriber.strip()
  262. for subscriber in subscribers.split(',')
  263. ]
  264. # All packages output
  265. for package in self.packages:
  266. self.assertIn(package.name, out_packages)
  267. # All its subscribers output?
  268. subscribers = out_packages[package.name]
  269. for user in self.users:
  270. self.assertIn(str(user), subscribers)
  271. def test_dump_package_does_not_exist(self):
  272. self.call_command('does-not-exist', verbosity=2)
  273. self.assert_warning_in_output('does-not-exist does not exist')
  274. class StatsCommandTest(CommandWithOutputTestCase):
  275. command_name = 'tracker_stats'
  276. def setUp(self):
  277. self.package_count = 5
  278. for i in range(self.package_count):
  279. SourcePackageName.objects.create(name='package' + str(i))
  280. # Add some pseudo packages in the mix
  281. PseudoPackageName.objects.create(name='pseudo')
  282. self.user_count = 2
  283. for i in range(self.user_count):
  284. UserEmail.objects.create(email='email' + str(i) + '@domain.com')
  285. # Subscribe all users to all source packages
  286. self.subscription_count = self.package_count * self.user_count
  287. for user in UserEmail.objects.all():
  288. email_settings = EmailSettings.objects.create(user_email=user)
  289. for package in SourcePackageName.objects.all():
  290. Subscription.objects.create(email_settings=email_settings,
  291. package=package)
  292. def test_legacy_output(self):
  293. self.call_command()
  294. self.assertIn('Src pkg\tSubscr.\tDate\t\tNb email', self.out)
  295. expected = '\t'.join(map(str, (
  296. self.package_count,
  297. self.subscription_count,
  298. timezone.now().strftime('%Y-%m-%d'),
  299. self.user_count,
  300. )))
  301. self.assertIn(expected, self.out)
  302. def test_json_output(self):
  303. self.call_command(json=True)
  304. output = json.loads(self.out)
  305. expected = {
  306. 'package_number': self.package_count,
  307. 'subscription_number': self.subscription_count,
  308. 'date': timezone.now().strftime('%Y-%m-%d'),
  309. 'unique_emails_number': self.user_count,
  310. }
  311. self.assertDictEqual(expected, output)
  312. class AddKeywordManagementCommandTest(TestCase):
  313. def test_simple_add(self):
  314. """
  315. Tests the management command when it is only supposed to create a new
  316. keyword.
  317. """
  318. # Sanity check - the keyword we are about to add does not already exist
  319. self.assertEqual(Keyword.objects.filter(name='new-keyword').count(), 0)
  320. call_command('tracker_add_keyword', 'new-keyword')
  321. qs = Keyword.objects.filter(name='new-keyword', default=False)
  322. self.assertEqual(qs.count(), 1)
  323. def test_simple_add_default(self):
  324. """
  325. Tests the management command when it is only supposed to create a new
  326. default keyword.
  327. """
  328. # Sanity check - the keyword we are about to add does not already exist
  329. self.assertEqual(Keyword.objects.filter(name='new-keyword').count(), 0)
  330. call_command('tracker_add_keyword', 'new-keyword',
  331. is_default_keyword=True)
  332. qs = Keyword.objects.filter(name='new-keyword', default=True)
  333. self.assertEqual(qs.count(), 1)
  334. def test_create_and_add_to_subscribers(self):
  335. """
  336. Tests the management command when the new keyword should be added to
  337. subscribers that already have another specified keyword.
  338. """
  339. existing_keyword = Keyword.objects.create(name='existing-keyword')
  340. # A user who added the existing keyword to its subscription keywords
  341. u = UserEmail.objects.create(email='subscription-user@domain.com')
  342. s = EmailSettings.objects.create(user_email=u)
  343. p = PackageName.objects.create(name='dummy-package')
  344. sub = Subscription.objects.create(email_settings=s, package=p)
  345. sub.keywords.add(existing_keyword)
  346. sub.save()
  347. # A user who added the existing keyword to its default keywords
  348. u = UserEmail.objects.create(email='defaultuser@domain.com')
  349. s = EmailSettings.objects.create(user_email=u)
  350. s.default_keywords.add(existing_keyword)
  351. s.save()
  352. # A user who does not have the existing keyword.
  353. u = UserEmail.objects.create(email='no-keyword@domain.com')
  354. s = EmailSettings.objects.create(user_email=u)
  355. # Make sure that it is so!
  356. self.assertNotIn(existing_keyword, s.default_keywords.all())
  357. # Sanity check - the keyword we want to add does not already exist
  358. self.assertEqual(Keyword.objects.filter(name='new-keyword').count(), 0)
  359. # Sanity check -- only one subscription exists
  360. self.assertEqual(Subscription.objects.count(), 1)
  361. call_command('tracker_add_keyword', 'new-keyword', 'existing-keyword')
  362. # New keyword created?
  363. keyword = Keyword.objects.filter(name='new-keyword')
  364. self.assertTrue(keyword.exists())
  365. keyword = keyword[0]
  366. # No subscriptions changed
  367. self.assertEqual(Subscription.objects.count(), 1)
  368. sub = Subscription.objects.all()[0]
  369. # Keyword added to the subscription specific keywords.
  370. self.assertIn(keyword, sub.keywords.all())
  371. # New keyword added to the user that had the existing keyword in its
  372. # default list
  373. default_user = UserEmail.objects.get(email='defaultuser@domain.com')
  374. self.assertIn(keyword,
  375. default_user.emailsettings.default_keywords.all())
  376. # Keyword not added to the default list of the user that did not have
  377. # the existing keyword
  378. u = UserEmail.objects.get(email='no-keyword@domain.com')
  379. self.assertNotIn(keyword, u.emailsettings.default_keywords.all())
  380. def test_create_and_add_to_subscribers_no_unlink(self):
  381. """
  382. Tests that adding a new keyword to subscriptions which had a particular
  383. given keyword does not cause it to become unlinked from the user's
  384. default keywords.
  385. """
  386. Keyword.objects.create(name='existing-keyword')
  387. u = UserEmail.objects.create(email='subscription-user@domain.com')
  388. s = EmailSettings.objects.create(user_email=u)
  389. p = PackageName.objects.create(name='dummy-package')
  390. sub = Subscription.objects.create(email_settings=s, package=p)
  391. call_command('tracker_add_keyword', 'new-keyword', 'existing-keyword')
  392. sub = Subscription.objects.get(email_settings=s, package=p)
  393. self.assertTrue(sub._use_user_default_keywords)
  394. def test_create_and_add_no_existing_keyword(self):
  395. """
  396. Tests that the command has no effect if the given "existing" keyword
  397. does not actually exist.
  398. """
  399. old_count = Keyword.objects.count()
  400. # Error raised
  401. with self.assertRaises(CommandError):
  402. call_command('tracker_add_keyword', 'new-keyword',
  403. 'existing-keyword')
  404. # ...and nothing changed.
  405. self.assertEqual(Keyword.objects.count(), old_count)
  406. def test_create_default_keyword_to_all_users(self):
  407. """
  408. Tests adding a default keyword adds it to all users' default keywords
  409. list.
  410. """
  411. existing_keyword = Keyword.objects.create(name='existing-keyword')
  412. # A user who added an existing keyword to its default keywords
  413. u = UserEmail.objects.create(email='defaultuser@domain.com')
  414. s = EmailSettings.objects.create(user_email=u)
  415. s.default_keywords.add(existing_keyword)
  416. s.save()
  417. # A user who does not have any keywords apart from the defaults
  418. u = UserEmail.objects.create(email='no-keyword@domain.com')
  419. s = EmailSettings.objects.create(user_email=u)
  420. # Make sure that it is so!
  421. self.assertNotIn(existing_keyword, s.default_keywords.all())
  422. # Sanity check - the keyword we want to add does not already exist
  423. self.assertEqual(Keyword.objects.filter(name='new-keyword').count(), 0)
  424. call_command('tracker_add_keyword', 'new-keyword', **{
  425. 'is_default_keyword': True
  426. })
  427. keyword = Keyword.objects.get(name='new-keyword')
  428. # This keyword is given to all users
  429. self.assertEqual(
  430. EmailSettings.objects.filter(default_keywords=keyword).count(),
  431. EmailSettings.objects.count()
  432. )
  433. def test_create_default_keyword_existing_keyword(self):
  434. """
  435. Tests adding a default keyword which should be added to all
  436. subscriptions that have a different existing keyword.
  437. """
  438. existing_keyword = Keyword.objects.create(name='existing-keyword')
  439. # A user who added the existing keyword to its subscription keywords
  440. user1 = UserEmail.objects.create(email='subscription-user@domain.com')
  441. email_settings1 = EmailSettings.objects.create(user_email=user1)
  442. p = PackageName.objects.create(name='dummy-package')
  443. sub = Subscription.objects.create(email_settings=email_settings1,
  444. package=p)
  445. sub.keywords.add(existing_keyword)
  446. sub.save()
  447. # A user who added the existing keyword to its default keywords
  448. u = UserEmail.objects.create(email='defaultuser@domain.com')
  449. s = EmailSettings.objects.create(user_email=u)
  450. s.default_keywords.add(existing_keyword)
  451. s.save()
  452. # A user who does not have the existing keyword.
  453. user2 = UserEmail.objects.create(email='no-keyword@domain.com')
  454. email_settings2 = EmailSettings.objects.create(user_email=user2)
  455. # And is subscribed to a package without having the keyword
  456. sub = Subscription.objects.create(email_settings=email_settings2,
  457. package=p)
  458. sub.keywords.add(Keyword.objects.create(name='some-other-keyword'))
  459. # Sanity check - the keyword we want to add does not already exist
  460. self.assertEqual(Keyword.objects.filter(name='new-keyword').count(), 0)
  461. call_command('tracker_add_keyword', 'new-keyword', 'existing-keyword',
  462. **{'is_default_keyword': True})
  463. new_keyword = Keyword.objects.get(name='new-keyword')
  464. # Every user has the keyword
  465. self.assertEqual(
  466. EmailSettings.objects.filter(default_keywords=new_keyword).count(),
  467. EmailSettings.objects.count()
  468. )
  469. # Subscription with the existing keyword has the new keyword
  470. sub = Subscription.objects.get(email_settings=email_settings1,
  471. package=p)
  472. self.assertIn(new_keyword, sub.keywords.all())
  473. # Subscription without the existing keyword not modified
  474. sub = Subscription.objects.get(email_settings=email_settings2,
  475. package=p)
  476. self.assertNotIn(new_keyword, sub.keywords.all())
  477. class ProcessMailTests(TestCase):
  478. """Tests for the tracker_process_mail management command"""
  479. @mock.patch('distro_tracker.mail.management.commands.tracker_process_mail.'
  480. 'MailQueue')
  481. def test_process_mail_command(self, mock_queue):
  482. """command is a simple wrapper around MailQueue.process_loop()"""
  483. call_command('tracker_process_mail')
  484. mock_queue.assert_called_with()
  485. mock_queue.return_value.process_loop.assert_called_with()