Refactor forms and views a bit. Add more validation to forms

This commit is contained in:
Matt Wright
2012-08-23 14:56:35 -04:00
parent b0b09aea49
commit 57595bbab4
5 changed files with 152 additions and 131 deletions
+60 -19
View File
@@ -11,11 +11,13 @@
from flask import request, current_app as app
from flask.ext.wtf import Form, TextField, PasswordField, SubmitField, \
HiddenField, Required, BooleanField, EqualTo, Email, ValidationError
HiddenField, Required, BooleanField, EqualTo, Email, ValidationError, \
Length
from werkzeug.local import LocalProxy
from .confirmable import requires_confirmation
from .exceptions import UserNotFoundError
from .utils import encrypt_password
from .utils import verify_password, get_message
# Convenient reference
_datastore = LocalProxy(lambda: app.extensions['security'].datastore)
@@ -24,6 +26,8 @@ email_required = Required(message='Email not provided')
email_validator = Email(message='Invalid email address')
password_required = Required(message="Password not provided")
def unique_user_email(form, field):
try:
@@ -63,14 +67,27 @@ class UniqueEmailFormMixin():
class PasswordFormMixin():
password = PasswordField("Password",
validators=[Required(message="Password not provided")])
validators=[password_required])
class NewPasswordFormMixin():
password = PasswordField("Password",
validators=[password_required,
Length(min=6, max=128)])
class PasswordConfirmFormMixin():
password_confirm = PasswordField("Retype Password",
validators=[EqualTo('password', message="Passwords do not match")])
class NextFormMixin():
next = HiddenField()
class RegisterFormMixin():
submit = SubmitField("Register")
class SendConfirmationForm(Form, UserEmailFormMixin):
"""The default forgot password form"""
@@ -81,6 +98,14 @@ class SendConfirmationForm(Form, UserEmailFormMixin):
if request.method == 'GET':
self.email.data = request.args.get('email', None)
def validate(self):
if not super(SendConfirmationForm, self).validate():
return False
if self.user.confirmed_at is not None:
self.email.errors.append(get_message('ALREADY_CONFIRMED')[0])
return False
return True
def to_dict(self):
return dict(email=self.email.data)
@@ -94,10 +119,9 @@ class ForgotPasswordForm(Form, UserEmailFormMixin):
return dict(email=self.email.data)
class PasswordlessLoginForm(Form, UserEmailFormMixin):
class PasswordlessLoginForm(Form, UserEmailFormMixin, NextFormMixin):
"""The passwordless login form"""
next = HiddenField()
submit = SubmitField("Send Login Link")
def __init__(self, *args, **kwargs):
@@ -105,38 +129,55 @@ class PasswordlessLoginForm(Form, UserEmailFormMixin):
if request.method == 'GET':
self.next.data = request.args.get('next', None)
def validate(self):
if not super(PasswordlessLoginForm, self).validate():
return False
if not self.user.is_active():
self.email.errors.append(get_message('DISABLED_ACCOUNT')[0])
return False
return True
def to_dict(self):
return dict(email=self.email.data)
return dict(user=self.user, next=self.next.data)
class LoginForm(Form, UserEmailFormMixin, PasswordFormMixin):
class LoginForm(Form, UserEmailFormMixin, PasswordFormMixin, NextFormMixin):
"""The default login form"""
remember = BooleanField("Remember Me")
next = HiddenField()
submit = SubmitField("Login")
def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
self.next.data = request.args.get('next', None)
def validate(self):
if not super(LoginForm, self).validate():
return False
if not verify_password(self.password.data, self.user.password):
self.password.errors.append('Invalid password')
return False
if requires_confirmation(self.user):
self.email.errors.append(get_message('CONFIRMATION_REQUIRED')[0])
return False
if not self.user.is_active():
self.email.errors.append(get_message('DISABLED_ACCOUNT')[0])
return False
return True
class RegisterForm(Form,
UniqueEmailFormMixin,
PasswordFormMixin,
PasswordConfirmFormMixin):
"""The default register form"""
submit = SubmitField("Register")
class ConfirmRegisterForm(Form, RegisterFormMixin,
UniqueEmailFormMixin, NewPasswordFormMixin):
def to_dict(self):
return dict(email=self.email.data,
password=encrypt_password(self.password.data))
password=self.password.data)
class ResetPasswordForm(Form,
PasswordFormMixin,
PasswordConfirmFormMixin):
class RegisterForm(ConfirmRegisterForm, PasswordConfirmFormMixin):
pass
class ResetPasswordForm(Form, NewPasswordFormMixin, PasswordConfirmFormMixin):
"""The default reset password form"""
submit = SubmitField("Reset Password")
+41
View File
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
flask.ext.security.registerable
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Flask-Security registerable module
:copyright: (c) 2012 by Matt Wright.
:license: MIT, see LICENSE for more details.
"""
from flask import current_app as app
from werkzeug.local import LocalProxy
from .confirmable import generate_confirmation_link
from .signals import user_registered
from .utils import do_flash, get_message, send_mail, encrypt_password
# Convenient references
_security = LocalProxy(lambda: app.extensions['security'])
_datastore = LocalProxy(lambda: _security.datastore)
def register_user(**kwargs):
confirmation_link, token = None, None
kwargs['password'] = encrypt_password(kwargs['password'])
user = _datastore.create_user(**kwargs)
_datastore._commit()
if _security.confirmable:
confirmation_link, token = generate_confirmation_link(user)
do_flash(*get_message('CONFIRM_REGISTRATION', email=user.email))
user_registered.send(dict(user=user, confirm_token=token),
app=app._get_current_object())
send_mail('Welcome', user.email, 'welcome',
user=user, confirmation_link=confirmation_link)
return user
+49 -110
View File
@@ -14,20 +14,21 @@ from flask import current_app as app, redirect, request, \
from werkzeug.datastructures import MultiDict
from werkzeug.local import LocalProxy
from flask_security.confirmable import generate_confirmation_link, \
send_confirmation_instructions, requires_confirmation, confirm_by_token
from flask_security.confirmable import send_confirmation_instructions, \
confirm_by_token
from flask_security.decorators import login_required
from flask_security.exceptions import ConfirmationError, ResetPasswordError, \
PasswordlessLoginError
from flask_security.forms import LoginForm, RegisterForm, ForgotPasswordForm, \
ResetPasswordForm, SendConfirmationForm, PasswordlessLoginForm
from flask_security.forms import LoginForm, ConfirmRegisterForm, RegisterForm, \
ForgotPasswordForm, ResetPasswordForm, SendConfirmationForm, \
PasswordlessLoginForm
from flask_security.passwordless import send_login_instructions, login_by_token
from flask_security.recoverable import reset_by_token, \
send_reset_password_instructions
from flask_security.signals import user_registered
from flask_security.registerable import register_user
from flask_security.utils import get_url, get_post_login_redirect, do_flash, \
get_message, config_value, login_user, logout_user, send_mail, \
anonymous_user_required, url_for_security as url_for, verify_password
get_message, config_value, login_user, logout_user, \
anonymous_user_required, url_for_security as url_for
# Convenient references
@@ -35,34 +36,19 @@ _security = LocalProxy(lambda: app.extensions['security'])
_datastore = LocalProxy(lambda: _security.datastore)
_logger = LocalProxy(lambda: app.logger)
def _render_json(form):
has_errors = len(form.errors) > 0
def _json_auth_ok(user):
return jsonify({
"meta": {
"code": 200
},
"response": {
"user": {
"id": str(user.id),
"authentication_token": user.get_auth_token()
}
}
})
if has_errors:
code = 400
response = dict(errors=form.errors)
else:
code = 200
response = dict(user=dict(id=str(form.user.id),
authentication_token=form.user.get_auth_token()))
def _json_auth_error(msg):
resp = jsonify({
"meta": {
"code": 400
},
"response": {
"error": msg
}
})
resp.status_code = 400
return resp
return jsonify(dict(meta=dict(code=code), response=response))
def _commit(response=None):
@@ -78,7 +64,6 @@ def _ctx(endpoint):
def login():
"""View function for login view"""
user, msg, confirm_url = None, None, None
form_data = request.form
if request.json:
@@ -87,33 +72,14 @@ def login():
form = LoginForm(form_data, csrf_enabled=not app.testing)
if form.validate_on_submit():
user = form.user
login_user(form.user, remember=form.remember.data)
after_this_request(_commit)
if requires_confirmation(user):
msg = get_message('CONFIRMATION_REQUIRED')
confirm_url = url_for('send_confirmation', email=user.email)
form.email.errors.append(msg[0])
if not request.json:
return redirect(get_post_login_redirect())
elif verify_password(form.password.data, user.password):
if login_user(user, remember=form.remember.data):
after_this_request(_commit)
if request.json:
return _json_auth_ok(user)
return redirect(get_post_login_redirect())
msg = get_message('DISABLED_ACCOUNT')
form.email.errors.append(msg[0])
else:
msg = get_message('PASSWORD_MISMATCH')
form.password.errors.append(msg[0])
_logger.debug('Unsuccessful authentication attempt: %s' % msg[0])
if request.json:
return _json_auth_error(msg[0])
if confirm_url:
do_flash(*msg)
return redirect(confirm_url)
if request.json:
return _render_json(form)
return render_template('security/login_user.html',
login_user_form=form,
@@ -125,47 +91,37 @@ def logout():
"""View function which handles a logout request."""
logout_user()
_logger.debug('User logged out')
next_url = request.args.get('next', None)
post_logout_url = get_url(_security.post_logout_view)
return redirect(next_url or post_logout_url)
return redirect(request.args.get('next', None) or
get_url(_security.post_logout_view))
@anonymous_user_required
def register():
"""View function which handles a registration request."""
form = RegisterForm(csrf_enabled=not app.testing)
if not form.validate_on_submit():
return render_template('security/register_user.html',
register_user_form=form,
**_ctx('register'))
confirmation_link, token = None, None
user = _datastore.create_user(**form.to_dict())
_commit()
if _security.confirmable:
confirmation_link, token = generate_confirmation_link(user)
do_flash(*get_message('CONFIRM_REGISTRATION', email=user.email))
form = ConfirmRegisterForm
else:
form = RegisterForm
user_registered.send(dict(user=user, confirm_token=token),
app=app._get_current_object())
form = form(csrf_enabled=not app.testing)
send_mail('Welcome', user.email, 'welcome',
user=user, confirmation_link=confirmation_link)
if form.validate_on_submit():
user = register_user(**form.to_dict())
_logger.debug('User %s registered' % user)
if not _security.confirmable or _security.login_without_confirmation:
after_this_request(_commit)
login_user(user)
if not _security.confirmable or _security.login_without_confirmation:
after_this_request(_commit)
login_user(user)
post_register_url = get_url(_security.post_register_view)
post_login_url = get_url(_security.post_login_view)
post_register_url = get_url(_security.post_register_view)
post_login_url = get_url(_security.post_login_view)
return redirect(post_register_url or post_login_url)
return redirect(post_register_url or post_login_url)
return render_template('security/register_user.html',
register_user_form=form,
**_ctx('register'))
@anonymous_user_required
@@ -175,13 +131,8 @@ def send_login():
form = PasswordlessLoginForm(csrf_enabled=not app.testing)
if form.validate_on_submit():
user = _datastore.find_user(**form.to_dict())
if user.is_active():
send_login_instructions(user, form.next.data)
do_flash(*get_message('LOGIN_EMAIL_SENT', email=user.email))
else:
form.email.errors.append(get_message('DISABLED_ACCOUNT')[0])
send_login_instructions(**form.to_dict())
do_flash(*get_message('LOGIN_EMAIL_SENT', email=form.user.email))
return render_template('security/send_login.html',
send_login_form=form,
@@ -211,14 +162,8 @@ def send_confirmation():
form = SendConfirmationForm(csrf_enabled=not app.testing)
if form.validate_on_submit():
user = _datastore.find_user(**form.to_dict())
if user.confirmed_at is None:
send_confirmation_instructions(user)
msg = get_message('CONFIRMATION_REQUEST', email=user.email)
_logger.debug('%s request confirmation instructions' % user)
else:
msg = get_message('ALREADY_CONFIRMED')
do_flash(*msg)
send_confirmation_instructions(form.user)
do_flash(*get_message('CONFIRMATION_REQUEST', email=form.user.email))
return render_template('security/send_confirmation.html',
send_confirmation_form=form,
@@ -232,14 +177,12 @@ def confirm_email(token):
try:
user = confirm_by_token(token)
except ConfirmationError, e:
_logger.debug('Confirmation error: %s' % e)
if e.user:
send_confirmation_instructions(e.user)
do_flash(str(e), 'error')
confirm_error_url = get_url(_security.confirm_error_view)
return redirect(confirm_error_url or url_for('send_confirmation'))
_logger.debug('%s confirmed their email' % user)
do_flash(*get_message('EMAIL_CONFIRMED'))
login_user(user, True)
post_confirm_url = get_url(_security.post_confirm_view)
@@ -254,10 +197,8 @@ def forgot_password():
form = ForgotPasswordForm(csrf_enabled=not app.testing)
if form.validate_on_submit():
user = _datastore.find_user(**form.to_dict())
send_reset_password_instructions(user)
_logger.debug('%s requested to reset their password' % user)
do_flash(*get_message('PASSWORD_RESET_REQUEST', email=user.email))
send_reset_password_instructions(form.user)
do_flash(*get_message('PASSWORD_RESET_REQUEST', email=form.user.email))
return render_template('security/forgot_password.html',
forgot_password_form=form,
@@ -269,7 +210,7 @@ def reset_password(token):
"""View function that handles a reset password request."""
next = None
form = ResetPasswordForm(csrf_enabled=not app.testing)
form = ResetPasswordForm(reset_token=token, csrf_enabled=not app.testing)
if form.validate_on_submit():
try:
@@ -281,13 +222,11 @@ def reset_password(token):
msg = (str(e), 'error')
if e.user:
send_reset_password_instructions(e.user)
_logger.debug('Password reset error: ' + msg[0])
do_flash(*msg)
if next:
login_user(user)
_logger.debug('%s reset their password' % user)
return redirect(next)
return render_template('security/reset_password.html',
+1 -1
View File
@@ -30,7 +30,7 @@ class SecurityTest(TestCase):
headers=headers)
def register(self, email, password='password'):
data = dict(email=email, password=password, password_confirm=password)
data = dict(email=email, password=password)
return self.client.post('/register', data=data, follow_redirects=True)
def authenticate(self, email="matt@lp.com", password="password", endpoint=None, **kwargs):
+1 -1
View File
@@ -59,7 +59,7 @@ class DefaultSecurityTests(SecurityTest):
def test_bad_password(self):
r = self.authenticate(password="bogus")
self.assertIn("Password does not match", r.data)
self.assertIn("Invalid password", r.data)
def test_inactive_user(self):
r = self.authenticate("tiya@lp.com", "password")