# -*- coding: utf-8 -*- """ Flask-Security ~~~~~~~~~~~~~~ This module was generated by Flask-Security to give developers greater control over the various security mechanisms. For more information about using this feature see: TODO: Documentation URL """ from datetime import datetime from flask import current_app as app, redirect, request, session, \ render_template, jsonify, Blueprint from flask.ext.login import login_user, logout_user from flask.ext.principal import Identity, AnonymousIdentity, identity_changed from werkzeug.datastructures import MultiDict from werkzeug.local import LocalProxy from flask_security.confirmable import confirm_by_token, reset_confirmation_token from flask_security.decorators import login_required from flask_security.exceptions import ConfirmationError, BadCredentialsError, \ ResetPasswordError from flask_security.forms import LoginForm, RegisterForm, ForgotPasswordForm, \ ResetPasswordForm, ResendConfirmationForm from flask_security.recoverable import reset_by_token, \ reset_password_reset_token from flask_security.signals import user_registered from flask_security.tokens import generate_authentication_token from flask_security.utils import get_url, get_post_login_redirect, do_flash, \ get_remember_token, get_message, config_value # Convenient references _security = LocalProxy(lambda: app.security) _datastore = LocalProxy(lambda: app.security.datastore) _logger = LocalProxy(lambda: app.logger) def _do_login(user, remember=True): """Performs the login and sends the appropriate signal.""" if not login_user(user, remember): return False if user.authentication_token is None: user.authentication_token = generate_authentication_token(user) if remember: user.remember_token = get_remember_token(user.email, user.password) if _security.trackable: old_current, new_current = user.current_login_at, datetime.utcnow() user.last_login_at = old_current or new_current user.current_login_at = new_current old_current, new_current = user.current_login_ip, request.remote_addr user.last_login_ip = old_current or new_current user.current_login_ip = new_current user.login_count = user.login_count + 1 if user.login_count else 0 _datastore._save_model(user) identity_changed.send(app._get_current_object(), identity=Identity(user.id)) _logger.debug('User %s logged in' % user) return True def _json_auth_ok(user): return jsonify({ "meta": { "code": 200 }, "response": { "user": { "id": str(user.id), "authentication_token": user.authentication_token } } }) def _json_auth_error(msg): resp = jsonify({ "meta": { "code": 400 }, "response": { "error": msg } }) resp.status_code = 400 return resp def authenticate(): """View function which handles an authentication request.""" form = LoginForm(MultiDict(request.json) if request.json else request.form) try: user = _security.auth_provider.authenticate(form) if _do_login(user, remember=form.remember.data): if request.json: return _json_auth_ok(user) return redirect(get_post_login_redirect()) raise BadCredentialsError('Account is disabled') except BadCredentialsError, e: msg = str(e) _logger.debug('Unsuccessful authentication attempt: %s' % msg) if request.json: return _json_auth_error(msg) do_flash(msg, 'error') return redirect(request.referrer or _security.login_manager.login_view) def logout(): """View function which handles a logout request.""" for key in ('identity.name', 'identity.auth_type'): session.pop(key, None) identity_changed.send(app._get_current_object(), identity=AnonymousIdentity()) logout_user() _logger.debug('User logged out') return redirect(request.args.get('next', None) or _security.post_logout_view) def register_user(): """View function which handles a registration request.""" form = RegisterForm(csrf_enabled=not app.testing) if form.validate_on_submit(): # Create user u = _datastore.create_user(**form.to_dict()) # Send confirmation instructions if necessary t = reset_confirmation_token(u) if _security.confirmable else None data = dict(user=u, confirm_token=t) user_registered.send(data, app=app._get_current_object()) _logger.debug('User %s registered' % u) # Login the user if allowed if not _security.confirmable or _security.login_without_confirmation: _do_login(u) return redirect(_security.post_register_view or _security.post_login_view) return render_template('security/registrations/new.html', register_user_form=form) def send_confirmation(): form = ResendConfirmationForm() if form.validate_on_submit(): user = _datastore.find_user(email=form.email.data) reset_confirmation_token(user) _logger.debug('%s request confirmation instructions' % user) msg, cat = get_message('CONFIRMATION_REQUEST', email=user.email) do_flash(msg, cat) else: for key, value in form.errors.items(): do_flash(value[0], 'error') return render_template('security/confirmations/new.html', reset_confirmation_form=form) def confirm_account(token): """View function which handles a account confirmation request.""" try: user = confirm_by_token(token) _logger.debug('%s confirmed their account' % user) except ConfirmationError, e: msg, cat = str(e), 'error' _logger.debug('Confirmation error: ' + msg) if e.user: reset_confirmation_token(e.user) msg, cat = get_message('CONFIRMATION_EXPIRED', within=_security.confirm_email_within, email=e.user.email) do_flash(msg, cat) return redirect(get_url(_security.confirm_error_view)) do_flash(get_message('ACCOUNT_CONFIRMED')) return redirect(_security.post_confirm_view or _security.post_login_view) def forgot_password(): """View function that handles a forgotten password request.""" form = ForgotPasswordForm(csrf_enabled=not app.testing) if form.validate_on_submit(): user = _datastore.find_user(**form.to_dict()) reset_password_reset_token(user) _logger.debug('%s requested to reset their password' % user) msg, cat = get_message('PASSWORD_RESET_REQUEST', email=user.email) do_flash(msg, cat) return redirect(_security.post_forgot_view) else: _logger.debug('A reset password request was made for %s but ' 'that email does not exist.' % form.email.data) for key, value in form.errors.items(): do_flash(value[0], 'error') return render_template('security/passwords/new.html', forgot_password_form=form) def reset_password(token): """View function that handles a reset password request.""" form = ResetPasswordForm(csrf_enabled=not app.testing) if form.validate_on_submit(): try: user = reset_by_token(token=token, **form.to_dict()) _logger.debug('%s reset their password' % user) except ResetPasswordError, e: msg, cat = str(e), 'error' _logger.debug('Password reset error: ' + msg) if e.user: reset_password_reset_token(e.user) msg, cat = get_message('PASSWORD_RESET_EXPIRED', within=_security.reset_password_within, email=e.user.email) do_flash(msg, cat) return render_template('security/passwords/edit.html', reset_password_form=form, password_reset_token=token) def create_blueprint(app, name, import_name, **kwargs): bp = Blueprint(name, import_name, **kwargs) bp.route(config_value('AUTH_URL', app=app), methods=['POST'], endpoint='authenticate')(authenticate) bp.route(config_value('LOGOUT_URL', app=app), endpoint='logout')(login_required(logout)) if config_value('REGISTERABLE', app=app): bp.route(config_value('REGISTER_URL', app=app), methods=['GET', 'POST'], endpoint='register')(register_user) if config_value('RECOVERABLE', app=app): bp.route(config_value('RESET_URL', app=app), methods=['GET', 'POST'], endpoint='forgot_password')(forgot_password) bp.route(config_value('RESET_URL', app=app) + '/', methods=['GET', 'POST'], endpoint='reset_password')(reset_password) if config_value('CONFIRMABLE', app=app): bp.route(config_value('CONFIRM_URL', app=app), methods=['GET', 'POST'], endpoint='send_confirmation')(send_confirmation) bp.route(config_value('CONFIRM_URL', app=app) + '/', methods=['GET', 'POST'], endpoint='confirm_account')(confirm_account) return bp