diff --git a/CHANGES b/CHANGES index c3aeb7e..1a6c473 100644 --- a/CHANGES +++ b/CHANGES @@ -3,6 +3,14 @@ Flask-Security Changelog Here you can see the full list of changes between each Flask-Security release. +Version 1.7.2 +------------- + +Released January 14th 2014 + +- Fixed a bug regarding the `password_changed` signal. + + Version 1.7.1 ------------- diff --git a/flask_security/__init__.py b/flask_security/__init__.py index 5328aca..514587b 100644 --- a/flask_security/__init__.py +++ b/flask_security/__init__.py @@ -15,9 +15,9 @@ __version__ = '1.7.1' from .core import Security, RoleMixin, UserMixin, AnonymousUser, current_user from .datastore import SQLAlchemyUserDatastore, MongoEngineUserDatastore, PeeweeUserDatastore from .decorators import auth_token_required, http_auth_required, \ - login_required, roles_accepted, roles_required, auth_required + login_required, roles_accepted, roles_required, auth_required from .forms import ForgotPasswordForm, LoginForm, RegisterForm, \ - ResetPasswordForm, PasswordlessLoginForm, ConfirmRegisterForm + ResetPasswordForm, PasswordlessLoginForm, ConfirmRegisterForm from .signals import confirm_instructions_sent, password_reset, \ - reset_password_instructions_sent, user_confirmed, user_registered + reset_password_instructions_sent, user_confirmed, user_registered from .utils import login_user, logout_user, url_for_security diff --git a/flask_security/changeable.py b/flask_security/changeable.py index 6d72a2e..6918cc0 100644 --- a/flask_security/changeable.py +++ b/flask_security/changeable.py @@ -29,8 +29,8 @@ def send_password_changed_notice(user): :param user: The user to send the notice to """ if config_value('SEND_PASSWORD_CHANGE_EMAIL'): - send_mail(config_value('EMAIL_SUBJECT_PASSWORD_CHANGE_NOTICE'), user.email, - 'change_notice', user=user) + subject = config_value('EMAIL_SUBJECT_PASSWORD_CHANGE_NOTICE') + send_mail(subject, user.email, 'change_notice', user=user) def change_user_password(user, password): diff --git a/flask_security/confirmable.py b/flask_security/confirmable.py index e59529d..8d19ae8 100644 --- a/flask_security/confirmable.py +++ b/flask_security/confirmable.py @@ -58,7 +58,9 @@ def generate_confirmation_token(user): def requires_confirmation(user): """Returns `True` if the user requires confirmation.""" - return _security.confirmable and not _security.login_without_confirmation and user.confirmed_at == None + return (_security.confirmable and + not _security.login_without_confirmation and + user.confirmed_at is None) def confirm_email_token_status(token): diff --git a/flask_security/core.py b/flask_security/core.py index c9574a5..5f8682d 100644 --- a/flask_security/core.py +++ b/flask_security/core.py @@ -92,38 +92,74 @@ _default_config = { #: Default Flask-Security messages _default_messages = { - 'UNAUTHORIZED': ('You do not have permission to view this resource.', 'error'), - 'CONFIRM_REGISTRATION': ('Thank you. Confirmation instructions have been sent to %(email)s.', 'success'), - 'EMAIL_CONFIRMED': ('Thank you. Your email has been confirmed.', 'success'), - 'ALREADY_CONFIRMED': ('Your email has already been confirmed.', 'info'), - 'INVALID_CONFIRMATION_TOKEN': ('Invalid confirmation token.', 'error'), - 'EMAIL_ALREADY_ASSOCIATED': ('%(email)s is already associated with an account.', 'error'), - 'PASSWORD_MISMATCH': ('Password does not match', 'error'), - 'RETYPE_PASSWORD_MISMATCH': ('Passwords do not match', 'error'), - 'INVALID_REDIRECT': ('Redirections outside the domain are forbidden', 'error'), - 'PASSWORD_RESET_REQUEST': ('Instructions to reset your password have been sent to %(email)s.', 'info'), - 'PASSWORD_RESET_EXPIRED': ('You did not reset your password within %(within)s. New instructions have been sent to %(email)s.', 'error'), - 'INVALID_RESET_PASSWORD_TOKEN': ('Invalid reset password token.', 'error'), - 'CONFIRMATION_REQUIRED': ('Email requires confirmation.', 'error'), - 'CONFIRMATION_REQUEST': ('Confirmation instructions have been sent to %(email)s.', 'info'), - 'CONFIRMATION_EXPIRED': ('You did not confirm your email within %(within)s. New instructions to confirm your email have been sent to %(email)s.', 'error'), - 'LOGIN_EXPIRED': ('You did not login within %(within)s. New instructions to login have been sent to %(email)s.', 'error'), - 'LOGIN_EMAIL_SENT': ('Instructions to login have been sent to %(email)s.', 'success'), - 'INVALID_LOGIN_TOKEN': ('Invalid login token.', 'error'), - 'DISABLED_ACCOUNT': ('Account is disabled.', 'error'), - 'EMAIL_NOT_PROVIDED': ('Email not provided', 'error'), - 'INVALID_EMAIL_ADDRESS': ('Invalid email address', 'error'), - 'PASSWORD_NOT_PROVIDED': ('Password not provided', 'error'), - 'PASSWORD_NOT_SET': ('No password is set for this user', 'error'), - 'PASSWORD_INVALID_LENGTH': ('Password must be at least 6 characters', 'error'), - 'USER_DOES_NOT_EXIST': ('Specified user does not exist', 'error'), - 'INVALID_PASSWORD': ('Invalid password', 'error'), - 'PASSWORDLESS_LOGIN_SUCCESSFUL': ('You have successfuly logged in.', 'success'), - 'PASSWORD_RESET': ('You successfully reset your password and you have been logged in automatically.', 'success'), - 'PASSWORD_IS_THE_SAME': ('Your new password must be different than your previous password.', 'error'), - 'PASSWORD_CHANGE': ('You successfully changed your password.', 'success'), - 'LOGIN': ('Please log in to access this page.', 'info'), - 'REFRESH': ('Please reauthenticate to access this page.', 'info'), + 'UNAUTHORIZED': ( + 'You do not have permission to view this resource.', 'error'), + 'CONFIRM_REGISTRATION': ( + 'Thank you. Confirmation instructions have been sent to %(email)s.', 'success'), + 'EMAIL_CONFIRMED': ( + 'Thank you. Your email has been confirmed.', 'success'), + 'ALREADY_CONFIRMED': ( + 'Your email has already been confirmed.', 'info'), + 'INVALID_CONFIRMATION_TOKEN': ( + 'Invalid confirmation token.', 'error'), + 'EMAIL_ALREADY_ASSOCIATED': ( + '%(email)s is already associated with an account.', 'error'), + 'PASSWORD_MISMATCH': ( + 'Password does not match', 'error'), + 'RETYPE_PASSWORD_MISMATCH': ( + 'Passwords do not match', 'error'), + 'INVALID_REDIRECT': ( + 'Redirections outside the domain are forbidden', 'error'), + 'PASSWORD_RESET_REQUEST': ( + 'Instructions to reset your password have been sent to %(email)s.', 'info'), + 'PASSWORD_RESET_EXPIRED': ( + 'You did not reset your password within %(within)s. New instructions have been sent ' + 'to %(email)s.', 'error'), + 'INVALID_RESET_PASSWORD_TOKEN': ( + 'Invalid reset password token.', 'error'), + 'CONFIRMATION_REQUIRED': ( + 'Email requires confirmation.', 'error'), + 'CONFIRMATION_REQUEST': ( + 'Confirmation instructions have been sent to %(email)s.', 'info'), + 'CONFIRMATION_EXPIRED': ( + 'You did not confirm your email within %(within)s. New instructions to confirm your email ' + 'have been sent to %(email)s.', 'error'), + 'LOGIN_EXPIRED': ( + 'You did not login within %(within)s. New instructions to login have been sent to ' + '%(email)s.', 'error'), + 'LOGIN_EMAIL_SENT': ( + 'Instructions to login have been sent to %(email)s.', 'success'), + 'INVALID_LOGIN_TOKEN': ( + 'Invalid login token.', 'error'), + 'DISABLED_ACCOUNT': ( + 'Account is disabled.', 'error'), + 'EMAIL_NOT_PROVIDED': ( + 'Email not provided', 'error'), + 'INVALID_EMAIL_ADDRESS': ( + 'Invalid email address', 'error'), + 'PASSWORD_NOT_PROVIDED': ( + 'Password not provided', 'error'), + 'PASSWORD_NOT_SET': ( + 'No password is set for this user', 'error'), + 'PASSWORD_INVALID_LENGTH': ( + 'Password must be at least 6 characters', 'error'), + 'USER_DOES_NOT_EXIST': ( + 'Specified user does not exist', 'error'), + 'INVALID_PASSWORD': ( + 'Invalid password', 'error'), + 'PASSWORDLESS_LOGIN_SUCCESSFUL': ( + 'You have successfuly logged in.', 'success'), + 'PASSWORD_RESET': ( + 'You successfully reset your password and you have been logged in automatically.', + 'success'), + 'PASSWORD_IS_THE_SAME': ( + 'Your new password must be different than your previous password.', 'error'), + 'PASSWORD_CHANGE': ( + 'You successfully changed your password.', 'success'), + 'LOGIN': ( + 'Please log in to access this page.', 'info'), + 'REFRESH': ( + 'Please reauthenticate to access this page.', 'info'), } _allowed_password_hash_schemes = [ @@ -207,7 +243,8 @@ def _get_principal(app): def _get_pwd_context(app): pw_hash = cv('PASSWORD_HASH', app=app) if pw_hash not in _allowed_password_hash_schemes: - allowed = ', '.join(_allowed_password_hash_schemes[:-1]) + ' and ' + _allowed_password_hash_schemes[-1] + allowed = (', '.join(_allowed_password_hash_schemes[:-1]) + + ' and ' + _allowed_password_hash_schemes[-1]) raise ValueError("Invalid hash scheme %r. Allowed values are %s" % (pw_hash, allowed)) return CryptContext(schemes=_allowed_password_hash_schemes, default=pw_hash) diff --git a/flask_security/decorators.py b/flask_security/decorators.py index 40b5ba5..3363fc5 100644 --- a/flask_security/decorators.py +++ b/flask_security/decorators.py @@ -13,7 +13,7 @@ from collections import namedtuple from functools import wraps from flask import current_app, Response, request, redirect, _request_ctx_stack -from flask.ext.login import current_user, login_required +from flask.ext.login import current_user, login_required # pragma: no flakes from flask.ext.principal import RoleNeed, Permission, Identity, identity_changed from werkzeug.local import LocalProxy diff --git a/flask_security/forms.py b/flask_security/forms.py index 54876ae..a913bae 100644 --- a/flask_security/forms.py +++ b/flask_security/forms.py @@ -223,7 +223,6 @@ class LoginForm(Form, NextFormMixin): self.password.errors.append(get_message('PASSWORD_NOT_PROVIDED')[0]) return False - self.user = _datastore.get_user(self.email.data) if self.user is None: diff --git a/flask_security/script.py b/flask_security/script.py index 16a03d2..a9c8084 100644 --- a/flask_security/script.py +++ b/flask_security/script.py @@ -42,9 +42,9 @@ class CreateUserCommand(Command): """Create a user""" option_list = ( - Option('-e', '--email', dest='email', default=None), + Option('-e', '--email', dest='email', default=None), Option('-p', '--password', dest='password', default=None), - Option('-a', '--active', dest='active', default=''), + Option('-a', '--active', dest='active', default=''), ) @commit diff --git a/flask_security/utils.py b/flask_security/utils.py index 794cad0..38375da 100644 --- a/flask_security/utils.py +++ b/flask_security/utils.py @@ -10,8 +10,6 @@ """ import base64 -import blinker -import functools import hashlib import hmac import sys @@ -20,16 +18,13 @@ from contextlib import contextmanager from datetime import datetime, timedelta from flask import url_for, flash, current_app, request, session, render_template -from flask.ext.login import login_user as _login_user, \ - logout_user as _logout_user +from flask.ext.login import login_user as _login_user, logout_user as _logout_user from flask.ext.mail import Message from flask.ext.principal import Identity, AnonymousIdentity, identity_changed from itsdangerous import BadSignature, SignatureExpired from werkzeug.local import LocalProxy -from .signals import user_registered, user_confirmed, \ - confirm_instructions_sent, login_instructions_sent, \ - password_reset, password_changed, reset_password_instructions_sent +from .signals import user_registered, login_instructions_sent, reset_password_instructions_sent # Convenient references _security = LocalProxy(lambda: current_app.extensions['security']) @@ -396,56 +391,3 @@ def capture_reset_password_requests(reset_password_sent_at=None): yield reset_requests finally: reset_password_instructions_sent.disconnect(_on) - - -class CaptureSignals(object): - """Testing utility for capturing blinker signals. - - Context manager which mocks out selected signals and registers which are `sent` on and what - arguments were sent. Instantiate with a list of blinker `NamedSignals` to patch. Each signal - has it's `send` mocked out. - """ - def __init__(self, signals): - """Patch all given signals and make them available as attributes. - - :param signals: list of signals - """ - self._records = {} - self._receivers = {} - for signal in signals: - self._records[signal] = [] - self._receivers[signal] = functools.partial(self._record, signal) - - def __getitem__(self, signal): - """All captured signals are available via `ctxt[signal]`. - """ - if isinstance(signal, blinker.base.NamedSignal): - return self._records[signal] - else: - super(CaptureSignals, self).__setitem__(signal) - - def _record(self, signal, *args, **kwargs): - self._records[signal].append((args, kwargs)) - - def __enter__(self): - for signal, receiver in self._receivers.items(): - signal.connect(receiver) - return self - - def __exit__(self, type, value, traceback): - for signal, receiver in self._receivers.items(): - signal.disconnect(receiver) - - def signals_sent(self): - """Return a set of the signals sent. - :rtype: list of blinker `NamedSignals`. - """ - return set([signal for signal, _ in self._records.items() if self._records[signal]]) - - -def capture_signals(): - """Factory method that creates a `CaptureSignals` with all the flask_security signals.""" - return CaptureSignals([user_registered, user_confirmed, - confirm_instructions_sent, login_instructions_sent, - password_reset, password_changed, - reset_password_instructions_sent]) diff --git a/flask_security/views.py b/flask_security/views.py index 528b4d6..ec9f899 100644 --- a/flask_security/views.py +++ b/flask_security/views.py @@ -77,8 +77,9 @@ def login(): if not request.json: return redirect(get_post_login_redirect()) - form.next.data = get_url(request.args.get('next')) \ - or get_url(request.form.get('next')) or '' + form.next.data = (get_url(request.args.get('next')) or + get_url(request.form.get('next')) or + '') if request.json: return _render_json(form, True) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..c231d3d --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,10 @@ +Flask-SQLAlchemy>=1.0 +bcrypt>=1.0.2 +flask-mongoengine>=0.7.0 +flask-peewee>=0.6.5 +pytest>=2.5.2 +pytest-cache>=1.0 +pytest-cov>=1.6 +pytest-flakes>=0.2 +pytest-pep8>=1.0.5 +tox>=1.7.0 diff --git a/setup.cfg b/setup.cfg index 736bfed..fe56265 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,4 +3,14 @@ source-dir = docs/ build-dir = docs/_build [upload_sphinx] -upload-dir = docs/_build/html \ No newline at end of file +upload-dir = docs/_build/html + +[pytest] +pep8maxlinelength = 99 + +pep8ignore = + docs/* ALL + +flakes-ignore = + ImportStarUsed + flask_security/__init__.py UnusedImport diff --git a/setup.py b/setup.py index 6cbad06..1421fde 100644 --- a/setup.py +++ b/setup.py @@ -1,22 +1,45 @@ """ Flask-Security ============== - -Flask-Security is a Flask extension that aims to add quick and simple security -to your Flask applications. - -Resources ---------- - -* `Documentation `_ -* `Issue Tracker `_ -* `Source `_ -* `Development Version - `_ - """ -from setuptools import setup +import sys + +from setuptools import setup, find_packages +from setuptools.command.test import test as TestCommand + + +def get_requirements(suffix=''): + with open('requirements%s.txt' % suffix) as f: + rv = f.read().splitlines() + return rv + + +def get_long_description(): + with open('README.rst') as f: + rv = f.read() + return rv + + +class PyTest(TestCommand): + + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [ + '-xrs', + '--cov', 'flask_security', + '--cov-report', 'term-missing', + '--pep8', + '--flakes', + '--clearcache' + ] + self.test_suite = True + + def run_tests(self): + import pytest + errno = pytest.main(self.test_args) + sys.exit(errno) + setup( name='Flask-Security', @@ -26,30 +49,14 @@ setup( author='Matt Wright', author_email='matt@nobien.net', description='Simple security for Flask apps', - long_description=__doc__, - packages=[ - 'flask_security' - ], + long_description=get_long_description(), + packages=find_packages(), zip_safe=False, include_package_data=True, platforms='any', - install_requires=[ - 'Flask>=0.10.1', - 'Flask-Login>=0.2.9', - 'Flask-Mail>=0.9.0', - 'Flask-Principal>=0.4.0', - 'Flask-WTF>=0.9.3', - 'passlib>=1.6.2', - ], - test_suite='nose.collector', - tests_require=[ - 'nose', - 'Flask-SQLAlchemy', - 'Flask-MongoEngine', - 'Flask-Peewee', - 'bcrypt', - 'simplejson' - ], + install_requires=get_requirements(), + tests_require=get_requirements('-dev'), + cmdclass={'test': PyTest}, classifiers=[ 'Development Status :: 4 - Beta', 'Environment :: Web Environment', diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index 0e7d036..0000000 --- a/tests/__init__.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- - -import hmac - -from hashlib import sha1 -from unittest import TestCase - -from tests.test_app.sqlalchemy import create_app - - -class SecurityTest(TestCase): - - APP_KWARGS = { - 'register_blueprint': True, - } - AUTH_CONFIG = None - - def setUp(self): - super(SecurityTest, self).setUp() - - app_kwargs = self.APP_KWARGS - app = self._create_app(self.AUTH_CONFIG or {}, **app_kwargs) - app.debug = False - app.config['TESTING'] = True - app.config['WTF_CSRF_ENABLED'] = False - - self.app = app - self.client = app.test_client() - - def _create_app(self, auth_config, **kwargs): - return create_app(auth_config, **kwargs) - - def _get(self, route, content_type=None, follow_redirects=None, headers=None): - return self.client.get(route, follow_redirects=follow_redirects, - content_type=content_type or 'text/html', - headers=headers) - - def _post(self, route, data=None, content_type=None, follow_redirects=True, headers=None): - content_type = content_type or 'application/x-www-form-urlencoded' - return self.client.post(route, data=data, - follow_redirects=follow_redirects, - content_type=content_type, headers=headers) - - def register(self, email, password='password'): - data = dict(email=email, password=password) - return self.client.post('/register', data=data, follow_redirects=True) - - def authenticate(self, email="matt@lp.com", password="password", endpoint=None, **kwargs): - data = dict(email=email, password=password, remember='y') - return self._post(endpoint or '/login', data=data, **kwargs) - - def json_authenticate(self, email="matt@lp.com", password="password", endpoint=None): - data = """{ - "email": "%s", - "password": "%s" - }""" - return self._post(endpoint or '/login', content_type="application/json", - data=data % (email, password)) - - def logout(self, endpoint=None): - return self._get(endpoint or '/logout', follow_redirects=True) - - def assertIsHomePage(self, data): - self.assertIn(b'Home Page', data) - - def assertIn(self, member, container, msg=None): - if hasattr(TestCase, 'assertIn'): - return TestCase.assertIn(self, member, container, msg) - - return self.assertTrue(member in container) - - def assertNotIn(self, member, container, msg=None): - if hasattr(TestCase, 'assertNotIn'): - return TestCase.assertNotIn(self, member, container, msg) - - return self.assertFalse(member in container) - - def assertIsNotNone(self, obj, msg=None): - if hasattr(TestCase, 'assertIsNotNone'): - return TestCase.assertIsNotNone(self, obj, msg) - - return self.assertTrue(obj is not None) - - def get_message(self, key, **kwargs): - return self.app.config['SECURITY_MSG_' + key][0] % kwargs diff --git a/tests/configured_tests.py b/tests/configured_tests.py deleted file mode 100644 index f0af0c4..0000000 --- a/tests/configured_tests.py +++ /dev/null @@ -1,878 +0,0 @@ -# -*- coding: utf-8 -*- - -# from __future__ import with_statement - -import base64 -import time -import simplejson as json -import flask - -from flask_security.utils import capture_registrations, \ - capture_reset_password_requests, capture_passwordless_login_requests -from flask_security.forms import LoginForm, ConfirmRegisterForm, RegisterForm, \ - ForgotPasswordForm, ResetPasswordForm, SendConfirmationForm, \ - PasswordlessLoginForm -from flask_security.forms import TextField, SubmitField, valid_user_email - -from flask_security.signals import user_registered - - -from tests import SecurityTest - - -class PasswordVerifyEncryptTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_PASSWORD_HASH': 'bcrypt', - 'SECURITY_PASSWORD_SALT': '89gf828uiguiu23ju2' - } - - def test_verify_password_bcrypt(self): - from flask_security.utils import verify_password, encrypt_password - with self.app.app_context(): - self.assertTrue(verify_password('custompassword', encrypt_password('custompassword'))) - - -class ConfiguredPasswordHashSecurityTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_PASSWORD_HASH': 'bcrypt', - 'SECURITY_PASSWORD_SALT': 'so-salty', - 'USER_COUNT': 1 - } - - def test_authenticate(self): - r = self.authenticate(endpoint="/login") - self.assertIn(b'Home Page', r.data) - - -class ConfiguredSecurityTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_REGISTERABLE': True, - 'SECURITY_LOGOUT_URL': '/custom_logout', - 'SECURITY_LOGIN_URL': '/custom_login', - 'SECURITY_POST_LOGIN_VIEW': '/post_login', - 'SECURITY_POST_LOGOUT_VIEW': '/post_logout', - 'SECURITY_POST_REGISTER_VIEW': '/post_register', - 'SECURITY_UNAUTHORIZED_VIEW': '/unauthorized', - 'SECURITY_DEFAULT_HTTP_AUTH_REALM': 'Custom Realm' - } - - def test_login_view(self): - r = self._get('/custom_login') - self.assertIn(b"

