Starting a large refactor and adding confirmation abilities

This commit is contained in:
Matt Wright
2012-05-11 13:23:42 -04:00
parent 1ae711279e
commit 2b587f7047
17 changed files with 633 additions and 528 deletions
+19 -12
View File
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
# a little trick so you can run:
# $ python example/app.py
# from the root of the security project
@@ -7,6 +9,7 @@ sys.path.pop(0)
sys.path.insert(0, os.getcwd())
from flask import Flask, render_template, current_app
from flask.ext.mail import Mail
from flask.ext.mongoengine import MongoEngine
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.security import Security, LoginForm, login_required, \
@@ -21,12 +24,12 @@ def create_roles():
def create_users():
for u in (('matt', 'matt@lp.com', 'password', ['admin'], True),
('joe', 'joe@lp.com', 'password', ['editor'], True),
('jill', 'jill@lp.com', 'password', ['author'], True),
('tiya', 'tiya@lp.com', 'password', [], False)):
current_app.security.datastore.create_user(username=u[0], email=u[1],
password=u[2], roles=u[3], active=u[4])
for u in (('matt@lp.com', 'password', ['admin'], True),
('joe@lp.com', 'password', ['editor'], True),
('jill@lp.com', 'password', ['author'], True),
('tiya@lp.com', 'password', [], False)):
current_app.security.datastore.create_user(
email=u[0], password=u[1], roles=u[2], active=u[3])
def populate_data():
@@ -43,6 +46,8 @@ def create_app(auth_config):
for key, value in auth_config.items():
app.config[key] = value
app.mail = Mail(app)
@app.route('/')
def index():
return render_template('index.html', content='Home Page')
@@ -69,6 +74,10 @@ def create_app(auth_config):
def post_logout():
return render_template('index.html', content='Post Logout')
@app.route('/post_register')
def post_register():
return render_template('index.html', content='Post Register')
@app.route('/admin')
@roles_required('admin')
def admin():
@@ -99,14 +108,11 @@ def create_sqlalchemy_app(auth_config=None):
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(255), unique=True)
email = db.Column(db.String(255), unique=True)
password = db.Column(db.String(120))
first_name = db.Column(db.String(120))
last_name = db.Column(db.String(120))
active = db.Column(db.Boolean())
created_at = db.Column(db.DateTime())
modified_at = db.Column(db.DateTime())
confirmation_token = db.Column(db.String(255))
confirmation_sent_at = db.Column(db.DateTime())
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
@@ -134,10 +140,11 @@ def create_mongoengine_app(auth_config=None):
description = db.StringField(max_length=255)
class User(db.Document, UserMixin):
username = db.StringField(unique=True, max_length=255)
email = db.StringField(unique=True, max_length=255)
password = db.StringField(required=True, max_length=120)
active = db.BooleanField(default=True)
confirmation_token = db.StringField(max_length=255)
confirmation_sent_at = db.DateTimeField()
roles = db.ListField(db.ReferenceField(Role), default=[])
Security(app, MongoEngineUserDatastore(db, User, Role))
+1 -1
View File
@@ -12,7 +12,7 @@
{% endif -%}
<li>
{%- if current_user.is_authenticated() -%}
<a href="{{ url_for('auth.logout') }}">Log out</a>
<a href="{{ url_for('flask_security.logout') }}">Log out</a>
{%- else -%}
<a href="{{ url_for('login') }}">Log in</a>
{%- endif -%}
+2 -2
View File
@@ -1,8 +1,8 @@
{% include "_messages.html" %}
{% include "_nav.html" %}
<form action="{{ url_for('auth.authenticate') }}" method="POST" name="login_form">
<form action="{{ url_for('flask_security.authenticate') }}" method="POST" name="login_form">
{{ form.hidden_tag() }}
{{ form.username.label }} {{ form.username }}<br/>
{{ form.email.label }} {{ form.email }}<br/>
{{ form.password.label }} {{ form.password }}<br/>
{{ form.remember.label }} {{ form.remember }}<br/>
{{ form.next }}
+10
View File
@@ -0,0 +1,10 @@
{% include "_messages.html" %}
{% include "_nav.html" %}
<form action="{{ url_for('flask_security.register') }}" method="POST" name="register_form">
{{ form.hidden_tag() }}
{{ form.email.label }} {{ form.email }}<br/>
{{ form.password.label }} {{ form.password }}<br/>
{{ form.password_confirm.label }} {{ form.password_confirm }}<br/>
{{ form.submit }}
</form>
<p>{{ content }}</p>
+1 -309
View File
@@ -10,312 +10,4 @@
:license: MIT, see LICENSE for more details.
"""
from functools import wraps
from flask import current_app, Blueprint, redirect, request
from flask.ext.login import AnonymousUser as AnonymousUserBase, \
UserMixin as BaseUserMixin, LoginManager, login_required, \
current_user, login_url
from flask.ext.principal import Principal, RoleNeed, UserNeed, \
Permission, identity_loaded
from flask.ext.wtf import Form, TextField, PasswordField, SubmitField, \
HiddenField, Required, BooleanField
from flask.ext.security import views, exceptions, utils
from passlib.context import CryptContext
from werkzeug.datastructures import ImmutableList
#: Default Flask-Security configuration
_default_config = {
'SECURITY_URL_PREFIX': None,
'SECURITY_FLASH_MESSAGES': True,
'SECURITY_PASSWORD_HASH': 'plaintext',
'SECURITY_USER_DATASTORE': 'user_datastore',
'SECURITY_AUTH_PROVIDER': 'flask.ext.security::AuthenticationProvider',
'SECURITY_LOGIN_FORM': 'flask.ext.security::LoginForm',
'SECURITY_AUTH_URL': '/auth',
'SECURITY_LOGOUT_URL': '/logout',
'SECURITY_RESET_URL': '/reset',
'SECURITY_LOGIN_VIEW': '/login',
'SECURITY_POST_LOGIN_VIEW': '/',
'SECURITY_POST_LOGOUT_VIEW': '/',
'SECURITY_RESET_PASSWORD_WITHIN': 10
}
def roles_required(*roles):
"""View decorator which specifies that a user must have all the specified
roles. Example::
@app.route('/dashboard')
@roles_required('admin', 'editor')
def dashboard():
return 'Dashboard'
The current user must have both the `admin` role and `editor` role in order
to view the page.
:param args: The required roles.
"""
perm = Permission(*[RoleNeed(role) for role in roles])
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated():
login_view = current_app.security.login_manager.login_view
return redirect(login_url(login_view, request.url))
if perm.can():
return fn(*args, **kwargs)
current_app.logger.debug('Identity does not provide the '
'roles: %s' % [r for r in roles])
return redirect(request.referrer or '/')
return decorated_view
return wrapper
def roles_accepted(*roles):
"""View decorator which specifies that a user must have at least one of the
specified roles. Example::
@app.route('/create_post')
@roles_accepted('editor', 'author')
def create_post():
return 'Create Post'
The current user must have either the `editor` role or `author` role in
order to view the page.
:param args: The possible roles.
"""
perms = [Permission(RoleNeed(role)) for role in roles]
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated():
login_view = current_app.security.login_manager.login_view
return redirect(login_url(login_view, request.url))
for perm in perms:
if perm.can():
return fn(*args, **kwargs)
current_app.logger.debug('Current user does not provide a required '
'role. Accepted: %s Provided: %s' % ([r for r in roles],
[r.name for r in current_user.roles]))
utils.do_flash('You do not have permission to view this resource',
'error')
return redirect(request.referrer or '/')
return decorated_view
return wrapper
class RoleMixin(object):
"""Mixin for `Role` model definitions"""
def __eq__(self, other):
if isinstance(other, basestring):
return self.name == other
return self.name == other.name
def __ne__(self, other):
if isinstance(other, basestring):
return self.name != other
return self.name != other.name
def __str__(self):
return '<Role name=%s, description=%s>' % (self.name, self.description)
class UserMixin(BaseUserMixin):
"""Mixin for `User` model definitions"""
def is_active(self):
"""Returns `True` if the user is active."""
return self.active
def has_role(self, role):
"""Returns `True` if the user identifies with the specified role.
:param role: A role name or `Role` instance"""
return role in self.roles
def __str__(self):
ctx = (str(self.id), self.username, self.email)
return '<User id=%s, username=%s, email=%s>' % ctx
class AnonymousUser(AnonymousUserBase):
def __init__(self):
super(AnonymousUser, self).__init__()
self.roles = ImmutableList()
def has_role(self, *args):
"""Returns `False`"""
return False
def load_user(user_id):
try:
return current_app.security.datastore.with_id(user_id)
except Exception, e:
current_app.logger.error('Error getting user: %s' % e)
return None
def on_identity_loaded(sender, identity):
if hasattr(current_user, 'id'):
identity.provides.add(UserNeed(current_user.id))
for role in current_user.roles:
identity.provides.add(RoleNeed(role.name))
identity.user = current_user
class Security(object):
"""The :class:`Security` class initializes the Flask-Security extension.
:param app: The application.
:param datastore: An instance of a user datastore.
"""
def __init__(self, app=None, datastore=None):
self.init_app(app, datastore)
def init_app(self, app, datastore, recoverable=False):
"""Initializes the Flask-Security extension for the specified
application and datastore implentation.
:param app: The application.
:param datastore: An instance of a user datastore.
"""
if app is None or datastore is None:
return
for key, value in _default_config.items():
app.config.setdefault(key, value)
login_manager = LoginManager()
login_manager.anonymous_user = AnonymousUser
login_manager.login_view = utils.config_value(app, 'LOGIN_VIEW')
login_manager.setup_app(app)
Provider = utils.get_class_from_string(app, 'AUTH_PROVIDER')
Form = utils.get_class_from_string(app, 'LOGIN_FORM')
pw_hash = utils.config_value(app, 'PASSWORD_HASH')
self.login_manager = login_manager
self.pwd_context = CryptContext(schemes=[pw_hash], default=pw_hash)
self.auth_provider = Provider(Form)
self.principal = Principal(app)
self.datastore = datastore
self.form_class = Form
self.auth_url = utils.config_value(app, 'AUTH_URL')
self.logout_url = utils.config_value(app, 'LOGOUT_URL')
self.reset_url = utils.config_value(app, 'RESET_URL')
self.post_login_view = utils.config_value(app, 'POST_LOGIN_VIEW')
self.post_logout_view = utils.config_value(app, 'POST_LOGOUT_VIEW')
self.reset_password_within = utils.config_value(app, 'RESET_PASSWORD_WITHIN')
identity_loaded.connect_via(app)(on_identity_loaded)
login_manager.user_loader(load_user)
bp = Blueprint('auth', __name__)
bp.route(self.auth_url,
methods=['POST'],
endpoint='authenticate')(views.authenticate)
bp.route(self.logout_url,
endpoint='logout')(login_required(views.logout))
if recoverable:
bp.route(self.reset_url,
methods=['POST'],
endpoint='reset')(views.reset)
app.register_blueprint(bp,
url_prefix=utils.config_value(app, 'URL_PREFIX'))
app.security = self
class LoginForm(Form):
"""The default login form"""
username = TextField("Username or Email",
validators=[Required(message="Username not provided")])
password = PasswordField("Password",
validators=[Required(message="Password not provided")])
remember = BooleanField("Remember Me")
next = HiddenField()
submit = SubmitField("Login")
def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
self.next.data = request.args.get('next', None)
class AuthenticationProvider(object):
"""The default authentication provider implementation.
:param login_form_class: The login form class to use when authenticating a
user
"""
def __init__(self, login_form_class=None):
self.login_form_class = login_form_class or LoginForm
def login_form(self, formdata=None):
"""Returns an instance of the login form with the provided form.
:param formdata: The incoming form data"""
return self.login_form_class(formdata)
def authenticate(self, form):
"""Processes an authentication request and returns a user instance if
authentication is successful.
:param form: An instance of a populated login form
"""
if not form.validate():
if form.username.errors:
raise exceptions.BadCredentialsError(form.username.errors[0])
if form.password.errors:
raise exceptions.BadCredentialsError(form.password.errors[0])
return self.do_authenticate(form.username.data, form.password.data)
def do_authenticate(self, user_identifier, password):
"""Returns the authenticated user if authentication is successfull. If
authentication fails an appropriate error is raised
:param user_identifier: The user's identifier, either an email address
or username
:param password: The user's unencrypted password
"""
try:
user = current_app.security.datastore.find_user(user_identifier)
except AttributeError, e:
self.auth_error("Could not find user datastore: %s" % e)
except exceptions.UserNotFoundError, e:
raise exceptions.BadCredentialsError("Specified user does not exist")
except Exception, e:
self.auth_error('Unexpected authentication error: %s' % e)
# compare passwords
if current_app.security.pwd_context.verify(password, user.password):
return user
# bad match
raise exceptions.BadCredentialsError("Password does not match")
def auth_error(self, msg):
"""Sends an error log message and raises an authentication error.
:param msg: An authentication error message"""
current_app.logger.error(msg)
raise exceptions.AuthenticationError(msg)
from .core import *
+44
View File
@@ -0,0 +1,44 @@
from datetime import datetime
from flask import render_template, current_app, request, url_for
from flask.ext.security.utils import generate_token
from werkzeug.local import LocalProxy
logger = LocalProxy(lambda: current_app.logger)
def send_confirmation_instructions(user):
from flask.ext.mail import Message
msg = Message("Please confirm your email",
sender=current_app.security.email_sender,
recipients=[user.email])
confirmation_link = request.url_root[:-1] + \
url_for('flask_security.confirm',
confirmation_token=user.confirmation_token)
ctx = dict(user=user, confirmation_link=confirmation_link)
msg.body = render_template('email/confirmation_instructions.txt', **ctx)
msg.html = render_template('email/confirmation_instructions.html', **ctx)
logger.debug("Sending confirmation instructions")
logger.debug(msg.html)
current_app.mail.send(msg)
def generate_confirmation_token(user):
token = generate_token()
now = datetime.utcnow()
if isinstance(user, dict):
user['confirmation_token'] = token
user['confirmation_sent_at'] = now
else:
user.confirmation_token = token
user.confirmation_sent_at = now
return user
+368
View File
@@ -0,0 +1,368 @@
# -*- coding: utf-8 -*-
"""
flask.ext.security.core
~~~~~~~~~~~~~~~~~~~~~~~
Flask-Security core module
:copyright: (c) 2012 by Matt Wright.
:license: MIT, see LICENSE for more details.
"""
from datetime import timedelta
from functools import wraps
from flask import current_app, Blueprint, redirect, request
from flask.ext.login import AnonymousUser as AnonymousUserBase, \
UserMixin as BaseUserMixin, LoginManager, login_required, \
current_user, login_url
from flask.ext.principal import Principal, RoleNeed, UserNeed, \
Permission, identity_loaded
from flask.ext.wtf import Form, TextField, PasswordField, SubmitField, \
HiddenField, Required, BooleanField, EqualTo, Email
from flask.ext.security import views, exceptions, utils
from passlib.context import CryptContext
from werkzeug.datastructures import ImmutableList
#: Default Flask-Security configuration
_default_config = {
'SECURITY_URL_PREFIX': None,
'SECURITY_FLASH_MESSAGES': True,
'SECURITY_PASSWORD_HASH': 'plaintext',
'SECURITY_AUTH_PROVIDER': 'flask.ext.security::AuthenticationProvider',
'SECURITY_LOGIN_FORM': 'flask.ext.security::LoginForm',
'SECURITY_REGISTER_FORM': 'flask.ext.security::RegisterForm',
'SECURITY_AUTH_URL': '/auth',
'SECURITY_LOGOUT_URL': '/logout',
'SECURITY_REGISTER_URL': '/register',
'SECURITY_RESET_URL': '/reset',
'SECURITY_CONFIRM_URL': '/confirm',
'SECURITY_LOGIN_VIEW': '/login',
'SECURITY_POST_LOGIN_VIEW': '/',
'SECURITY_POST_LOGOUT_VIEW': '/',
'SECURITY_POST_REGISTER_VIEW': '/',
'SECURITY_RESET_PASSWORD_WITHIN': 10,
'SECURITY_DEFAULT_ROLES': [],
'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True,
'SECURITY_CONFIRM_EMAIL': False,
'SECURITY_CONFIRM_EMAIL_WITHIN': '5 days',
'SECURITY_EMAIL_SENDER': 'no-reply@localhost'
}
def roles_required(*roles):
"""View decorator which specifies that a user must have all the specified
roles. Example::
@app.route('/dashboard')
@roles_required('admin', 'editor')
def dashboard():
return 'Dashboard'
The current user must have both the `admin` role and `editor` role in order
to view the page.
:param args: The required roles.
"""
perm = Permission(*[RoleNeed(role) for role in roles])
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated():
login_view = current_app.security.login_manager.login_view
return redirect(login_url(login_view, request.url))
if perm.can():
return fn(*args, **kwargs)
current_app.logger.debug('Identity does not provide the '
'roles: %s' % [r for r in roles])
return redirect(request.referrer or '/')
return decorated_view
return wrapper
def roles_accepted(*roles):
"""View decorator which specifies that a user must have at least one of the
specified roles. Example::
@app.route('/create_post')
@roles_accepted('editor', 'author')
def create_post():
return 'Create Post'
The current user must have either the `editor` role or `author` role in
order to view the page.
:param args: The possible roles.
"""
perms = [Permission(RoleNeed(role)) for role in roles]
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated():
login_view = current_app.security.login_manager.login_view
return redirect(login_url(login_view, request.url))
for perm in perms:
if perm.can():
return fn(*args, **kwargs)
r1 = [r for r in roles]
r2 = [r.name for r in current_user.roles]
current_app.logger.debug('Current user does not provide a '
'required role. Accepted: %s Provided: %s' % (r1, r2))
utils.do_flash('You do not have permission to '
'view this resource', 'error')
return redirect(request.referrer or '/')
return decorated_view
return wrapper
class RoleMixin(object):
"""Mixin for `Role` model definitions"""
def __eq__(self, other):
if isinstance(other, basestring):
return self.name == other
return self.name == other.name
def __ne__(self, other):
if isinstance(other, basestring):
return self.name != other
return self.name != other.name
def __str__(self):
return '<Role name=%s>' % self.name
class UserMixin(BaseUserMixin):
"""Mixin for `User` model definitions"""
def is_active(self):
"""Returns `True` if the user is active."""
return self.active
def has_role(self, role):
"""Returns `True` if the user identifies with the specified role.
:param role: A role name or `Role` instance"""
return role in self.roles
def __str__(self):
ctx = (str(self.id), self.email)
return '<User id=%s, email=%s>' % ctx
class AnonymousUser(AnonymousUserBase):
def __init__(self):
super(AnonymousUser, self).__init__()
self.roles = ImmutableList()
def has_role(self, *args):
"""Returns `False`"""
return False
def load_user(user_id):
try:
return current_app.security.datastore.with_id(user_id)
except Exception, e:
current_app.logger.error('Error getting user: %s' % e)
return None
def on_identity_loaded(sender, identity):
if hasattr(current_user, 'id'):
identity.provides.add(UserNeed(current_user.id))
for role in current_user.roles:
identity.provides.add(RoleNeed(role.name))
identity.user = current_user
class Security(object):
"""The :class:`Security` class initializes the Flask-Security extension.
:param app: The application.
:param datastore: An instance of a user datastore.
"""
def __init__(self, app=None, datastore=None, **kwargs):
self.init_app(app, datastore, **kwargs)
def init_app(self, app, datastore,
registerable=True, recoverable=False, template_folder=None):
"""Initializes the Flask-Security extension for the specified
application and datastore implentation.
:param app: The application.
:param datastore: An instance of a user datastore.
"""
if app is None or datastore is None:
return
for key, value in _default_config.items():
app.config.setdefault(key, value)
login_manager = LoginManager()
login_manager.anonymous_user = AnonymousUser
login_manager.login_view = utils.config_value(app, 'LOGIN_VIEW')
login_manager.user_loader(load_user)
login_manager.setup_app(app)
Provider = utils.get_class_from_string(app, 'AUTH_PROVIDER')
pw_hash = utils.config_value(app, 'PASSWORD_HASH')
self.login_manager = login_manager
self.pwd_context = CryptContext(schemes=[pw_hash], default=pw_hash)
self.auth_provider = Provider(Form)
self.principal = Principal(app)
self.datastore = datastore
self.LoginForm = utils.get_class_from_string(app, 'LOGIN_FORM')
self.RegisterForm = utils.get_class_from_string(app, 'REGISTER_FORM')
self.auth_url = utils.config_value(app, 'AUTH_URL')
self.logout_url = utils.config_value(app, 'LOGOUT_URL')
self.reset_url = utils.config_value(app, 'RESET_URL')
self.register_url = utils.config_value(app, 'REGISTER_URL')
self.confirm_url = utils.config_value(app, 'CONFIRM_URL')
self.post_login_view = utils.config_value(app, 'POST_LOGIN_VIEW')
self.post_logout_view = utils.config_value(app, 'POST_LOGOUT_VIEW')
self.post_register_view = utils.config_value(app, 'POST_REGISTER_VIEW')
self.reset_password_within = utils.config_value(app, 'RESET_PASSWORD_WITHIN')
self.default_roles = utils.config_value(app, "DEFAULT_ROLES")
self.login_without_confirmation = utils.config_value(app, 'LOGIN_WITHOUT_CONFIRMATION')
self.confirm_email = utils.config_value(app, 'CONFIRM_EMAIL')
self.email_sender = utils.config_value(app, 'EMAIL_SENDER')
values = utils.config_value(app, 'CONFIRM_EMAIL_WITHIN').split()
self.confirm_email_within = timedelta(**{values[1]: int(values[0])})
identity_loaded.connect_via(app)(on_identity_loaded)
bp = Blueprint('flask_security', __name__, template_folder='templates')
bp.route(self.auth_url,
methods=['POST'],
endpoint='authenticate')(views.authenticate)
bp.route(self.logout_url,
endpoint='logout')(login_required(views.logout))
self.setup_register(bp) if registerable else None
self.setup_reset(bp) if recoverable else None
self.setup_confirm(bp) if self.confirm_email else None
app.register_blueprint(bp,
url_prefix=utils.config_value(app, 'URL_PREFIX'))
app.security = self
def setup_register(self, bp):
bp.route(self.register_url,
methods=['POST'],
endpoint='register')(views.register)
def setup_reset(self, bp):
bp.route(self.reset_url,
methods=['POST'],
endpoint='reset')(views.reset)
def setup_confirm(self, bp):
bp.route(self.confirm_url, endpoint='confirm')(views.confirm)
class LoginForm(Form):
"""The default login form"""
email = TextField("Email Address",
validators=[Required(message="Email not provided")])
password = PasswordField("Password",
validators=[Required(message="Password not provided")])
remember = BooleanField("Remember Me")
next = HiddenField()
submit = SubmitField("Login")
def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
self.next.data = request.args.get('next', None)
class RegisterForm(Form):
"""The default register form"""
email = TextField("Email Address",
validators=[Required(message='Email not provided'), Email()])
password = PasswordField("Password",
validators=[Required(message="Password not provided")])
password_confirm = PasswordField("Password",
validators=[EqualTo('password', message="Password not provided")])
def to_dict(self):
return dict(email=self.email.data, password=self.password.data)
class AuthenticationProvider(object):
"""The default authentication provider implementation.
:param login_form_class: The login form class to use when authenticating a
user
"""
def __init__(self, login_form_class=None):
self.login_form_class = login_form_class or LoginForm
def login_form(self, formdata=None):
"""Returns an instance of the login form with the provided form.
:param formdata: The incoming form data"""
return self.login_form_class(formdata)
def authenticate(self, form):
"""Processes an authentication request and returns a user instance if
authentication is successful.
:param form: An instance of a populated login form
"""
if not form.validate():
if form.email.errors:
raise exceptions.BadCredentialsError(form.email.errors[0])
if form.password.errors:
raise exceptions.BadCredentialsError(form.password.errors[0])
return self.do_authenticate(form.email.data, form.password.data)
def do_authenticate(self, user_identifier, password):
"""Returns the authenticated user if authentication is successfull. If
authentication fails an appropriate error is raised
:param user_identifier: The user's identifier, usuall an email address
:param password: The user's unencrypted password
"""
try:
user = current_app.security.datastore.find_user(user_identifier)
except AttributeError, e:
self.auth_error("Could not find user datastore: %s" % e)
except exceptions.UserNotFoundError, e:
raise exceptions.BadCredentialsError("Specified user does not exist")
except Exception, e:
self.auth_error('Unexpected authentication error: %s' % e)
# compare passwords
if current_app.security.pwd_context.verify(password, user.password):
return user
# bad match
raise exceptions.BadCredentialsError("Password does not match")
def auth_error(self, msg):
"""Sends an error log message and raises an authentication error.
:param msg: An authentication error message"""
current_app.logger.error(msg)
raise exceptions.AuthenticationError(msg)
+9 -55
View File
@@ -15,13 +15,13 @@ from flask.ext.security import exceptions
class UserDatastore(object):
"""Abstracted user datastore. Always extend this class and implement the
:attr:`get_models`, :attr:`_save_model`, :attr:`_do_with_id`,
:attr:`_do_find_user`, and :attr:`_do_find_role` methods.
:attr:`_save_model`, :attr:`_do_with_id`, :attr:`_do_find_user`, and
:attr:`_do_find_role` methods.
:param db: An instance of a configured databse manager from a Flask
extension such as Flask-SQLAlchemy or Flask-MongoEngine
:param user_model: A user model class
:param role_model: A role model class
:param user_model: A user model class definition
:param role_model: A role model class definition
"""
def __init__(self, db, user_model, role_model):
self.db = db
@@ -44,59 +44,19 @@ class UserDatastore(object):
raise NotImplementedError(
"User datastore does not implement _do_find_role method")
def _do_add_role(self, user, role):
user, role = self._prepare_role_modify_args(user, role)
if role not in user.roles:
user.roles.append(role)
return user
def _do_remove_role(self, user, role):
user, role = self._prepare_role_modify_args(user, role)
if role in user.roles:
user.roles.remove(role)
return user
def _do_toggle_active(self, user, active=None):
user = self.find_user(user)
if active is None:
user.active = not user.active
elif active != user.active:
user.active = active
return user
def _do_deactive_user(self, user):
return self._do_toggle_active(user, False)
def _do_active_user(self, user):
return self._do_toggle_active(user, True)
def _prepare_role_modify_args(self, user, role):
if isinstance(user, self.user_model):
user = user.username or user.email
if isinstance(role, self.role_model):
role = role.name
return self.find_user(user), self.find_role(role)
def _prepare_create_role_args(self, kwargs):
for key in ('name', 'description'):
kwargs[key] = kwargs.get(key, None)
if kwargs['name'] is None:
raise exceptions.RoleCreationError("Missing name argument")
return kwargs
def _prepare_create_user_args(self, kwargs):
username = kwargs.get('username', None)
email = kwargs.get('email', None)
password = kwargs.get('password', None)
kwargs.setdefault('active', True)
if username is None and email is None:
raise exceptions.UserCreationError(
'Missing username and/or email arguments')
if email is None:
raise exceptions.UserCreationError('Missing email argument')
if password is None:
raise exceptions.UserCreationError('Missing password argument')
@@ -129,7 +89,7 @@ class UserDatastore(object):
def find_user(self, user):
"""Returns a user based on the specified identifier.
:param user: User identifier, usually a username or email address
:param user: User identifier, usually email address
"""
user = self._do_find_user(user)
if user:
@@ -150,7 +110,6 @@ class UserDatastore(object):
"""Creates and returns a new role.
:param name: Role name
:param description: Role description
"""
role = self.role_model(**self._prepare_create_role_args(kwargs))
return self._save_model(role)
@@ -158,7 +117,6 @@ class UserDatastore(object):
def create_user(self, **kwargs):
"""Creates and returns a new user.
:param username: Username
:param email: Email address
:param password: Unencrypted password
:param active: The optional active state
@@ -224,7 +182,6 @@ class SQLAlchemyUserDatastore(UserDatastore):
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(255), unique=True)
email = db.Column(db.String(255), unique=True)
password = db.Column(db.String(120))
first_name = db.Column(db.String(120))
@@ -247,8 +204,7 @@ class SQLAlchemyUserDatastore(UserDatastore):
return self.user_model.query.get(id)
def _do_find_user(self, user):
return self.user_model.query.filter_by(username=user).first() or \
self.user_model.query.filter_by(email=user).first()
return self.user_model.query.filter_by(email=user).first()
def _do_find_role(self, role):
return self.role_model.query.filter_by(name=role).first()
@@ -274,7 +230,6 @@ class MongoEngineUserDatastore(UserDatastore):
name = db.StringField(required=True, unique=True, max_length=80)
class User(db.Document, UserMixin):
username = db.StringField(unique=True, max_length=255)
email = db.StringField(unique=True, max_length=255)
password = db.StringField(required=True, max_length=120)
active = db.BooleanField(default=True)
@@ -294,8 +249,7 @@ class MongoEngineUserDatastore(UserDatastore):
return None
def _do_find_user(self, user):
return self.user_model.objects(username=user).first() or \
self.user_model.objects(email=user).first()
return self.user_model.objects(email=user).first()
def _do_find_role(self, role):
return self.role_model.objects(name=role).first()
+34
View File
@@ -0,0 +1,34 @@
from datetime import datetime, timedelta
from flask import current_app
from flask.ext.security.utils import generate_token
def reset_password_period_valid(user):
sent_at = user.reset_password_sent_at
reset_within = int(current_app.security.reset_password_within)
days_ago = datetime.utcnow() - timedelta(days=reset_within)
return (sent_at is not None) and \
(sent_at >= days_ago)
def generate_reset_password_token(user):
user.reset_password_token = generate_token()
user.reset_password_sent_at = datetime.utcnow()
current_app.security.datastore._save_model(user)
def clear_reset_password_token(user):
user.reset_password_token = None
user.reset_password_sent_at = None
def send_reset_password_instructions():
pass
def should_generate_reset_token(user):
return (user.reset_password_token is None) or \
(not reset_password_period_valid(user))
-102
View File
@@ -1,102 +0,0 @@
# -*- coding: utf-8 -*-
"""
flask.ext.security.script
~~~~~~~~~~~~~~~~~~~~~~~~~
This module contains commands for use with the Flask-Script extension
:copyright: (c) 2012 by Matt Wright.
:license: MIT, see LICENSE for more details.
"""
import json
import re
from flask.ext.script import Command, Option
from flask.ext.security import (UserCreationError, UserNotFoundError,
RoleNotFoundError, user_datastore)
def pprint(obj):
print json.dumps(obj, sort_keys=True, indent=4)
class CreateUserCommand(Command):
"""Create a user"""
option_list = (
Option('-u', '--username', dest='username', default=None),
Option('-e', '--email', dest='email', default=None),
Option('-p', '--password', dest='password', default=None),
Option('-a', '--active', dest='active', default=''),
Option('-r', '--roles', dest='roles', default=''),
)
def run(self, **kwargs):
# sanitize active input
ai = re.sub(r'\s', '', str(kwargs['active']))
kwargs['active'] = ai.lower() in ['', 'y','yes', '1', 'active']
# sanitize role input a bit
ri = re.sub(r'\s', '', kwargs['roles'])
kwargs['roles'] = [] if ri == '' else ri.split(',')
user_datastore.create_user(**kwargs)
print 'User created successfully.'
kwargs['password'] = '****'
pprint(kwargs)
class CreateRoleCommand(Command):
"""Create a role"""
option_list = (
Option('-n', '--name', dest='name', default=None),
Option('-d', '--desc', dest='description', default=None),
)
def run(self, **kwargs):
role = user_datastore.create_role(**kwargs)
print 'Role "%(name)s" created successfully.' % kwargs
class _RoleCommand(Command):
option_list = (
Option('-u', '--user', dest='user_identifier'),
Option('-r', '--role', dest='role_name'),
)
class AddRoleCommand(_RoleCommand):
"""Add a role to a user"""
def run(self, user_identifier, role_name):
user_datastore.add_role_to_user(user_identifier, role_name)
print "Role '%s' added to user '%s' successfully" % (role_name, user_identifier)
class RemoveRoleCommand(_RoleCommand):
"""Add a role to a user"""
def run(self, user_identifier, role_name):
user_datastore.remove_role_from_user(user_identifier, role_name)
print "Role '%s' removed from user '%s' successfully" % (role_name, user_identifier)
class _ToggleActiveCommand(Command):
option_list = (
Option('-u', '--user', dest='user_identifier'),
)
class DeactivateUserCommand(_ToggleActiveCommand):
"""Deactive a user"""
def run(self, user_identifier):
user_datastore.deactivate_user(user_identifier)
print "User '%s' has been deactivated" % user_identifier
class ActivateUserCommand(_ToggleActiveCommand):
"""Deactive a user"""
def run(self, user_identifier):
user_datastore.activate_user(user_identifier)
print "User '%s' has been activated" % user_identifier
@@ -0,0 +1,5 @@
<p>Welcome {{ user.email }}!</p>
<p>You can confirm your account email through the link below:</p>
<p><a href="{{ confirmation_link }}">Confirm my account</a></p>
@@ -0,0 +1,5 @@
Welcome {{ user.email }}!
You can confirm your account email through the link below:
{{ confirmation_link }}
View File
+7
View File
@@ -9,11 +9,18 @@
:license: MIT, see LICENSE for more details.
"""
import base64
import os
from importlib import import_module
from flask import url_for, flash, current_app, request, session
def generate_token():
return base64.urlsafe_b64encode(os.urandom(30))
def do_flash(message, category):
if config_value(current_app, 'FLASH_MESSAGES'):
flash(message, category)
+65 -24
View File
@@ -12,45 +12,86 @@
from flask import current_app, redirect, request, session
from flask.ext.login import login_user, logout_user
from flask.ext.principal import Identity, AnonymousIdentity, identity_changed
from flask.ext.security import exceptions, utils
from flask.ext.security import exceptions, utils, confirmable
from werkzeug.local import LocalProxy
security = LocalProxy(lambda: current_app.security)
logger = LocalProxy(lambda: current_app.logger)
def do_login(user, remember=True):
if login_user(user, remember):
identity_changed.send(current_app._get_current_object(),
identity=Identity(user.id))
logger.debug('User %s logged in' % user)
return True
return False
def authenticate():
form = current_app.security.LoginForm()
try:
form = current_app.security.form_class()
user = current_app.security.auth_provider.authenticate(form)
user = security.auth_provider.authenticate(form)
if login_user(user, remember=form.remember.data):
redirect_url = utils.get_post_login_redirect()
identity_changed.send(current_app._get_current_object(),
identity=Identity(user.id))
current_app.logger.debug('User %s logged in. Redirecting to: '
'%s' % (user, redirect_url))
return redirect(redirect_url)
if do_login(user, remember=form.remember.data):
url = utils.get_post_login_redirect()
return redirect(url)
raise exceptions.BadCredentialsError('Inactive user')
except exceptions.BadCredentialsError, e:
message = '%s' % e
utils.do_flash(message, 'error')
redirect_url = request.referrer or \
current_app.security.login_manager.login_view
current_app.logger.error('Unsuccessful authentication attempt: %s. '
'Redirect to: %s' % (message, redirect_url))
return redirect(redirect_url)
msg = str(e)
utils.do_flash(msg, 'error')
url = request.referrer or security.login_manager.login_view
logger.debug('Unsuccessful authentication attempt: %s. '
'Redirect to: %s' % (msg, url))
return redirect(url)
def logout():
for value in ('identity.name', 'identity.auth_type'):
session.pop(value, None)
for key in ('identity.name', 'identity.auth_type'):
session.pop(key, None)
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
app = current_app._get_current_object()
identity_changed.send(app, identity=AnonymousIdentity())
logout_user()
redirect_url = utils.find_redirect('SECURITY_POST_LOGOUT_VIEW')
current_app.logger.debug('User logged out. Redirect to: %s' % redirect_url)
return redirect(redirect_url)
url = security.post_logout_view
logger.debug('User logged out. Redirect to: %s' % url)
return redirect(url)
def register():
form = security.RegisterForm(csrf_enabled=not current_app.testing)
if form.validate_on_submit():
params = form.to_dict()
params['roles'] = security.default_roles
params['active'] = True
if security.confirm_email:
confirmable.generate_confirmation_token(params)
user = security.datastore.create_user(**params)
if security.confirm_email:
confirmable.send_confirmation_instructions(user)
if security.login_without_confirmation:
do_login(user)
url = security.post_register_view
logger.debug("User %s registered. Redirect to: %s" % (user, url))
return redirect(url)
return redirect(request.referrer or security.register_url)
def confirm():
token = request.args.get('confirmation_token', None)
def reset():
+63 -23
View File
@@ -1,4 +1,7 @@
# -*- coding: utf-8 -*-
import unittest
from example import app
@@ -25,12 +28,14 @@ class SecurityTest(unittest.TestCase):
def _post(self, route, data=None, content_type=None, follow_redirects=True):
return self.client.post(route, data=data,
follow_redirects=follow_redirects,
content_type=content_type or 'text/html')
content_type=content_type or 'application/x-www-form-urlencoded')
def authenticate(self, username, password, endpoint=None):
data = dict(username=username, password=password)
return self._post(endpoint or '/auth', data=data,
content_type='application/x-www-form-urlencoded')
def register(self, email, password, endpoint=None):
return self._post(endpoint or '/register')
def authenticate(self, email, password, endpoint=None):
data = dict(email=email, password=password)
return self._post(endpoint or '/auth', data=data)
def logout(self, endpoint=None):
return self._get(endpoint or '/logout', follow_redirects=True)
@@ -43,15 +48,15 @@ class DefaultSecurityTests(SecurityTest):
self.assertIn('Login Page', r.data)
def test_authenticate(self):
r = self.authenticate("matt", "password")
self.assertIn('Home Page', r.data)
r = self.authenticate("matt@lp.com", "password")
self.assertIn('Hello matt@lp.com', r.data)
def test_unprovided_username(self):
r = self.authenticate("", "password")
self.assertIn("Username not provided", r.data)
self.assertIn("Email not provided", r.data)
def test_unprovided_password(self):
r = self.authenticate("matt", "")
r = self.authenticate("matt@lp.com", "")
self.assertIn("Password not provided", r.data)
def test_invalid_user(self):
@@ -59,15 +64,15 @@ class DefaultSecurityTests(SecurityTest):
self.assertIn("Specified user does not exist", r.data)
def test_bad_password(self):
r = self.authenticate("matt", "bogus")
r = self.authenticate("matt@lp.com", "bogus")
self.assertIn("Password does not match", r.data)
def test_inactive_user(self):
r = self.authenticate("tiya", "password")
r = self.authenticate("tiya@lp.com", "password")
self.assertIn("Inactive user", r.data)
def test_logout(self):
self.authenticate("matt", "password")
self.authenticate("matt@lp.com", "password")
r = self.logout()
self.assertIn('Home Page', r.data)
@@ -76,28 +81,28 @@ class DefaultSecurityTests(SecurityTest):
self.assertIn('Please log in to access this page', r.data)
def test_authorized_access(self):
self.authenticate("matt", "password")
self.authenticate("matt@lp.com", "password")
r = self._get("/profile")
self.assertIn('profile', r.data)
def test_valid_admin_role(self):
self.authenticate("matt", "password")
self.authenticate("matt@lp.com", "password")
r = self._get("/admin")
self.assertIn('Admin Page', r.data)
def test_invalid_admin_role(self):
self.authenticate("joe", "password")
self.authenticate("joe@lp.com", "password")
r = self._get("/admin", follow_redirects=True)
self.assertIn('Home Page', r.data)
def test_roles_accepted(self):
for user in ("matt", "joe"):
for user in ("matt@lp.com", "joe@lp.com"):
self.authenticate(user, "password")
r = self._get("/admin_or_editor")
self.assertIn('Admin or Editor Page', r.data)
self.logout()
self.authenticate("jill", "password")
self.authenticate("jill@lp.com", "password")
r = self._get("/admin_or_editor", follow_redirects=True)
self.assertIn('Home Page', r.data)
@@ -105,17 +110,22 @@ class DefaultSecurityTests(SecurityTest):
r = self._get('/admin', follow_redirects=True)
self.assertIn('<input id="next"', r.data)
def test_register_valid_user(self):
data = dict(email='dude@lp.com', password='password', password_confirm='password')
self.client.post('/register', data=data, follow_redirects=True)
r = self.authenticate('dude@lp.com', 'password')
self.assertIn('Hello dude@lp.com', r.data)
class ConfiguredSecurityTests(SecurityTest):
class ConfiguredURLTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_PASSWORD_HASH': 'bcrypt',
'SECURITY_USER_DATASTORE': 'custom_datastore_name',
'SECURITY_AUTH_URL': '/custom_auth',
'SECURITY_LOGOUT_URL': '/custom_logout',
'SECURITY_LOGIN_VIEW': '/custom_login',
'SECURITY_POST_LOGIN_VIEW': '/post_login',
'SECURITY_POST_LOGOUT_VIEW': '/post_logout'
'SECURITY_POST_LOGOUT_VIEW': '/post_logout',
'SECURITY_POST_REGISTER_VIEW': '/post_register'
}
def test_login_view(self):
@@ -123,14 +133,44 @@ class ConfiguredSecurityTests(SecurityTest):
self.assertIn("Custom Login Page", r.data)
def test_authenticate(self):
r = self.authenticate("matt", "password", endpoint="/custom_auth")
r = self.authenticate("matt@lp.com", "password", endpoint="/custom_auth")
self.assertIn('Post Login', r.data)
def test_logout(self):
self.authenticate("matt", "password", endpoint="/custom_auth")
self.authenticate("matt@lp.com", "password", endpoint="/custom_auth")
r = self.logout(endpoint="/custom_logout")
self.assertIn('Post Logout', r.data)
def test_register(self):
data = dict(email='dude@lp.com', password='password', password_confirm='password')
r = self.client.post('/register', data=data, follow_redirects=True)
self.assertIn('Hello dude@lp.com', r.data)
self.assertIn('Post Register', r.data)
class ConfirmationTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_CONFIRM_EMAIL': True,
'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True
}
def test_register_valid_user_automatically_signs_in(self):
e = 'dude@lp.com'
p = 'password'
data = dict(email=e, password=p, password_confirm=p)
r = self.client.post('/register', data=data, follow_redirects=True)
self.assertIn(e, r.data)
def test_register_valid_user_sends_confirmation_email(self):
e = 'dude@lp.com'
p = 'password'
data = dict(email=e, password=p, password_confirm=p)
with self.app.mail.record_messages() as outbox:
self.client.post('/register', data=data, follow_redirects=True)
self.assertEqual(len(outbox), 1)
self.assertIn(e, outbox[0].html)
class MongoEngineSecurityTests(DefaultSecurityTests):