123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- # GNU MediaGoblin -- federated, autonomous media hosting
- # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- import six
- from openid.consumer import consumer
- from openid.consumer.discover import DiscoveryFailure
- from openid.extensions.sreg import SRegRequest, SRegResponse
- from mediagoblin import mg_globals, messages
- from mediagoblin.db.models import User
- from mediagoblin.decorators import (auth_enabled, allow_registration,
- require_active_login)
- from mediagoblin.tools.response import redirect, render_to_response
- from mediagoblin.tools.translate import pass_to_ugettext as _
- from mediagoblin.plugins.openid import forms as auth_forms
- from mediagoblin.plugins.openid.models import OpenIDUserURL
- from mediagoblin.plugins.openid.store import SQLAlchemyOpenIDStore
- from mediagoblin.auth.tools import register_user
- def _start_verification(request, form, return_to, sreg=True):
- """
- Start OpenID Verification.
- Returns False if verification fails, otherwise, will return either a
- redirect or render_to_response object
- """
- openid_url = form.openid.data
- c = consumer.Consumer(request.session, SQLAlchemyOpenIDStore())
- # Try to discover provider
- try:
- auth_request = c.begin(openid_url)
- except DiscoveryFailure:
- # Discovery failed, return to login page
- form.openid.errors.append(
- _('Sorry, the OpenID server could not be found'))
- return False
- host = 'http://' + request.host
- if sreg:
- # Ask provider for email and nickname
- auth_request.addExtension(SRegRequest(required=['email', 'nickname']))
- # Do we even need this?
- if auth_request is None:
- form.openid.errors.append(
- _('No OpenID service was found for %s' % openid_url))
- elif auth_request.shouldSendRedirect():
- # Begin the authentication process as a HTTP redirect
- redirect_url = auth_request.redirectURL(
- host, return_to)
- return redirect(
- request, location=redirect_url)
- else:
- # Send request as POST
- form_html = auth_request.htmlMarkup(
- host, host + return_to,
- # Is this necessary?
- form_tag_attrs={'id': 'openid_message'})
- # Beware: this renders a template whose content is a form
- # and some javascript to submit it upon page load. Non-JS
- # users will have to click the form submit button to
- # initiate OpenID authentication.
- return render_to_response(
- request,
- 'mediagoblin/plugins/openid/request_form.html',
- {'html': form_html})
- return False
- def _finish_verification(request):
- """
- Complete OpenID Verification Process.
- If the verification failed, will return false, otherwise, will return
- the response
- """
- c = consumer.Consumer(request.session, SQLAlchemyOpenIDStore())
- # Check the response from the provider
- response = c.complete(request.args, request.base_url)
- if response.status == consumer.FAILURE:
- messages.add_message(
- request,
- messages.WARNING,
- _('Verification of %s failed: %s' %
- (response.getDisplayIdentifier(), response.message)))
- elif response.status == consumer.SUCCESS:
- # Verification was successfull
- return response
- elif response.status == consumer.CANCEL:
- # Verification canceled
- messages.add_message(
- request,
- messages.WARNING,
- _('Verification cancelled'))
- return False
- def _response_email(response):
- """ Gets the email from the OpenID providers response"""
- sreg_response = SRegResponse.fromSuccessResponse(response)
- if sreg_response and 'email' in sreg_response:
- return sreg_response.data['email']
- return None
- def _response_nickname(response):
- """ Gets the nickname from the OpenID providers response"""
- sreg_response = SRegResponse.fromSuccessResponse(response)
- if sreg_response and 'nickname' in sreg_response:
- return sreg_response.data['nickname']
- return None
- @auth_enabled
- def login(request):
- """OpenID Login View"""
- login_form = auth_forms.LoginForm(request.form)
- allow_registration = mg_globals.app_config["allow_registration"]
- # Can't store next in request.GET because of redirects to OpenID provider
- # Store it in the session
- next = request.GET.get('next')
- request.session['next'] = next
- login_failed = False
- if request.method == 'POST' and login_form.validate():
- return_to = request.urlgen(
- 'mediagoblin.plugins.openid.finish_login')
- success = _start_verification(request, login_form, return_to)
- if success:
- return success
- login_failed = True
- return render_to_response(
- request,
- 'mediagoblin/plugins/openid/login.html',
- {'login_form': login_form,
- 'next': request.session.get('next'),
- 'login_failed': login_failed,
- 'post_url': request.urlgen('mediagoblin.plugins.openid.login'),
- 'allow_registration': allow_registration})
- @auth_enabled
- def finish_login(request):
- """Complete OpenID Login Process"""
- response = _finish_verification(request)
- if not response:
- # Verification failed, redirect to login page.
- return redirect(request, 'mediagoblin.plugins.openid.login')
- # Verification was successfull
- query = OpenIDUserURL.query.filter_by(
- openid_url=response.identity_url,
- ).first()
- user = query.user if query else None
- if user:
- # Set up login in session
- request.session['user_id'] = six.text_type(user.id)
- request.session.save()
- if request.session.get('next'):
- return redirect(request, location=request.session.pop('next'))
- else:
- return redirect(request, "index")
- else:
- # No user, need to register
- if not mg_globals.app_config['allow_registration']:
- messages.add_message(
- request,
- messages.WARNING,
- _('Sorry, registration is disabled on this instance.'))
- return redirect(request, 'index')
- # Get email and nickname from response
- email = _response_email(response)
- username = _response_nickname(response)
- register_form = auth_forms.RegistrationForm(request.form,
- openid=response.identity_url,
- email=email,
- username=username)
- return render_to_response(
- request,
- 'mediagoblin/auth/register.html',
- {'register_form': register_form,
- 'post_url': request.urlgen('mediagoblin.plugins.openid.register')})
- @allow_registration
- @auth_enabled
- def register(request):
- """OpenID Registration View"""
- if request.method == 'GET':
- # Need to connect to openid provider before registering a user to
- # get the users openid url. If method is 'GET', then this page was
- # acessed without logging in first.
- return redirect(request, 'mediagoblin.plugins.openid.login')
- register_form = auth_forms.RegistrationForm(request.form)
- if register_form.validate():
- user = register_user(request, register_form)
- if user:
- # redirect the user to their homepage... there will be a
- # message waiting for them to verify their email
- return redirect(
- request, 'mediagoblin.user_pages.user_home',
- user=user.username)
- return render_to_response(
- request,
- 'mediagoblin/auth/register.html',
- {'register_form': register_form,
- 'post_url': request.urlgen('mediagoblin.plugins.openid.register')})
- @require_active_login
- def start_edit(request):
- """Starts the process of adding an openid url to a users account"""
- form = auth_forms.LoginForm(request.form)
- if request.method == 'POST' and form.validate():
- query = OpenIDUserURL.query.filter_by(
- openid_url=form.openid.data
- ).first()
- user = query.user if query else None
- if not user:
- return_to = request.urlgen('mediagoblin.plugins.openid.finish_edit')
- success = _start_verification(request, form, return_to, False)
- if success:
- return success
- else:
- form.openid.errors.append(
- _('Sorry, an account is already registered to that OpenID.'))
- return render_to_response(
- request,
- 'mediagoblin/plugins/openid/add.html',
- {'form': form,
- 'post_url': request.urlgen('mediagoblin.plugins.openid.edit')})
- @require_active_login
- def finish_edit(request):
- """Finishes the process of adding an openid url to a user"""
- response = _finish_verification(request)
- if not response:
- # Verification failed, redirect to add openid page.
- return redirect(request, 'mediagoblin.plugins.openid.edit')
- # Verification was successfull
- query = OpenIDUserURL.query.filter_by(
- openid_url=response.identity_url,
- ).first()
- user_exists = query.user if query else None
- if user_exists:
- # user exists with that openid url, redirect back to edit page
- messages.add_message(
- request,
- messages.WARNING,
- _('Sorry, an account is already registered to that OpenID.'))
- return redirect(request, 'mediagoblin.plugins.openid.edit')
- else:
- # Save openid to user
- user = User.query.filter_by(
- id=request.session['user_id']
- ).first()
- new_entry = OpenIDUserURL()
- new_entry.openid_url = response.identity_url
- new_entry.user_id = user.id
- new_entry.save()
- messages.add_message(
- request,
- messages.SUCCESS,
- _('Your OpenID url was saved successfully.'))
- return redirect(request, 'mediagoblin.edit.account')
- @require_active_login
- def delete_openid(request):
- """View to remove an openid from a users account"""
- form = auth_forms.LoginForm(request.form)
- if request.method == 'POST' and form.validate():
- # Check if a user has this openid
- query = OpenIDUserURL.query.filter_by(
- openid_url=form.openid.data
- )
- user = query.first().user if query.first() else None
- if user and user.id == int(request.session['user_id']):
- count = len(user.openid_urls)
- if not count > 1 and not user.pw_hash:
- # Make sure the user has a pw or another OpenID
- messages.add_message(
- request,
- messages.WARNING,
- _("You can't delete your only OpenID URL unless you"
- " have a password set"))
- elif user:
- # There is a user, but not the same user who is logged in
- form.openid.errors.append(
- _('That OpenID is not registered to this account.'))
- if not form.errors and not request.session.get('messages'):
- # Okay to continue with deleting openid
- return_to = request.urlgen(
- 'mediagoblin.plugins.openid.finish_delete')
- success = _start_verification(request, form, return_to, False)
- if success:
- return success
- return render_to_response(
- request,
- 'mediagoblin/plugins/openid/delete.html',
- {'form': form,
- 'post_url': request.urlgen('mediagoblin.plugins.openid.delete')})
- @require_active_login
- def finish_delete(request):
- """Finishes the deletion of an OpenID from an user's account"""
- response = _finish_verification(request)
- if not response:
- # Verification failed, redirect to delete openid page.
- return redirect(request, 'mediagoblin.plugins.openid.delete')
- query = OpenIDUserURL.query.filter_by(
- openid_url=response.identity_url
- )
- user = query.first().user if query.first() else None
- # Need to check this again because of generic openid urls such as google's
- if user and user.id == int(request.session['user_id']):
- count = len(user.openid_urls)
- if count > 1 or user.pw_hash:
- # User has more then one openid or also has a password.
- query.first().delete()
- messages.add_message(
- request,
- messages.SUCCESS,
- _('OpenID was successfully removed.'))
- return redirect(request, 'mediagoblin.edit.account')
- elif not count > 1:
- messages.add_message(
- request,
- messages.WARNING,
- _("You can't delete your only OpenID URL unless you have a "
- "password set"))
- return redirect(request, 'mediagoblin.plugins.openid.delete')
- else:
- messages.add_message(
- request,
- messages.WARNING,
- _('That OpenID is not registered to this account.'))
- return redirect(request, 'mediagoblin.plugins.openid.delete')
|