Login

", r.data) - - def test_authenticate(self): - r = self.authenticate(endpoint="/custom_login") - self.assertIn(b'Post Login', r.data) - - def test_logout(self): - self.authenticate(endpoint="/custom_login") - r = self.logout(endpoint="/custom_logout") - self.assertIn(b'Post Logout', r.data) - - def test_register_view(self): - r = self._get('/register') - self.assertIn(b'

Register

', r.data) - - def test_register(self): - data = dict(email='dude@lp.com', - password='password', - password_confirm='password') - - r = self._post('/register', data=data, follow_redirects=True) - self.assertIn(b'Post Register', r.data) - - def test_register_with_next_querystring_argument(self): - data = dict(email='dude@lp.com', - password='password', - password_confirm='password') - - r = self._post('/register?next=/page1', data=data, follow_redirects=True) - self.assertIn(b'Page 1', r.data) - - def test_register_json(self): - data = '{ "email": "dude@lp.com", "password": "password"}' - r = self._post('/register', data=data, content_type='application/json') - data = json.loads(r.data) - self.assertEquals(data['meta']['code'], 200) - - def test_register_existing_email(self): - data = dict(email='matt@lp.com', - password='password', - password_confirm='password') - r = self._post('/register', data=data, follow_redirects=True) - msg = b'matt@lp.com is already associated with an account' - self.assertIn(msg, r.data) - - def test_unauthorized(self): - self.authenticate("joe@lp.com", endpoint="/custom_auth") - r = self._get("/admin", follow_redirects=True) - msg = b'You are not allowed to access the requested resouce' - self.assertIn(msg, r.data) - - def test_default_http_auth_realm(self): - r = self._get('/http', headers={ - 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus") - }) - self.assertIn(b'

Unauthorized

', r.data) - self.assertIn('WWW-Authenticate', r.headers) - self.assertEquals('Basic realm="Custom Realm"', - r.headers['WWW-Authenticate']) - - -class BadConfiguredSecurityTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_PASSWORD_HASH': 'bcrypt', - 'USER_COUNT': 1 - } - - def test_bad_configuration_raises_runtimer_error(self): - self.assertRaises(RuntimeError, self.authenticate) - - -class DefaultTemplatePathTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_LOGIN_USER_TEMPLATE': 'custom_security/login_user.html', - } - - def test_login_user_template(self): - r = self._get('/login') - - self.assertIn(b'CUSTOM LOGIN USER', r.data) - - -class RegisterableTemplatePathTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_REGISTERABLE': True, - 'SECURITY_REGISTER_USER_TEMPLATE': 'custom_security/register_user.html' - } - - def test_register_user_template(self): - r = self._get('/register') - - self.assertIn(b'CUSTOM REGISTER USER', r.data) - - -class RecoverableTemplatePathTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_RECOVERABLE': True, - 'SECURITY_FORGOT_PASSWORD_TEMPLATE': 'custom_security/forgot_password.html', - 'SECURITY_RESET_PASSWORD_TEMPLATE': 'custom_security/reset_password.html', - } - - def test_forgot_password_template(self): - r = self._get('/reset') - - self.assertIn(b'CUSTOM FORGOT PASSWORD', r.data) - - def test_reset_password_template(self): - with capture_reset_password_requests() as requests: - r = self._post('/reset', data=dict(email='joe@lp.com'), - follow_redirects=True) - - t = requests[0]['token'] - - r = self._get('/reset/' + t) - - self.assertIn(b'CUSTOM RESET PASSWORD', r.data) - - -class ConfirmableTemplatePathTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': True, - 'SECURITY_SEND_CONFIRMATION_TEMPLATE': 'custom_security/send_confirmation.html' - } - - def test_send_confirmation_template(self): - r = self._get('/confirm') - - self.assertIn(b'CUSTOM SEND CONFIRMATION', r.data) - - -class PasswordlessTemplatePathTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_PASSWORDLESS': True, - 'SECURITY_SEND_LOGIN_TEMPLATE': 'custom_security/send_login.html' - } - - def test_send_login_template(self): - r = self._get('/login') - - self.assertIn(b'CUSTOM SEND LOGIN', r.data) - - -class RegisterableTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_REGISTERABLE': True, - 'USER_COUNT': 1 - } - - def test_register_valid_user(self): - data = dict(email='dude@lp.com', - password='password', - password_confirm='password') - self._post('/register', data=data, follow_redirects=True) - r = self.authenticate('dude@lp.com') - self.assertIn(b'Hello dude@lp.com', r.data) - - -class ConfirmableTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': True, - 'SECURITY_REGISTERABLE': True, - 'SECURITY_EMAIL_SUBJECT_REGISTER': 'Custom welcome subject', - 'USER_COUNT': 1 - } - - def test_login_before_confirmation(self): - e = 'dude@lp.com' - self.register(e) - r = self.authenticate(email=e) - self.assertIn(self.get_message('CONFIRMATION_REQUIRED').encode('utf-8'), r.data) - - def test_send_confirmation_of_already_confirmed_account(self): - e = 'dude@lp.com' - - with capture_registrations() as registrations: - r = self.register(e) - token = registrations[0]['confirm_token'] - - self.client.get('/confirm/' + token, follow_redirects=True) - self.logout() - r = self._post('/confirm', data=dict(email=e)) - m = self.get_message('ALREADY_CONFIRMED') - self.assertIn(m.encode('utf-8'), r.data) - - def test_register_sends_confirmation_email(self): - e = 'dude@lp.com' - with self.app.extensions['mail'].record_messages() as outbox: - self.register(e) - self.assertEqual(len(outbox), 1) - self.assertIn(e, outbox[0].html) - self.assertEqual('Custom welcome subject', outbox[0].subject) - - def test_confirm_email(self): - e = 'dude@lp.com' - - tokens = [] - def on_registered(sender, **kwargs): - tokens.append(kwargs['confirm_token']) - - user_registered.connect(on_registered, self.app) - - r = self.register(e) - self.assertEqual(len(tokens), 1) - r = self.client.get('/confirm/' + tokens[0], follow_redirects=True) - msg = self.app.config['SECURITY_MSG_EMAIL_CONFIRMED'][0] - self.assertIn(msg.encode('utf-8'), r.data) - - def test_invalid_token_when_confirming_email(self): - r = self.client.get('/confirm/bogus', follow_redirects=True) - msg = self.app.config['SECURITY_MSG_INVALID_CONFIRMATION_TOKEN'][0] - self.assertIn(msg.encode('utf-8'), r.data) - - def test_send_confirmation_json(self): - r = self._post('/confirm', data='{"email": "matt@lp.com"}', - content_type='application/json') - self.assertEquals(r.status_code, 200) - - def test_send_confirmation_with_invalid_email(self): - r = self._post('/confirm', data=dict(email='bogus@bogus.com')) - msg = self.app.config['SECURITY_MSG_USER_DOES_NOT_EXIST'][0] - self.assertIn(msg.encode('utf-8'), r.data) - - def test_resend_confirmation(self): - e = 'dude@lp.com' - self.register(e) - r = self._post('/confirm', data={'email': e}) - - msg = self.get_message('CONFIRMATION_REQUEST', email=e).encode('utf-8') - self.assertIn(msg, r.data) - - def test_user_deleted_before_confirmation(self): - e = 'dude@lp.com' - - with capture_registrations() as registrations: - self.register(e) - user = registrations[0]['user'] - token = registrations[0]['confirm_token'] - - with self.app.app_context(): - from flask_security.core import _security - _security.datastore.delete(user) - _security.datastore.commit() - - r = self.client.get('/confirm/' + token, follow_redirects=True) - msg = self.app.config['SECURITY_MSG_INVALID_CONFIRMATION_TOKEN'][0] - self.assertIn(msg.encode('utf-8'), r.data) - - -class ExpiredConfirmationTest(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': True, - 'SECURITY_REGISTERABLE': True, - 'SECURITY_CONFIRM_EMAIL_WITHIN': '1 milliseconds', - 'USER_COUNT': 1 - } - - def test_expired_confirmation_token_sends_email(self): - e = 'dude@lp.com' - - with capture_registrations() as registrations: - self.register(e) - token = registrations[0]['confirm_token'] - - time.sleep(1.25) - - with self.app.extensions['mail'].record_messages() as outbox: - r = self.client.get('/confirm/' + token, follow_redirects=True) - - self.assertEqual(len(outbox), 1) - self.assertNotIn(token, outbox[0].html) - - expire_text = self.AUTH_CONFIG['SECURITY_CONFIRM_EMAIL_WITHIN'] - msg = self.app.config['SECURITY_MSG_CONFIRMATION_EXPIRED'][0] - msg = msg % dict(within=expire_text, email=e) - self.assertIn(msg.encode('utf-8'), r.data) - - -class LoginWithoutImmediateConfirmTests(SecurityTest): - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': True, - 'SECURITY_REGISTERABLE': True, - 'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True, - 'USER_COUNT': 1 - } - - def test_register_valid_user_automatically_signs_in(self): - e = 'dude@lp.com' - p = 'password' - data = dict(email=e, password=p, password_confirm=p) - r = self._post('/register', data=data, follow_redirects=True) - self.assertIn(e.encode('utf-8'), r.data) - - def test_confirm_email_of_user_different_than_current_user(self): - e1 = 'dude@lp.com' - e2 = 'lady@lp.com' - - with capture_registrations() as registrations: - self.register(e1) - self.logout() - self.register(e2) - token1 = registrations[0]['confirm_token'] - token2 = registrations[1]['confirm_token'] - - self.client.get('/confirm/' + token1, follow_redirects=True) - self.client.get('/logout') - self.authenticate(email=e1) - r = self.client.get('/confirm/' + token2, follow_redirects=True) - m = self.app.config['SECURITY_MSG_EMAIL_CONFIRMED'][0] - self.assertIn(m.encode('utf-8'), r.data) - self.assertIn(b'Hello lady@lp.com', r.data) - - def test_login_unconfirmed_user_when_login_without_confirmation_is_true(self): - e = 'dude@lp.com' - p = 'password' - data = dict(email=e, password=p, password_confirm=p) - r = self._post('/register', data=data, follow_redirects=True) - self.assertIn(e.encode('utf-8'), r.data) - self.client.get('/logout') - r = self.authenticate(email=e) - self.assertIn(e.encode('utf-8'), r.data) - - -class RecoverableTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_RECOVERABLE': True, - 'SECURITY_RESET_PASSWORD_ERROR_VIEW': '/', - 'SECURITY_POST_FORGOT_VIEW': '/' - } - - def test_reset_view(self): - with capture_reset_password_requests() as requests: - r = self._post('/reset', data=dict(email='joe@lp.com'), - follow_redirects=True) - t = requests[0]['token'] - r = self._get('/reset/' + t) - self.assertIn(b'

