test_misc.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # GNU MediaGoblin -- federated, autonomous media hosting
  2. # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. import pytz
  17. import datetime
  18. from werkzeug.datastructures import FileStorage
  19. from .resources import GOOD_JPG
  20. from mediagoblin.db.base import Session
  21. from mediagoblin.media_types import sniff_media
  22. from mediagoblin.submit.lib import new_upload_entry
  23. from mediagoblin.submit.task import collect_garbage
  24. from mediagoblin.db.models import User, MediaEntry, MediaComment
  25. from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
  26. def test_404_for_non_existent(test_app):
  27. res = test_app.get('/does-not-exist/', expect_errors=True)
  28. assert res.status_int == 404
  29. def test_user_deletes_other_comments(test_app):
  30. user_a = fixture_add_user(u"chris_a")
  31. user_b = fixture_add_user(u"chris_b")
  32. media_a = fixture_media_entry(uploader=user_a.id, save=False,
  33. expunge=False, fake_upload=False)
  34. media_b = fixture_media_entry(uploader=user_b.id, save=False,
  35. expunge=False, fake_upload=False)
  36. Session.add(media_a)
  37. Session.add(media_b)
  38. Session.flush()
  39. # Create all 4 possible comments:
  40. for u_id in (user_a.id, user_b.id):
  41. for m_id in (media_a.id, media_b.id):
  42. cmt = MediaComment()
  43. cmt.media_entry = m_id
  44. cmt.author = u_id
  45. cmt.content = u"Some Comment"
  46. Session.add(cmt)
  47. Session.flush()
  48. usr_cnt1 = User.query.count()
  49. med_cnt1 = MediaEntry.query.count()
  50. cmt_cnt1 = MediaComment.query.count()
  51. User.query.get(user_a.id).delete(commit=False)
  52. usr_cnt2 = User.query.count()
  53. med_cnt2 = MediaEntry.query.count()
  54. cmt_cnt2 = MediaComment.query.count()
  55. # One user deleted
  56. assert usr_cnt2 == usr_cnt1 - 1
  57. # One media gone
  58. assert med_cnt2 == med_cnt1 - 1
  59. # Three of four comments gone.
  60. assert cmt_cnt2 == cmt_cnt1 - 3
  61. User.query.get(user_b.id).delete()
  62. usr_cnt2 = User.query.count()
  63. med_cnt2 = MediaEntry.query.count()
  64. cmt_cnt2 = MediaComment.query.count()
  65. # All users gone
  66. assert usr_cnt2 == usr_cnt1 - 2
  67. # All media gone
  68. assert med_cnt2 == med_cnt1 - 2
  69. # All comments gone
  70. assert cmt_cnt2 == cmt_cnt1 - 4
  71. def test_media_deletes_broken_attachment(test_app):
  72. user_a = fixture_add_user(u"chris_a")
  73. media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False)
  74. media.attachment_files.append(dict(
  75. name=u"some name",
  76. filepath=[u"does", u"not", u"exist"],
  77. ))
  78. Session.add(media)
  79. Session.flush()
  80. MediaEntry.query.get(media.id).delete()
  81. User.query.get(user_a.id).delete()
  82. def test_garbage_collection_task(test_app):
  83. """ Test old media entry are removed by GC task """
  84. user = fixture_add_user()
  85. # Create a media entry that's unprocessed and over an hour old.
  86. entry_id = 72
  87. now = datetime.datetime.now(pytz.UTC)
  88. file_data = FileStorage(
  89. stream=open(GOOD_JPG, "rb"),
  90. filename="mah_test.jpg",
  91. content_type="image/jpeg"
  92. )
  93. # Find media manager
  94. media_type, media_manager = sniff_media(file_data, "mah_test.jpg")
  95. entry = new_upload_entry(user)
  96. entry.id = entry_id
  97. entry.title = "Mah Image"
  98. entry.slug = "slugy-slug-slug"
  99. entry.media_type = 'image'
  100. entry.created = now - datetime.timedelta(days=2)
  101. entry.save()
  102. # Validate the model exists
  103. assert MediaEntry.query.filter_by(id=entry_id).first() is not None
  104. # Call the garbage collection task
  105. collect_garbage()
  106. # Now validate the image has been deleted
  107. assert MediaEntry.query.filter_by(id=entry_id).first() is None