123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- # GNU MediaGoblin -- federated, autonomous media hosting
- # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- import datetime
- import urllib
- import six
- from oauthlib.oauth1.rfc5849.utils import UNICODE_ASCII_CHARACTER_SET
- from oauthlib.oauth1 import (RequestTokenEndpoint, AuthorizationEndpoint,
- AccessTokenEndpoint)
- from mediagoblin.decorators import require_active_login
- from mediagoblin.tools.translate import pass_to_ugettext
- from mediagoblin.meddleware.csrf import csrf_exempt
- from mediagoblin.tools.request import decode_request
- from mediagoblin.tools.response import (render_to_response, redirect,
- json_response, render_400,
- form_response)
- from mediagoblin.tools.crypto import random_string
- from mediagoblin.tools.validator import validate_email, validate_url
- from mediagoblin.oauth.forms import AuthorizeForm
- from mediagoblin.oauth.oauth import GMGRequestValidator, GMGRequest
- from mediagoblin.oauth.tools.request import decode_authorization_header
- from mediagoblin.oauth.tools.forms import WTFormData
- from mediagoblin.db.models import NonceTimestamp, Client, RequestToken
- # possible client types
- CLIENT_TYPES = ["web", "native"] # currently what pump supports
- @csrf_exempt
- def client_register(request):
- """ Endpoint for client registration """
- try:
- data = decode_request(request)
- except ValueError:
- error = "Could not decode data."
- return json_response({"error": error}, status=400)
- if data is "":
- error = "Unknown Content-Type"
- return json_response({"error": error}, status=400)
- if "type" not in data:
- error = "No registration type provided."
- return json_response({"error": error}, status=400)
- if data.get("application_type", None) not in CLIENT_TYPES:
- error = "Unknown application_type."
- return json_response({"error": error}, status=400)
- client_type = data["type"]
- if client_type == "client_update":
- # updating a client
- if "client_id" not in data:
- error = "client_id is requried to update."
- return json_response({"error": error}, status=400)
- elif "client_secret" not in data:
- error = "client_secret is required to update."
- return json_response({"error": error}, status=400)
- client = Client.query.filter_by(
- id=data["client_id"],
- secret=data["client_secret"]
- ).first()
- if client is None:
- error = "Unauthorized."
- return json_response({"error": error}, status=403)
- client.application_name = data.get(
- "application_name",
- client.application_name
- )
- client.application_type = data.get(
- "application_type",
- client.application_type
- )
- app_name = ("application_type", client.application_name)
- if app_name in CLIENT_TYPES:
- client.application_name = app_name
- elif client_type == "client_associate":
- # registering
- if "client_id" in data:
- error = "Only set client_id for update."
- return json_response({"error": error}, status=400)
- elif "access_token" in data:
- error = "access_token not needed for registration."
- return json_response({"error": error}, status=400)
- elif "client_secret" in data:
- error = "Only set client_secret for update."
- return json_response({"error": error}, status=400)
- # generate the client_id and client_secret
- client_id = random_string(22, UNICODE_ASCII_CHARACTER_SET)
- client_secret = random_string(43, UNICODE_ASCII_CHARACTER_SET)
- expirey = 0 # for now, lets not have it expire
- expirey_db = None if expirey == 0 else expirey
- application_type = data["application_type"]
- # save it
- client = Client(
- id=client_id,
- secret=client_secret,
- expirey=expirey_db,
- application_type=application_type,
- )
- else:
- error = "Invalid registration type"
- return json_response({"error": error}, status=400)
- logo_uri = data.get("logo_uri", client.logo_url)
- if logo_uri is not None and not validate_url(logo_uri):
- error = "Logo URI {0} is not a valid URI.".format(logo_uri)
- return json_response(
- {"error": error},
- status=400
- )
- else:
- client.logo_url = logo_uri
- client.application_name = data.get("application_name", None)
- contacts = data.get("contacts", None)
- if contacts is not None:
- if not isinstance(contacts, six.text_type):
- error = "Contacts must be a string of space-seporated email addresses."
- return json_response({"error": error}, status=400)
- contacts = contacts.split()
- for contact in contacts:
- if not validate_email(contact):
- # not a valid email
- error = "Email {0} is not a valid email.".format(contact)
- return json_response({"error": error}, status=400)
- client.contacts = contacts
- redirect_uris = data.get("redirect_uris", None)
- if redirect_uris is not None:
- if not isinstance(redirect_uris, six.text_type):
- error = "redirect_uris must be space-seporated URLs."
- return json_response({"error": error}, status=400)
- redirect_uris = redirect_uris.split()
- for uri in redirect_uris:
- if not validate_url(uri):
- # not a valid uri
- error = "URI {0} is not a valid URI".format(uri)
- return json_response({"error": error}, status=400)
- client.redirect_uri = redirect_uris
- client.save()
- expirey = 0 if client.expirey is None else client.expirey
- return json_response(
- {
- "client_id": client.id,
- "client_secret": client.secret,
- "expires_at": expirey,
- })
- @csrf_exempt
- def request_token(request):
- """ Returns request token """
- try:
- data = decode_request(request)
- except ValueError:
- error = "Could not decode data."
- return json_response({"error": error}, status=400)
- if not data and request.headers:
- data = request.headers
- data = dict(data) # mutableifying
- authorization = decode_authorization_header(data)
- if authorization == dict() or u"oauth_consumer_key" not in authorization:
- error = "Missing required parameter."
- return json_response({"error": error}, status=400)
- # check the client_id
- client_id = authorization[u"oauth_consumer_key"]
- client = Client.query.filter_by(id=client_id).first()
- if client == None:
- # client_id is invalid
- error = "Invalid client_id"
- return json_response({"error": error}, status=400)
- # make request token and return to client
- request_validator = GMGRequestValidator(authorization)
- rv = RequestTokenEndpoint(request_validator)
- tokens = rv.create_request_token(request, authorization)
- # store the nonce & timestamp before we return back
- nonce = authorization[u"oauth_nonce"]
- timestamp = authorization[u"oauth_timestamp"]
- timestamp = datetime.datetime.fromtimestamp(float(timestamp))
- nc = NonceTimestamp(nonce=nonce, timestamp=timestamp)
- nc.save()
- return form_response(tokens)
- @require_active_login
- def authorize(request):
- """ Displays a page for user to authorize """
- if request.method == "POST":
- return authorize_finish(request)
- _ = pass_to_ugettext
- token = request.args.get("oauth_token", None)
- if token is None:
- # no token supplied, display a html 400 this time
- err_msg = _("Must provide an oauth_token.")
- return render_400(request, err_msg=err_msg)
- oauth_request = RequestToken.query.filter_by(token=token).first()
- if oauth_request is None:
- err_msg = _("No request token found.")
- return render_400(request, err_msg)
- if oauth_request.used:
- return authorize_finish(request)
- if oauth_request.verifier is None:
- orequest = GMGRequest(request)
- orequest.resource_owner_key = token
- request_validator = GMGRequestValidator()
- auth_endpoint = AuthorizationEndpoint(request_validator)
- verifier = auth_endpoint.create_verifier(orequest, {})
- oauth_request.verifier = verifier["oauth_verifier"]
- oauth_request.user = request.user.id
- oauth_request.save()
- # find client & build context
- client = Client.query.filter_by(id=oauth_request.client).first()
- authorize_form = AuthorizeForm(WTFormData({
- "oauth_token": oauth_request.token,
- "oauth_verifier": oauth_request.verifier
- }))
- context = {
- "user": request.user,
- "oauth_request": oauth_request,
- "client": client,
- "authorize_form": authorize_form,
- }
- # AuthorizationEndpoint
- return render_to_response(
- request,
- "mediagoblin/api/authorize.html",
- context
- )
- def authorize_finish(request):
- """ Finishes the authorize """
- _ = pass_to_ugettext
- token = request.form["oauth_token"]
- verifier = request.form["oauth_verifier"]
- oauth_request = RequestToken.query.filter_by(token=token, verifier=verifier)
- oauth_request = oauth_request.first()
- if oauth_request is None:
- # invalid token or verifier
- err_msg = _("No request token found.")
- return render_400(request, err_msg)
- oauth_request.used = True
- oauth_request.updated = datetime.datetime.now()
- oauth_request.save()
- if oauth_request.callback == "oob":
- # out of bounds
- context = {"oauth_request": oauth_request}
- return render_to_response(
- request,
- "mediagoblin/api/oob.html",
- context
- )
- # okay we need to redirect them then!
- querystring = "?oauth_token={0}&oauth_verifier={1}".format(
- oauth_request.token,
- oauth_request.verifier
- )
- # It's come from the OAuth headers so it'll be encoded.
- redirect_url = urllib.unquote(oauth_request.callback)
- return redirect(
- request,
- querystring=querystring,
- location=redirect_url
- )
- @csrf_exempt
- def access_token(request):
- """ Provides an access token based on a valid verifier and request token """
- data = request.headers
- parsed_tokens = decode_authorization_header(data)
- if parsed_tokens == dict() or "oauth_token" not in parsed_tokens:
- error = "Missing required parameter."
- return json_response({"error": error}, status=400)
- request.resource_owner_key = parsed_tokens["oauth_consumer_key"]
- request.oauth_token = parsed_tokens["oauth_token"]
- request_validator = GMGRequestValidator(data)
- av = AccessTokenEndpoint(request_validator)
- tokens = av.create_access_token(request, {})
- return form_response(tokens)
|