Reset password

', r.data) - - def test_forgot_post_sends_email(self): - with capture_reset_password_requests(): - with self.app.extensions['mail'].record_messages() as outbox: - self._post('/reset', data=dict(email='joe@lp.com')) - self.assertEqual(len(outbox), 1) - - def test_forgot_password_json(self): - r = self._post('/reset', data='{"email": "matt@lp.com"}', - content_type="application/json") - self.assertEquals(r.status_code, 200) - - def test_forgot_password_invalid_email(self): - r = self._post('/reset', data=dict(email='larry@lp.com'), - follow_redirects=True) - self.assertIn(b"Specified user does not exist", r.data) - - def test_reset_password_with_valid_token(self): - with capture_reset_password_requests() as requests: - r = self._post('/reset', data=dict(email='joe@lp.com'), - follow_redirects=True) - t = requests[0]['token'] - - r = self._post('/reset/' + t, data={ - 'password': 'newpassword', - 'password_confirm': 'newpassword' - }, follow_redirects=True) - - r = self.logout() - r = self.authenticate('joe@lp.com', 'newpassword') - self.assertIn(b'Hello joe@lp.com', r.data) - - def test_reset_password_with_invalid_token(self): - r = self._post('/reset/bogus', data={ - 'password': 'newpassword', - 'password_confirm': 'newpassword' - }, follow_redirects=True) - m = self.get_message('INVALID_RESET_PASSWORD_TOKEN') - self.assertIn(m.encode('utf-8'), r.data) - - def test_reset_password_with_mangled_token(self): - t = "WyIxNjQ2MzYiLCIxMzQ1YzBlZmVhM2VhZjYwODgwMDhhZGU2YzU0MzZjMiJd.BZEw_Q.lQyo3npdPZtcJ_sNHVHP103syjM&url_id=fbb89a8328e58c181ea7d064c2987874bc54a23d" - r = self._post('/reset/' + t, data={ - 'password': 'newpassword', - 'password_confirm': 'newpassword' - }, follow_redirects=True) - - m = self.get_message('INVALID_RESET_PASSWORD_TOKEN') - self.assertIn(m.encode('utf-8'), r.data) - - -class ExpiredResetPasswordTest(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_RECOVERABLE': True, - 'SECURITY_RESET_PASSWORD_WITHIN': '1 milliseconds' - } - - def test_reset_password_with_expired_token(self): - with capture_reset_password_requests() as requests: - r = self._post('/reset', data=dict(email='joe@lp.com'), - follow_redirects=True) - t = requests[0]['token'] - - time.sleep(1) - - r = self._post('/reset/' + t, data={ - 'password': 'newpassword', - 'password_confirm': 'newpassword' - }, follow_redirects=True) - - self.assertIn(b'You did not reset your password within', r.data) - - -class ChangePasswordTest(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_RECOVERABLE': True, - 'SECURITY_CHANGEABLE': True, - } - - def test_change_password(self): - self.authenticate() - r = self.client.get('/change', follow_redirects=True) - self.assertIn(b'Change password', r.data) - - def test_change_password_invalid(self): - self.authenticate() - r = self._post('/change', data={ - 'password': 'notpassword', - 'new_password': 'newpassword', - 'new_password_confirm': 'newpassword' - }, follow_redirects=True) - self.assertNotIn(b'You successfully changed your password', r.data) - self.assertIn(b'Invalid password', r.data) - - def test_change_password_mismatch(self): - self.authenticate() - r = self._post('/change', data={ - 'password': 'password', - 'new_password': 'newpassword', - 'new_password_confirm': 'notnewpassword' - }, follow_redirects=True) - self.assertNotIn(b'You successfully changed your password', r.data) - self.assertIn(b'Passwords do not match', r.data) - - def test_change_password_bad_password(self): - self.authenticate() - r = self._post('/change', data={ - 'password': 'password', - 'new_password': 'a', - 'new_password_confirm': 'a' - }, follow_redirects=True) - self.assertNotIn(b'You successfully changed your password', r.data) - self.assertIn(b'Password must be at least 6 characters', r.data) - - def test_change_password_same_as_previous(self): - self.authenticate() - r = self._post('/change', data={ - 'password': 'password', - 'new_password': 'password', - 'new_password_confirm': 'password' - }, follow_redirects=True) - self.assertNotIn(b'You successfully changed your password', r.data) - self.assertIn(b'Your new password must be different than your previous password.', r.data) - - def test_change_password_success(self): - data = { - 'password': 'password', - 'new_password': 'newpassword', - 'new_password_confirm': 'newpassword' - } - - self.authenticate() - with self.app.extensions['mail'].record_messages() as outbox: - r = self._post('/change', data=data, follow_redirects=True) - - self.assertIn(b'You successfully changed your password', r.data) - self.assertIn(b'Home Page', r.data) - - self.assertEqual(len(outbox), 1) - self.assertIn("Your password has been changed", outbox[0].html) - self.assertIn("/reset", outbox[0].html) - - -class EmailConfigTest(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_SEND_REGISTER_EMAIL': False, - 'SECURITY_SEND_PASSWORD_CHANGE_EMAIL': False, - } - - def test_change_password_success_email_option(self): - """Test the change password email can be turned off w/ configuration.""" - - data = { - 'password': 'password', - 'new_password': 'newpassword', - 'new_password_confirm': 'newpassword' - } - - self.authenticate() - with self.app.extensions['mail'].record_messages() as outbox: - self._post('/change', data=data, follow_redirects=True) - self.assertEqual(len(outbox), 0) - - -class ChangePasswordPostViewTest(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_CHANGEABLE': True, - 'SECURITY_POST_CHANGE_VIEW': '/profile', - } - - def test_change_password_success(self): - data = { - 'password': 'password', - 'new_password': 'newpassword', - 'new_password_confirm': 'newpassword' - } - self.authenticate() - r = self._post('/change', data=data, follow_redirects=True) - - self.assertIn(b'Profile Page', r.data) - - -class ChangePasswordDisabledTest(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_CHANGEABLE': False, - } - - def test_change_password_endpoint_is_404(self): - self.authenticate() - r = self.client.get('/change', follow_redirects=True) - self.assertEqual(404, r.status_code) - - -class TrackableTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_TRACKABLE': True, - 'USER_COUNT': 1 - } - - def test_did_track(self): - e = 'matt@lp.com' - self.authenticate(email=e) - self.logout() - self.authenticate(email=e) - - with self.app.test_request_context('/profile'): - user = self.app.security.datastore.find_user(email=e) - self.assertIsNotNone(user.last_login_at) - self.assertIsNotNone(user.current_login_at) - self.assertEquals('untrackable', user.last_login_ip) - self.assertEquals('untrackable', user.current_login_ip) - self.assertEquals(2, user.login_count) - - -class PasswordlessTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_PASSWORDLESS': True - } - - def test_login_request_for_inactive_user(self): - msg = self.app.config['SECURITY_MSG_DISABLED_ACCOUNT'][0] - r = self._post('/login', data=dict(email='tiya@lp.com'), - follow_redirects=True) - self.assertIn(msg.encode('utf-8'), r.data) - - def test_request_login_token_with_json_and_valid_email(self): - data = '{"email": "matt@lp.com", "password": "password"}' - r = self._post('/login', data=data, content_type='application/json') - self.assertEquals(r.status_code, 200) - self.assertNotIn(b'error', r.data) - - def test_request_login_token_with_json_and_invalid_email(self): - data = '{"email": "nobody@lp.com", "password": "password"}' - r = self._post('/login', data=data, content_type='application/json') - self.assertIn(b'errors', r.data) - - def test_request_login_token_sends_email_and_can_login(self): - e = 'matt@lp.com' - r, user, token = None, None, None - - with capture_passwordless_login_requests() as requests: - with self.app.extensions['mail'].record_messages() as outbox: - r = self._post('/login', data=dict(email=e), - follow_redirects=True) - - self.assertEqual(len(outbox), 1) - - self.assertEquals(1, len(requests)) - self.assertIn('user', requests[0]) - self.assertIn('login_token', requests[0]) - - user = requests[0]['user'] - token = requests[0]['login_token'] - - msg = self.app.config['SECURITY_MSG_LOGIN_EMAIL_SENT'][0] - msg = msg % dict(email=user.email) - self.assertIn(msg.encode('utf-8'), r.data) - - r = self.client.get('/login/' + token, follow_redirects=True) - msg = self.get_message('PASSWORDLESS_LOGIN_SUCCESSFUL').encode('utf-8') - self.assertIn(msg, r.data) - - r = self.client.get('/profile') - self.assertIn(b'Profile Page', r.data) - - def test_invalid_login_token(self): - m = self.app.config['SECURITY_MSG_INVALID_LOGIN_TOKEN'][0] - r = self._get('/login/bogus', follow_redirects=True) - self.assertIn(m.encode('utf-8'), r.data) - - def test_token_login_when_already_authenticated(self): - with capture_passwordless_login_requests() as requests: - self._post('/login', data=dict(email='matt@lp.com'), - follow_redirects=True) - token = requests[0]['login_token'] - - r = self.client.get('/login/' + token, follow_redirects=True) - msg = self.get_message('PASSWORDLESS_LOGIN_SUCCESSFUL') - self.assertIn(msg.encode('utf-8'), r.data) - - r = self.client.get('/login/' + token, follow_redirects=True) - msg = self.get_message('PASSWORDLESS_LOGIN_SUCCESSFUL') - self.assertNotIn(msg.encode('utf-8'), r.data) - - def test_send_login_with_invalid_email(self): - r = self._post('/login', data=dict(email='bogus@bogus.com')) - self.assertIn(b'Specified user does not exist', r.data) - - -class ExpiredLoginTokenTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_PASSWORDLESS': True, - 'SECURITY_LOGIN_WITHIN': '1 milliseconds', - 'USER_COUNT': 1 - } - - def test_expired_login_token_sends_email(self): - e = 'matt@lp.com' - - with capture_passwordless_login_requests() as requests: - self._post('/login', data=dict(email=e), follow_redirects=True) - token = requests[0]['login_token'] - - time.sleep(1.25) - - with self.app.extensions['mail'].record_messages() as outbox: - r = self.client.get('/login/' + token, follow_redirects=True) - - expire_text = self.AUTH_CONFIG['SECURITY_LOGIN_WITHIN'] - msg = self.app.config['SECURITY_MSG_LOGIN_EXPIRED'][0] - msg = msg % dict(within=expire_text, email=e) - self.assertIn(msg.encode('utf-8'), r.data) - self.assertEqual(len(outbox), 1) - self.assertIn(e, outbox[0].html) - self.assertNotIn(token, outbox[0].html) - - -class AsyncMailTaskTests(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_RECOVERABLE': True, - 'USER_COUNT': 1 - } - - def setUp(self): - super(AsyncMailTaskTests, self).setUp() - self.mail_sent = False - - def test_send_email_task_is_called(self): - @self.app.security.send_mail_task - def send_email(msg): - self.mail_sent = True - - self._post('/reset', data=dict(email='matt@lp.com')) - self.assertTrue(self.mail_sent) - - -class NoBlueprintTests(SecurityTest): - - APP_KWARGS = { - 'register_blueprint': False, - } - - AUTH_CONFIG = { - 'USER_COUNT': 1 - } - - def test_login_endpoint_is_404(self): - r = self._get('/login') - self.assertEqual(404, r.status_code) - - def test_http_auth_without_blueprint(self): - auth = base64.b64encode(b"matt@lp.com:password").decode('utf-8') - r = self._get('/http', headers={'Authorization': 'basic %s' % auth}) - self.assertIn(b'HTTP Authentication', r.data) - - -class ExtendFormsTest(SecurityTest): - - class MyLoginForm(LoginForm): - email = TextField('My Login Email Address Field') - - class MyRegisterForm(RegisterForm): - email = TextField('My Register Email Address Field') - - APP_KWARGS = { - 'login_form': MyLoginForm, - 'register_form': MyRegisterForm, - } - - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': False, - 'SECURITY_REGISTERABLE': True, - } - - def test_login_view(self): - r = self._get('/login', follow_redirects=True) - self.assertIn(b"My Login Email Address Field", r.data) - - def test_register(self): - r = self._get('/register', follow_redirects=True) - self.assertIn(b"My Register Email Address Field", r.data) - - -class RecoverableExtendFormsTest(SecurityTest): - - class MyForgotPasswordForm(ForgotPasswordForm): - email = TextField('My Forgot Password Email Address Field', - validators=[valid_user_email]) - - class MyResetPasswordForm(ResetPasswordForm): - submit = SubmitField("My Reset Password Submit Field") - - APP_KWARGS = { - 'forgot_password_form': MyForgotPasswordForm, - 'reset_password_form': MyResetPasswordForm, - } - - AUTH_CONFIG = { - 'SECURITY_RECOVERABLE': True, - } - - def test_forgot_password(self): - r = self._get('/reset', follow_redirects=True) - self.assertIn(b"My Forgot Password Email Address Field", r.data) - - def test_reset_password(self): - with capture_reset_password_requests() as requests: - self._post('/reset', data=dict(email='joe@lp.com'), - follow_redirects=True) - token = requests[0]['token'] - r = self._get('/reset/' + token) - self.assertIn(b"My Reset Password Submit Field", r.data) - - -class PasswordlessExtendFormsTest(SecurityTest): - - class MyPasswordlessLoginForm(PasswordlessLoginForm): - email = TextField('My Passwordless Login Email Address Field') - - APP_KWARGS = { - 'passwordless_login_form': MyPasswordlessLoginForm, - } - - AUTH_CONFIG = { - 'SECURITY_PASSWORDLESS': True, - } - - def test_passwordless_login(self): - r = self._get('/login', follow_redirects=True) - self.assertIn(b"My Passwordless Login Email Address Field", r.data) - - -class ConfirmableExtendFormsTest(SecurityTest): - - class MyConfirmRegisterForm(ConfirmRegisterForm): - email = TextField('My Confirm Register Email Address Field') - - class MySendConfirmationForm(SendConfirmationForm): - email = TextField('My Send Confirmation Email Address Field') - - APP_KWARGS = { - 'confirm_register_form': MyConfirmRegisterForm, - 'send_confirmation_form': MySendConfirmationForm, - } - - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': True, - 'SECURITY_REGISTERABLE': True, - } - - def test_register(self): - r = self._get('/register', follow_redirects=True) - self.assertIn(b"My Confirm Register Email Address Field", r.data) - - def test_send_confirmation(self): - r = self._get('/confirm', follow_redirects=True) - self.assertIn(b"My Send Confirmation Email Address Field", r.data) - - -class AdditionalUserIdentityAttributes(SecurityTest): - - AUTH_CONFIG = { - 'SECURITY_USER_IDENTITY_ATTRIBUTES': ('email', 'username') - } - - def test_authenticate(self): - r = self.authenticate(email='matt') - self.assertIn(b'Hello matt@lp.com', r.data) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9fb514a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,307 @@ +# -*- coding: utf-8 -*- +""" + conftest + ~~~~~~~~ + + Test fixtures and what not +""" + +import os +import tempfile +import time + +import pytest + +from flask import Flask, render_template, current_app +from flask_mail import Mail + +from flask_security import Security, MongoEngineUserDatastore, SQLAlchemyUserDatastore, \ + PeeweeUserDatastore, UserMixin, RoleMixin, http_auth_required, login_required, \ + auth_token_required, auth_required, roles_required, roles_accepted + +from utils import populate_data, Response + + +@pytest.fixture() +def app(): + app = Flask(__name__) + app.response_class = Response + app.debug = True + app.config['SECRET_KEY'] = 'secret' + app.config['TESTING'] = True + app.config['LOGIN_DISABLED'] = False + app.config['WTF_CSRF_ENABLED'] = False + + mail = Mail(app) + app.mail = mail + + @app.route('/') + def index(): + return render_template('index.html', content='Home Page') + + @app.route('/profile') + @login_required + def profile(): + return render_template('index.html', content='Profile Page') + + @app.route('/post_login') + @login_required + def post_login(): + return render_template('index.html', content='Post Login') + + @app.route('/http') + @http_auth_required + def http(): + return 'HTTP Authentication' + + @app.route('/http_custom_realm') + @http_auth_required('My Realm') + def http_custom_realm(): + return render_template('index.html', content='HTTP Authentication') + + @app.route('/token') + @auth_token_required + def token(): + return render_template('index.html', content='Token Authentication') + + @app.route('/multi_auth') + @auth_required('session', 'token', 'basic') + def multi_auth(): + return render_template('index.html', content='Session, Token, Basic auth') + + @app.route('/post_logout') + def post_logout(): + return render_template('index.html', content='Post Logout') + + @app.route('/post_register') + def post_register(): + return render_template('index.html', content='Post Register') + + @app.route('/admin') + @roles_required('admin') + def admin(): + return render_template('index.html', content='Admin Page') + + @app.route('/admin_and_editor') + @roles_required('admin', 'editor') + def admin_and_editor(): + return render_template('index.html', content='Admin and Editor Page') + + @app.route('/admin_or_editor') + @roles_accepted('admin', 'editor') + def admin_or_editor(): + return render_template('index.html', content='Admin or Editor Page') + + @app.route('/unauthorized') + def unauthorized(): + return render_template('unauthorized.html') + + @app.route('/coverage/add_role_to_user') + def add_role_to_user(): + ds = current_app.security.datastore + u = ds.find_user(email='joe@lp.com') + r = ds.find_role('admin') + ds.add_role_to_user(u, r) + return 'success' + + @app.route('/coverage/remove_role_from_user') + def remove_role_from_user(): + ds = current_app.security.datastore + u = ds.find_user(email='matt@lp.com') + ds.remove_role_from_user(u, 'admin') + return 'success' + + @app.route('/coverage/deactivate_user') + def deactivate_user(): + ds = current_app.security.datastore + u = ds.find_user(email='matt@lp.com') + ds.deactivate_user(u) + return 'success' + + @app.route('/coverage/activate_user') + def activate_user(): + ds = current_app.security.datastore + u = ds.find_user(email='tiya@lp.com') + ds.activate_user(u) + return 'success' + + @app.route('/coverage/invalid_role') + def invalid_role(): + ds = current_app.security.datastore + return 'success' if ds.find_role('bogus') is None else 'failure' + + @app.route('/page1') + def page_1(): + return 'Page 1' + return app + + +@pytest.fixture() +def mongoengine_datastore(request, app): + from flask_mongoengine import MongoEngine + + db_name = 'flask_security_test_%s' % str(time.time()).replace('.', '_') + app.config['MONGODB_SETTINGS'] = { + 'db': db_name, + 'host': 'localhost', + 'port': 27017, + 'alias': db_name + } + + db = MongoEngine(app) + + class Role(db.Document, RoleMixin): + name = db.StringField(required=True, unique=True, max_length=80) + description = db.StringField(max_length=255) + meta = {"db_alias": db_name} + + class User(db.Document, UserMixin): + email = db.StringField(unique=True, max_length=255) + username = db.StringField(max_length=255) + password = db.StringField(required=True, max_length=255) + last_login_at = db.DateTimeField() + current_login_at = db.DateTimeField() + last_login_ip = db.StringField(max_length=100) + current_login_ip = db.StringField(max_length=100) + login_count = db.IntField() + active = db.BooleanField(default=True) + confirmed_at = db.DateTimeField() + roles = db.ListField(db.ReferenceField(Role), default=[]) + meta = {"db_alias": db_name} + + request.addfinalizer(lambda: db.connection.drop_database(db_name)) + + return MongoEngineUserDatastore(db, User, Role) + + +@pytest.fixture() +def sqlalchemy_datastore(request, app, tmpdir): + from flask_sqlalchemy import SQLAlchemy + + f, path = tempfile.mkstemp(prefix='flask-security-test-db', suffix='.db', dir=str(tmpdir)) + + app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + path + db = SQLAlchemy(app) + + roles_users = db.Table( + 'roles_users', + db.Column('user_id', db.Integer(), db.ForeignKey('user.id')), + db.Column('role_id', db.Integer(), db.ForeignKey('role.id'))) + + class Role(db.Model, RoleMixin): + id = db.Column(db.Integer(), primary_key=True) + name = db.Column(db.String(80), unique=True) + description = db.Column(db.String(255)) + + class User(db.Model, UserMixin): + id = db.Column(db.Integer, primary_key=True) + email = db.Column(db.String(255), unique=True) + username = db.Column(db.String(255)) + password = db.Column(db.String(255)) + last_login_at = db.Column(db.DateTime()) + current_login_at = db.Column(db.DateTime()) + last_login_ip = db.Column(db.String(100)) + current_login_ip = db.Column(db.String(100)) + login_count = db.Column(db.Integer) + active = db.Column(db.Boolean()) + confirmed_at = db.Column(db.DateTime()) + roles = db.relationship('Role', secondary=roles_users, + backref=db.backref('users', lazy='dynamic')) + + with app.app_context(): + db.create_all() + + request.addfinalizer(lambda: os.remove(path)) + + return SQLAlchemyUserDatastore(db, User, Role) + + +@pytest.fixture() +def peewee_datastore(request, app, tmpdir): + from peewee import TextField, DateTimeField, IntegerField, BooleanField, ForeignKeyField + from flask_peewee.db import Database + + f, path = tempfile.mkstemp(prefix='flask-security-test-db', suffix='.db', dir=str(tmpdir)) + + app.config['DATABASE'] = { + 'name': path, + 'engine': 'peewee.SqliteDatabase' + } + + db = Database(app) + + class Role(db.Model, RoleMixin): + name = TextField(unique=True) + description = TextField(null=True) + + class User(db.Model, UserMixin): + email = TextField() + username = TextField() + password = TextField() + last_login_at = DateTimeField(null=True) + current_login_at = DateTimeField(null=True) + last_login_ip = TextField(null=True) + current_login_ip = TextField(null=True) + login_count = IntegerField(null=True) + active = BooleanField(default=True) + confirmed_at = DateTimeField(null=True) + + class UserRoles(db.Model): + """ Peewee does not have built-in many-to-many support, so we have to + create this mapping class to link users to roles.""" + user = ForeignKeyField(User, related_name='roles') + role = ForeignKeyField(Role, related_name='users') + name = property(lambda self: self.role.name) + description = property(lambda self: self.role.description) + + with app.app_context(): + for Model in (Role, User, UserRoles): + Model.create_table() + + request.addfinalizer(lambda: os.remove(path)) + + return PeeweeUserDatastore(db, User, Role, UserRoles) + + +@pytest.fixture() +def sqlalchemy_app(app, sqlalchemy_datastore): + def create(): + app.security = Security(app, datastore=sqlalchemy_datastore) + return app + return create + + +@pytest.fixture() +def peewee_app(app, peewee_datastore): + def create(): + app.security = Security(app, datastore=peewee_datastore) + return app + return create + + +@pytest.fixture() +def mongoengine_app(app, mongoengine_datastore): + def create(): + app.security = Security(app, datastore=mongoengine_datastore) + return app + return create + + +@pytest.fixture(params=['sqlalchemy', 'mongoengine', 'peewee']) +def client(request, sqlalchemy_app, mongoengine_app, peewee_app): + if request.param == 'sqlalchemy': + app = sqlalchemy_app() + elif request.param == 'mongoengine': + app = mongoengine_app() + elif request.param == 'peewee': + app = peewee_app() + populate_data(app) + return app.test_client() + + +@pytest.fixture() +def get_message(app): + def fn(key, **kwargs): + rv = app.config['SECURITY_MSG_' + key][0] % kwargs + return rv.encode('utf-8') + return fn diff --git a/tests/functional_tests.py b/tests/functional_tests.py deleted file mode 100644 index d366c0e..0000000 --- a/tests/functional_tests.py +++ /dev/null @@ -1,277 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import with_statement - -import base64 -import simplejson as json - -try: - from cookielib import Cookie -except ImportError: - from http.cookiejar import Cookie - -from werkzeug.utils import parse_cookie - -from tests import SecurityTest - - -def get_cookies(rv): - cookies = {} - for value in rv.headers.get_all("Set-Cookie"): - cookies.update(parse_cookie(value)) - return cookies - - -class DefaultSecurityTests(SecurityTest): - - def test_instance(self): - self.assertIsNotNone(self.app) - self.assertIsNotNone(self.app.security) - self.assertIsNotNone(self.app.security.pwd_context) - - def test_login_view(self): - r = self._get('/login') - self.assertIn(b'

