tests_utils.py 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2013-2016 The Distro Tracker Developers
  3. # See the COPYRIGHT file at the top-level directory of this distribution and
  4. # at http://deb.li/DTAuthors
  5. #
  6. # This file is part of Distro Tracker. It is subject to the license terms
  7. # in the LICENSE file found in the top-level directory of this
  8. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  9. # including this file, may be copied, modified, propagated, or distributed
  10. # except according to the terms contained in the LICENSE file.
  11. """
  12. Tests for the Distro Tracker core utils.
  13. """
  14. from __future__ import unicode_literals
  15. import datetime
  16. from email import encoders
  17. from email.header import Header
  18. from email.mime.multipart import MIMEMultipart
  19. from email.mime.base import MIMEBase
  20. import os
  21. import time
  22. import tempfile
  23. from debian import deb822
  24. from django.core import mail
  25. from django.test.utils import override_settings
  26. from django.utils import six
  27. from django.utils.http import http_date
  28. from django.utils.functional import curry
  29. from django.utils.six.moves import mock
  30. from distro_tracker.core.models import Repository
  31. from distro_tracker.core.utils import verp
  32. from distro_tracker.core.utils import message_from_bytes
  33. from distro_tracker.core.utils import now
  34. from distro_tracker.core.utils import SpaceDelimitedTextField
  35. from distro_tracker.core.utils import PrettyPrintList
  36. from distro_tracker.core.utils import verify_signature
  37. from distro_tracker.core.utils.packages import AptCache
  38. from distro_tracker.core.utils.packages import extract_vcs_information
  39. from distro_tracker.core.utils.packages import extract_dsc_file_name
  40. from distro_tracker.core.utils.packages import package_hashdir
  41. from distro_tracker.core.utils.datastructures import DAG, InvalidDAGException
  42. from distro_tracker.core.utils.email_messages import CustomEmailMessage
  43. from distro_tracker.core.utils.email_messages import decode_header
  44. from distro_tracker.core.utils.email_messages import (
  45. name_and_address_from_string,
  46. names_and_addresses_from_string)
  47. from distro_tracker.core.utils.email_messages import unfold_header
  48. from distro_tracker.core.utils.linkify import linkify
  49. from distro_tracker.core.utils.linkify import LinkifyDebianBugLinks
  50. from distro_tracker.core.utils.linkify import LinkifyUbuntuBugLinks
  51. from distro_tracker.core.utils.linkify import LinkifyHttpLinks
  52. from distro_tracker.core.utils.linkify import LinkifyCVELinks
  53. from distro_tracker.core.utils.http import HttpCache
  54. from distro_tracker.core.utils.http import get_resource_content
  55. from distro_tracker.test import TestCase, SimpleTestCase
  56. from distro_tracker.test.utils import set_mock_response
  57. from distro_tracker.test.utils import make_temp_directory
  58. class VerpModuleTest(SimpleTestCase):
  59. """
  60. Tests for the ``distro_tracker.core.utils.verp`` module.
  61. """
  62. def test_encode(self):
  63. """
  64. Tests for the encode method.
  65. """
  66. self.assertEqual(
  67. verp.encode('itny-out@domain.com', 'node42!ann@old.example.com'),
  68. 'itny-out-node42+21ann=old.example.com@domain.com')
  69. self.assertEqual(
  70. verp.encode('itny-out@domain.com', 'tom@old.example.com'),
  71. 'itny-out-tom=old.example.com@domain.com')
  72. self.assertEqual(
  73. verp.encode('itny-out@domain.com', 'dave+priority@new.example.com'),
  74. 'itny-out-dave+2Bpriority=new.example.com@domain.com')
  75. self.assertEqual(
  76. verp.encode('bounce@dom.com', 'user+!%-:@[]+@other.com'),
  77. 'bounce-user+2B+21+25+2D+3A+40+5B+5D+2B=other.com@dom.com')
  78. def test_decode(self):
  79. """
  80. Tests the decode method.
  81. """
  82. self.assertEqual(
  83. verp.decode('itny-out-dave+2Bpriority=new.example.com@domain.com'),
  84. ('itny-out@domain.com', 'dave+priority@new.example.com'))
  85. self.assertEqual(
  86. verp.decode('itny-out-node42+21ann=old.example.com@domain.com'),
  87. ('itny-out@domain.com', 'node42!ann@old.example.com'))
  88. self.assertEqual(
  89. verp.decode('bounce-addr+2B40=dom.com@asdf.com'),
  90. ('bounce@asdf.com', 'addr+40@dom.com'))
  91. self.assertEqual(
  92. verp.decode(
  93. 'bounce-user+2B+21+25+2D+3A+40+5B+5D+2B=other.com@dom.com'),
  94. ('bounce@dom.com', 'user+!%-:@[]+@other.com'))
  95. def test_decode_lowercase_code(self):
  96. """Encoding of special characters with lowercase should work"""
  97. self.assertEqual(
  98. verp.decode(
  99. 'bounce-user+2b+2d+3a=other.com@dom.com'),
  100. ('bounce@dom.com', 'user+-:@other.com'))
  101. def test_invariant_encode_decode(self):
  102. """
  103. Tests that decoding an encoded address returns the original pair.
  104. """
  105. from_email, to_email = 'bounce@domain.com', 'user@other.com'
  106. self.assertEqual(
  107. verp.decode(verp.encode(from_email, to_email)),
  108. (from_email, to_email))
  109. @override_settings(EMAIL_BACKEND='django.core.mail.backends.smtp.EmailBackend')
  110. class CustomMessageFromBytesTest(TestCase):
  111. """
  112. Tests the ``distro_tracker.core.utils.message_from_bytes`` function.
  113. """
  114. def setUp(self):
  115. self.message_bytes = b"""MIME-Version: 1.0
  116. Content-Type: text/plain; charset="utf-8"
  117. Content-Disposition: inline
  118. Content-Transfer-Encoding: 8bit
  119. """
  120. self.body = "üßščć한글ᥡ╥ສए"
  121. self.message_bytes = self.message_bytes + self.body.encode('utf-8')
  122. def get_mock_connection(self):
  123. """
  124. Helper method returning a mock SMTP connection object.
  125. """
  126. import smtplib
  127. return mock.create_autospec(smtplib.SMTP, return_value={})
  128. def test_as_string_returns_bytes(self):
  129. """
  130. Tests that the as_string message returns bytes.
  131. """
  132. message = message_from_bytes(self.message_bytes)
  133. self.assertEqual(self.message_bytes, message.as_string())
  134. self.assertTrue(isinstance(message.as_string(), six.binary_type))
  135. def test_get_payload_decode_idempotent(self):
  136. """
  137. Tests that the get_payload method returns bytes which can be decoded
  138. using the message's encoding and that they are identical to the
  139. ones given to the function in the first place.
  140. """
  141. message = message_from_bytes(self.message_bytes)
  142. self.assertEqual(self.body,
  143. message.get_payload(decode=True).decode('utf-8'))
  144. def test_integrate_with_django(self):
  145. """
  146. Tests that the message obtained by the message_from_bytes function can
  147. be sent out using the Django email API.
  148. In the same time, this test makes sure that Django keeps using
  149. the as_string method as expected.
  150. """
  151. from django.core.mail import get_connection
  152. backend = get_connection()
  153. # Replace the backend's SMTP connection with a mock.
  154. mock_connection = self.get_mock_connection()
  155. backend.connection = mock_connection
  156. # Send the message over the backend
  157. message = message_from_bytes(self.message_bytes)
  158. custom_message = CustomEmailMessage(
  159. msg=message,
  160. from_email='from@domain.com',
  161. to=['to@domain.com'])
  162. backend.send_messages([custom_message])
  163. backend.close()
  164. # The backend sent the mail over SMTP & it is not corrupted
  165. mock_connection.sendmail.assert_called_with(
  166. 'from@domain.com',
  167. ['to@domain.com'],
  168. mock.ANY)
  169. self.assertEqual(
  170. mock_connection.sendmail.call_args[0][2].replace(b"\r\n", b"\n"),
  171. message.as_string())
  172. class EmailUtilsTest(SimpleTestCase):
  173. def test_name_and_address_from_string(self):
  174. """
  175. Tests retrieving a name and address from a string which contains
  176. unquoted commas.
  177. """
  178. self.assertDictEqual(
  179. name_and_address_from_string(
  180. 'John H. Robinson, IV <jaqque@debian.org>'),
  181. {'name': 'John H. Robinson, IV', 'email': 'jaqque@debian.org'}
  182. )
  183. self.assertDictEqual(
  184. name_and_address_from_string('email@domain.com'),
  185. {'name': '', 'email': 'email@domain.com'}
  186. )
  187. self.assertDictEqual(
  188. name_and_address_from_string('Name <email@domain.com>'),
  189. {'name': 'Name', 'email': 'email@domain.com'}
  190. )
  191. self.assertIsNone(name_and_address_from_string(''))
  192. def test_names_and_addresses_from_string(self):
  193. """
  194. Tests extracting names and emails from a string containing a list of
  195. them.
  196. """
  197. self.assertSequenceEqual(
  198. names_and_addresses_from_string(
  199. 'John H. Robinson, IV <jaqque@debian.org>, '
  200. 'Name <email@domain.com>'
  201. ), [
  202. {'name': 'John H. Robinson, IV', 'email': 'jaqque@debian.org'},
  203. {'name': 'Name', 'email': 'email@domain.com'}
  204. ]
  205. )
  206. self.assertSequenceEqual(
  207. names_and_addresses_from_string(
  208. 'John H. Robinson, IV <jaqque@debian.org>, '
  209. 'email@domain.com'
  210. ), [
  211. {'name': 'John H. Robinson, IV', 'email': 'jaqque@debian.org'},
  212. {'name': '', 'email': 'email@domain.com'}
  213. ]
  214. )
  215. self.assertSequenceEqual(names_and_addresses_from_string(''), [])
  216. def test_unfold_header(self):
  217. test_values = {
  218. 'a\n b': 'a b',
  219. 'a\r\n b': 'a b',
  220. 'a\n\tb': 'a\tb',
  221. 'a\r\n\tb\n c\n\td': 'a\tb c\td',
  222. 'a\n\t bc\n d': 'a\t bc d',
  223. }
  224. for folded, unfolded in test_values.items():
  225. self.assertEqual(unfold_header(folded), unfolded)
  226. def test_unfold_header_with_none_value(self):
  227. self.assertIsNone(unfold_header(None))
  228. class CustomEmailMessageTest(TestCase):
  229. """
  230. Tests the ``CustomEmailMessage`` class.
  231. """
  232. def create_multipart(self):
  233. """
  234. Helper method creates a multipart message.
  235. """
  236. msg = MIMEMultipart()
  237. msg.attach(self.prepare_part(b'data'))
  238. return msg
  239. def prepare_part(self, data):
  240. part = MIMEBase('application', 'octet-stream')
  241. part.set_payload(data)
  242. encoders.encode_base64(part)
  243. return part
  244. def test_sent_message_same_as_original(self):
  245. """
  246. Tests that an ``email.message.Message`` instance sent by using the
  247. ``CustomEmailMessage`` class is the same as the original message.
  248. """
  249. msg = self.create_multipart()
  250. custom_message = CustomEmailMessage(msg=msg, to=['recipient'])
  251. custom_message.send()
  252. self.assertEqual(msg.as_string(), mail.outbox[0].message().as_string())
  253. def test_attachment_included(self):
  254. """
  255. Tests that an attachment included in the ``CustomEmailMessage``
  256. instance is sent with the rest of the message.
  257. """
  258. msg = self.create_multipart()
  259. attachment = self.prepare_part(b'new_data')
  260. msg.attach(attachment)
  261. custom_message = CustomEmailMessage(msg=msg, to=['recipient'])
  262. custom_message.send()
  263. self.assertIn(attachment, mail.outbox[0].message().get_payload())
  264. class DAGTests(SimpleTestCase):
  265. """
  266. Tests for the `DAG` class.
  267. """
  268. def test_add_nodes(self):
  269. """
  270. Tests adding nodes to a DAG.
  271. """
  272. g = DAG()
  273. # A single node
  274. g.add_node(1)
  275. self.assertEqual(len(g.all_nodes), 1)
  276. self.assertEqual(g.all_nodes[0], 1)
  277. # Another one
  278. g.add_node(2)
  279. self.assertEqual(len(g.all_nodes), 2)
  280. self.assertIn(2, g.all_nodes)
  281. # When adding a same node again, nothing changes.
  282. g.add_node(1)
  283. self.assertEqual(len(g.all_nodes), 2)
  284. def test_add_edge(self):
  285. """
  286. Tests adding edges to a DAG.
  287. """
  288. g = DAG()
  289. g.add_node(1)
  290. g.add_node(2)
  291. g.add_edge(1, 2)
  292. self.assertEqual(len(g.dependent_nodes(1)), 1)
  293. self.assertIn(2, g.dependent_nodes(1))
  294. # In-degrees updated
  295. self.assertEqual(g.in_degree[g.nodes_map[1].id], 0)
  296. self.assertEqual(g.in_degree[g.nodes_map[2].id], 1)
  297. g.add_node(3)
  298. g.add_edge(1, 3)
  299. self.assertEqual(len(g.dependent_nodes(1)), 2)
  300. self.assertIn(3, g.dependent_nodes(1))
  301. # In-degrees updated
  302. self.assertEqual(g.in_degree[g.nodes_map[1].id], 0)
  303. self.assertEqual(g.in_degree[g.nodes_map[3].id], 1)
  304. g.add_edge(2, 3)
  305. self.assertEqual(len(g.dependent_nodes(2)), 1)
  306. self.assertIn(3, g.dependent_nodes(2))
  307. # In-degrees updated
  308. self.assertEqual(g.in_degree[g.nodes_map[3].id], 2)
  309. # Add a same edge again - nothing changed?
  310. g.add_edge(1, 3)
  311. self.assertEqual(len(g.dependent_nodes(1)), 2)
  312. # Add an edge resulting in a cycle
  313. with self.assertRaises(InvalidDAGException):
  314. g.add_edge(3, 1)
  315. def test_remove_node(self):
  316. """
  317. Tests removing a node from the graph.
  318. """
  319. g = DAG()
  320. g.add_node(1)
  321. g.add_node(2)
  322. g.add_node(3)
  323. g.add_edge(1, 2)
  324. g.add_edge(1, 3)
  325. g.add_edge(2, 3)
  326. g.remove_node(3)
  327. self.assertNotIn(3, g.all_nodes)
  328. self.assertEqual(len(g.dependent_nodes(1)), 1)
  329. self.assertIn(2, g.dependent_nodes(1))
  330. self.assertEqual(len(g.dependent_nodes(2)), 0)
  331. g.remove_node(1)
  332. self.assertEqual(g.in_degree[g.nodes_map[2].id], 0)
  333. def test_find_no_dependency_node(self):
  334. """
  335. Tests that the DAG correctly returns nodes with no dependencies.
  336. """
  337. g = DAG()
  338. g.add_node(1)
  339. g.add_node(2)
  340. g.add_node(3)
  341. g.add_edge(1, 2)
  342. g.add_edge(2, 3)
  343. self.assertEqual(g._get_node_with_no_dependencies().original, 1)
  344. g = DAG()
  345. g.add_node(1)
  346. g.add_node(2)
  347. g.add_node(3)
  348. g.add_edge(3, 2)
  349. g.add_edge(2, 1)
  350. self.assertEqual(g._get_node_with_no_dependencies().original, 3)
  351. g = DAG()
  352. g.add_node(1)
  353. self.assertEqual(g._get_node_with_no_dependencies().original, 1)
  354. def test_topsort_simple(self):
  355. """
  356. Tests the topological sort of the DAG class.
  357. """
  358. g = DAG()
  359. g.add_node(1)
  360. g.add_node(2)
  361. g.add_node(3)
  362. g.add_edge(1, 2)
  363. g.add_edge(2, 3)
  364. topsort = list(g.topsort_nodes())
  365. self.assertSequenceEqual([1, 2, 3], topsort)
  366. def test_topsort_no_dependencies(self):
  367. """
  368. Tests the toplogical sort of the DAG class when the given DAG has no
  369. dependencies between the nodes.
  370. """
  371. g = DAG()
  372. g.add_node(1)
  373. g.add_node(2)
  374. g.add_node(3)
  375. topsort = list(g.topsort_nodes())
  376. nodes = [1, 2, 3]
  377. # The order in this case cannot be mandated, only that all the nodes
  378. # are in the output
  379. for node in nodes:
  380. self.assertIn(node, topsort)
  381. def test_topsort_complex(self):
  382. """
  383. Tests the toplogical sort when a more complex graph is given.
  384. """
  385. g = DAG()
  386. nodes = list(range(13))
  387. for node in nodes:
  388. g.add_node(node)
  389. edges = (
  390. (0, 1),
  391. (0, 2),
  392. (0, 3),
  393. (0, 5),
  394. (0, 6),
  395. (2, 3),
  396. (3, 4),
  397. (3, 5),
  398. (4, 9),
  399. (6, 4),
  400. (6, 9),
  401. (7, 6),
  402. (8, 7),
  403. (9, 10),
  404. (9, 11),
  405. (9, 12),
  406. (11, 12),
  407. )
  408. for edge in edges:
  409. g.add_edge(*edge)
  410. topsort = list(g.topsort_nodes())
  411. # Make sure all nodes are found in the toplogical sort
  412. for node in nodes:
  413. self.assertIn(node, topsort)
  414. # Make sure that all dependent nodes are found after the nodes they
  415. # depend on.
  416. # Invariant: for each edge (n1, n2) position(n2) in the topological
  417. # sort must be strictly greater than the position(n1).
  418. for node1, node2 in edges:
  419. self.assertTrue(topsort.index(node2) > topsort.index(node1))
  420. def test_topsort_string_nodes(self):
  421. """
  422. Tests the toplogical sort when strings are used for node objects.
  423. """
  424. g = DAG()
  425. nodes = ['shirt', 'pants', 'tie', 'belt', 'shoes', 'socks', 'pants']
  426. for node in nodes:
  427. g.add_node(node)
  428. edges = (
  429. ('shirt', 'tie'),
  430. ('shirt', 'belt'),
  431. ('belt', 'tie'),
  432. ('pants', 'tie'),
  433. ('pants', 'belt'),
  434. ('pants', 'shoes'),
  435. ('pants', 'shirt'),
  436. ('socks', 'shoes'),
  437. ('socks', 'pants'),
  438. )
  439. for edge in edges:
  440. g.add_edge(*edge)
  441. topsort = list(g.topsort_nodes())
  442. for node in nodes:
  443. self.assertIn(node, topsort)
  444. for node1, node2 in edges:
  445. self.assertTrue(topsort.index(node2) > topsort.index(node1))
  446. def test_nodes_reachable_from(self):
  447. """
  448. Tests finding all nodes reachable from a single node.
  449. """
  450. # Simple situation first.
  451. g = DAG()
  452. g.add_node(1)
  453. g.add_node(2)
  454. g.add_node(3)
  455. g.add_edge(1, 2)
  456. g.add_edge(2, 3)
  457. self.assertEqual(len(g.nodes_reachable_from(1)), 2)
  458. self.assertIn(2, g.nodes_reachable_from(1))
  459. self.assertIn(3, g.nodes_reachable_from(1))
  460. self.assertEqual(len(g.nodes_reachable_from(2)), 1)
  461. self.assertIn(3, g.nodes_reachable_from(1))
  462. # No nodes reachable from the given node
  463. g = DAG()
  464. g.add_node(1)
  465. g.add_node(2)
  466. g.add_node(3)
  467. g.add_edge(2, 3)
  468. self.assertEqual(len(g.nodes_reachable_from(1)), 0)
  469. # More complex graph
  470. g = DAG()
  471. g.add_node(1)
  472. g.add_node(2)
  473. g.add_node(3)
  474. g.add_node(4)
  475. g.add_node(5)
  476. g.add_edge(1, 3)
  477. g.add_edge(2, 4)
  478. g.add_edge(2, 5)
  479. g.add_edge(4, 5)
  480. g.add_edge(5, 3)
  481. self.assertEqual(len(g.nodes_reachable_from(2)), 3)
  482. for node in range(3, 6):
  483. self.assertIn(node, g.nodes_reachable_from(2))
  484. self.assertEqual(len(g.nodes_reachable_from(1)), 1)
  485. self.assertIn(3, g.nodes_reachable_from(1))
  486. class PrettyPrintListTest(SimpleTestCase):
  487. """
  488. Tests for the PrettyPrintList class.
  489. """
  490. def test_string_output(self):
  491. """
  492. Tests the output of a PrettyPrintList.
  493. """
  494. l = PrettyPrintList(['a', 'b', 'abe', 'q'])
  495. self.assertEqual(str(l), 'a b abe q')
  496. l = PrettyPrintList()
  497. self.assertEqual(str(l), '')
  498. l = PrettyPrintList([0, 'a', 1])
  499. self.assertEqual(str(l), '0 a 1')
  500. def test_list_methods_accessible(self):
  501. """
  502. Tests that list methods are accessible to the PrettyPrintList object.
  503. """
  504. l = PrettyPrintList()
  505. l.append('a')
  506. self.assertEqual(str(l), 'a')
  507. l.extend(['q', 'w'])
  508. self.assertEqual(str(l), 'a q w')
  509. l.pop()
  510. self.assertEqual(str(l), 'a q')
  511. # len works?
  512. self.assertEqual(len(l), 2)
  513. # Iterable?
  514. self.assertSequenceEqual(l, ['a', 'q'])
  515. # Indexable?
  516. self.assertEqual(l[0], 'a')
  517. # Comparable?
  518. l2 = PrettyPrintList(['a', 'q'])
  519. self.assertTrue(l == l2)
  520. l3 = PrettyPrintList()
  521. self.assertFalse(l == l3)
  522. # Comparable to plain lists?
  523. self.assertTrue(l == ['a', 'q'])
  524. self.assertFalse(l == ['a'])
  525. class SpaceDelimitedTextFieldTest(SimpleTestCase):
  526. """
  527. Tests the SpaceDelimitedTextField class.
  528. """
  529. def setUp(self):
  530. self.field = SpaceDelimitedTextField()
  531. def test_list_to_field(self):
  532. self.assertEqual(
  533. self.field.get_db_prep_value(PrettyPrintList(['a', 'b', 3])),
  534. 'a b 3'
  535. )
  536. self.assertEqual(
  537. self.field.get_db_prep_value(PrettyPrintList()),
  538. ''
  539. )
  540. def test_field_to_list(self):
  541. self.assertEqual(
  542. self.field.to_python('a b 3'),
  543. PrettyPrintList(['a', 'b', '3'])
  544. )
  545. self.assertEqual(
  546. self.field.to_python(''),
  547. PrettyPrintList()
  548. )
  549. def test_sane_inverse(self):
  550. l = PrettyPrintList(['a', 'b', 'c'])
  551. self.assertEqual(
  552. self.field.to_python(self.field.get_db_prep_value(l)),
  553. l
  554. )
  555. class PackageUtilsTests(SimpleTestCase):
  556. """
  557. Tests the distro_tracker.core.utils.packages utlity functions.
  558. """
  559. def test_get_vcs(self):
  560. browser_url = 'http://other-url.com'
  561. vcs_url = 'git://url.com'
  562. d = {
  563. 'Vcs-Git': vcs_url,
  564. 'Vcs-Browser': browser_url,
  565. }
  566. self.assertDictEqual(
  567. {
  568. 'type': 'git',
  569. 'browser': browser_url,
  570. 'url': vcs_url,
  571. },
  572. extract_vcs_information(d)
  573. )
  574. # Browser not found
  575. d = {
  576. 'Vcs-Git': vcs_url,
  577. }
  578. self.assertDictEqual(
  579. {
  580. 'type': 'git',
  581. 'url': vcs_url,
  582. },
  583. extract_vcs_information(d)
  584. )
  585. # A VCS type longer than three letters
  586. d = {
  587. 'Vcs-Darcs': vcs_url,
  588. }
  589. self.assertDictEqual(
  590. {
  591. 'type': 'darcs',
  592. 'url': vcs_url,
  593. },
  594. extract_vcs_information(d)
  595. )
  596. # Empty dict
  597. self.assertDictEqual({}, extract_vcs_information({}))
  598. # No vcs information in the dict
  599. self.assertDictEqual({}, extract_vcs_information({
  600. 'stuff': 'that does not',
  601. 'have': 'anything to do',
  602. 'with': 'vcs'
  603. }))
  604. def test_package_hash_dir(self):
  605. self.assertEqual(package_hashdir("dpkg"), "d")
  606. self.assertEqual(package_hashdir("lua"), "l")
  607. self.assertEqual(package_hashdir("lib"), "lib")
  608. self.assertEqual(package_hashdir("libc6"), "libc")
  609. self.assertEqual(package_hashdir("lib+fancy"), "lib+")
  610. self.assertEqual(package_hashdir(""), "")
  611. self.assertEqual(package_hashdir(None), None)
  612. def test_extract_dsc_file_name(self):
  613. stanza = deb822.Sources(
  614. """Package: curl
  615. Binary: curl
  616. Version: 7.26.0
  617. Maintainer: Maintainer <maintainer@domain.com>
  618. Architecture: any
  619. Standards-Version: 3.9.3
  620. Format: 3.0 (quilt)
  621. Files:
  622. {} 2531 dummy-package_7.26.0.dsc
  623. {} 3073624 dummy-package_7.26.0.orig.tar.gz
  624. {} 33360 dummy-package_7.26.0-1+wheezy3.debian.tar.gz
  625. Checksums-Sha1:
  626. {} 2531 dummy-package_7.26.0.dsc
  627. {} 3073624 dummy-package_7.26.0.orig.tar.gz
  628. {} 33360 dummy-package_7.26.0-1+wheezy3.debian.tar.gz
  629. Checksums-Sha256:
  630. {} 2531 dummy-package_7.26.0.dsc
  631. {} 3073624 dummy-package_7.26.0.orig.tar.gz
  632. {} 33360 dummy-package_7.26.0-1+wheezy3.debian.tar.gz
  633. Directory: pool/updates/main/c/curl
  634. Priority: source
  635. Section: libs
  636. """.format(
  637. '602b2a11624744e2e92353f5e76ad7e6', # noqa
  638. '3fa4d5236f2a36ca5c3af6715e837691', # noqa
  639. '2972826d5b1ebadace83f236e946b33f', # noqa
  640. '50fd8c0de138e80903443927365565151291338c', # noqa
  641. '66e1fd0312f62374b96fe02e644f66202fd6324b', # noqa
  642. 'a0f16b381d3ac3e02de307dced481eaf01b3ead1', # noqa
  643. 'daf4c6c8ad485f98cc6ad684b5de30d7d07e45e521a1a6caf148406f7c9993cd', # noqa
  644. '79ccce9edb8aee17d20ad4d75e1f83a789f8c2e71e68f468e1bf8abf8933193f', # noqa
  645. '335bf9f847e68df71dc0b9bd14863c6a8951198af3ac19fc67b8817835fd0e17', # noqa
  646. )) # noqa
  647. self.assertEqual(
  648. 'dummy-package_7.26.0.dsc',
  649. extract_dsc_file_name(stanza)
  650. )
  651. # No input given
  652. self.assertIsNone(extract_dsc_file_name({}))
  653. # No files entry...
  654. self.assertIsNone(extract_dsc_file_name({
  655. 'package': 'name',
  656. 'version': 'version'
  657. }))
  658. class HttpCacheTest(SimpleTestCase):
  659. def set_mock_response(self, mock_requests, headers=None, status_code=200):
  660. set_mock_response(
  661. mock_requests,
  662. text=self.response_content.decode('utf-8'),
  663. headers=headers,
  664. status_code=status_code)
  665. def setUp(self):
  666. # Set up a cache directory to use in the tests
  667. self.cache_directory = tempfile.mkdtemp(suffix='test-cache')
  668. # Set up a simple response content
  669. self.response_content = 'Simple response'
  670. self.response_content = self.response_content.encode('utf-8')
  671. def tearDown(self):
  672. # Remove the test directory
  673. import shutil
  674. shutil.rmtree(self.cache_directory)
  675. def test_parse_cache_control_header(self):
  676. """
  677. Tests the utility function for parsing a Cache-Control header into a
  678. dict.
  679. """
  680. from distro_tracker.core.utils.http import parse_cache_control_header
  681. header = 'must-revalidate, max-age=3600'
  682. d = parse_cache_control_header(header)
  683. self.assertIn('must-revalidate', d)
  684. self.assertIn('max-age', d)
  685. self.assertEqual(d['max-age'], '3600')
  686. header = 'max-age=0, private'
  687. d = parse_cache_control_header(header)
  688. self.assertIn('private', d)
  689. self.assertIn('max-age', d)
  690. self.assertEqual(d['max-age'], '0')
  691. @mock.patch('distro_tracker.core.utils.http.requests')
  692. def test_update_cache_new_item(self, mock_requests):
  693. """
  694. Tests the simple case of updating the cache with a new URL's response.
  695. """
  696. headers = {
  697. 'Connection': 'Keep-Alive',
  698. 'Content-Type': 'text/plain',
  699. }
  700. self.set_mock_response(mock_requests, headers=headers)
  701. cache = HttpCache(self.cache_directory)
  702. url = 'http://example.com'
  703. # The URL cannot be found in the cache at this point
  704. self.assertFalse(url in cache)
  705. response, updated = cache.update(url)
  706. # The returned response is correct
  707. self.assertEqual(self.response_content, response.content)
  708. self.assertEqual(200, response.status_code)
  709. # The return value indicates the cache has been updated
  710. self.assertTrue(updated)
  711. # The URL is now found in the cache
  712. self.assertTrue(url in cache)
  713. # The content is accessible through the cache
  714. self.assertEqual(self.response_content, cache.get_content(url))
  715. # The returned headers are accessible through the cache
  716. cached_headers = cache.get_headers(url)
  717. for key, value in headers.items():
  718. self.assertIn(key, cached_headers)
  719. self.assertEqual(value, cached_headers[key])
  720. @mock.patch('distro_tracker.core.utils.http.requests')
  721. def test_cache_not_expired(self, mock_requests):
  722. """
  723. Tests that the cache knows a response is not expired based on its
  724. Cache-Control header.
  725. """
  726. self.set_mock_response(mock_requests, headers={
  727. 'Cache-Control': 'must-revalidate, max-age=3600',
  728. })
  729. cache = HttpCache(self.cache_directory)
  730. url = 'http://example.com'
  731. cache.update(url)
  732. self.assertTrue(url in cache)
  733. self.assertFalse(cache.is_expired(url))
  734. @mock.patch('distro_tracker.core.utils.http.requests')
  735. def test_cache_expired(self, mock_requests):
  736. """
  737. Tests that the cache knows when an entry with a stale Cache-Control
  738. header is expired.
  739. """
  740. self.set_mock_response(mock_requests, headers={
  741. 'Cache-Control': 'must-revalidate, max-age=0',
  742. })
  743. cache = HttpCache(self.cache_directory)
  744. url = 'http://example.com'
  745. cache.update(url)
  746. self.assertTrue(url in cache)
  747. self.assertTrue(cache.is_expired(url))
  748. @mock.patch('distro_tracker.core.utils.http.requests')
  749. def test_cache_conditional_get_last_modified(self, mock_requests):
  750. """
  751. Tests that the cache performs a conditional GET request when asked to
  752. update the response for a URL with a Last-Modified header.
  753. """
  754. last_modified = http_date(time.time())
  755. self.set_mock_response(mock_requests, headers={
  756. 'Last-Modified': last_modified
  757. })
  758. cache = HttpCache(self.cache_directory)
  759. url = 'http://example.com'
  760. cache.update(url)
  761. self.response_content = b''
  762. self.set_mock_response(mock_requests, status_code=304)
  763. # Run the update again
  764. response, updated = cache.update(url)
  765. self.assertFalse(updated)
  766. mock_requests.get.assert_called_with(
  767. url, verify=False, allow_redirects=True,
  768. headers={'If-Modified-Since': last_modified})
  769. # The actual server's response is returned
  770. self.assertEqual(response.status_code, 304)
  771. @mock.patch('distro_tracker.core.utils.http.requests')
  772. def test_cache_conditional_get_last_modified_expired(self, mock_requests):
  773. """
  774. Tests that the cache performs a conditional GET request when asked to
  775. update the response for a URL with a Last-Modified header, which has
  776. since expired.
  777. """
  778. last_modified = http_date(time.time() - 3600)
  779. self.set_mock_response(mock_requests, headers={
  780. 'Last-Modified': last_modified
  781. })
  782. cache = HttpCache(self.cache_directory)
  783. url = 'http://example.com'
  784. cache.update(url)
  785. # Set a new Last-Modified and content value
  786. new_last_modified = http_date(time.time())
  787. self.response_content = b'Response'
  788. self.set_mock_response(mock_requests, headers={
  789. 'Last-Modified': new_last_modified
  790. })
  791. # Run the update again
  792. response, updated = cache.update(url)
  793. self.assertTrue(updated)
  794. self.assertEqual(200, response.status_code)
  795. # The new content is found in the cache
  796. self.assertEqual(self.response_content, cache.get_content(url))
  797. # The new Last-Modified is found in the headers cache
  798. self.assertEqual(
  799. new_last_modified,
  800. cache.get_headers(url)['Last-Modified']
  801. )
  802. @mock.patch('distro_tracker.core.utils.http.requests')
  803. def test_cache_expires_header(self, mock_requests):
  804. """
  805. Tests that the cache knows that a cached response is not expired based
  806. on its Expires header.
  807. """
  808. expires = http_date(time.time() + 3600)
  809. self.set_mock_response(mock_requests, headers={
  810. 'Expires': expires
  811. })
  812. cache = HttpCache(self.cache_directory)
  813. url = 'http://example.com'
  814. cache.update(url)
  815. self.assertFalse(cache.is_expired(url))
  816. @mock.patch('distro_tracker.core.utils.http.requests')
  817. def test_cache_expires_header_expired(self, mock_requests):
  818. """
  819. Tests that the cache knows that a cached response is expired based
  820. on its Expires header.
  821. """
  822. expires = http_date(time.time() - 3600)
  823. self.set_mock_response(mock_requests, headers={
  824. 'Expires': expires
  825. })
  826. cache = HttpCache(self.cache_directory)
  827. url = 'http://example.com'
  828. cache.update(url)
  829. self.assertTrue(cache.is_expired(url))
  830. @mock.patch('distro_tracker.core.utils.http.requests')
  831. def test_cache_remove_url(self, mock_requests):
  832. """
  833. Tests removing a cached response.
  834. """
  835. self.set_mock_response(mock_requests)
  836. cache = HttpCache(self.cache_directory)
  837. url = 'http://example.com'
  838. cache.update(url)
  839. # Sanity check - the url is cached
  840. self.assertTrue(url in cache)
  841. cache.remove(url)
  842. self.assertFalse(url in cache)
  843. @mock.patch('distro_tracker.core.utils.http.requests')
  844. def test_conditional_get_etag(self, mock_requests):
  845. """
  846. Tests that the cache performs a conditional GET request when asked to
  847. update the response for a URL with an ETag header
  848. """
  849. etag = '"466010a-11bf9-4e17efa8afb81"'
  850. self.set_mock_response(mock_requests, headers={
  851. 'ETag': etag,
  852. })
  853. cache = HttpCache(self.cache_directory)
  854. url = 'http://example.com'
  855. cache.update(url)
  856. self.response_content = b''
  857. self.set_mock_response(mock_requests, status_code=304)
  858. # Run the update again
  859. response, updated = cache.update(url)
  860. self.assertFalse(updated)
  861. mock_requests.get.assert_called_with(
  862. url, verify=False, allow_redirects=True,
  863. headers={'If-None-Match': etag, })
  864. # The actual server's response is returned
  865. self.assertEqual(response.status_code, 304)
  866. @mock.patch('distro_tracker.core.utils.http.requests')
  867. def test_conditional_get_etag_expired(self, mock_requests):
  868. """
  869. Tests that the cache performs a conditional GET request when asked to
  870. update the response for a URL with an ETag header, which has since
  871. expired.
  872. """
  873. etag = '"466010a-11bf9-4e17efa8afb81"'
  874. self.set_mock_response(mock_requests, headers={
  875. 'ETag': etag,
  876. })
  877. cache = HttpCache(self.cache_directory)
  878. url = 'http://example.com'
  879. cache.update(url)
  880. # Set a new ETag and content value
  881. new_etag = '"57ngfhty11bf9-9t831116kn1qw1'
  882. self.response_content = b'Response'
  883. self.set_mock_response(mock_requests, headers={
  884. 'ETag': new_etag
  885. })
  886. # Run the update again
  887. response, updated = cache.update(url)
  888. self.assertTrue(updated)
  889. self.assertEqual(200, response.status_code)
  890. # The new content is found in the cache
  891. self.assertEqual(self.response_content, cache.get_content(url))
  892. # The new Last-Modified is found in the headers cache
  893. self.assertEqual(
  894. new_etag,
  895. cache.get_headers(url)['ETag']
  896. )
  897. @mock.patch('distro_tracker.core.utils.http.requests')
  898. def test_conditional_force_unconditional_get(self, mock_requests):
  899. """
  900. Tests that the users can force the cache to perform an unconditional
  901. GET when updating a cached resource.
  902. """
  903. last_modified = http_date(time.time())
  904. self.set_mock_response(mock_requests, headers={
  905. 'Last-Modified': last_modified
  906. })
  907. cache = HttpCache(self.cache_directory)
  908. url = 'http://example.com'
  909. cache.update(url)
  910. # Run the update again
  911. response, updated = cache.update(url, force=True)
  912. # Make sure that we ask for a non-cached version
  913. mock_requests.get.assert_called_with(
  914. url, verify=False, allow_redirects=True,
  915. headers={'Cache-Control': 'no-cache'})
  916. self.assertTrue(updated)
  917. def test_get_resource_content_utlity_function_cached(self):
  918. """
  919. Tests the :func:`distro_tracker.core.utils.http.get_resource_content`
  920. utility function when the resource is cached in the given cache
  921. instance.
  922. """
  923. mock_cache = mock.create_autospec(HttpCache)
  924. mock_cache.is_expired.return_value = False
  925. expected_content = b"Some content"
  926. mock_cache.get_content.return_value = expected_content
  927. url = 'http://some.url.com'
  928. content = get_resource_content(url, mock_cache)
  929. # The expected content is retrieved
  930. self.assertEqual(content, expected_content)
  931. # The function did not update the cache
  932. self.assertFalse(mock_cache.update.called)
  933. def test_get_resource_content_utility_function_not_cached(self):
  934. """
  935. Tests the :func:`distro_tracker.core.utils.http.get_resource_content`
  936. utility function when the resource is not cached in the given cache
  937. instance.
  938. """
  939. mock_cache = mock.create_autospec(HttpCache)
  940. mock_cache.is_expired.return_value = True
  941. expected_content = b"Some content"
  942. mock_cache.get_content.return_value = expected_content
  943. url = 'http://some.url.com'
  944. content = get_resource_content(url, mock_cache)
  945. self.assertEqual(content, expected_content)
  946. # The function updated the cache
  947. mock_cache.update.assert_called_once_with(url)
  948. class VerifySignatureTest(SimpleTestCase):
  949. """
  950. Tests the :func:`distro_tracker.core.utils.verify_signature` function.
  951. """
  952. def setUp(self):
  953. self.TEST_KEYRING_DIRECTORY = tempfile.mkdtemp(suffix='-test-keyring')
  954. def tearDown(self):
  955. import shutil
  956. shutil.rmtree(self.TEST_KEYRING_DIRECTORY)
  957. def test_signed_message(self):
  958. """
  959. Tests extracting the signature from a correctly signed message when the
  960. signer is found in the keyring.
  961. """
  962. self.import_key_into_keyring('key1.pub')
  963. file_path = self.get_test_data_path('signed-message')
  964. expected = [
  965. ('PTS Tests', 'fake-address@domain.com')
  966. ]
  967. with open(file_path, 'rb') as f:
  968. self.assertEqual(expected, verify_signature(f.read()))
  969. def test_signed_message_unknown_key(self):
  970. """
  971. Tests extracting the signature from a correctly signed message when the
  972. signer is not found in the keyring.
  973. """
  974. file_path = self.get_test_data_path('signed-message')
  975. with open(file_path, 'rb') as f:
  976. self.assertSequenceEqual([], verify_signature(f.read()))
  977. def test_incorrect_signature(self):
  978. """
  979. Tests extracting signature information when the signature itself is
  980. wrong.
  981. """
  982. with self.settings(
  983. DISTRO_TRACKER_KEYRING_DIRECTORY=self.TEST_KEYRING_DIRECTORY):
  984. self.assertIsNone(verify_signature(b"This is not a signature"))
  985. def test_utf8_content(self):
  986. """
  987. Tests extracting the signature from a message passed as unicode text
  988. instead of bytes.
  989. """
  990. self.import_key_into_keyring('key1.pub')
  991. file_path = self.get_test_data_path('signed-message')
  992. expected = [
  993. ('PTS Tests', 'fake-address@domain.com')
  994. ]
  995. with open(file_path, 'rb') as f:
  996. content = f.read().decode('utf-8')
  997. self.assertEqual(expected, verify_signature(content))
  998. class DecodeHeaderTest(SimpleTestCase):
  999. """
  1000. Tests for :func:`distro_tracker.core.utils.email_messages.decode_header`.
  1001. """
  1002. def test_decode_header_iso(self):
  1003. """
  1004. Single part iso-8859-1 encoded text.
  1005. """
  1006. h = Header(b'M\xfcnchen', 'iso-8859-1')
  1007. header_text = decode_header(h)
  1008. self.assertEqual('München', header_text)
  1009. def test_decode_header_utf8(self):
  1010. """
  1011. Single part utf-8 encoded text.
  1012. """
  1013. h = Header(b'M\xc3\xbcnchen', 'utf-8')
  1014. header_text = decode_header(h)
  1015. self.assertEqual('München', header_text)
  1016. def test_decode_header_multipart(self):
  1017. """
  1018. Two part header: iso-8859-1 and utf-8
  1019. """
  1020. h = Header(b'M\xfcnchen', 'iso-8859-1')
  1021. h.append(b' M\xc3\xbcnchen', 'utf-8')
  1022. header_text = decode_header(h)
  1023. self.assertEqual('München München', header_text)
  1024. def test_decode_header_none(self):
  1025. self.assertIsNone(decode_header(None))
  1026. class AptCacheTests(TestCase):
  1027. """
  1028. Tests for :class:`distro_tracker.core.utils.packages.AptCache`.
  1029. """
  1030. @staticmethod
  1031. def stub_acquire(source_records, dest_dir, debian_dir_only, content):
  1032. # Create a file in the destination directory
  1033. file_name = 'temp'
  1034. file_path = os.path.join(dest_dir, file_name)
  1035. # Create a file of the given size
  1036. with open(file_path, 'wb') as f:
  1037. f.write(content)
  1038. return None, 'ekrem'
  1039. def create_cache(self):
  1040. """
  1041. Helper method which creates an
  1042. :class:`distro_tracker.core.utils.packages.AptCache` instance which is
  1043. used for testing. Some of its methods are replaced by mocks and stubs to
  1044. avoid HTTP calls.
  1045. """
  1046. self.cache = AptCache()
  1047. self.cache._get_apt_source_records = mock.MagicMock()
  1048. self.cache._get_format = mock.MagicMock(return_value='1.0')
  1049. self.cache._extract_dpkg_source = mock.MagicMock()
  1050. self.cached_files = []
  1051. self.cache._get_all_cached_files = mock.MagicMock(
  1052. return_value=self.cached_files)
  1053. self.cache._match_index_file_to_repository = mock.MagicMock()
  1054. def set_stub_acquire_content(self, content):
  1055. """
  1056. Helper method which sets the content of a file which is created by the
  1057. cache instance when retrieve_source is called.
  1058. """
  1059. self.cache._apt_acquire_package = mock.MagicMock(side_effect=curry(
  1060. AptCacheTests.stub_acquire, content=content))
  1061. def set_stub_cached_files_for_repository(self, repository, files):
  1062. """
  1063. Helper method adds the given list of files to the stub list of cached
  1064. files for a given repository.
  1065. :param repository: The repository to which these files are associated.
  1066. :type repository: :class:`Repository
  1067. <distro_tracker.core.models.Repository>`
  1068. :param files: List of cached file names. The function uses the list to
  1069. build the stub by prefixing the names with expected repository
  1070. identifiers.
  1071. """
  1072. # Build the prefix from the repository's URI and suite
  1073. base_uri = repository.uri.rstrip('/')
  1074. if base_uri.startswith('http://'):
  1075. base_uri = base_uri[7:]
  1076. prefix = base_uri + '/' + repository.suite + '/'
  1077. prefix = prefix.replace('/', '_')
  1078. for file_name in files:
  1079. self.cached_files.append(prefix + file_name)
  1080. self.cache._match_index_file_to_repository.return_value = repository
  1081. def assert_cache_size_equal(self, size):
  1082. self.assertEqual(size, self.cache.cache_size)
  1083. def test_cache_size_increase_after_acquire(self):
  1084. """
  1085. Tests that the cache correctly increases its size after acquiring new
  1086. files.
  1087. """
  1088. with make_temp_directory('-dtracker-cache') as cache_directory:
  1089. with self.settings(
  1090. DISTRO_TRACKER_CACHE_DIRECTORY=cache_directory,
  1091. DISTRO_TRACKER_APT_CACHE_MAX_SIZE=10):
  1092. self.create_cache()
  1093. # Sanity check: old size is 0 as nothing was ever cached in the
  1094. # brand new directory
  1095. self.assert_cache_size_equal(0)
  1096. content = b'a' * 5 # 5 bytes
  1097. self.set_stub_acquire_content(content)
  1098. self.cache.retrieve_source('dummy-package', '1.0.0')
  1099. self.assert_cache_size_equal(5)
  1100. def test_cache_multiple_insert_no_remove(self):
  1101. """
  1102. Tests that the cache does not remove packages unless the size limit is
  1103. exceeded.
  1104. """
  1105. with make_temp_directory('-dtracker-cache') as cache_directory:
  1106. with self.settings(
  1107. DISTRO_TRACKER_CACHE_DIRECTORY=cache_directory,
  1108. DISTRO_TRACKER_APT_CACHE_MAX_SIZE=10):
  1109. self.create_cache()
  1110. # Sanity check: old size is 0 as nothing was ever cached in the
  1111. # brand new directory
  1112. self.assert_cache_size_equal(0)
  1113. content = b'a' * 5 # 5 bytes
  1114. self.set_stub_acquire_content(content)
  1115. # Add one file.
  1116. self.cache.retrieve_source('dummy-package', '1.0.0')
  1117. self.assert_cache_size_equal(5)
  1118. # Same content in another file
  1119. self.set_stub_acquire_content(content)
  1120. self.cache.retrieve_source('package', '1.0.0')
  1121. # Both files are now saved.
  1122. self.assert_cache_size_equal(10)
  1123. def test_clear_cache(self):
  1124. """
  1125. Tests that the cache removes packages when it exceeds its allocated
  1126. size.
  1127. """
  1128. with make_temp_directory('-dtracker-cache') as cache_directory:
  1129. with self.settings(
  1130. DISTRO_TRACKER_CACHE_DIRECTORY=cache_directory,
  1131. DISTRO_TRACKER_APT_CACHE_MAX_SIZE=10):
  1132. self.create_cache()
  1133. # Sanity check: old size is 0 as nothing was ever cached in the
  1134. # brand new directory
  1135. self.assert_cache_size_equal(0)
  1136. initial_content = b'a' * 11
  1137. self.set_stub_acquire_content(initial_content)
  1138. # Set initial source content
  1139. self.cache.retrieve_source('dummy-package', '1.0.0')
  1140. self.assert_cache_size_equal(11)
  1141. content = b'a' * 7
  1142. self.set_stub_acquire_content(content)
  1143. self.cache.retrieve_source('package', '1.0.0')
  1144. # Only the second content is found in the package
  1145. self.assert_cache_size_equal(7)
  1146. def test_get_sources_for_repository(self):
  1147. """
  1148. Tests that the cache correctly returns a list of cached Sources files
  1149. for a given repository.
  1150. """
  1151. with make_temp_directory('-dtracker-cache') as cache_directory:
  1152. with self.settings(DISTRO_TRACKER_CACHE_DIRECTORY=cache_directory):
  1153. self.create_cache()
  1154. repository = Repository.objects.create(
  1155. name='stable',
  1156. shorthand='stable',
  1157. uri='http://httpredir.debian.org/debian/dists',
  1158. suite='stable')
  1159. expected_source_files = [
  1160. 'main_source_Sources',
  1161. 'contrib_source_Sources',
  1162. ]
  1163. files = expected_source_files + [
  1164. 'Release',
  1165. 'main_binary-amd64_Packages',
  1166. ]
  1167. self.set_stub_cached_files_for_repository(repository, files)
  1168. sources = \
  1169. self.cache.get_sources_files_for_repository(repository)
  1170. self.assertEqual(len(expected_source_files), len(sources))
  1171. for expected_source, returned_source in zip(
  1172. expected_source_files, sources):
  1173. self.assertTrue(returned_source.endswith(expected_source))
  1174. def test_get_packages_for_repository(self):
  1175. """
  1176. Tests that the cache correctly returns a list of cached Packages files
  1177. for a given repository.
  1178. """
  1179. with make_temp_directory('-dtracker-cache') as cache_directory:
  1180. with self.settings(DISTRO_TRACKER_CACHE_DIRECTORY=cache_directory):
  1181. self.create_cache()
  1182. repository = Repository.objects.create(
  1183. name='stable',
  1184. shorthand='stable',
  1185. uri='http://httpredir.debian.org/debian/dists',
  1186. suite='stable')
  1187. expected_packages_files = [
  1188. 'main_binary-amd64_Packages',
  1189. 'main_binary-i386_Packages',
  1190. ]
  1191. files = expected_packages_files + [
  1192. 'Release',
  1193. 'main_source_Sources',
  1194. ]
  1195. self.set_stub_cached_files_for_repository(repository, files)
  1196. packages = \
  1197. self.cache.get_packages_files_for_repository(repository)
  1198. self.assertEqual(len(expected_packages_files), len(packages))
  1199. for expected, returned in zip(
  1200. expected_packages_files, packages):
  1201. self.assertTrue(returned.endswith(expected))
  1202. class LinkifyTests(TestCase):
  1203. """
  1204. Tests for :func:`distro_tracker.core.utils.linkify`.
  1205. """
  1206. sample_url = "http://www.example.com/foo/"
  1207. https_url = "https://www.example.com.foo/"
  1208. @staticmethod
  1209. def link(url):
  1210. return '<a href="{url}">{url}</a>'.format(url=url)
  1211. @staticmethod
  1212. def debian_bug(bug, baseurl='https://bugs.debian.org/'):
  1213. bugno = bug[1:] if bug[0] == '#' else bug
  1214. return '<a href="{}{}">{}</a>'.format(baseurl, bugno, bug)
  1215. @classmethod
  1216. def lp_bug(cls, bug):
  1217. return cls.debian_bug(bug, 'https://bugs.launchpad.net/bugs/')
  1218. @staticmethod
  1219. def cve_link(cve,
  1220. baseurl='https://cve.mitre.org/cgi-bin/cvename.cgi?name='):
  1221. return '<a href="{}{}">{}</a>'.format(baseurl, cve, cve)
  1222. def setUp(self):
  1223. self.data = {
  1224. 'LinkifyHttpLinks': {
  1225. 'simple': (self.sample_url, self.link(self.sample_url)),
  1226. 'https': (self.https_url, self.link(self.https_url)),
  1227. # Default case, link in text
  1228. 'intext': ('see ' + self.sample_url + ' for example',
  1229. 'see ' + self.link(self.sample_url) +
  1230. ' for example'),
  1231. # Existing HTML links are not re-processed
  1232. 'htmllink': (self.link(self.sample_url),
  1233. self.link(self.sample_url)),
  1234. # Ensure xhttp:// is not recognized as a link
  1235. 'badlink': ('x' + self.sample_url, 'x' + self.sample_url)
  1236. },
  1237. 'LinkifyDebianBugLinks': {
  1238. 'simple': ('closes: ' + '1234', 'closes: ' +
  1239. self.debian_bug(bug='1234')),
  1240. 'withsharp': ('Closes: ' + '#1234', 'Closes: ' +
  1241. self.debian_bug(bug='#1234')),
  1242. 'intext': ('see closes: ' + '#1234' +
  1243. 'for informations',
  1244. 'see closes: ' + self.debian_bug(bug='#1234') +
  1245. 'for informations'),
  1246. 'multipleintext': ('see Closes: ' + '1234, 5678,\n9123' +
  1247. 'or closes: ' + '456' + 'for example',
  1248. 'see Closes: ' +
  1249. self.debian_bug(bug='1234') + ', ' +
  1250. self.debian_bug(bug='5678') + ',\n' +
  1251. self.debian_bug(bug='9123') + 'or ' +
  1252. 'closes: ' +
  1253. self.debian_bug(bug='456') + 'for example'),
  1254. # Case of a Closes field on its single line (space-separated)
  1255. 'closesfield': ('\nCloses: 123 456\n',
  1256. '\nCloses: ' + self.debian_bug('123') + ' ' +
  1257. self.debian_bug('456') + '\n'),
  1258. 'txtbeforefield': ('\nFinally Closes: 123 456\n',
  1259. '\nFinally Closes: ' +
  1260. self.debian_bug('123') + ' 456\n'),
  1261. 'txtafterfield': ('\nCloses: 123 456 foobar\n',
  1262. '\nCloses: ' + self.debian_bug('123') +
  1263. ' 456 foobar\n'),
  1264. },
  1265. 'LinkifyUbuntuBugLinks': {
  1266. 'simple': ('lp: ' + '1234', 'lp: ' +
  1267. self.lp_bug(bug='1234')),
  1268. 'withsharp': ('Lp: ' + '#1234', 'Lp: ' +
  1269. self.lp_bug(bug='#1234')),
  1270. 'intext': ('see lp: ' + '#1234' +
  1271. 'for informations',
  1272. 'see lp: ' + self.lp_bug('#1234') +
  1273. 'for informations'),
  1274. 'multipleintext': ('see lp: ' + '1234, 5678,\n9123' +
  1275. 'or lp: ' + '456' + 'for example',
  1276. 'see lp: ' +
  1277. self.lp_bug(bug='1234') + ', ' +
  1278. self.lp_bug(bug='5678') + ',\n' +
  1279. self.lp_bug(bug='9123') + 'or lp: ' +
  1280. self.lp_bug(bug='456') + 'for example')
  1281. },
  1282. 'LinkifyCVELinks': {
  1283. 'oldformat': ('CVE-2012-1234',
  1284. self.cve_link('CVE-2012-1234')),
  1285. 'newformat': ('CVE-2014-1234567',
  1286. self.cve_link('CVE-2014-1234567')),
  1287. 'intext': ('see ' + 'cve-2014-67890' + ' for informations',
  1288. 'see ' + self.cve_link('cve-2014-67890') +
  1289. ' for informations'),
  1290. 'notinurl': ('foo.debian.org/CVE-2014-1234',
  1291. 'foo.debian.org/CVE-2014-1234'),
  1292. },
  1293. }
  1294. def _test_linkify_class(self, cls):
  1295. linkifier = cls()
  1296. for name, data in self.data[cls.__name__].items():
  1297. output = linkifier.linkify(data[0])
  1298. self.assertEqual(output, data[1],
  1299. '{} failed with "{}" test data'.format(
  1300. cls.__name__, name))
  1301. def test_linkify_http(self):
  1302. """Test the linkifyHttpLinks class"""
  1303. self._test_linkify_class(LinkifyHttpLinks)
  1304. def test_linkify_debian_bug(self):
  1305. """Test the linkifyDebianbug class"""
  1306. self._test_linkify_class(LinkifyDebianBugLinks)
  1307. def test_linkify_ubuntu_bug(self):
  1308. """Test the linkifyUbuntubug class"""
  1309. self._test_linkify_class(LinkifyUbuntuBugLinks)
  1310. def test_linkify_CVE_links(self):
  1311. """Test the LinkifyCVELinks class"""
  1312. self._test_linkify_class(LinkifyCVELinks)
  1313. @override_settings(
  1314. DISTRO_TRACKER_CVE_URL='https://security-tracker.debian.org/tracker/')
  1315. def test_linkify_CVE_links_custom_url(self):
  1316. """Test LinkifyCVELinks with a custom DISTRO_TRACKER_CVE_URL"""
  1317. url = 'https://security-tracker.debian.org/tracker/'
  1318. # Replace the URL in the expected data
  1319. for key, content in self.data['LinkifyCVELinks'].items():
  1320. self.data['LinkifyCVELinks'][key] = (
  1321. content[0],
  1322. content[1].replace(
  1323. "https://cve.mitre.org/cgi-bin/cvename.cgi?name=", url)
  1324. )
  1325. self._test_linkify_class(LinkifyCVELinks)
  1326. def test_linkify(self):
  1327. """Test the linkify function as a combination of all the individual
  1328. tests."""
  1329. text = ''
  1330. expected = ''
  1331. for linkifier_test_data in self.data.values():
  1332. for before, after in linkifier_test_data.values():
  1333. text += before + '\n'
  1334. expected += after + '\n'
  1335. linkify_output = linkify(text)
  1336. self.assertEqual(linkify_output, expected)
  1337. class UtilsTests(TestCase):
  1338. def test_now(self):
  1339. """Ensure distro_tracker.core.utils.now() exists"""
  1340. self.assertIsInstance(now(), datetime.datetime)