test_oauth1.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # GNU MediaGoblin -- federated, autonomous media hosting
  2. # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. import pytest
  17. from six.moves.urllib.parse import parse_qs, urlparse
  18. from oauthlib.oauth1 import Client
  19. from mediagoblin import mg_globals
  20. from mediagoblin.tools import template, pluginapi
  21. from mediagoblin.tests.tools import fixture_add_user
  22. class TestOAuth(object):
  23. MIME_FORM = "application/x-www-form-urlencoded"
  24. MIME_JSON = "application/json"
  25. @pytest.fixture(autouse=True)
  26. def setup(self, test_app):
  27. self.test_app = test_app
  28. self.db = mg_globals.database
  29. self.pman = pluginapi.PluginManager()
  30. self.user_password = "AUserPassword123"
  31. self.user = fixture_add_user("OAuthy", self.user_password)
  32. self.login()
  33. def login(self):
  34. self.test_app.post(
  35. "/auth/login/", {
  36. "username": self.user.username,
  37. "password": self.user_password})
  38. def register_client(self, **kwargs):
  39. """ Regiters a client with the API """
  40. kwargs["type"] = "client_associate"
  41. kwargs["application_type"] = kwargs.get("application_type", "native")
  42. return self.test_app.post("/api/client/register", kwargs)
  43. def test_client_client_register_limited_info(self):
  44. """ Tests that a client can be registered with limited information """
  45. response = self.register_client()
  46. client_info = response.json
  47. client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
  48. assert response.status_int == 200
  49. assert client is not None
  50. def test_client_register_full_info(self):
  51. """ Provides every piece of information possible to register client """
  52. query = {
  53. "application_name": "Testificate MD",
  54. "application_type": "web",
  55. "contacts": "someone@someplace.com tuteo@tsengeo.lu",
  56. "logo_uri": "http://ayrel.com/utral.png",
  57. "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu",
  58. }
  59. response = self.register_client(**query)
  60. client_info = response.json
  61. client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
  62. assert client is not None
  63. assert client.secret == client_info["client_secret"]
  64. assert client.application_type == query["application_type"]
  65. assert client.redirect_uri == query["redirect_uris"].split()
  66. assert client.logo_url == query["logo_uri"]
  67. assert client.contacts == query["contacts"].split()
  68. def test_client_update(self):
  69. """ Tests that you can update a client """
  70. # first we need to register a client
  71. response = self.register_client()
  72. client_info = response.json
  73. client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
  74. # Now update
  75. update_query = {
  76. "type": "client_update",
  77. "application_name": "neytiri",
  78. "contacts": "someone@someplace.com abc@cba.com",
  79. "logo_uri": "http://place.com/picture.png",
  80. "application_type": "web",
  81. "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/",
  82. }
  83. update_response = self.register_client(**update_query)
  84. assert update_response.status_int == 200
  85. client_info = update_response.json
  86. client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
  87. assert client.secret == client_info["client_secret"]
  88. assert client.application_type == update_query["application_type"]
  89. assert client.application_name == update_query["application_name"]
  90. assert client.contacts == update_query["contacts"].split()
  91. assert client.logo_url == update_query["logo_uri"]
  92. assert client.redirect_uri == update_query["redirect_uris"].split()
  93. def to_authorize_headers(self, data):
  94. headers = ""
  95. for key, value in data.items():
  96. headers += '{0}="{1}",'.format(key, value)
  97. return {"Authorization": "OAuth " + headers[:-1]}
  98. def test_request_token(self):
  99. """ Test a request for a request token """
  100. response = self.register_client()
  101. client_id = response.json["client_id"]
  102. endpoint = "/oauth/request_token"
  103. request_query = {
  104. "oauth_consumer_key": client_id,
  105. "oauth_nonce": "abcdefghij",
  106. "oauth_timestamp": 123456789.0,
  107. "oauth_callback": "https://some.url/callback",
  108. }
  109. headers = self.to_authorize_headers(request_query)
  110. headers["Content-Type"] = self.MIME_FORM
  111. response = self.test_app.post(endpoint, headers=headers)
  112. response = parse_qs(response.body.decode())
  113. # each element is a list, reduce it to a string
  114. for key, value in response.items():
  115. response[key] = value[0]
  116. request_token = self.db.RequestToken.query.filter_by(
  117. token=response["oauth_token"]
  118. ).first()
  119. client = self.db.Client.query.filter_by(id=client_id).first()
  120. assert request_token is not None
  121. assert request_token.secret == response["oauth_token_secret"]
  122. assert request_token.client == client.id
  123. assert request_token.used == False
  124. assert request_token.callback == request_query["oauth_callback"]