Login

', r.data) - - def test_authenticate(self): - r = self.authenticate() - self.assertIn(b'Hello matt@lp.com', r.data) - - def test_authenticate_case_insensitive_email(self): - r = self.authenticate(email='MATT@lp.com') - self.assertIn(b'Hello matt@lp.com', r.data) - - def test_unprovided_username(self): - r = self.authenticate("") - self.assertIn(self.get_message('EMAIL_NOT_PROVIDED').encode('utf-8'), r.data) - - def test_unprovided_password(self): - r = self.authenticate(password="") - self.assertIn(self.get_message('PASSWORD_NOT_PROVIDED').encode('utf-8'), r.data) - - def test_invalid_user(self): - r = self.authenticate(email="bogus@bogus.com") - self.assertIn(self.get_message('USER_DOES_NOT_EXIST').encode('utf-8'), r.data) - - def test_bad_password(self): - r = self.authenticate(password="bogus") - self.assertIn(self.get_message('INVALID_PASSWORD').encode('utf-8'), r.data) - - def test_inactive_user(self): - r = self.authenticate("tiya@lp.com", "password") - self.assertIn(self.get_message('DISABLED_ACCOUNT').encode('utf-8'), r.data) - - def test_logout(self): - self.authenticate() - r = self.logout() - self.assertIsHomePage(r.data) - - def test_unauthorized_access(self): - self.logout() - r = self._get('/profile', follow_redirects=True) - self.assertIn(b'
  • Please log in to access this page.
  • ', r.data) - - def test_authorized_access(self): - self.authenticate() - r = self._get("/profile") - self.assertIn(b'profile', r.data) - - def test_valid_admin_role(self): - self.authenticate() - r = self._get("/admin") - self.assertIn(b'Admin Page', r.data) - - def test_invalid_admin_role(self): - self.authenticate("joe@lp.com") - r = self._get("/admin", follow_redirects=True) - self.assertIsHomePage(r.data) - - def test_roles_accepted(self): - for user in ("matt@lp.com", "joe@lp.com"): - self.authenticate(user) - r = self._get("/admin_or_editor") - self.assertIn(b'Admin or Editor Page', r.data) - self.logout() - - self.authenticate("jill@lp.com") - r = self._get("/admin_or_editor", follow_redirects=True) - self.assertIsHomePage(r.data) - - def test_unauthenticated_role_required(self): - r = self._get('/admin', follow_redirects=True) - self.assertIn(self.get_message('UNAUTHORIZED').encode('utf-8'), r.data) - - def test_multiple_role_required(self): - for user in ("matt@lp.com", "joe@lp.com"): - self.authenticate(user) - r = self._get("/admin_and_editor", follow_redirects=True) - self.assertIsHomePage(r.data) - self._get('/logout') - - self.authenticate('dave@lp.com') - r = self._get("/admin_and_editor", follow_redirects=True) - self.assertIn(b'Admin and Editor Page', r.data) - - def test_ok_json_auth(self): - r = self.json_authenticate() - data = json.loads(r.data) - self.assertEquals(data['meta']['code'], 200) - self.assertIn('authentication_token', data['response']['user']) - - def test_invalid_json_auth(self): - r = self.json_authenticate(password='junk') - self.assertIn(b'"code": 400', r.data) - - def test_token_auth_via_querystring_valid_token(self): - r = self.json_authenticate() - data = json.loads(r.data) - token = data['response']['user']['authentication_token'] - r = self._get('/token?auth_token=' + token) - self.assertIn(b'Token Authentication', r.data) - - def test_token_auth_via_header_valid_token(self): - r = self.json_authenticate() - data = json.loads(r.data) - token = data['response']['user']['authentication_token'] - headers = {"Authentication-Token": token} - r = self._get('/token', headers=headers) - self.assertIn(b'Token Authentication', r.data) - - def test_token_auth_via_querystring_invalid_token(self): - r = self._get('/token?auth_token=X') - self.assertEqual(401, r.status_code) - - def test_token_auth_via_header_invalid_token(self): - r = self._get('/token', headers={"Authentication-Token": 'X'}) - self.assertEqual(401, r.status_code) - - def test_http_auth(self): - r = self._get('/http', headers={ - 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8') - }) - self.assertIn(b'HTTP Authentication', r.data) - - def test_http_auth_no_authorization(self): - r = self._get('/http', headers={}) - self.assertIn(b'

    Unauthorized

    ', r.data) - self.assertIn('WWW-Authenticate', r.headers) - self.assertEquals('Basic realm="Login Required"', - r.headers['WWW-Authenticate']) - - def test_invalid_http_auth_invalid_username(self): - r = self._get('/http', headers={ - 'Authorization': 'Basic %s' % base64.b64encode(b"bogus:bogus").decode('utf-8') - }) - self.assertIn(b'

    Unauthorized

    ', r.data) - self.assertIn('WWW-Authenticate', r.headers) - self.assertEquals('Basic realm="Login Required"', - r.headers['WWW-Authenticate']) - - def test_invalid_http_auth_bad_password(self): - r = self._get('/http', headers={ - 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8') - }) - self.assertIn(b'

    Unauthorized

    ', r.data) - self.assertIn('WWW-Authenticate', r.headers) - self.assertEquals('Basic realm="Login Required"', - r.headers['WWW-Authenticate']) - - def test_custom_http_auth_realm(self): - r = self._get('/http_custom_realm', headers={ - 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8') - }) - self.assertIn(b'

    Unauthorized

    ', r.data) - self.assertIn('WWW-Authenticate', r.headers) - self.assertEquals('Basic realm="My Realm"', - r.headers['WWW-Authenticate']) - - def test_multi_auth_basic(self): - r = self._get('/multi_auth', headers={ - 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8') - }) - self.assertIn(b'Basic', r.data) - - def test_multi_auth_token(self): - r = self.json_authenticate() - data = json.loads(r.data) - token = data['response']['user']['authentication_token'] - r = self._get('/multi_auth?auth_token=' + token) - self.assertIn(b'Token', r.data) - - def test_multi_auth_session(self): - self.authenticate() - r = self._get('/multi_auth') - self.assertIn(b'Session', r.data) - - def test_user_deleted_during_session_reverts_to_anonymous_user(self): - self.authenticate() - - with self.app.test_request_context('/'): - user = self.app.security.datastore.find_user(email='matt@lp.com') - self.app.security.datastore.delete_user(user) - self.app.security.datastore.commit() - - r = self._get('/') - self.assertNotIn(b'Hello matt@lp.com', r.data) - - def test_remember_token(self): - r = self.authenticate(follow_redirects=False) - self.client.cookie_jar.clear_session_cookies() - r = self._get('/profile') - self.assertIn(b'profile', r.data) - - def test_token_loader_does_not_fail_with_invalid_token(self): - c = Cookie(version=0, name='remember_token', value='None', port=None, - port_specified=False, domain='www.example.com', - domain_specified=False, domain_initial_dot=False, path='/', - path_specified=True, secure=False, expires=None, - discard=True, comment=None, comment_url=None, - rest={'HttpOnly': None}, rfc2109=False) - - self.client.cookie_jar.set_cookie(c) - r = self._get('/') - self.assertNotIn(b'BadSignature', r.data) - - -class MongoEngineSecurityTests(DefaultSecurityTests): - - def _create_app(self, auth_config, **kwargs): - from tests.test_app.mongoengine import create_app - return create_app(auth_config, **kwargs) - - -class PeeweeSecurityTests(DefaultSecurityTests): - - def _create_app(self, auth_config, **kwargs): - from tests.test_app.peewee_app import create_app - return create_app(auth_config, **kwargs) - - -class DefaultDatastoreTests(SecurityTest): - - def test_add_role_to_user(self): - r = self._get('/coverage/add_role_to_user') - self.assertIn(b'success', r.data) - - def test_remove_role_from_user(self): - r = self._get('/coverage/remove_role_from_user') - self.assertIn(b'success', r.data) - - def test_activate_user(self): - r = self._get('/coverage/activate_user') - self.assertIn(b'success', r.data) - - def test_deactivate_user(self): - r = self._get('/coverage/deactivate_user') - self.assertIn(b'success', r.data) - - def test_invalid_role(self): - r = self._get('/coverage/invalid_role') - self.assertIn(b'success', r.data) - - -class MongoEngineDatastoreTests(DefaultDatastoreTests): - - def _create_app(self, auth_config, **kwargs): - from tests.test_app.mongoengine import create_app - return create_app(auth_config, **kwargs) diff --git a/tests/signals_tests.py b/tests/signals_tests.py deleted file mode 100644 index 3cdb32e..0000000 --- a/tests/signals_tests.py +++ /dev/null @@ -1,244 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import with_statement - -from flask_security.utils import capture_registrations, \ - capture_reset_password_requests, capture_signals -from flask_security.signals import user_registered, user_confirmed, \ - confirm_instructions_sent, login_instructions_sent, \ - password_reset, password_changed, reset_password_instructions_sent -from tests import SecurityTest - - -def compare_user(a, b): - """Helper to compare two users.""" - return a.id == b.id and a.email == b.email and a.password == b.password - - -class SignalTest(SecurityTest): - - def _create_app(self, auth_config, **kwargs): - from tests.test_app.mongoengine import create_app - return create_app(auth_config, **kwargs) - - -class RegisterableSignalsTests(SignalTest): - - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': True, - 'SECURITY_REGISTERABLE': True, - } - - def test_register(self): - e = 'dude@lp.com' - with capture_signals() as mocks: - self.register(e) - user = self.app.security.datastore.find_user(email='dude@lp.com') - self.assertEqual(mocks.signals_sent(), set([user_registered])) - calls = mocks[user_registered] - self.assertEqual(len(calls), 1) - args, kwargs = calls[0] - self.assertTrue(compare_user(kwargs['user'], user)) - self.assertIn('confirm_token', kwargs) - self.assertEqual(args[0], self.app) - - def test_register_without_password(self): - e = 'dude@lp.com' - with capture_signals() as mocks: - self.register(e, password='') - self.assertEqual(mocks.signals_sent(), set()) - - -class ConfirmableSignalsTests(SignalTest): - - AUTH_CONFIG = { - 'SECURITY_CONFIRMABLE': True, - 'SECURITY_REGISTERABLE': True, - } - - def test_confirm(self): - e = 'dude@lp.com' - with capture_registrations() as registrations: - self.register(e) - token = registrations[0]['confirm_token'] - with capture_signals() as mocks: - self.client.get('/confirm/' + token, follow_redirects=True) - user = self.app.security.datastore.find_user(email='dude@lp.com') - self.assertTrue(mocks.signals_sent(), set([user_confirmed])) - calls = mocks[user_confirmed] - self.assertEqual(len(calls), 1) - args, kwargs = calls[0] - self.assertEqual(args[0], self.app) - self.assertTrue(compare_user(kwargs['user'], user)) - - def test_confirm_bad_token(self): - e = 'dude@lp.com' - with capture_registrations(): - self.register(e) - with capture_signals() as mocks: - self.client.get('/confirm/bogus', follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - def test_confirm_twice(self): - e = 'dude@lp.com' - with capture_registrations() as registrations: - self.register(e) - token = registrations[0]['confirm_token'] - self.client.get('/confirm/' + token, follow_redirects=True) - self.logout() - with capture_signals() as mocks: - self.client.get('/confirm/' + token, follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set([user_confirmed])) - # TODO: is that the desired behaviour? - - def test_resend_confirmation(self): - e = 'dude@lp.com' - self.register(e) - with capture_signals() as mocks: - self._post('/confirm', data={'email': e}) - user = self.app.security.datastore.find_user(email='dude@lp.com') - self.assertEqual(mocks.signals_sent(), set([confirm_instructions_sent])) - calls = mocks[confirm_instructions_sent] - self.assertEqual(len(calls), 1) - args, kwargs = calls[0] - self.assertTrue(compare_user(kwargs['user'], user)) - self.assertEqual(args[0], self.app) - - def test_send_confirmation_bad_email(self): - with capture_signals() as mocks: - self._post('/confirm', data=dict(email='bogus@bogus.com')) - self.assertEqual(mocks.signals_sent(), set()) - - -class RecoverableSignalsTests(SignalTest): - - AUTH_CONFIG = { - 'SECURITY_RECOVERABLE': True, - 'SECURITY_RESET_PASSWORD_ERROR_VIEW': '/', - 'SECURITY_POST_FORGOT_VIEW': '/' - } - - def test_reset_password_request(self): - with capture_signals() as mocks: - self._post('/reset', data=dict(email='joe@lp.com'), - follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set([reset_password_instructions_sent])) - user = self.app.security.datastore.find_user(email='joe@lp.com') - calls = mocks[reset_password_instructions_sent] - self.assertEqual(len(calls), 1) - args, kwargs = calls[0] - self.assertTrue(compare_user(kwargs['user'], user)) - self.assertIn('token', kwargs) - self.assertEqual(args[0], self.app) - - def test_reset_password(self): - with capture_reset_password_requests() as requests: - self._post('/reset', data=dict(email='joe@lp.com'), - follow_redirects=True) - token = requests[0]['token'] - with capture_signals() as mocks: - data = dict(password='newpassword', password_confirm='newpassword') - self._post('/reset/' + token, data, follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set([password_reset])) - user = self.app.security.datastore.find_user(email='joe@lp.com') - calls = mocks[password_reset] - self.assertEqual(len(calls), 1) - args, kwargs = calls[0] - self.assertTrue(compare_user(kwargs['user'], user)) - self.assertEqual(args[0], self.app) - - def test_reset_password_invalid_emails(self): - with capture_signals() as mocks: - self._post('/reset', data=dict(email='nobody@lp.com'), - follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - def test_reset_password_invalid_token(self): - with capture_signals() as mocks: - data = dict(password='newpassword', password_confirm='newpassword') - self._post('/reset/bogus', data, follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - -class ChangeableSignalsTests(SignalTest): - - AUTH_CONFIG = { - 'SECURITY_CHANGEABLE': True, - } - - def test_change_password(self): - self.authenticate('joe@lp.com') - with capture_signals() as mocks: - with self.client as client: - client.post('/change', - data=dict(password='password', - new_password='newpassword', - new_password_confirm='newpassword')) - self.assertEqual(mocks.signals_sent(), set([password_changed])) - user = self.app.security.datastore.find_user(email='joe@lp.com') - calls = mocks[password_changed] - self.assertEqual(len(calls), 1) - args, kwargs = calls[0] - self.assertTrue(compare_user(kwargs['user'], user)) - self.assertEqual(args[0], self.app) - - def test_change_password_invalid_password(self): - with capture_signals() as mocks: - self.client.post('/change', - data=dict(password='notpassword', - new_password='newpassword', - new_password_confirm='newpassword'), - follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - def test_change_password_bad_password(self): - with capture_signals() as mocks: - self.client.post('/change', - data=dict(password='notpassword', - new_password='a', - new_password_confirm='a'), - follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - def test_change_password_mismatch_password(self): - with capture_signals() as mocks: - self.client.post('/change', - data=dict(password='password', - new_password='newpassword', - new_password_confirm='notnewpassword'), - follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - -class PasswordlessTests(SignalTest): - - AUTH_CONFIG = { - 'SECURITY_PASSWORDLESS': True - } - - def test_login_request_for_inactive_user(self): - with capture_signals() as mocks: - self._post('/login', data=dict(email='tiya@lp.com'), - follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - def test_login_request_for_invalid_email(self): - with capture_signals() as mocks: - self._post('/login', data=dict(email='nobody@lp.com'), - follow_redirects=True) - self.assertEqual(mocks.signals_sent(), set()) - - def test_request_login_token_sends_email_and_can_login(self): - e = 'matt@lp.com' - - with capture_signals() as mocks: - self._post('/login', data=dict(email=e), follow_redirects=True) - - self.assertEqual(mocks.signals_sent(), set([login_instructions_sent])) - user = self.app.security.datastore.find_user(email='matt@lp.com') - calls = mocks[login_instructions_sent] - self.assertEqual(len(calls), 1) - args, kwargs = calls[0] - self.assertTrue(compare_user(kwargs['user'], user)) - self.assertIn('login_token', kwargs) - self.assertEqual(args[0], self.app) diff --git a/tests/test_app/templates/_messages.html b/tests/templates/_messages.html similarity index 100% rename from tests/test_app/templates/_messages.html rename to tests/templates/_messages.html diff --git a/tests/test_app/templates/_nav.html b/tests/templates/_nav.html similarity index 100% rename from tests/test_app/templates/_nav.html rename to tests/templates/_nav.html diff --git a/tests/templates/custom_security/change_password.html b/tests/templates/custom_security/change_password.html new file mode 100644 index 0000000..f9f7d23 --- /dev/null +++ b/tests/templates/custom_security/change_password.html @@ -0,0 +1 @@ +CUSTOM CHANGE PASSWORD diff --git a/tests/test_app/templates/custom_security/forgot_password.html b/tests/templates/custom_security/forgot_password.html similarity index 100% rename from tests/test_app/templates/custom_security/forgot_password.html rename to tests/templates/custom_security/forgot_password.html diff --git a/tests/test_app/templates/custom_security/login_user.html b/tests/templates/custom_security/login_user.html similarity index 100% rename from tests/test_app/templates/custom_security/login_user.html rename to tests/templates/custom_security/login_user.html diff --git a/tests/test_app/templates/custom_security/register_user.html b/tests/templates/custom_security/register_user.html similarity index 100% rename from tests/test_app/templates/custom_security/register_user.html rename to tests/templates/custom_security/register_user.html diff --git a/tests/test_app/templates/custom_security/reset_password.html b/tests/templates/custom_security/reset_password.html similarity index 100% rename from tests/test_app/templates/custom_security/reset_password.html rename to tests/templates/custom_security/reset_password.html diff --git a/tests/test_app/templates/custom_security/send_confirmation.html b/tests/templates/custom_security/send_confirmation.html similarity index 100% rename from tests/test_app/templates/custom_security/send_confirmation.html rename to tests/templates/custom_security/send_confirmation.html diff --git a/tests/test_app/templates/custom_security/send_login.html b/tests/templates/custom_security/send_login.html similarity index 100% rename from tests/test_app/templates/custom_security/send_login.html rename to tests/templates/custom_security/send_login.html diff --git a/tests/test_app/templates/index.html b/tests/templates/index.html similarity index 100% rename from tests/test_app/templates/index.html rename to tests/templates/index.html diff --git a/tests/test_app/templates/register.html b/tests/templates/register.html similarity index 100% rename from tests/test_app/templates/register.html rename to tests/templates/register.html diff --git a/tests/test_app/templates/unauthorized.html b/tests/templates/unauthorized.html similarity index 100% rename from tests/test_app/templates/unauthorized.html rename to tests/templates/unauthorized.html diff --git a/tests/test_app/__init__.py b/tests/test_app/__init__.py deleted file mode 100644 index 32dfd70..0000000 --- a/tests/test_app/__init__.py +++ /dev/null @@ -1,185 +0,0 @@ -# -*- coding: utf-8 -*- - -from flask import Flask, render_template, current_app -from flask.ext.mail import Mail -from flask.ext.security import login_required, roles_required, roles_accepted -from flask.ext.security.decorators import http_auth_required, \ - auth_token_required, auth_required -from flask.ext.security.utils import encrypt_password -from werkzeug.local import LocalProxy - -ds = LocalProxy(lambda: current_app.extensions['security'].datastore) - - -def create_app(config): - app = Flask(__name__) - app.debug = True - app.config['SECRET_KEY'] = 'secret' - app.config['TESTING'] = True - app.config['LOGIN_DISABLED'] = False - - for key, value in config.items(): - app.config[key] = value - - mail = Mail(app) - app.extensions['mail'] = mail - - @app.route('/') - def index(): - return render_template('index.html', content='Home Page') - - @app.route('/profile') - @login_required - def profile(): - return render_template('index.html', content='Profile Page') - - @app.route('/post_login') - @login_required - def post_login(): - return render_template('index.html', content='Post Login') - - @app.route('/http') - @http_auth_required - def http(): - return 'HTTP Authentication' - - @app.route('/http_custom_realm') - @http_auth_required('My Realm') - def http_custom_realm(): - return render_template('index.html', content='HTTP Authentication') - - @app.route('/token') - @auth_token_required - def token(): - return render_template('index.html', content='Token Authentication') - - @app.route('/multi_auth') - @auth_required('session', 'token', 'basic') - def multi_auth(): - return render_template('index.html', content='Session, Token, Basic auth') - - @app.route('/post_logout') - def post_logout(): - return render_template('index.html', content='Post Logout') - - @app.route('/post_register') - def post_register(): - return render_template('index.html', content='Post Register') - - @app.route('/admin') - @roles_required('admin') - def admin(): - return render_template('index.html', content='Admin Page') - - @app.route('/admin_and_editor') - @roles_required('admin', 'editor') - def admin_and_editor(): - return render_template('index.html', content='Admin and Editor Page') - - @app.route('/admin_or_editor') - @roles_accepted('admin', 'editor') - def admin_or_editor(): - return render_template('index.html', content='Admin or Editor Page') - - @app.route('/unauthorized') - def unauthorized(): - return render_template('unauthorized.html') - - @app.route('/coverage/add_role_to_user') - def add_role_to_user(): - u = ds.find_user(email='joe@lp.com') - r = ds.find_role('admin') - ds.add_role_to_user(u, r) - return 'success' - - @app.route('/coverage/remove_role_from_user') - def remove_role_from_user(): - u = ds.find_user(email='matt@lp.com') - ds.remove_role_from_user(u, 'admin') - return 'success' - - @app.route('/coverage/deactivate_user') - def deactivate_user(): - u = ds.find_user(email='matt@lp.com') - ds.deactivate_user(u) - return 'success' - - @app.route('/coverage/activate_user') - def activate_user(): - u = ds.find_user(email='tiya@lp.com') - ds.activate_user(u) - return 'success' - - @app.route('/coverage/invalid_role') - def invalid_role(): - return 'success' if ds.find_role('bogus') is None else 'failure' - - @app.route('/page1') - def page_1(): - return 'Page 1' - - return app - - -def create_roles(): - for role in ('admin', 'editor', 'author'): - ds.create_role(name=role) - ds.commit() - - -def create_users(count=None): - users = [('matt@lp.com', 'matt', 'password', ['admin'], True), - ('joe@lp.com', 'joe', 'password', ['editor'], True), - ('dave@lp.com', 'dave', 'password', ['admin', 'editor'], True), - ('jill@lp.com', 'jill', 'password', ['author'], True), - ('tiya@lp.com', 'tiya', 'password', [], False)] - count = count or len(users) - - for u in users[:count]: - pw = encrypt_password(u[2]) - roles = [ds.find_or_create_role(rn) for rn in u[3]] - ds.commit() - user = ds.create_user(email=u[0], username=u[1], password=pw, active=u[4]) - ds.commit() - for role in roles: - ds.add_role_to_user(user, role) - ds.commit() - - -def populate_data(user_count=None): - create_roles() - create_users(user_count) - - -def add_context_processors(s): - @s.context_processor - def for_all(): - return dict() - - @s.forgot_password_context_processor - def forgot_password(): - return dict() - - @s.login_context_processor - def login(): - return dict() - - @s.register_context_processor - def register(): - return dict() - - @s.reset_password_context_processor - def reset_password(): - return dict() - - @s.send_confirmation_context_processor - def send_confirmation(): - return dict() - - @s.send_login_context_processor - def send_login(): - return dict() - - @s.mail_context_processor - def mail(): - return dict() diff --git a/tests/test_app/mongoengine.py b/tests/test_app/mongoengine.py deleted file mode 100644 index 9be6f5f..0000000 --- a/tests/test_app/mongoengine.py +++ /dev/null @@ -1,57 +0,0 @@ -# -*- coding: utf-8 -*- - -import sys -import os - -sys.path.pop(0) -sys.path.insert(0, os.getcwd()) - -from flask.ext.mongoengine import MongoEngine -from flask.ext.security import Security, UserMixin, RoleMixin, \ - MongoEngineUserDatastore - -from tests.test_app import create_app as create_base_app, populate_data, \ - add_context_processors - -def create_app(config, **kwargs): - app = create_base_app(config) - - app.config['MONGODB_SETTINGS'] = dict( - db='flask_security_test', - host='localhost', - port=27017 - ) - - db = MongoEngine(app) - - class Role(db.Document, RoleMixin): - name = db.StringField(required=True, unique=True, max_length=80) - description = db.StringField(max_length=255) - - class User(db.Document, UserMixin): - email = db.StringField(unique=True, max_length=255) - username = db.StringField(max_length=255) - password = db.StringField(required=True, max_length=255) - last_login_at = db.DateTimeField() - current_login_at = db.DateTimeField() - last_login_ip = db.StringField(max_length=100) - current_login_ip = db.StringField(max_length=100) - login_count = db.IntField() - active = db.BooleanField(default=True) - confirmed_at = db.DateTimeField() - roles = db.ListField(db.ReferenceField(Role), default=[]) - - @app.before_first_request - def before_first_request(): - User.drop_collection() - Role.drop_collection() - populate_data(app.config.get('USER_COUNT', None)) - - app.security = Security(app, datastore=MongoEngineUserDatastore(db, User, Role), **kwargs) - - add_context_processors(app.security) - - return app - -if __name__ == '__main__': - create_app({}).run() diff --git a/tests/test_app/peewee_app.py b/tests/test_app/peewee_app.py deleted file mode 100644 index b33ab29..0000000 --- a/tests/test_app/peewee_app.py +++ /dev/null @@ -1,64 +0,0 @@ -# -*- coding: utf-8 -*- - -import sys -import os - -sys.path.pop(0) -sys.path.insert(0, os.getcwd()) - -from flask_peewee.db import Database -from peewee import * -from flask.ext.security import Security, UserMixin, RoleMixin, \ - PeeweeUserDatastore - -from tests.test_app import create_app as create_base_app, populate_data, \ - add_context_processors - - -def create_app(config, **kwargs): - app = create_base_app(config) - app.config['DATABASE'] = { - 'name': 'peewee.db', - 'engine': 'peewee.SqliteDatabase' - } - db = Database(app) - - class Role(db.Model, RoleMixin): - name = TextField(unique=True) - description = TextField(null=True) - - class User(db.Model, UserMixin): - email = TextField() - username = TextField() - password = TextField() - last_login_at = DateTimeField(null=True) - current_login_at = DateTimeField(null=True) - last_login_ip = TextField(null=True) - current_login_ip = TextField(null=True) - login_count = IntegerField(null=True) - active = BooleanField(default=True) - confirmed_at = DateTimeField(null=True) - - class UserRoles(db.Model): - """ Peewee does not have built-in many-to-many support, so we have to - create this mapping class to link users to roles.""" - user = ForeignKeyField(User, related_name='roles') - role = ForeignKeyField(Role, related_name='users') - name = property(lambda self: self.role.name) - description = property(lambda self: self.role.description) - - @app.before_first_request - def before_first_request(): - for Model in (Role, User, UserRoles): - Model.drop_table(fail_silently=True) - Model.create_table() - populate_data(app.config.get('USER_COUNT', None)) - - app.security = Security(app, datastore=PeeweeUserDatastore(db, User, Role, UserRoles), **kwargs) - - add_context_processors(app.security) - - return app - -if __name__ == '__main__': - create_app({}).run() diff --git a/tests/test_app/sqlalchemy.py b/tests/test_app/sqlalchemy.py deleted file mode 100644 index fcce996..0000000 --- a/tests/test_app/sqlalchemy.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: utf-8 -*- - -import sys -import os - -sys.path.pop(0) -sys.path.insert(0, os.getcwd()) - - -from flask.ext.sqlalchemy import SQLAlchemy -from flask.ext.security import Security, UserMixin, RoleMixin, \ - SQLAlchemyUserDatastore - -from tests.test_app import create_app as create_base_app, populate_data, \ - add_context_processors - -def create_app(config, **kwargs): - app = create_base_app(config) - app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://' - - db = SQLAlchemy(app) - - roles_users = db.Table('roles_users', - db.Column('user_id', db.Integer(), db.ForeignKey('user.id')), - db.Column('role_id', db.Integer(), db.ForeignKey('role.id'))) - - class Role(db.Model, RoleMixin): - id = db.Column(db.Integer(), primary_key=True) - name = db.Column(db.String(80), unique=True) - description = db.Column(db.String(255)) - - class User(db.Model, UserMixin): - id = db.Column(db.Integer, primary_key=True) - email = db.Column(db.String(255), unique=True) - username = db.Column(db.String(255)) - password = db.Column(db.String(255)) - last_login_at = db.Column(db.DateTime()) - current_login_at = db.Column(db.DateTime()) - last_login_ip = db.Column(db.String(100)) - current_login_ip = db.Column(db.String(100)) - login_count = db.Column(db.Integer) - active = db.Column(db.Boolean()) - confirmed_at = db.Column(db.DateTime()) - roles = db.relationship('Role', secondary=roles_users, - backref=db.backref('users', lazy='dynamic')) - - @app.before_first_request - def before_first_request(): - db.drop_all() - db.create_all() - populate_data(app.config.get('USER_COUNT', None)) - - app.security = Security(app, datastore=SQLAlchemyUserDatastore(db, User, Role), **kwargs) - - add_context_processors(app.security) - - return app - -if __name__ == '__main__': - create_app({}).run() diff --git a/tests/test_changeable.py b/tests/test_changeable.py new file mode 100644 index 0000000..fe4734e --- /dev/null +++ b/tests/test_changeable.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +""" + test_changeable + ~~~~~~~~~~~~~~~ + + Changeable tests +""" + +from flask_security.signals import password_changed + +from utils import authenticate, init_app_with_options + + +def _get_client(app, datastore, **options): + config = { + 'SECURITY_CHANGEABLE': True + } + config.update(options) + init_app_with_options(app, datastore, **config) + return app.test_client() + + +def test_recoverable_flag(app, sqlalchemy_datastore, get_message): + client = _get_client(app, sqlalchemy_datastore) + + recorded = [] + + @password_changed.connect_via(app) + def on_password_changed(app, user): + recorded.append(user) + + authenticate(client) + + # Test change view + response = client.get('/change', follow_redirects=True) + assert b'Change password' in response.data + + # Test wrong original password + response = client.post('/change', data={ + 'password': 'notpassword', + 'new_password': 'newpassword', + 'new_password_confirm': 'newpassword' + }, follow_redirects=True) + assert get_message('INVALID_PASSWORD') in response.data + + # Test mismatch + response = client.post('/change', data={ + 'password': 'password', + 'new_password': 'newpassword', + 'new_password_confirm': 'notnewpassword' + }, follow_redirects=True) + assert get_message('PASSWORD_CHANGE') not in response.data + assert get_message('RETYPE_PASSWORD_MISMATCH') in response.data + + # Test bad password + response = client.post('/change', data={ + 'password': 'password', + 'new_password': 'a', + 'new_password_confirm': 'a' + }, follow_redirects=True) + assert get_message('PASSWORD_CHANGE') not in response.data + assert get_message('PASSWORD_INVALID_LENGTH') in response.data + + # Test same as previous + response = client.post('/change', data={ + 'password': 'password', + 'new_password': 'password', + 'new_password_confirm': 'password' + }, follow_redirects=True) + assert get_message('PASSWORD_CHANGE') not in response.data + assert get_message('PASSWORD_IS_THE_SAME') in response.data + + # Test successful submit sends email notification + with app.mail.record_messages() as outbox: + response = client.post('/change', data={ + 'password': 'password', + 'new_password': 'newpassword', + 'new_password_confirm': 'newpassword' + }, follow_redirects=True) + + assert get_message('PASSWORD_CHANGE') in response.data + assert b'Home Page' in response.data + assert len(recorded) == 1 + assert len(outbox) == 1 + assert "Your password has been changed" in outbox[0].html + + +def test_custom_change_url(app, sqlalchemy_datastore, get_message): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_CHANGE_URL': '/custom_change' + }) + + authenticate(client) + response = client.get('/custom_change') + assert response.status_code == 200 + + +def test_custom_change_template(app, sqlalchemy_datastore, get_message): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_CHANGE_PASSWORD_TEMPLATE': 'custom_security/change_password.html' + }) + + authenticate(client) + response = client.get('/change') + assert b'CUSTOM CHANGE PASSWORD' in response.data + + +def test_disable_change_emails(app, sqlalchemy_datastore): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_SEND_PASSWORD_CHANGE_EMAIL': False + }) + authenticate(client) + + with app.mail.record_messages() as outbox: + client.post('/change', data={ + 'password': 'password', + 'new_password': 'newpassword', + 'new_password_confirm': 'newpassword' + }, follow_redirects=True) + assert len(outbox) == 0 + + +def test_custom_post_change_view(app, sqlalchemy_datastore): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_POST_CHANGE_VIEW': '/profile', + }) + authenticate(client) + + response = client.post('/change', data={ + 'password': 'password', + 'new_password': 'newpassword', + 'new_password_confirm': 'newpassword' + }, follow_redirects=True) + + assert b'Profile Page' in response.data diff --git a/tests/test_common.py b/tests/test_common.py new file mode 100644 index 0000000..856a577 --- /dev/null +++ b/tests/test_common.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- +""" + test_common + ~~~~~~~~~~~ + + Test common functionality +""" + +import base64 + +try: + from cookielib import Cookie +except ImportError: + from http.cookiejar import Cookie + +from utils import authenticate, json_authenticate, logout + + +def test_login_view(client): + response = client.get('/login') + assert b'

    Login

    ' in response.data + + +def test_authenticate(client): + response = authenticate(client) + assert response.status_code == 302 + response = authenticate(client, follow_redirects=True) + assert b'Hello matt@lp.com' in response.data + + +def test_authenticate_case_insensitive_email(client): + response = authenticate(client, email='MATT@lp.com', follow_redirects=True) + assert b'Hello matt@lp.com' in response.data + + +def test_unprovided_username(client, get_message): + response = authenticate(client, "") + assert get_message('EMAIL_NOT_PROVIDED') in response.data + + +def test_unprovided_password(client, get_message): + response = authenticate(client, password="") + assert get_message('PASSWORD_NOT_PROVIDED') in response.data + + +def test_invalid_user(client, get_message): + response = authenticate(client, email="bogus@bogus.com") + assert get_message('USER_DOES_NOT_EXIST') in response.data + + +def test_bad_password(client, get_message): + response = authenticate(client, password="bogus") + assert get_message('INVALID_PASSWORD') in response.data + + +def test_inactive_user(client, get_message): + response = authenticate(client, "tiya@lp.com", "password") + assert get_message('DISABLED_ACCOUNT') in response.data + + +def test_logout(client): + authenticate(client) + response = logout(client, follow_redirects=True) + assert b'Home Page' in response.data + + +def test_missing_session_access(client, get_message): + response = client.get('/profile', follow_redirects=True) + assert get_message('LOGIN') in response.data + + +def test_has_session_access(client): + authenticate(client) + response = client.get("/profile", follow_redirects=True) + assert b'profile' in response.data + + +def test_authorized_access(client): + authenticate(client) + response = client.get("/admin") + assert b'Admin Page' in response.data + + +def test_unauthorized_access(client, get_message): + authenticate(client, "joe@lp.com") + response = client.get("/admin", follow_redirects=True) + assert get_message('UNAUTHORIZED') in response.data + + +def test_roles_accepted(client): + for user in ("matt@lp.com", "joe@lp.com"): + authenticate(client, user) + response = client.get("/admin_or_editor") + assert b'Admin or Editor Page' in response.data + logout(client) + + authenticate(client, "jill@lp.com") + response = client.get("/admin_or_editor", follow_redirects=True) + assert b'Home Page' in response.data + + +def test_unauthenticated_role_required(client, get_message): + response = client.get('/admin', follow_redirects=True) + assert get_message('UNAUTHORIZED') in response.data + + +def test_multiple_role_required(client): + for user in ("matt@lp.com", "joe@lp.com"): + authenticate(client, user) + response = client.get("/admin_and_editor", follow_redirects=True) + assert b'Home Page' in response.data + client.get('/logout') + + authenticate(client, 'dave@lp.com') + response = client.get("/admin_and_editor", follow_redirects=True) + assert b'Admin and Editor Page' in response.data + + +def test_ok_json_auth(client): + response = json_authenticate(client) + assert response.jdata['meta']['code'] == 200 + assert 'authentication_token' in response.jdata['response']['user'] + + +def test_invalid_json_auth(client): + response = json_authenticate(client, password='junk') + assert b'"code": 400' in response.data + + +def test_token_auth_via_querystring_valid_token(client): + response = json_authenticate(client) + token = response.jdata['response']['user']['authentication_token'] + response = client.get('/token?auth_token=' + token) + assert b'Token Authentication' in response.data + + +def test_token_auth_via_header_valid_token(client): + response = json_authenticate(client) + token = response.jdata['response']['user']['authentication_token'] + headers = {"Authentication-Token": token} + response = client.get('/token', headers=headers) + assert b'Token Authentication' in response.data + + +def test_token_auth_via_querystring_invalid_token(client): + response = client.get('/token?auth_token=X') + assert 401 == response.status_code + + +def test_token_auth_via_header_invalid_token(client): + response = client.get('/token', headers={"Authentication-Token": 'X'}) + assert 401 == response.status_code + + +def test_http_auth(client): + response = client.get('/http', headers={ + 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8') + }) + assert b'HTTP Authentication' in response.data + + +def test_http_auth_no_authorization(client): + response = client.get('/http', headers={}) + assert b'

    Unauthorized

    ' in response.data + assert 'WWW-Authenticate' in response.headers + assert 'Basic realm="Login Required"' == response.headers['WWW-Authenticate'] + + +def test_invalid_http_auth_invalid_username(client): + response = client.get('/http', headers={ + 'Authorization': 'Basic %s' % base64.b64encode(b"bogus:bogus").decode('utf-8') + }) + assert b'

    Unauthorized

    ' in response.data + assert 'WWW-Authenticate' in response.headers + assert 'Basic realm="Login Required"' == response.headers['WWW-Authenticate'] + + +def test_invalid_http_auth_bad_password(client): + response = client.get('/http', headers={ + 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8') + }) + assert b'

    Unauthorized

    ' in response.data + assert 'WWW-Authenticate' in response.headers + assert 'Basic realm="Login Required"' == response.headers['WWW-Authenticate'] + + +def test_custom_http_auth_realm(client): + response = client.get('/http_custom_realm', headers={ + 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8') + }) + assert b'

    Unauthorized

    ' in response.data + assert 'WWW-Authenticate' in response.headers + assert 'Basic realm="My Realm"' == response.headers['WWW-Authenticate'] + + +def test_multi_auth_basic(client): + response = client.get('/multi_auth', headers={ + 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8') + }) + assert b'Basic' in response.data + + +def test_multi_auth_token(client): + response = json_authenticate(client) + token = response.jdata['response']['user']['authentication_token'] + response = client.get('/multi_auth?auth_token=' + token) + assert b'Token' in response.data + + +def test_multi_auth_session(client): + authenticate(client, ) + response = client.get('/multi_auth') + assert b'Session' in response.data + + +def test_user_deleted_during_session_reverts_to_anonymous_user(app, client): + authenticate(client) + + with app.test_request_context('/'): + user = app.security.datastore.find_user(email='matt@lp.com') + app.security.datastore.delete_user(user) + app.security.datastore.commit() + + response = client.get('/') + assert b'Hello matt@lp.com' not in response.data + + +def test_remember_token(client): + response = authenticate(client, follow_redirects=False) + client.cookie_jar.clear_session_cookies() + response = client.get('/profile') + assert b'profile' in response.data + + +def test_token_loader_does_not_fail_with_invalid_token(client): + c = Cookie(version=0, name='remember_token', value='None', port=None, + port_specified=False, domain='www.example.com', + domain_specified=False, domain_initial_dot=False, path='/', + path_specified=True, secure=False, expires=None, + discard=True, comment=None, comment_url=None, + rest={'HttpOnly': None}, rfc2109=False) + + client.cookie_jar.set_cookie(c) + response = client.get('/') + assert b'BadSignature' not in response.data + + +def test_coverage_endpoints(client): + for endpoint in [ + '/coverage/add_role_to_user', + '/coverage/remove_role_from_user', + '/coverage/activate_user', + '/coverage/deactivate_user' + ]: + response = client.get(endpoint) + assert b'success' in response.data diff --git a/tests/test_configuration.py b/tests/test_configuration.py new file mode 100644 index 0000000..69e6fd8 --- /dev/null +++ b/tests/test_configuration.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +""" + test_configuration + ~~~~~~~~~~~~~~~~~~ + + Basic configuration tests +""" + +import base64 + +from utils import authenticate, logout, init_app_with_options + + +def test_view_configuration(app, sqlalchemy_datastore): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_LOGOUT_URL': '/custom_logout', + 'SECURITY_LOGIN_URL': '/custom_login', + 'SECURITY_POST_LOGIN_VIEW': '/post_login', + 'SECURITY_POST_LOGOUT_VIEW': '/post_logout', + 'SECURITY_DEFAULT_HTTP_AUTH_REALM': 'Custom Realm', + }) + + client = app.test_client() + + response = client.get('/custom_login') + assert b"

    Login

    " in response.data + + response = authenticate(client, endpoint='/custom_login', follow_redirects=True) + assert b'Post Login' in response.data + + response = logout(client, endpoint='/custom_logout', follow_redirects=True) + assert b'Post Logout' in response.data + + response = client.get('/http', headers={ + 'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus") + }) + assert b'

    Unauthorized

    ' in response.data + assert 'WWW-Authenticate' in response.headers + assert 'Basic realm="Custom Realm"' == response.headers['WWW-Authenticate'] + + +def test_template_configuration(app, sqlalchemy_datastore): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_LOGIN_USER_TEMPLATE': 'custom_security/login_user.html', + }) + client = app.test_client() + response = client.get('/login') + assert b'CUSTOM LOGIN USER' in response.data diff --git a/tests/test_confirmable.py b/tests/test_confirmable.py new file mode 100644 index 0000000..32fd3e1 --- /dev/null +++ b/tests/test_confirmable.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +""" + test_confirmable + ~~~~~~~~~~~~~~~~ + + Confirmable tests +""" + +import time + +from flask_security.signals import user_confirmed, confirm_instructions_sent +from flask_security.utils import capture_registrations + +from utils import authenticate, logout, init_app_with_options + + +def test_confirmable_flag(app, sqlalchemy_datastore, get_message): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_CONFIRMABLE': True, + 'SECURITY_REGISTERABLE': True, + }) + + client = app.test_client() + + recorded_confirms = [] + recorded_instructions_sent = [] + + @user_confirmed.connect_via(app) + def on_confirmed(app, user): + recorded_confirms.append(user) + + @confirm_instructions_sent.connect_via(app) + def on_instructions_sent(app, user): + recorded_instructions_sent.append(user) + + # Test login before confirmation + email = 'dude@lp.com' + + with capture_registrations() as registrations: + response = client.post('/register', data=dict(email=email, password='password')) + + assert response.status_code == 302 + + response = authenticate(client, email=email) + assert get_message('CONFIRMATION_REQUIRED') in response.data + + # Test invalid token + response = client.get('/confirm/bogus', follow_redirects=True) + assert get_message('INVALID_CONFIRMATION_TOKEN') in response.data + + # Test JSON + response = client.post('/confirm', data='{"email": "matt@lp.com"}', headers={ + 'Content-Type': 'application/json' + }) + assert response.status_code == 200 + assert response.headers['Content-Type'] == 'application/json' + assert 'user' in response.jdata['response'] + assert len(recorded_instructions_sent) == 1 + + # Test ask for instructions with invalid email + response = client.post('/confirm', data=dict(email='bogus@bogus.com')) + assert get_message('USER_DOES_NOT_EXIST') in response.data + + # Test resend instructions + response = client.post('/confirm', data=dict(email=email)) + assert get_message('CONFIRMATION_REQUEST', email=email) in response.data + assert len(recorded_instructions_sent) == 2 + + # Test confirm + token = registrations[0]['confirm_token'] + response = client.get('/confirm/' + token, follow_redirects=True) + assert get_message('EMAIL_CONFIRMED') in response.data + assert len(recorded_confirms) == 1 + + # Test already confirmed + response = client.get('/confirm/' + token, follow_redirects=True) + assert get_message('ALREADY_CONFIRMED') in response.data + + # Test already confirmed when asking for confirmation instructions + logout(client) + + response = client.get('/confirm') + assert response.status_code == 200 + + response = client.post('/confirm', data=dict(email=email)) + assert get_message('ALREADY_CONFIRMED') in response.data + + # Test user was deleted before confirmation + with capture_registrations() as registrations: + client.post('/register', data=dict(email='mary@lp.com', password='password')) + + user = registrations[0]['user'] + token = registrations[0]['confirm_token'] + + with app.app_context(): + sqlalchemy_datastore.delete(user) + sqlalchemy_datastore.commit() + + response = client.get('/confirm/' + token, follow_redirects=True) + assert get_message('INVALID_CONFIRMATION_TOKEN') in response.data + + +def test_expired_confirmation_token(app, sqlalchemy_datastore, get_message): + within = '1 milliseconds' + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_CONFIRMABLE': True, + 'SECURITY_REGISTERABLE': True, + 'SECURITY_CONFIRM_EMAIL_WITHIN': within + }) + + client = app.test_client() + + with capture_registrations() as registrations: + data = dict(email='mary@lp.com', password='password') + client.post('/register', data=data, follow_redirects=True) + + user = registrations[0]['user'] + token = registrations[0]['confirm_token'] + + time.sleep(1) + + response = client.get('/confirm/' + token, follow_redirects=True) + assert get_message('CONFIRMATION_EXPIRED', within=within, email=user.email) in response.data + + +def test_login_when_unconfirmed(app, sqlalchemy_datastore, get_message): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_CONFIRMABLE': True, + 'SECURITY_REGISTERABLE': True, + 'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True + }) + + client = app.test_client() + + data = dict(email='mary@lp.com', password='password') + response = client.post('/register', data=data, follow_redirects=True) + assert b'mary@lp.com' in response.data + + +def test_confirmation_different_user_when_logged_in(app, sqlalchemy_datastore, get_message): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_CONFIRMABLE': True, + 'SECURITY_REGISTERABLE': True, + 'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True + }) + + client = app.test_client() + + e1 = 'dude@lp.com' + e2 = 'lady@lp.com' + + with capture_registrations() as registrations: + for e in e1, e2: + client.post('/register', data=dict(email=e, password='password')) + logout(client) + + token1 = registrations[0]['confirm_token'] + token2 = registrations[1]['confirm_token'] + + client.get('/confirm/' + token1, follow_redirects=True) + logout(client) + authenticate(client, email=e1) + + response = client.get('/confirm/' + token2, follow_redirects=True) + assert get_message('EMAIL_CONFIRMED') in response.data + assert b'Hello lady@lp.com' in response.data diff --git a/tests/test_datastore.py b/tests/test_datastore.py new file mode 100644 index 0000000..028e230 --- /dev/null +++ b/tests/test_datastore.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +""" + test_datastore + ~~~~~~~~~~~~~~ + + Datastore tests +""" + +from pytest import raises + +from flask_security import UserMixin +from flask_security.datastore import Datastore, UserDatastore + + +class User(UserMixin): + pass + + +def test_unimplemented_datastore_methods(): + datastore = Datastore(None) + with raises(NotImplementedError): + datastore.put(None) + with raises(NotImplementedError): + datastore.delete(None) + + +def test_unimplemented_user_datastore_methods(): + datastore = UserDatastore(None, None) + with raises(NotImplementedError): + datastore.find_user(None) + with raises(NotImplementedError): + datastore.find_role(None) + + +def test_toggle_active(): + datastore = UserDatastore(None, None) + user = User() + user.active = True + assert datastore.toggle_active(user) is True + assert not user.active + assert datastore.toggle_active(user) is True + assert user.active is True + + +def test_deactivate_user(): + datastore = UserDatastore(None, None) + user = User() + user.active = True + assert datastore.deactivate_user(user) is True + assert not user.active + + +def test_activate_user(): + datastore = UserDatastore(None, None) + user = User() + user.active = False + assert datastore.activate_user(user) is True + assert user.active is True + + +def test_deactivate_returns_false_if_already_false(): + datastore = UserDatastore(None, None) + user = User() + user.active = False + assert not datastore.deactivate_user(user) + + +def test_activate_returns_false_if_already_true(): + datastore = UserDatastore(None, None) + user = User() + user.active = True + assert not datastore.activate_user(user) diff --git a/tests/test_entities.py b/tests/test_entities.py new file mode 100644 index 0000000..2306cc8 --- /dev/null +++ b/tests/test_entities.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +""" + test_entities + ~~~~~~~~~~~~~ + + Entity tests +""" + +from flask_security import RoleMixin, UserMixin, AnonymousUser + + +class Role(RoleMixin): + def __init__(self, name): + self.name = name + + +class User(UserMixin): + def __init__(self, roles): + self.roles = roles + + +def test_role_mixin_equal(): + admin1 = Role('admin') + admin2 = Role('admin') + assert admin1 == admin2 + + +def test_role_mixin_not_equal(): + admin = Role('admin') + editor = Role('editor') + assert admin != editor + + +def test_user_mixin_has_role_with_string(): + admin = Role('admin') + editor = Role('editor') + user = User([admin, editor]) + assert user.has_role('admin') is True + assert user.has_role('editor') is True + assert user.has_role(admin) is True + assert user.has_role(editor) is True + + +def test_anonymous_user_has_no_roles(): + user = AnonymousUser() + assert not user.has_role('admin') diff --git a/tests/test_hashing.py b/tests/test_hashing.py new file mode 100644 index 0000000..c698581 --- /dev/null +++ b/tests/test_hashing.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +""" + test_hashing + ~~~~~~~~~~~~ + + hashing tests +""" + +from pytest import raises + +from flask_security.utils import verify_password, encrypt_password + +from utils import authenticate, init_app_with_options + + +def test_verify_password_bcrypt(app, sqlalchemy_datastore): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_PASSWORD_HASH': 'bcrypt', + 'SECURITY_PASSWORD_SALT': 'salty' + }) + with app.app_context(): + assert verify_password('pass', encrypt_password('pass')) + + +def test_login_with_bcrypt_enabled(app, sqlalchemy_datastore): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_PASSWORD_HASH': 'bcrypt', + 'SECURITY_PASSWORD_SALT': 'salty' + }) + response = authenticate(app.test_client(), follow_redirects=True) + assert b'Home Page' in response.data + + +def test_missing_hash_salt_option(app, sqlalchemy_datastore): + with raises(RuntimeError): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_PASSWORD_HASH': 'bcrypt', + }) diff --git a/tests/test_misc.py b/tests/test_misc.py new file mode 100644 index 0000000..83ff3e7 --- /dev/null +++ b/tests/test_misc.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +""" + test_emails + ~~~~~~~~~~~ + + Email functionality tests +""" + +from flask_security import Security +from flask_security.forms import LoginForm, RegisterForm, ConfirmRegisterForm, \ + SendConfirmationForm, PasswordlessLoginForm, ForgotPasswordForm, ResetPasswordForm, \ + ChangePasswordForm, TextField, PasswordField, email_required, email_validator, valid_user_email +from flask_security.utils import capture_reset_password_requests + +from utils import authenticate, init_app_with_options, populate_data + + +def test_async_email_task(app, sqlalchemy_datastore): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_RECOVERABLE': True + }) + + app.mail_sent = False + + @app.security.send_mail_task + def send_email(msg): + app.mail_sent = True + + client = app.test_client() + client.post('/reset', data=dict(email='matt@lp.com')) + assert app.mail_sent is True + + +def test_register_blueprint_flag(app, sqlalchemy_datastore): + app.security = Security(app, datastore=Security, register_blueprint=False) + client = app.test_client() + response = client.get('/login') + assert response.status_code == 404 + + +def test_basic_custom_forms(app, sqlalchemy_datastore): + app.config['SECURITY_REGISTERABLE'] = True + app.config['SECURITY_RECOVERABLE'] = True + app.config['SECURITY_CHANGEABLE'] = True + + class MyLoginForm(LoginForm): + email = TextField('My Login Email Address Field') + + class MyRegisterForm(RegisterForm): + email = TextField('My Register Email Address Field') + + class MyForgotPasswordForm(ForgotPasswordForm): + email = TextField('My Forgot Email Address Field', + validators=[email_required, email_validator, valid_user_email]) + + class MyResetPasswordForm(ResetPasswordForm): + password = TextField('My Reset Password Field') + + class MyChangePasswordForm(ChangePasswordForm): + password = PasswordField('My Change Password Field') + + app.security = Security(app, + datastore=sqlalchemy_datastore, + login_form=MyLoginForm, + register_form=MyRegisterForm, + forgot_password_form=MyForgotPasswordForm, + reset_password_form=MyResetPasswordForm, + change_password_form=MyChangePasswordForm) + + populate_data(app) + + client = app.test_client() + + response = client.get('/login') + assert b'My Login Email Address Field' in response.data + + response = client.get('/register') + assert b'My Register Email Address Field' in response.data + + response = client.get('/reset') + assert b'My Forgot Email Address Field' in response.data + + with capture_reset_password_requests() as requests: + response = client.post('/reset', data=dict(email='matt@lp.com')) + + token = requests[0]['token'] + response = client.get('/reset/' + token) + assert b'My Reset Password Field' in response.data + + authenticate(client) + + response = client.get('/change') + assert b'My Change Password Field' in response.data + + +def test_confirmable_custom_form(app, sqlalchemy_datastore): + app.config['SECURITY_REGISTERABLE'] = True + app.config['SECURITY_CONFIRMABLE'] = True + + class MyRegisterForm(ConfirmRegisterForm): + email = TextField('My Register Email Address Field') + + class MySendConfirmationForm(SendConfirmationForm): + email = TextField('My Send Confirmation Email Address Field') + + app.security = Security(app, + datastore=sqlalchemy_datastore, + send_confirmation_form=MySendConfirmationForm, + confirm_register_form=MyRegisterForm) + + client = app.test_client() + + response = client.get('/register') + assert b'My Register Email Address Field' in response.data + + response = client.get('/confirm') + assert b'My Send Confirmation Email Address Field' in response.data + + +def test_passwordless_custom_form(app, sqlalchemy_datastore): + app.config['SECURITY_PASSWORDLESS'] = True + + class MyPasswordlessLoginForm(PasswordlessLoginForm): + email = TextField('My Passwordless Email Address Field') + + app.security = Security(app, + datastore=sqlalchemy_datastore, + passwordless_login_form=MyPasswordlessLoginForm) + + client = app.test_client() + + response = client.get('/login') + assert b'My Passwordless Email Address Field' in response.data + + +def test_addition_identity_attributes(app, sqlalchemy_datastore): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_USER_IDENTITY_ATTRIBUTES': ('email', 'username') + }) + client = app.test_client() + response = authenticate(client, email='matt', follow_redirects=True) + assert b'Hello matt@lp.com' in response.data diff --git a/tests/test_passwordless.py b/tests/test_passwordless.py new file mode 100644 index 0000000..7734072 --- /dev/null +++ b/tests/test_passwordless.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +""" + test_passwordless + ~~~~~~~~~~~~~~~~~ + + Passwordless tests +""" + +import time + +from flask_security.signals import login_instructions_sent +from flask_security.utils import capture_passwordless_login_requests + +from utils import logout, init_app_with_options + + +def test_trackable_flag(app, sqlalchemy_datastore, get_message): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_PASSWORDLESS': True + }) + + client = app.test_client() + + recorded = [] + + @login_instructions_sent.connect_via(app) + def on_instructions_sent(app, user, login_token): + recorded.append(user) + + # Test disabled account + response = client.post('/login', data=dict(email='tiya@lp.com'), follow_redirects=True) + assert get_message('DISABLED_ACCOUNT') in response.data + + # Test login with json and valid email + data = '{"email": "matt@lp.com", "password": "password"}' + response = client.post('/login', data=data, headers={'Content-Type': 'application/json'}) + assert response.status_code == 200 + assert len(recorded) == 1 + + # Test login with json and invalid email + data = '{"email": "nobody@lp.com", "password": "password"}' + response = client.post('/login', data=data, headers={'Content-Type': 'application/json'}) + assert 'errors' in response.data + + # Test sends email and shows appropriate response + with capture_passwordless_login_requests() as requests: + with app.mail.record_messages() as outbox: + response = client.post('/login', data=dict(email='matt@lp.com'), follow_redirects=True) + + assert len(recorded) == 2 + assert len(requests) == 1 + assert len(outbox) == 1 + assert 'user' in requests[0] + assert 'login_token' in requests[0] + + user = requests[0]['user'] + assert get_message('LOGIN_EMAIL_SENT', email=user.email) in response.data + + token = requests[0]['login_token'] + response = client.get('/login/' + token, follow_redirects=True) + assert get_message('PASSWORDLESS_LOGIN_SUCCESSFUL') in response.data + + # Test already authenticated + response = client.get('/login/' + token, follow_redirects=True) + assert get_message('PASSWORDLESS_LOGIN_SUCCESSFUL') not in response.data + + logout(client) + + # Test invalid token + response = client.get('/login/bogus', follow_redirects=True) + assert get_message('INVALID_LOGIN_TOKEN') in response.data + + # Test login request with invalid email + response = client.post('/login', data=dict(email='bogus@bogus.com')) + assert get_message('USER_DOES_NOT_EXIST') in response.data + + +def test_expired_login_token(app, sqlalchemy_datastore, get_message): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_PASSWORDLESS': True, + 'SECURITY_LOGIN_WITHIN': '1 milliseconds', + }) + + client = app.test_client() + + e = 'matt@lp.com' + + with capture_passwordless_login_requests() as requests: + client.post('/login', data=dict(email=e), follow_redirects=True) + + token = requests[0]['login_token'] + user = requests[0]['user'] + + time.sleep(1) + + response = client.get('/login/' + token, follow_redirects=True) + assert get_message('LOGIN_EXPIRED', within='1 milliseconds', email=user.email) in response.data diff --git a/tests/test_recoverable.py b/tests/test_recoverable.py new file mode 100644 index 0000000..94f0950 --- /dev/null +++ b/tests/test_recoverable.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +""" + test_recoverable + ~~~~~~~~~~~~~~~~ + + Recoverable functionality tests +""" + +import time + +from flask_security.signals import reset_password_instructions_sent, password_reset +from flask_security.utils import capture_reset_password_requests + +from utils import authenticate, logout, init_app_with_options + + +def _get_client(app, datastore, **options): + config = { + 'SECURITY_RECOVERABLE': True + } + config.update(options) + init_app_with_options(app, datastore, **config) + return app.test_client() + + +def test_recoverable_flag(app, sqlalchemy_datastore, get_message): + client = _get_client(app, sqlalchemy_datastore) + + recorded_resets = [] + recorded_instructions_sent = [] + + @password_reset.connect_via(app) + def on_password_reset(app, user): + recorded_resets.append(user) + + @reset_password_instructions_sent.connect_via(app) + def on_instructions_sent(app, user, token): + recorded_instructions_sent.append(user) + + # Test the reset view + response = client.get('/reset') + assert b'

    Send password reset instructions

    ' in response.data + + # Test submitting email to reset password creates a token and sends email + with capture_reset_password_requests() as requests: + with app.mail.record_messages() as outbox: + response = client.post('/reset', data=dict(email='joe@lp.com'), follow_redirects=True) + + assert len(recorded_instructions_sent) == 1 + assert len(outbox) == 1 + assert response.status_code == 200 + assert get_message('PASSWORD_RESET_REQUEST', email='joe@lp.com') in response.data + token = requests[0]['token'] + + # Test view for reset token + response = client.get('/reset/' + token) + assert b'

    Reset password

    ' in response.data + + # Test submitting a new password + response = client.post('/reset/' + token, data={ + 'password': 'newpassword', + 'password_confirm': 'newpassword' + }, follow_redirects=True) + + assert get_message('PASSWORD_RESET') in response.data + assert len(recorded_resets) == 1 + + logout(client) + + # Test logging in with the new password + response = authenticate(client, 'joe@lp.com', 'newpassword', follow_redirects=True) + assert b'Hello joe@lp.com' in response.data + + logout(client) + + # Test submitting JSON + response = client.post('/reset', data='{"email": "joe@lp.com"}', headers={ + 'Content-Type': 'application/json' + }) + assert response.headers['Content-Type'] == 'application/json' + assert 'user' in response.jdata['response'] + + logout(client) + + # Test invalid email + response = client.post('/reset', data=dict(email='bogus@lp.com'), follow_redirects=True) + assert get_message('USER_DOES_NOT_EXIST') in response.data + + logout(client) + + # Test invalid token + response = client.post('/reset/bogus', data={ + 'password': 'newpassword', + 'password_confirm': 'newpassword' + }, follow_redirects=True) + assert get_message('INVALID_RESET_PASSWORD_TOKEN') in response.data + + # Test mangled token + token = ("WyIxNjQ2MzYiLCIxMzQ1YzBlZmVhM2VhZjYwODgwMDhhZGU2YzU0MzZjMiJd.BZEw_Q.lQyo3npdPZtcJ" + "_sNHVHP103syjM&url_id=fbb89a8328e58c181ea7d064c2987874bc54a23d") + response = client.post('/reset/' + token, data={ + 'password': 'newpassword', + 'password_confirm': 'newpassword' + }, follow_redirects=True) + assert get_message('INVALID_RESET_PASSWORD_TOKEN') in response.data + + +def test_expired_reset_token(app, sqlalchemy_datastore, get_message): + within = '1 milliseconds' + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_RESET_PASSWORD_WITHIN': within + }) + + with capture_reset_password_requests() as requests: + client.post('/reset', data=dict(email='joe@lp.com'), follow_redirects=True) + + user = requests[0]['user'] + token = requests[0]['token'] + + time.sleep(1) + + response = client.post('/reset/' + token, data={ + 'password': 'newpassword', + 'password_confirm': 'newpassword' + }, follow_redirects=True) + + assert get_message('PASSWORD_RESET_EXPIRED', within=within, email=user.email) in response.data + + +def test_custom_reset_url(app, sqlalchemy_datastore, get_message): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_RESET_URL': '/custom_reset' + }) + + response = client.get('/custom_reset') + assert response.status_code == 200 + + +def test_custom_reset_templates(app, sqlalchemy_datastore): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_RESET_PASSWORD_TEMPLATE': 'custom_security/reset_password.html', + 'SECURITY_FORGOT_PASSWORD_TEMPLATE': 'custom_security/forgot_password.html' + }) + + response = client.get('/reset') + assert b'CUSTOM FORGOT PASSWORD' in response.data + + with capture_reset_password_requests() as requests: + client.post('/reset', data=dict(email='joe@lp.com'), follow_redirects=True) + token = requests[0]['token'] + + response = client.get('/reset/' + token) + assert b'CUSTOM RESET PASSWORD' in response.data diff --git a/tests/test_registerable.py b/tests/test_registerable.py new file mode 100644 index 0000000..a45ba96 --- /dev/null +++ b/tests/test_registerable.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +""" + test_registerable + ~~~~~~~~~~~~~~~~~ + + Registerable tests +""" + +from flask_security.signals import user_registered + +from utils import authenticate, logout, init_app_with_options + + +def _get_client(app, datastore, **options): + config = { + 'SECURITY_REGISTERABLE': True, + 'SECURITY_POST_REGISTER_VIEW': '/post_register', + } + config.update(options) + init_app_with_options(app, datastore, **config) + return app.test_client() + + +def test_registerable_flag(app, sqlalchemy_datastore, get_message): + client = _get_client(app, sqlalchemy_datastore) + recorded = [] + + # Test the register view + response = client.get('/register') + assert b"

    Register

    " in response.data + + # Test registering is successful, sends email, and fires signal + @user_registered.connect_via(app) + def on_user_registerd(app, user, confirm_token): + recorded.append(user) + + data = dict(email='dude@lp.com', password='password', password_confirm='password') + with app.mail.record_messages() as outbox: + response = client.post('/register', data=data, follow_redirects=True) + + assert len(recorded) == 1 + assert len(outbox) == 1 + assert b'Post Register' in response.data + + logout(client) + + # Test user can login after registering + response = authenticate(client, email='dude@lp.com', password='password') + assert response.status_code == 302 + + logout(client) + + # Test registering with an existing email + data = dict(email='dude@lp.com', password='password', password_confirm='password') + response = client.post('/register', data=data, follow_redirects=True) + assert get_message('EMAIL_ALREADY_ASSOCIATED', email='dude@lp.com') in response.data + + # Test registering with JSON + data = '{ "email": "dude2@lp.com", "password": "password"}' + response = client.post('/register', data=data, content_type='application/json') + assert response.headers['content-type'] == 'application/json' + assert response.jdata['meta']['code'] == 200 + + logout(client) + + # Test ?next param + data = dict(email='dude3@lp.com', + password='password', + password_confirm='password') + + response = client.post('/register?next=/page1', data=data, follow_redirects=True) + assert b'Page 1' in response.data + + +def test_custom_register_url(app, sqlalchemy_datastore): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_REGISTER_URL': '/custom_register' + }) + + response = client.get('/custom_register') + assert b"

    Register

    " in response.data + + data = dict(email='dude@lp.com', + password='password', + password_confirm='password') + + response = client.post('/custom_register', data=data, follow_redirects=True) + assert b'Post Register' in response.data + + +def test_custom_register_tempalate(app, sqlalchemy_datastore): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_REGISTER_USER_TEMPLATE': 'custom_security/register_user.html' + }) + response = client.get('/register') + assert b'CUSTOM REGISTER USER' in response.data + + +def test_disable_register_emails(app, sqlalchemy_datastore): + client = _get_client(app, sqlalchemy_datastore, **{ + 'SECURITY_SEND_REGISTER_EMAIL': False + }) + data = dict(email='dude@lp.com', password='password', password_confirm='password') + with app.mail.record_messages() as outbox: + client.post('/register', data=data, follow_redirects=True) + assert len(outbox) == 0 diff --git a/tests/test_trackable.py b/tests/test_trackable.py new file mode 100644 index 0000000..8667fe3 --- /dev/null +++ b/tests/test_trackable.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +""" + test_trackable + ~~~~~~~~~~~~~~ + + Trackable tests +""" + +from utils import authenticate, logout, init_app_with_options + + +def test_trackable_flag(app, sqlalchemy_datastore): + init_app_with_options(app, sqlalchemy_datastore, **{ + 'SECURITY_TRACKABLE': True + }) + + client = app.test_client() + + e = 'matt@lp.com' + authenticate(client, email=e) + logout(client) + authenticate(client, email=e) + + with app.app_context(): + user = app.security.datastore.find_user(email=e) + assert user.last_login_at is not None + assert user.current_login_at is not None + assert user.last_login_ip == 'untrackable' + assert user.current_login_ip == 'untrackable' + assert user.login_count == 2 diff --git a/tests/unit_tests.py b/tests/unit_tests.py deleted file mode 100644 index 41e4554..0000000 --- a/tests/unit_tests.py +++ /dev/null @@ -1,88 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest - -from flask_security import RoleMixin, UserMixin, AnonymousUser -from flask_security.datastore import Datastore, UserDatastore - - -class Role(RoleMixin): - def __init__(self, name): - self.name = name - - -class User(UserMixin): - def __init__(self, email, roles): - self.email = email - self.roles = roles - -admin = Role('admin') -admin2 = Role('admin') -editor = Role('editor') - -user = User('matt@lp.com', [admin, editor]) - - -class SecurityEntityTests(unittest.TestCase): - - def test_role_mixin_equal(self): - self.assertEqual(admin, admin2) - - def test_role_mixin_not_equal(self): - self.assertNotEqual(admin, editor) - - def test_user_mixin_has_role_with_string(self): - self.assertTrue(user.has_role('admin')) - - def test_user_mixin_has_role_with_role_obj(self): - self.assertTrue(user.has_role(Role('admin'))) - - def test_anonymous_user_has_no_roles(self): - au = AnonymousUser() - self.assertEqual(0, len(au.roles)) - self.assertFalse(au.has_role('admin')) - - -class DatastoreTests(unittest.TestCase): - - def setUp(self): - super(DatastoreTests, self).setUp() - self.ds = UserDatastore(None, None) - - def test_unimplemented_datastore_methods(self): - ds = Datastore(None) - self.assertRaises(NotImplementedError, ds.put, None) - self.assertRaises(NotImplementedError, ds.delete, None) - - def test_unimplemented_user_datastore_methods(self): - self.assertRaises(NotImplementedError, self.ds.find_user, None) - self.assertRaises(NotImplementedError, self.ds.find_role, None) - - def test_toggle_active(self): - user.active = True - rv = self.ds.toggle_active(user) - self.assertTrue(rv) - self.assertFalse(user.active) - rv = self.ds.toggle_active(user) - self.assertTrue(rv) - self.assertTrue(user.active) - - def test_deactivate_user(self): - user.active = True - rv = self.ds.deactivate_user(user) - self.assertTrue(rv) - self.assertFalse(user.active) - - def test_activate_user(self): - ds = UserDatastore(None, None) - user.active = False - ds.activate_user(user) - self.assertTrue(user.active) - - def test_deactivate_returns_false_if_already_false(self): - user.active = False - self.assertFalse(self.ds.deactivate_user(user)) - - def test_activate_returns_false_if_already_true(self): - user.active = True - self.assertFalse(self.ds.activate_user(user)) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..17de43f --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +""" + utils + ~~~~~ + + Test utils +""" + +from flask import Response as BaseResponse, json + +from flask_security import Security +from flask_security.utils import encrypt_password + +_missing = object + + +def authenticate(client, email="matt@lp.com", password="password", endpoint=None, **kwargs): + data = dict(email=email, password=password, remember='y') + return client.post(endpoint or '/login', data=data, **kwargs) + + +def json_authenticate(client, email="matt@lp.com", password="password", endpoint=None): + data = '{"email": "%s", "password": "%s"}' % (email, password) + return client.post(endpoint or '/login', content_type="application/json", data=data) + + +def logout(client, endpoint=None, **kwargs): + return client.get(endpoint or '/logout', **kwargs) + + +def create_roles(ds): + for role in ('admin', 'editor', 'author'): + ds.create_role(name=role) + ds.commit() + + +def create_users(ds, count=None): + users = [('matt@lp.com', 'matt', 'password', ['admin'], True), + ('joe@lp.com', 'joe', 'password', ['editor'], True), + ('dave@lp.com', 'dave', 'password', ['admin', 'editor'], True), + ('jill@lp.com', 'jill', 'password', ['author'], True), + ('tiya@lp.com', 'tiya', 'password', [], False)] + count = count or len(users) + + for u in users[:count]: + pw = encrypt_password(u[2]) + roles = [ds.find_or_create_role(rn) for rn in u[3]] + ds.commit() + user = ds.create_user(email=u[0], username=u[1], password=pw, active=u[4]) + ds.commit() + for role in roles: + ds.add_role_to_user(user, role) + ds.commit() + + +def populate_data(app, user_count=None): + ds = app.security.datastore + with app.app_context(): + create_roles(ds) + create_users(ds, user_count) + + +class Response(BaseResponse): # pragma: no cover + + @property + def jdata(self): + rv = getattr(self, '_cached_jdata', _missing) + if rv is not _missing: + return rv + try: + self._cached_jdata = json.loads(self.data) + except ValueError: + raise Exception('Invalid JSON response') + return self._cached_jdata + + +def init_app_with_options(app, datastore, **options): + app.config.update(**options) + app.security = Security(app, datastore=datastore) + populate_data(app)