Convert all tests to use pytest. Phew!

This commit is contained in:
Matt Wright
2014-03-13 18:28:25 -04:00
parent ed645b51f2
commit f3014d01df
48 changed files with 1834 additions and 2079 deletions
+8
View File
@@ -3,6 +3,14 @@ Flask-Security Changelog
Here you can see the full list of changes between each Flask-Security release.
Version 1.7.2
-------------
Released January 14th 2014
- Fixed a bug regarding the `password_changed` signal.
Version 1.7.1
-------------
+3 -3
View File
@@ -15,9 +15,9 @@ __version__ = '1.7.1'
from .core import Security, RoleMixin, UserMixin, AnonymousUser, current_user
from .datastore import SQLAlchemyUserDatastore, MongoEngineUserDatastore, PeeweeUserDatastore
from .decorators import auth_token_required, http_auth_required, \
login_required, roles_accepted, roles_required, auth_required
login_required, roles_accepted, roles_required, auth_required
from .forms import ForgotPasswordForm, LoginForm, RegisterForm, \
ResetPasswordForm, PasswordlessLoginForm, ConfirmRegisterForm
ResetPasswordForm, PasswordlessLoginForm, ConfirmRegisterForm
from .signals import confirm_instructions_sent, password_reset, \
reset_password_instructions_sent, user_confirmed, user_registered
reset_password_instructions_sent, user_confirmed, user_registered
from .utils import login_user, logout_user, url_for_security
+2 -2
View File
@@ -29,8 +29,8 @@ def send_password_changed_notice(user):
:param user: The user to send the notice to
"""
if config_value('SEND_PASSWORD_CHANGE_EMAIL'):
send_mail(config_value('EMAIL_SUBJECT_PASSWORD_CHANGE_NOTICE'), user.email,
'change_notice', user=user)
subject = config_value('EMAIL_SUBJECT_PASSWORD_CHANGE_NOTICE')
send_mail(subject, user.email, 'change_notice', user=user)
def change_user_password(user, password):
+3 -1
View File
@@ -58,7 +58,9 @@ def generate_confirmation_token(user):
def requires_confirmation(user):
"""Returns `True` if the user requires confirmation."""
return _security.confirmable and not _security.login_without_confirmation and user.confirmed_at == None
return (_security.confirmable and
not _security.login_without_confirmation and
user.confirmed_at is None)
def confirm_email_token_status(token):
+70 -33
View File
@@ -92,38 +92,74 @@ _default_config = {
#: Default Flask-Security messages
_default_messages = {
'UNAUTHORIZED': ('You do not have permission to view this resource.', 'error'),
'CONFIRM_REGISTRATION': ('Thank you. Confirmation instructions have been sent to %(email)s.', 'success'),
'EMAIL_CONFIRMED': ('Thank you. Your email has been confirmed.', 'success'),
'ALREADY_CONFIRMED': ('Your email has already been confirmed.', 'info'),
'INVALID_CONFIRMATION_TOKEN': ('Invalid confirmation token.', 'error'),
'EMAIL_ALREADY_ASSOCIATED': ('%(email)s is already associated with an account.', 'error'),
'PASSWORD_MISMATCH': ('Password does not match', 'error'),
'RETYPE_PASSWORD_MISMATCH': ('Passwords do not match', 'error'),
'INVALID_REDIRECT': ('Redirections outside the domain are forbidden', 'error'),
'PASSWORD_RESET_REQUEST': ('Instructions to reset your password have been sent to %(email)s.', 'info'),
'PASSWORD_RESET_EXPIRED': ('You did not reset your password within %(within)s. New instructions have been sent to %(email)s.', 'error'),
'INVALID_RESET_PASSWORD_TOKEN': ('Invalid reset password token.', 'error'),
'CONFIRMATION_REQUIRED': ('Email requires confirmation.', 'error'),
'CONFIRMATION_REQUEST': ('Confirmation instructions have been sent to %(email)s.', 'info'),
'CONFIRMATION_EXPIRED': ('You did not confirm your email within %(within)s. New instructions to confirm your email have been sent to %(email)s.', 'error'),
'LOGIN_EXPIRED': ('You did not login within %(within)s. New instructions to login have been sent to %(email)s.', 'error'),
'LOGIN_EMAIL_SENT': ('Instructions to login have been sent to %(email)s.', 'success'),
'INVALID_LOGIN_TOKEN': ('Invalid login token.', 'error'),
'DISABLED_ACCOUNT': ('Account is disabled.', 'error'),
'EMAIL_NOT_PROVIDED': ('Email not provided', 'error'),
'INVALID_EMAIL_ADDRESS': ('Invalid email address', 'error'),
'PASSWORD_NOT_PROVIDED': ('Password not provided', 'error'),
'PASSWORD_NOT_SET': ('No password is set for this user', 'error'),
'PASSWORD_INVALID_LENGTH': ('Password must be at least 6 characters', 'error'),
'USER_DOES_NOT_EXIST': ('Specified user does not exist', 'error'),
'INVALID_PASSWORD': ('Invalid password', 'error'),
'PASSWORDLESS_LOGIN_SUCCESSFUL': ('You have successfuly logged in.', 'success'),
'PASSWORD_RESET': ('You successfully reset your password and you have been logged in automatically.', 'success'),
'PASSWORD_IS_THE_SAME': ('Your new password must be different than your previous password.', 'error'),
'PASSWORD_CHANGE': ('You successfully changed your password.', 'success'),
'LOGIN': ('Please log in to access this page.', 'info'),
'REFRESH': ('Please reauthenticate to access this page.', 'info'),
'UNAUTHORIZED': (
'You do not have permission to view this resource.', 'error'),
'CONFIRM_REGISTRATION': (
'Thank you. Confirmation instructions have been sent to %(email)s.', 'success'),
'EMAIL_CONFIRMED': (
'Thank you. Your email has been confirmed.', 'success'),
'ALREADY_CONFIRMED': (
'Your email has already been confirmed.', 'info'),
'INVALID_CONFIRMATION_TOKEN': (
'Invalid confirmation token.', 'error'),
'EMAIL_ALREADY_ASSOCIATED': (
'%(email)s is already associated with an account.', 'error'),
'PASSWORD_MISMATCH': (
'Password does not match', 'error'),
'RETYPE_PASSWORD_MISMATCH': (
'Passwords do not match', 'error'),
'INVALID_REDIRECT': (
'Redirections outside the domain are forbidden', 'error'),
'PASSWORD_RESET_REQUEST': (
'Instructions to reset your password have been sent to %(email)s.', 'info'),
'PASSWORD_RESET_EXPIRED': (
'You did not reset your password within %(within)s. New instructions have been sent '
'to %(email)s.', 'error'),
'INVALID_RESET_PASSWORD_TOKEN': (
'Invalid reset password token.', 'error'),
'CONFIRMATION_REQUIRED': (
'Email requires confirmation.', 'error'),
'CONFIRMATION_REQUEST': (
'Confirmation instructions have been sent to %(email)s.', 'info'),
'CONFIRMATION_EXPIRED': (
'You did not confirm your email within %(within)s. New instructions to confirm your email '
'have been sent to %(email)s.', 'error'),
'LOGIN_EXPIRED': (
'You did not login within %(within)s. New instructions to login have been sent to '
'%(email)s.', 'error'),
'LOGIN_EMAIL_SENT': (
'Instructions to login have been sent to %(email)s.', 'success'),
'INVALID_LOGIN_TOKEN': (
'Invalid login token.', 'error'),
'DISABLED_ACCOUNT': (
'Account is disabled.', 'error'),
'EMAIL_NOT_PROVIDED': (
'Email not provided', 'error'),
'INVALID_EMAIL_ADDRESS': (
'Invalid email address', 'error'),
'PASSWORD_NOT_PROVIDED': (
'Password not provided', 'error'),
'PASSWORD_NOT_SET': (
'No password is set for this user', 'error'),
'PASSWORD_INVALID_LENGTH': (
'Password must be at least 6 characters', 'error'),
'USER_DOES_NOT_EXIST': (
'Specified user does not exist', 'error'),
'INVALID_PASSWORD': (
'Invalid password', 'error'),
'PASSWORDLESS_LOGIN_SUCCESSFUL': (
'You have successfuly logged in.', 'success'),
'PASSWORD_RESET': (
'You successfully reset your password and you have been logged in automatically.',
'success'),
'PASSWORD_IS_THE_SAME': (
'Your new password must be different than your previous password.', 'error'),
'PASSWORD_CHANGE': (
'You successfully changed your password.', 'success'),
'LOGIN': (
'Please log in to access this page.', 'info'),
'REFRESH': (
'Please reauthenticate to access this page.', 'info'),
}
_allowed_password_hash_schemes = [
@@ -207,7 +243,8 @@ def _get_principal(app):
def _get_pwd_context(app):
pw_hash = cv('PASSWORD_HASH', app=app)
if pw_hash not in _allowed_password_hash_schemes:
allowed = ', '.join(_allowed_password_hash_schemes[:-1]) + ' and ' + _allowed_password_hash_schemes[-1]
allowed = (', '.join(_allowed_password_hash_schemes[:-1]) +
' and ' + _allowed_password_hash_schemes[-1])
raise ValueError("Invalid hash scheme %r. Allowed values are %s" % (pw_hash, allowed))
return CryptContext(schemes=_allowed_password_hash_schemes, default=pw_hash)
+1 -1
View File
@@ -13,7 +13,7 @@ from collections import namedtuple
from functools import wraps
from flask import current_app, Response, request, redirect, _request_ctx_stack
from flask.ext.login import current_user, login_required
from flask.ext.login import current_user, login_required # pragma: no flakes
from flask.ext.principal import RoleNeed, Permission, Identity, identity_changed
from werkzeug.local import LocalProxy
-1
View File
@@ -223,7 +223,6 @@ class LoginForm(Form, NextFormMixin):
self.password.errors.append(get_message('PASSWORD_NOT_PROVIDED')[0])
return False
self.user = _datastore.get_user(self.email.data)
if self.user is None:
+2 -2
View File
@@ -42,9 +42,9 @@ class CreateUserCommand(Command):
"""Create a user"""
option_list = (
Option('-e', '--email', dest='email', default=None),
Option('-e', '--email', dest='email', default=None),
Option('-p', '--password', dest='password', default=None),
Option('-a', '--active', dest='active', default=''),
Option('-a', '--active', dest='active', default=''),
)
@commit
+2 -60
View File
@@ -10,8 +10,6 @@
"""
import base64
import blinker
import functools
import hashlib
import hmac
import sys
@@ -20,16 +18,13 @@ from contextlib import contextmanager
from datetime import datetime, timedelta
from flask import url_for, flash, current_app, request, session, render_template
from flask.ext.login import login_user as _login_user, \
logout_user as _logout_user
from flask.ext.login import login_user as _login_user, logout_user as _logout_user
from flask.ext.mail import Message
from flask.ext.principal import Identity, AnonymousIdentity, identity_changed
from itsdangerous import BadSignature, SignatureExpired
from werkzeug.local import LocalProxy
from .signals import user_registered, user_confirmed, \
confirm_instructions_sent, login_instructions_sent, \
password_reset, password_changed, reset_password_instructions_sent
from .signals import user_registered, login_instructions_sent, reset_password_instructions_sent
# Convenient references
_security = LocalProxy(lambda: current_app.extensions['security'])
@@ -396,56 +391,3 @@ def capture_reset_password_requests(reset_password_sent_at=None):
yield reset_requests
finally:
reset_password_instructions_sent.disconnect(_on)
class CaptureSignals(object):
"""Testing utility for capturing blinker signals.
Context manager which mocks out selected signals and registers which are `sent` on and what
arguments were sent. Instantiate with a list of blinker `NamedSignals` to patch. Each signal
has it's `send` mocked out.
"""
def __init__(self, signals):
"""Patch all given signals and make them available as attributes.
:param signals: list of signals
"""
self._records = {}
self._receivers = {}
for signal in signals:
self._records[signal] = []
self._receivers[signal] = functools.partial(self._record, signal)
def __getitem__(self, signal):
"""All captured signals are available via `ctxt[signal]`.
"""
if isinstance(signal, blinker.base.NamedSignal):
return self._records[signal]
else:
super(CaptureSignals, self).__setitem__(signal)
def _record(self, signal, *args, **kwargs):
self._records[signal].append((args, kwargs))
def __enter__(self):
for signal, receiver in self._receivers.items():
signal.connect(receiver)
return self
def __exit__(self, type, value, traceback):
for signal, receiver in self._receivers.items():
signal.disconnect(receiver)
def signals_sent(self):
"""Return a set of the signals sent.
:rtype: list of blinker `NamedSignals`.
"""
return set([signal for signal, _ in self._records.items() if self._records[signal]])
def capture_signals():
"""Factory method that creates a `CaptureSignals` with all the flask_security signals."""
return CaptureSignals([user_registered, user_confirmed,
confirm_instructions_sent, login_instructions_sent,
password_reset, password_changed,
reset_password_instructions_sent])
+3 -2
View File
@@ -77,8 +77,9 @@ def login():
if not request.json:
return redirect(get_post_login_redirect())
form.next.data = get_url(request.args.get('next')) \
or get_url(request.form.get('next')) or ''
form.next.data = (get_url(request.args.get('next')) or
get_url(request.form.get('next')) or
'')
if request.json:
return _render_json(form, True)
+10
View File
@@ -0,0 +1,10 @@
Flask-SQLAlchemy>=1.0
bcrypt>=1.0.2
flask-mongoengine>=0.7.0
flask-peewee>=0.6.5
pytest>=2.5.2
pytest-cache>=1.0
pytest-cov>=1.6
pytest-flakes>=0.2
pytest-pep8>=1.0.5
tox>=1.7.0
+11 -1
View File
@@ -3,4 +3,14 @@ source-dir = docs/
build-dir = docs/_build
[upload_sphinx]
upload-dir = docs/_build/html
upload-dir = docs/_build/html
[pytest]
pep8maxlinelength = 99
pep8ignore =
docs/* ALL
flakes-ignore =
ImportStarUsed
flask_security/__init__.py UnusedImport
+42 -35
View File
@@ -1,22 +1,45 @@
"""
Flask-Security
==============
Flask-Security is a Flask extension that aims to add quick and simple security
to your Flask applications.
Resources
---------
* `Documentation <http://packages.python.org/Flask-Security/>`_
* `Issue Tracker <https://github.com/mattupstate/flask-security/issues>`_
* `Source <https://github.com/mattupstate/flask-security>`_
* `Development Version
<https://github.com/mattupstate/flask-security/raw/develop#egg=Flask-Security-dev>`_
"""
from setuptools import setup
import sys
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
def get_requirements(suffix=''):
with open('requirements%s.txt' % suffix) as f:
rv = f.read().splitlines()
return rv
def get_long_description():
with open('README.rst') as f:
rv = f.read()
return rv
class PyTest(TestCommand):
def finalize_options(self):
TestCommand.finalize_options(self)
self.test_args = [
'-xrs',
'--cov', 'flask_security',
'--cov-report', 'term-missing',
'--pep8',
'--flakes',
'--clearcache'
]
self.test_suite = True
def run_tests(self):
import pytest
errno = pytest.main(self.test_args)
sys.exit(errno)
setup(
name='Flask-Security',
@@ -26,30 +49,14 @@ setup(
author='Matt Wright',
author_email='matt@nobien.net',
description='Simple security for Flask apps',
long_description=__doc__,
packages=[
'flask_security'
],
long_description=get_long_description(),
packages=find_packages(),
zip_safe=False,
include_package_data=True,
platforms='any',
install_requires=[
'Flask>=0.10.1',
'Flask-Login>=0.2.9',
'Flask-Mail>=0.9.0',
'Flask-Principal>=0.4.0',
'Flask-WTF>=0.9.3',
'passlib>=1.6.2',
],
test_suite='nose.collector',
tests_require=[
'nose',
'Flask-SQLAlchemy',
'Flask-MongoEngine',
'Flask-Peewee',
'bcrypt',
'simplejson'
],
install_requires=get_requirements(),
tests_require=get_requirements('-dev'),
cmdclass={'test': PyTest},
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Web Environment',
-85
View File
@@ -1,85 +0,0 @@
# -*- coding: utf-8 -*-
import hmac
from hashlib import sha1
from unittest import TestCase
from tests.test_app.sqlalchemy import create_app
class SecurityTest(TestCase):
APP_KWARGS = {
'register_blueprint': True,
}
AUTH_CONFIG = None
def setUp(self):
super(SecurityTest, self).setUp()
app_kwargs = self.APP_KWARGS
app = self._create_app(self.AUTH_CONFIG or {}, **app_kwargs)
app.debug = False
app.config['TESTING'] = True
app.config['WTF_CSRF_ENABLED'] = False
self.app = app
self.client = app.test_client()
def _create_app(self, auth_config, **kwargs):
return create_app(auth_config, **kwargs)
def _get(self, route, content_type=None, follow_redirects=None, headers=None):
return self.client.get(route, follow_redirects=follow_redirects,
content_type=content_type or 'text/html',
headers=headers)
def _post(self, route, data=None, content_type=None, follow_redirects=True, headers=None):
content_type = content_type or 'application/x-www-form-urlencoded'
return self.client.post(route, data=data,
follow_redirects=follow_redirects,
content_type=content_type, headers=headers)
def register(self, email, password='password'):
data = dict(email=email, password=password)
return self.client.post('/register', data=data, follow_redirects=True)
def authenticate(self, email="matt@lp.com", password="password", endpoint=None, **kwargs):
data = dict(email=email, password=password, remember='y')
return self._post(endpoint or '/login', data=data, **kwargs)
def json_authenticate(self, email="matt@lp.com", password="password", endpoint=None):
data = """{
"email": "%s",
"password": "%s"
}"""
return self._post(endpoint or '/login', content_type="application/json",
data=data % (email, password))
def logout(self, endpoint=None):
return self._get(endpoint or '/logout', follow_redirects=True)
def assertIsHomePage(self, data):
self.assertIn(b'Home Page', data)
def assertIn(self, member, container, msg=None):
if hasattr(TestCase, 'assertIn'):
return TestCase.assertIn(self, member, container, msg)
return self.assertTrue(member in container)
def assertNotIn(self, member, container, msg=None):
if hasattr(TestCase, 'assertNotIn'):
return TestCase.assertNotIn(self, member, container, msg)
return self.assertFalse(member in container)
def assertIsNotNone(self, obj, msg=None):
if hasattr(TestCase, 'assertIsNotNone'):
return TestCase.assertIsNotNone(self, obj, msg)
return self.assertTrue(obj is not None)
def get_message(self, key, **kwargs):
return self.app.config['SECURITY_MSG_' + key][0] % kwargs
-878
View File
@@ -1,878 +0,0 @@
# -*- coding: utf-8 -*-
# from __future__ import with_statement
import base64
import time
import simplejson as json
import flask
from flask_security.utils import capture_registrations, \
capture_reset_password_requests, capture_passwordless_login_requests
from flask_security.forms import LoginForm, ConfirmRegisterForm, RegisterForm, \
ForgotPasswordForm, ResetPasswordForm, SendConfirmationForm, \
PasswordlessLoginForm
from flask_security.forms import TextField, SubmitField, valid_user_email
from flask_security.signals import user_registered
from tests import SecurityTest
class PasswordVerifyEncryptTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_PASSWORD_HASH': 'bcrypt',
'SECURITY_PASSWORD_SALT': '89gf828uiguiu23ju2'
}
def test_verify_password_bcrypt(self):
from flask_security.utils import verify_password, encrypt_password
with self.app.app_context():
self.assertTrue(verify_password('custompassword', encrypt_password('custompassword')))
class ConfiguredPasswordHashSecurityTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_PASSWORD_HASH': 'bcrypt',
'SECURITY_PASSWORD_SALT': 'so-salty',
'USER_COUNT': 1
}
def test_authenticate(self):
r = self.authenticate(endpoint="/login")
self.assertIn(b'Home Page', r.data)
class ConfiguredSecurityTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_REGISTERABLE': True,
'SECURITY_LOGOUT_URL': '/custom_logout',
'SECURITY_LOGIN_URL': '/custom_login',
'SECURITY_POST_LOGIN_VIEW': '/post_login',
'SECURITY_POST_LOGOUT_VIEW': '/post_logout',
'SECURITY_POST_REGISTER_VIEW': '/post_register',
'SECURITY_UNAUTHORIZED_VIEW': '/unauthorized',
'SECURITY_DEFAULT_HTTP_AUTH_REALM': 'Custom Realm'
}
def test_login_view(self):
r = self._get('/custom_login')
self.assertIn(b"<h1>Login</h1>", r.data)
def test_authenticate(self):
r = self.authenticate(endpoint="/custom_login")
self.assertIn(b'Post Login', r.data)
def test_logout(self):
self.authenticate(endpoint="/custom_login")
r = self.logout(endpoint="/custom_logout")
self.assertIn(b'Post Logout', r.data)
def test_register_view(self):
r = self._get('/register')
self.assertIn(b'<h1>Register</h1>', r.data)
def test_register(self):
data = dict(email='dude@lp.com',
password='password',
password_confirm='password')
r = self._post('/register', data=data, follow_redirects=True)
self.assertIn(b'Post Register', r.data)
def test_register_with_next_querystring_argument(self):
data = dict(email='dude@lp.com',
password='password',
password_confirm='password')
r = self._post('/register?next=/page1', data=data, follow_redirects=True)
self.assertIn(b'Page 1', r.data)
def test_register_json(self):
data = '{ "email": "dude@lp.com", "password": "password"}'
r = self._post('/register', data=data, content_type='application/json')
data = json.loads(r.data)
self.assertEquals(data['meta']['code'], 200)
def test_register_existing_email(self):
data = dict(email='matt@lp.com',
password='password',
password_confirm='password')
r = self._post('/register', data=data, follow_redirects=True)
msg = b'matt@lp.com is already associated with an account'
self.assertIn(msg, r.data)
def test_unauthorized(self):
self.authenticate("joe@lp.com", endpoint="/custom_auth")
r = self._get("/admin", follow_redirects=True)
msg = b'You are not allowed to access the requested resouce'
self.assertIn(msg, r.data)
def test_default_http_auth_realm(self):
r = self._get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus")
})
self.assertIn(b'<h1>Unauthorized</h1>', r.data)
self.assertIn('WWW-Authenticate', r.headers)
self.assertEquals('Basic realm="Custom Realm"',
r.headers['WWW-Authenticate'])
class BadConfiguredSecurityTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_PASSWORD_HASH': 'bcrypt',
'USER_COUNT': 1
}
def test_bad_configuration_raises_runtimer_error(self):
self.assertRaises(RuntimeError, self.authenticate)
class DefaultTemplatePathTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_LOGIN_USER_TEMPLATE': 'custom_security/login_user.html',
}
def test_login_user_template(self):
r = self._get('/login')
self.assertIn(b'CUSTOM LOGIN USER', r.data)
class RegisterableTemplatePathTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_REGISTERABLE': True,
'SECURITY_REGISTER_USER_TEMPLATE': 'custom_security/register_user.html'
}
def test_register_user_template(self):
r = self._get('/register')
self.assertIn(b'CUSTOM REGISTER USER', r.data)
class RecoverableTemplatePathTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_RECOVERABLE': True,
'SECURITY_FORGOT_PASSWORD_TEMPLATE': 'custom_security/forgot_password.html',
'SECURITY_RESET_PASSWORD_TEMPLATE': 'custom_security/reset_password.html',
}
def test_forgot_password_template(self):
r = self._get('/reset')
self.assertIn(b'CUSTOM FORGOT PASSWORD', r.data)
def test_reset_password_template(self):
with capture_reset_password_requests() as requests:
r = self._post('/reset', data=dict(email='joe@lp.com'),
follow_redirects=True)
t = requests[0]['token']
r = self._get('/reset/' + t)
self.assertIn(b'CUSTOM RESET PASSWORD', r.data)
class ConfirmableTemplatePathTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': True,
'SECURITY_SEND_CONFIRMATION_TEMPLATE': 'custom_security/send_confirmation.html'
}
def test_send_confirmation_template(self):
r = self._get('/confirm')
self.assertIn(b'CUSTOM SEND CONFIRMATION', r.data)
class PasswordlessTemplatePathTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_PASSWORDLESS': True,
'SECURITY_SEND_LOGIN_TEMPLATE': 'custom_security/send_login.html'
}
def test_send_login_template(self):
r = self._get('/login')
self.assertIn(b'CUSTOM SEND LOGIN', r.data)
class RegisterableTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_REGISTERABLE': True,
'USER_COUNT': 1
}
def test_register_valid_user(self):
data = dict(email='dude@lp.com',
password='password',
password_confirm='password')
self._post('/register', data=data, follow_redirects=True)
r = self.authenticate('dude@lp.com')
self.assertIn(b'Hello dude@lp.com', r.data)
class ConfirmableTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
'SECURITY_EMAIL_SUBJECT_REGISTER': 'Custom welcome subject',
'USER_COUNT': 1
}
def test_login_before_confirmation(self):
e = 'dude@lp.com'
self.register(e)
r = self.authenticate(email=e)
self.assertIn(self.get_message('CONFIRMATION_REQUIRED').encode('utf-8'), r.data)
def test_send_confirmation_of_already_confirmed_account(self):
e = 'dude@lp.com'
with capture_registrations() as registrations:
r = self.register(e)
token = registrations[0]['confirm_token']
self.client.get('/confirm/' + token, follow_redirects=True)
self.logout()
r = self._post('/confirm', data=dict(email=e))
m = self.get_message('ALREADY_CONFIRMED')
self.assertIn(m.encode('utf-8'), r.data)
def test_register_sends_confirmation_email(self):
e = 'dude@lp.com'
with self.app.extensions['mail'].record_messages() as outbox:
self.register(e)
self.assertEqual(len(outbox), 1)
self.assertIn(e, outbox[0].html)
self.assertEqual('Custom welcome subject', outbox[0].subject)
def test_confirm_email(self):
e = 'dude@lp.com'
tokens = []
def on_registered(sender, **kwargs):
tokens.append(kwargs['confirm_token'])
user_registered.connect(on_registered, self.app)
r = self.register(e)
self.assertEqual(len(tokens), 1)
r = self.client.get('/confirm/' + tokens[0], follow_redirects=True)
msg = self.app.config['SECURITY_MSG_EMAIL_CONFIRMED'][0]
self.assertIn(msg.encode('utf-8'), r.data)
def test_invalid_token_when_confirming_email(self):
r = self.client.get('/confirm/bogus', follow_redirects=True)
msg = self.app.config['SECURITY_MSG_INVALID_CONFIRMATION_TOKEN'][0]
self.assertIn(msg.encode('utf-8'), r.data)
def test_send_confirmation_json(self):
r = self._post('/confirm', data='{"email": "matt@lp.com"}',
content_type='application/json')
self.assertEquals(r.status_code, 200)
def test_send_confirmation_with_invalid_email(self):
r = self._post('/confirm', data=dict(email='bogus@bogus.com'))
msg = self.app.config['SECURITY_MSG_USER_DOES_NOT_EXIST'][0]
self.assertIn(msg.encode('utf-8'), r.data)
def test_resend_confirmation(self):
e = 'dude@lp.com'
self.register(e)
r = self._post('/confirm', data={'email': e})
msg = self.get_message('CONFIRMATION_REQUEST', email=e).encode('utf-8')
self.assertIn(msg, r.data)
def test_user_deleted_before_confirmation(self):
e = 'dude@lp.com'
with capture_registrations() as registrations:
self.register(e)
user = registrations[0]['user']
token = registrations[0]['confirm_token']
with self.app.app_context():
from flask_security.core import _security
_security.datastore.delete(user)
_security.datastore.commit()
r = self.client.get('/confirm/' + token, follow_redirects=True)
msg = self.app.config['SECURITY_MSG_INVALID_CONFIRMATION_TOKEN'][0]
self.assertIn(msg.encode('utf-8'), r.data)
class ExpiredConfirmationTest(SecurityTest):
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
'SECURITY_CONFIRM_EMAIL_WITHIN': '1 milliseconds',
'USER_COUNT': 1
}
def test_expired_confirmation_token_sends_email(self):
e = 'dude@lp.com'
with capture_registrations() as registrations:
self.register(e)
token = registrations[0]['confirm_token']
time.sleep(1.25)
with self.app.extensions['mail'].record_messages() as outbox:
r = self.client.get('/confirm/' + token, follow_redirects=True)
self.assertEqual(len(outbox), 1)
self.assertNotIn(token, outbox[0].html)
expire_text = self.AUTH_CONFIG['SECURITY_CONFIRM_EMAIL_WITHIN']
msg = self.app.config['SECURITY_MSG_CONFIRMATION_EXPIRED'][0]
msg = msg % dict(within=expire_text, email=e)
self.assertIn(msg.encode('utf-8'), r.data)
class LoginWithoutImmediateConfirmTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True,
'USER_COUNT': 1
}
def test_register_valid_user_automatically_signs_in(self):
e = 'dude@lp.com'
p = 'password'
data = dict(email=e, password=p, password_confirm=p)
r = self._post('/register', data=data, follow_redirects=True)
self.assertIn(e.encode('utf-8'), r.data)
def test_confirm_email_of_user_different_than_current_user(self):
e1 = 'dude@lp.com'
e2 = 'lady@lp.com'
with capture_registrations() as registrations:
self.register(e1)
self.logout()
self.register(e2)
token1 = registrations[0]['confirm_token']
token2 = registrations[1]['confirm_token']
self.client.get('/confirm/' + token1, follow_redirects=True)
self.client.get('/logout')
self.authenticate(email=e1)
r = self.client.get('/confirm/' + token2, follow_redirects=True)
m = self.app.config['SECURITY_MSG_EMAIL_CONFIRMED'][0]
self.assertIn(m.encode('utf-8'), r.data)
self.assertIn(b'Hello lady@lp.com', r.data)
def test_login_unconfirmed_user_when_login_without_confirmation_is_true(self):
e = 'dude@lp.com'
p = 'password'
data = dict(email=e, password=p, password_confirm=p)
r = self._post('/register', data=data, follow_redirects=True)
self.assertIn(e.encode('utf-8'), r.data)
self.client.get('/logout')
r = self.authenticate(email=e)
self.assertIn(e.encode('utf-8'), r.data)
class RecoverableTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_RECOVERABLE': True,
'SECURITY_RESET_PASSWORD_ERROR_VIEW': '/',
'SECURITY_POST_FORGOT_VIEW': '/'
}
def test_reset_view(self):
with capture_reset_password_requests() as requests:
r = self._post('/reset', data=dict(email='joe@lp.com'),
follow_redirects=True)
t = requests[0]['token']
r = self._get('/reset/' + t)
self.assertIn(b'<h1>Reset password</h1>', r.data)
def test_forgot_post_sends_email(self):
with capture_reset_password_requests():
with self.app.extensions['mail'].record_messages() as outbox:
self._post('/reset', data=dict(email='joe@lp.com'))
self.assertEqual(len(outbox), 1)
def test_forgot_password_json(self):
r = self._post('/reset', data='{"email": "matt@lp.com"}',
content_type="application/json")
self.assertEquals(r.status_code, 200)
def test_forgot_password_invalid_email(self):
r = self._post('/reset', data=dict(email='larry@lp.com'),
follow_redirects=True)
self.assertIn(b"Specified user does not exist", r.data)
def test_reset_password_with_valid_token(self):
with capture_reset_password_requests() as requests:
r = self._post('/reset', data=dict(email='joe@lp.com'),
follow_redirects=True)
t = requests[0]['token']
r = self._post('/reset/' + t, data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
r = self.logout()
r = self.authenticate('joe@lp.com', 'newpassword')
self.assertIn(b'Hello joe@lp.com', r.data)
def test_reset_password_with_invalid_token(self):
r = self._post('/reset/bogus', data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
m = self.get_message('INVALID_RESET_PASSWORD_TOKEN')
self.assertIn(m.encode('utf-8'), r.data)
def test_reset_password_with_mangled_token(self):
t = "WyIxNjQ2MzYiLCIxMzQ1YzBlZmVhM2VhZjYwODgwMDhhZGU2YzU0MzZjMiJd.BZEw_Q.lQyo3npdPZtcJ_sNHVHP103syjM&url_id=fbb89a8328e58c181ea7d064c2987874bc54a23d"
r = self._post('/reset/' + t, data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
m = self.get_message('INVALID_RESET_PASSWORD_TOKEN')
self.assertIn(m.encode('utf-8'), r.data)
class ExpiredResetPasswordTest(SecurityTest):
AUTH_CONFIG = {
'SECURITY_RECOVERABLE': True,
'SECURITY_RESET_PASSWORD_WITHIN': '1 milliseconds'
}
def test_reset_password_with_expired_token(self):
with capture_reset_password_requests() as requests:
r = self._post('/reset', data=dict(email='joe@lp.com'),
follow_redirects=True)
t = requests[0]['token']
time.sleep(1)
r = self._post('/reset/' + t, data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
self.assertIn(b'You did not reset your password within', r.data)
class ChangePasswordTest(SecurityTest):
AUTH_CONFIG = {
'SECURITY_RECOVERABLE': True,
'SECURITY_CHANGEABLE': True,
}
def test_change_password(self):
self.authenticate()
r = self.client.get('/change', follow_redirects=True)
self.assertIn(b'Change password', r.data)
def test_change_password_invalid(self):
self.authenticate()
r = self._post('/change', data={
'password': 'notpassword',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}, follow_redirects=True)
self.assertNotIn(b'You successfully changed your password', r.data)
self.assertIn(b'Invalid password', r.data)
def test_change_password_mismatch(self):
self.authenticate()
r = self._post('/change', data={
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'notnewpassword'
}, follow_redirects=True)
self.assertNotIn(b'You successfully changed your password', r.data)
self.assertIn(b'Passwords do not match', r.data)
def test_change_password_bad_password(self):
self.authenticate()
r = self._post('/change', data={
'password': 'password',
'new_password': 'a',
'new_password_confirm': 'a'
}, follow_redirects=True)
self.assertNotIn(b'You successfully changed your password', r.data)
self.assertIn(b'Password must be at least 6 characters', r.data)
def test_change_password_same_as_previous(self):
self.authenticate()
r = self._post('/change', data={
'password': 'password',
'new_password': 'password',
'new_password_confirm': 'password'
}, follow_redirects=True)
self.assertNotIn(b'You successfully changed your password', r.data)
self.assertIn(b'Your new password must be different than your previous password.', r.data)
def test_change_password_success(self):
data = {
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}
self.authenticate()
with self.app.extensions['mail'].record_messages() as outbox:
r = self._post('/change', data=data, follow_redirects=True)
self.assertIn(b'You successfully changed your password', r.data)
self.assertIn(b'Home Page', r.data)
self.assertEqual(len(outbox), 1)
self.assertIn("Your password has been changed", outbox[0].html)
self.assertIn("/reset", outbox[0].html)
class EmailConfigTest(SecurityTest):
AUTH_CONFIG = {
'SECURITY_SEND_REGISTER_EMAIL': False,
'SECURITY_SEND_PASSWORD_CHANGE_EMAIL': False,
}
def test_change_password_success_email_option(self):
"""Test the change password email can be turned off w/ configuration."""
data = {
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}
self.authenticate()
with self.app.extensions['mail'].record_messages() as outbox:
self._post('/change', data=data, follow_redirects=True)
self.assertEqual(len(outbox), 0)
class ChangePasswordPostViewTest(SecurityTest):
AUTH_CONFIG = {
'SECURITY_CHANGEABLE': True,
'SECURITY_POST_CHANGE_VIEW': '/profile',
}
def test_change_password_success(self):
data = {
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}
self.authenticate()
r = self._post('/change', data=data, follow_redirects=True)
self.assertIn(b'Profile Page', r.data)
class ChangePasswordDisabledTest(SecurityTest):
AUTH_CONFIG = {
'SECURITY_CHANGEABLE': False,
}
def test_change_password_endpoint_is_404(self):
self.authenticate()
r = self.client.get('/change', follow_redirects=True)
self.assertEqual(404, r.status_code)
class TrackableTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_TRACKABLE': True,
'USER_COUNT': 1
}
def test_did_track(self):
e = 'matt@lp.com'
self.authenticate(email=e)
self.logout()
self.authenticate(email=e)
with self.app.test_request_context('/profile'):
user = self.app.security.datastore.find_user(email=e)
self.assertIsNotNone(user.last_login_at)
self.assertIsNotNone(user.current_login_at)
self.assertEquals('untrackable', user.last_login_ip)
self.assertEquals('untrackable', user.current_login_ip)
self.assertEquals(2, user.login_count)
class PasswordlessTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_PASSWORDLESS': True
}
def test_login_request_for_inactive_user(self):
msg = self.app.config['SECURITY_MSG_DISABLED_ACCOUNT'][0]
r = self._post('/login', data=dict(email='tiya@lp.com'),
follow_redirects=True)
self.assertIn(msg.encode('utf-8'), r.data)
def test_request_login_token_with_json_and_valid_email(self):
data = '{"email": "matt@lp.com", "password": "password"}'
r = self._post('/login', data=data, content_type='application/json')
self.assertEquals(r.status_code, 200)
self.assertNotIn(b'error', r.data)
def test_request_login_token_with_json_and_invalid_email(self):
data = '{"email": "nobody@lp.com", "password": "password"}'
r = self._post('/login', data=data, content_type='application/json')
self.assertIn(b'errors', r.data)
def test_request_login_token_sends_email_and_can_login(self):
e = 'matt@lp.com'
r, user, token = None, None, None
with capture_passwordless_login_requests() as requests:
with self.app.extensions['mail'].record_messages() as outbox:
r = self._post('/login', data=dict(email=e),
follow_redirects=True)
self.assertEqual(len(outbox), 1)
self.assertEquals(1, len(requests))
self.assertIn('user', requests[0])
self.assertIn('login_token', requests[0])
user = requests[0]['user']
token = requests[0]['login_token']
msg = self.app.config['SECURITY_MSG_LOGIN_EMAIL_SENT'][0]
msg = msg % dict(email=user.email)
self.assertIn(msg.encode('utf-8'), r.data)
r = self.client.get('/login/' + token, follow_redirects=True)
msg = self.get_message('PASSWORDLESS_LOGIN_SUCCESSFUL').encode('utf-8')
self.assertIn(msg, r.data)
r = self.client.get('/profile')
self.assertIn(b'Profile Page', r.data)
def test_invalid_login_token(self):
m = self.app.config['SECURITY_MSG_INVALID_LOGIN_TOKEN'][0]
r = self._get('/login/bogus', follow_redirects=True)
self.assertIn(m.encode('utf-8'), r.data)
def test_token_login_when_already_authenticated(self):
with capture_passwordless_login_requests() as requests:
self._post('/login', data=dict(email='matt@lp.com'),
follow_redirects=True)
token = requests[0]['login_token']
r = self.client.get('/login/' + token, follow_redirects=True)
msg = self.get_message('PASSWORDLESS_LOGIN_SUCCESSFUL')
self.assertIn(msg.encode('utf-8'), r.data)
r = self.client.get('/login/' + token, follow_redirects=True)
msg = self.get_message('PASSWORDLESS_LOGIN_SUCCESSFUL')
self.assertNotIn(msg.encode('utf-8'), r.data)
def test_send_login_with_invalid_email(self):
r = self._post('/login', data=dict(email='bogus@bogus.com'))
self.assertIn(b'Specified user does not exist', r.data)
class ExpiredLoginTokenTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_PASSWORDLESS': True,
'SECURITY_LOGIN_WITHIN': '1 milliseconds',
'USER_COUNT': 1
}
def test_expired_login_token_sends_email(self):
e = 'matt@lp.com'
with capture_passwordless_login_requests() as requests:
self._post('/login', data=dict(email=e), follow_redirects=True)
token = requests[0]['login_token']
time.sleep(1.25)
with self.app.extensions['mail'].record_messages() as outbox:
r = self.client.get('/login/' + token, follow_redirects=True)
expire_text = self.AUTH_CONFIG['SECURITY_LOGIN_WITHIN']
msg = self.app.config['SECURITY_MSG_LOGIN_EXPIRED'][0]
msg = msg % dict(within=expire_text, email=e)
self.assertIn(msg.encode('utf-8'), r.data)
self.assertEqual(len(outbox), 1)
self.assertIn(e, outbox[0].html)
self.assertNotIn(token, outbox[0].html)
class AsyncMailTaskTests(SecurityTest):
AUTH_CONFIG = {
'SECURITY_RECOVERABLE': True,
'USER_COUNT': 1
}
def setUp(self):
super(AsyncMailTaskTests, self).setUp()
self.mail_sent = False
def test_send_email_task_is_called(self):
@self.app.security.send_mail_task
def send_email(msg):
self.mail_sent = True
self._post('/reset', data=dict(email='matt@lp.com'))
self.assertTrue(self.mail_sent)
class NoBlueprintTests(SecurityTest):
APP_KWARGS = {
'register_blueprint': False,
}
AUTH_CONFIG = {
'USER_COUNT': 1
}
def test_login_endpoint_is_404(self):
r = self._get('/login')
self.assertEqual(404, r.status_code)
def test_http_auth_without_blueprint(self):
auth = base64.b64encode(b"matt@lp.com:password").decode('utf-8')
r = self._get('/http', headers={'Authorization': 'basic %s' % auth})
self.assertIn(b'HTTP Authentication', r.data)
class ExtendFormsTest(SecurityTest):
class MyLoginForm(LoginForm):
email = TextField('My Login Email Address Field')
class MyRegisterForm(RegisterForm):
email = TextField('My Register Email Address Field')
APP_KWARGS = {
'login_form': MyLoginForm,
'register_form': MyRegisterForm,
}
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': False,
'SECURITY_REGISTERABLE': True,
}
def test_login_view(self):
r = self._get('/login', follow_redirects=True)
self.assertIn(b"My Login Email Address Field", r.data)
def test_register(self):
r = self._get('/register', follow_redirects=True)
self.assertIn(b"My Register Email Address Field", r.data)
class RecoverableExtendFormsTest(SecurityTest):
class MyForgotPasswordForm(ForgotPasswordForm):
email = TextField('My Forgot Password Email Address Field',
validators=[valid_user_email])
class MyResetPasswordForm(ResetPasswordForm):
submit = SubmitField("My Reset Password Submit Field")
APP_KWARGS = {
'forgot_password_form': MyForgotPasswordForm,
'reset_password_form': MyResetPasswordForm,
}
AUTH_CONFIG = {
'SECURITY_RECOVERABLE': True,
}
def test_forgot_password(self):
r = self._get('/reset', follow_redirects=True)
self.assertIn(b"My Forgot Password Email Address Field", r.data)
def test_reset_password(self):
with capture_reset_password_requests() as requests:
self._post('/reset', data=dict(email='joe@lp.com'),
follow_redirects=True)
token = requests[0]['token']
r = self._get('/reset/' + token)
self.assertIn(b"My Reset Password Submit Field", r.data)
class PasswordlessExtendFormsTest(SecurityTest):
class MyPasswordlessLoginForm(PasswordlessLoginForm):
email = TextField('My Passwordless Login Email Address Field')
APP_KWARGS = {
'passwordless_login_form': MyPasswordlessLoginForm,
}
AUTH_CONFIG = {
'SECURITY_PASSWORDLESS': True,
}
def test_passwordless_login(self):
r = self._get('/login', follow_redirects=True)
self.assertIn(b"My Passwordless Login Email Address Field", r.data)
class ConfirmableExtendFormsTest(SecurityTest):
class MyConfirmRegisterForm(ConfirmRegisterForm):
email = TextField('My Confirm Register Email Address Field')
class MySendConfirmationForm(SendConfirmationForm):
email = TextField('My Send Confirmation Email Address Field')
APP_KWARGS = {
'confirm_register_form': MyConfirmRegisterForm,
'send_confirmation_form': MySendConfirmationForm,
}
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
}
def test_register(self):
r = self._get('/register', follow_redirects=True)
self.assertIn(b"My Confirm Register Email Address Field", r.data)
def test_send_confirmation(self):
r = self._get('/confirm', follow_redirects=True)
self.assertIn(b"My Send Confirmation Email Address Field", r.data)
class AdditionalUserIdentityAttributes(SecurityTest):
AUTH_CONFIG = {
'SECURITY_USER_IDENTITY_ATTRIBUTES': ('email', 'username')
}
def test_authenticate(self):
r = self.authenticate(email='matt')
self.assertIn(b'Hello matt@lp.com', r.data)
+307
View File
@@ -0,0 +1,307 @@
# -*- coding: utf-8 -*-
"""
conftest
~~~~~~~~
Test fixtures and what not
"""
import os
import tempfile
import time
import pytest
from flask import Flask, render_template, current_app
from flask_mail import Mail
from flask_security import Security, MongoEngineUserDatastore, SQLAlchemyUserDatastore, \
PeeweeUserDatastore, UserMixin, RoleMixin, http_auth_required, login_required, \
auth_token_required, auth_required, roles_required, roles_accepted
from utils import populate_data, Response
@pytest.fixture()
def app():
app = Flask(__name__)
app.response_class = Response
app.debug = True
app.config['SECRET_KEY'] = 'secret'
app.config['TESTING'] = True
app.config['LOGIN_DISABLED'] = False
app.config['WTF_CSRF_ENABLED'] = False
mail = Mail(app)
app.mail = mail
@app.route('/')
def index():
return render_template('index.html', content='Home Page')
@app.route('/profile')
@login_required
def profile():
return render_template('index.html', content='Profile Page')
@app.route('/post_login')
@login_required
def post_login():
return render_template('index.html', content='Post Login')
@app.route('/http')
@http_auth_required
def http():
return 'HTTP Authentication'
@app.route('/http_custom_realm')
@http_auth_required('My Realm')
def http_custom_realm():
return render_template('index.html', content='HTTP Authentication')
@app.route('/token')
@auth_token_required
def token():
return render_template('index.html', content='Token Authentication')
@app.route('/multi_auth')
@auth_required('session', 'token', 'basic')
def multi_auth():
return render_template('index.html', content='Session, Token, Basic auth')
@app.route('/post_logout')
def post_logout():
return render_template('index.html', content='Post Logout')
@app.route('/post_register')
def post_register():
return render_template('index.html', content='Post Register')
@app.route('/admin')
@roles_required('admin')
def admin():
return render_template('index.html', content='Admin Page')
@app.route('/admin_and_editor')
@roles_required('admin', 'editor')
def admin_and_editor():
return render_template('index.html', content='Admin and Editor Page')
@app.route('/admin_or_editor')
@roles_accepted('admin', 'editor')
def admin_or_editor():
return render_template('index.html', content='Admin or Editor Page')
@app.route('/unauthorized')
def unauthorized():
return render_template('unauthorized.html')
@app.route('/coverage/add_role_to_user')
def add_role_to_user():
ds = current_app.security.datastore
u = ds.find_user(email='joe@lp.com')
r = ds.find_role('admin')
ds.add_role_to_user(u, r)
return 'success'
@app.route('/coverage/remove_role_from_user')
def remove_role_from_user():
ds = current_app.security.datastore
u = ds.find_user(email='matt@lp.com')
ds.remove_role_from_user(u, 'admin')
return 'success'
@app.route('/coverage/deactivate_user')
def deactivate_user():
ds = current_app.security.datastore
u = ds.find_user(email='matt@lp.com')
ds.deactivate_user(u)
return 'success'
@app.route('/coverage/activate_user')
def activate_user():
ds = current_app.security.datastore
u = ds.find_user(email='tiya@lp.com')
ds.activate_user(u)
return 'success'
@app.route('/coverage/invalid_role')
def invalid_role():
ds = current_app.security.datastore
return 'success' if ds.find_role('bogus') is None else 'failure'
@app.route('/page1')
def page_1():
return 'Page 1'
return app
@pytest.fixture()
def mongoengine_datastore(request, app):
from flask_mongoengine import MongoEngine
db_name = 'flask_security_test_%s' % str(time.time()).replace('.', '_')
app.config['MONGODB_SETTINGS'] = {
'db': db_name,
'host': 'localhost',
'port': 27017,
'alias': db_name
}
db = MongoEngine(app)
class Role(db.Document, RoleMixin):
name = db.StringField(required=True, unique=True, max_length=80)
description = db.StringField(max_length=255)
meta = {"db_alias": db_name}
class User(db.Document, UserMixin):
email = db.StringField(unique=True, max_length=255)
username = db.StringField(max_length=255)
password = db.StringField(required=True, max_length=255)
last_login_at = db.DateTimeField()
current_login_at = db.DateTimeField()
last_login_ip = db.StringField(max_length=100)
current_login_ip = db.StringField(max_length=100)
login_count = db.IntField()
active = db.BooleanField(default=True)
confirmed_at = db.DateTimeField()
roles = db.ListField(db.ReferenceField(Role), default=[])
meta = {"db_alias": db_name}
request.addfinalizer(lambda: db.connection.drop_database(db_name))
return MongoEngineUserDatastore(db, User, Role)
@pytest.fixture()
def sqlalchemy_datastore(request, app, tmpdir):
from flask_sqlalchemy import SQLAlchemy
f, path = tempfile.mkstemp(prefix='flask-security-test-db', suffix='.db', dir=str(tmpdir))
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + path
db = SQLAlchemy(app)
roles_users = db.Table(
'roles_users',
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))
class Role(db.Model, RoleMixin):
id = db.Column(db.Integer(), primary_key=True)
name = db.Column(db.String(80), unique=True)
description = db.Column(db.String(255))
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True)
username = db.Column(db.String(255))
password = db.Column(db.String(255))
last_login_at = db.Column(db.DateTime())
current_login_at = db.Column(db.DateTime())
last_login_ip = db.Column(db.String(100))
current_login_ip = db.Column(db.String(100))
login_count = db.Column(db.Integer)
active = db.Column(db.Boolean())
confirmed_at = db.Column(db.DateTime())
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
with app.app_context():
db.create_all()
request.addfinalizer(lambda: os.remove(path))
return SQLAlchemyUserDatastore(db, User, Role)
@pytest.fixture()
def peewee_datastore(request, app, tmpdir):
from peewee import TextField, DateTimeField, IntegerField, BooleanField, ForeignKeyField
from flask_peewee.db import Database
f, path = tempfile.mkstemp(prefix='flask-security-test-db', suffix='.db', dir=str(tmpdir))
app.config['DATABASE'] = {
'name': path,
'engine': 'peewee.SqliteDatabase'
}
db = Database(app)
class Role(db.Model, RoleMixin):
name = TextField(unique=True)
description = TextField(null=True)
class User(db.Model, UserMixin):
email = TextField()
username = TextField()
password = TextField()
last_login_at = DateTimeField(null=True)
current_login_at = DateTimeField(null=True)
last_login_ip = TextField(null=True)
current_login_ip = TextField(null=True)
login_count = IntegerField(null=True)
active = BooleanField(default=True)
confirmed_at = DateTimeField(null=True)
class UserRoles(db.Model):
""" Peewee does not have built-in many-to-many support, so we have to
create this mapping class to link users to roles."""
user = ForeignKeyField(User, related_name='roles')
role = ForeignKeyField(Role, related_name='users')
name = property(lambda self: self.role.name)
description = property(lambda self: self.role.description)
with app.app_context():
for Model in (Role, User, UserRoles):
Model.create_table()
request.addfinalizer(lambda: os.remove(path))
return PeeweeUserDatastore(db, User, Role, UserRoles)
@pytest.fixture()
def sqlalchemy_app(app, sqlalchemy_datastore):
def create():
app.security = Security(app, datastore=sqlalchemy_datastore)
return app
return create
@pytest.fixture()
def peewee_app(app, peewee_datastore):
def create():
app.security = Security(app, datastore=peewee_datastore)
return app
return create
@pytest.fixture()
def mongoengine_app(app, mongoengine_datastore):
def create():
app.security = Security(app, datastore=mongoengine_datastore)
return app
return create
@pytest.fixture(params=['sqlalchemy', 'mongoengine', 'peewee'])
def client(request, sqlalchemy_app, mongoengine_app, peewee_app):
if request.param == 'sqlalchemy':
app = sqlalchemy_app()
elif request.param == 'mongoengine':
app = mongoengine_app()
elif request.param == 'peewee':
app = peewee_app()
populate_data(app)
return app.test_client()
@pytest.fixture()
def get_message(app):
def fn(key, **kwargs):
rv = app.config['SECURITY_MSG_' + key][0] % kwargs
return rv.encode('utf-8')
return fn
-277
View File
@@ -1,277 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import with_statement
import base64
import simplejson as json
try:
from cookielib import Cookie
except ImportError:
from http.cookiejar import Cookie
from werkzeug.utils import parse_cookie
from tests import SecurityTest
def get_cookies(rv):
cookies = {}
for value in rv.headers.get_all("Set-Cookie"):
cookies.update(parse_cookie(value))
return cookies
class DefaultSecurityTests(SecurityTest):
def test_instance(self):
self.assertIsNotNone(self.app)
self.assertIsNotNone(self.app.security)
self.assertIsNotNone(self.app.security.pwd_context)
def test_login_view(self):
r = self._get('/login')
self.assertIn(b'<h1>Login</h1>', r.data)
def test_authenticate(self):
r = self.authenticate()
self.assertIn(b'Hello matt@lp.com', r.data)
def test_authenticate_case_insensitive_email(self):
r = self.authenticate(email='MATT@lp.com')
self.assertIn(b'Hello matt@lp.com', r.data)
def test_unprovided_username(self):
r = self.authenticate("")
self.assertIn(self.get_message('EMAIL_NOT_PROVIDED').encode('utf-8'), r.data)
def test_unprovided_password(self):
r = self.authenticate(password="")
self.assertIn(self.get_message('PASSWORD_NOT_PROVIDED').encode('utf-8'), r.data)
def test_invalid_user(self):
r = self.authenticate(email="bogus@bogus.com")
self.assertIn(self.get_message('USER_DOES_NOT_EXIST').encode('utf-8'), r.data)
def test_bad_password(self):
r = self.authenticate(password="bogus")
self.assertIn(self.get_message('INVALID_PASSWORD').encode('utf-8'), r.data)
def test_inactive_user(self):
r = self.authenticate("tiya@lp.com", "password")
self.assertIn(self.get_message('DISABLED_ACCOUNT').encode('utf-8'), r.data)
def test_logout(self):
self.authenticate()
r = self.logout()
self.assertIsHomePage(r.data)
def test_unauthorized_access(self):
self.logout()
r = self._get('/profile', follow_redirects=True)
self.assertIn(b'<li class="info">Please log in to access this page.</li>', r.data)
def test_authorized_access(self):
self.authenticate()
r = self._get("/profile")
self.assertIn(b'profile', r.data)
def test_valid_admin_role(self):
self.authenticate()
r = self._get("/admin")
self.assertIn(b'Admin Page', r.data)
def test_invalid_admin_role(self):
self.authenticate("joe@lp.com")
r = self._get("/admin", follow_redirects=True)
self.assertIsHomePage(r.data)
def test_roles_accepted(self):
for user in ("matt@lp.com", "joe@lp.com"):
self.authenticate(user)
r = self._get("/admin_or_editor")
self.assertIn(b'Admin or Editor Page', r.data)
self.logout()
self.authenticate("jill@lp.com")
r = self._get("/admin_or_editor", follow_redirects=True)
self.assertIsHomePage(r.data)
def test_unauthenticated_role_required(self):
r = self._get('/admin', follow_redirects=True)
self.assertIn(self.get_message('UNAUTHORIZED').encode('utf-8'), r.data)
def test_multiple_role_required(self):
for user in ("matt@lp.com", "joe@lp.com"):
self.authenticate(user)
r = self._get("/admin_and_editor", follow_redirects=True)
self.assertIsHomePage(r.data)
self._get('/logout')
self.authenticate('dave@lp.com')
r = self._get("/admin_and_editor", follow_redirects=True)
self.assertIn(b'Admin and Editor Page', r.data)
def test_ok_json_auth(self):
r = self.json_authenticate()
data = json.loads(r.data)
self.assertEquals(data['meta']['code'], 200)
self.assertIn('authentication_token', data['response']['user'])
def test_invalid_json_auth(self):
r = self.json_authenticate(password='junk')
self.assertIn(b'"code": 400', r.data)
def test_token_auth_via_querystring_valid_token(self):
r = self.json_authenticate()
data = json.loads(r.data)
token = data['response']['user']['authentication_token']
r = self._get('/token?auth_token=' + token)
self.assertIn(b'Token Authentication', r.data)
def test_token_auth_via_header_valid_token(self):
r = self.json_authenticate()
data = json.loads(r.data)
token = data['response']['user']['authentication_token']
headers = {"Authentication-Token": token}
r = self._get('/token', headers=headers)
self.assertIn(b'Token Authentication', r.data)
def test_token_auth_via_querystring_invalid_token(self):
r = self._get('/token?auth_token=X')
self.assertEqual(401, r.status_code)
def test_token_auth_via_header_invalid_token(self):
r = self._get('/token', headers={"Authentication-Token": 'X'})
self.assertEqual(401, r.status_code)
def test_http_auth(self):
r = self._get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8')
})
self.assertIn(b'HTTP Authentication', r.data)
def test_http_auth_no_authorization(self):
r = self._get('/http', headers={})
self.assertIn(b'<h1>Unauthorized</h1>', r.data)
self.assertIn('WWW-Authenticate', r.headers)
self.assertEquals('Basic realm="Login Required"',
r.headers['WWW-Authenticate'])
def test_invalid_http_auth_invalid_username(self):
r = self._get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"bogus:bogus").decode('utf-8')
})
self.assertIn(b'<h1>Unauthorized</h1>', r.data)
self.assertIn('WWW-Authenticate', r.headers)
self.assertEquals('Basic realm="Login Required"',
r.headers['WWW-Authenticate'])
def test_invalid_http_auth_bad_password(self):
r = self._get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8')
})
self.assertIn(b'<h1>Unauthorized</h1>', r.data)
self.assertIn('WWW-Authenticate', r.headers)
self.assertEquals('Basic realm="Login Required"',
r.headers['WWW-Authenticate'])
def test_custom_http_auth_realm(self):
r = self._get('/http_custom_realm', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8')
})
self.assertIn(b'<h1>Unauthorized</h1>', r.data)
self.assertIn('WWW-Authenticate', r.headers)
self.assertEquals('Basic realm="My Realm"',
r.headers['WWW-Authenticate'])
def test_multi_auth_basic(self):
r = self._get('/multi_auth', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8')
})
self.assertIn(b'Basic', r.data)
def test_multi_auth_token(self):
r = self.json_authenticate()
data = json.loads(r.data)
token = data['response']['user']['authentication_token']
r = self._get('/multi_auth?auth_token=' + token)
self.assertIn(b'Token', r.data)
def test_multi_auth_session(self):
self.authenticate()
r = self._get('/multi_auth')
self.assertIn(b'Session', r.data)
def test_user_deleted_during_session_reverts_to_anonymous_user(self):
self.authenticate()
with self.app.test_request_context('/'):
user = self.app.security.datastore.find_user(email='matt@lp.com')
self.app.security.datastore.delete_user(user)
self.app.security.datastore.commit()
r = self._get('/')
self.assertNotIn(b'Hello matt@lp.com', r.data)
def test_remember_token(self):
r = self.authenticate(follow_redirects=False)
self.client.cookie_jar.clear_session_cookies()
r = self._get('/profile')
self.assertIn(b'profile', r.data)
def test_token_loader_does_not_fail_with_invalid_token(self):
c = Cookie(version=0, name='remember_token', value='None', port=None,
port_specified=False, domain='www.example.com',
domain_specified=False, domain_initial_dot=False, path='/',
path_specified=True, secure=False, expires=None,
discard=True, comment=None, comment_url=None,
rest={'HttpOnly': None}, rfc2109=False)
self.client.cookie_jar.set_cookie(c)
r = self._get('/')
self.assertNotIn(b'BadSignature', r.data)
class MongoEngineSecurityTests(DefaultSecurityTests):
def _create_app(self, auth_config, **kwargs):
from tests.test_app.mongoengine import create_app
return create_app(auth_config, **kwargs)
class PeeweeSecurityTests(DefaultSecurityTests):
def _create_app(self, auth_config, **kwargs):
from tests.test_app.peewee_app import create_app
return create_app(auth_config, **kwargs)
class DefaultDatastoreTests(SecurityTest):
def test_add_role_to_user(self):
r = self._get('/coverage/add_role_to_user')
self.assertIn(b'success', r.data)
def test_remove_role_from_user(self):
r = self._get('/coverage/remove_role_from_user')
self.assertIn(b'success', r.data)
def test_activate_user(self):
r = self._get('/coverage/activate_user')
self.assertIn(b'success', r.data)
def test_deactivate_user(self):
r = self._get('/coverage/deactivate_user')
self.assertIn(b'success', r.data)
def test_invalid_role(self):
r = self._get('/coverage/invalid_role')
self.assertIn(b'success', r.data)
class MongoEngineDatastoreTests(DefaultDatastoreTests):
def _create_app(self, auth_config, **kwargs):
from tests.test_app.mongoengine import create_app
return create_app(auth_config, **kwargs)
-244
View File
@@ -1,244 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import with_statement
from flask_security.utils import capture_registrations, \
capture_reset_password_requests, capture_signals
from flask_security.signals import user_registered, user_confirmed, \
confirm_instructions_sent, login_instructions_sent, \
password_reset, password_changed, reset_password_instructions_sent
from tests import SecurityTest
def compare_user(a, b):
"""Helper to compare two users."""
return a.id == b.id and a.email == b.email and a.password == b.password
class SignalTest(SecurityTest):
def _create_app(self, auth_config, **kwargs):
from tests.test_app.mongoengine import create_app
return create_app(auth_config, **kwargs)
class RegisterableSignalsTests(SignalTest):
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
}
def test_register(self):
e = 'dude@lp.com'
with capture_signals() as mocks:
self.register(e)
user = self.app.security.datastore.find_user(email='dude@lp.com')
self.assertEqual(mocks.signals_sent(), set([user_registered]))
calls = mocks[user_registered]
self.assertEqual(len(calls), 1)
args, kwargs = calls[0]
self.assertTrue(compare_user(kwargs['user'], user))
self.assertIn('confirm_token', kwargs)
self.assertEqual(args[0], self.app)
def test_register_without_password(self):
e = 'dude@lp.com'
with capture_signals() as mocks:
self.register(e, password='')
self.assertEqual(mocks.signals_sent(), set())
class ConfirmableSignalsTests(SignalTest):
AUTH_CONFIG = {
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
}
def test_confirm(self):
e = 'dude@lp.com'
with capture_registrations() as registrations:
self.register(e)
token = registrations[0]['confirm_token']
with capture_signals() as mocks:
self.client.get('/confirm/' + token, follow_redirects=True)
user = self.app.security.datastore.find_user(email='dude@lp.com')
self.assertTrue(mocks.signals_sent(), set([user_confirmed]))
calls = mocks[user_confirmed]
self.assertEqual(len(calls), 1)
args, kwargs = calls[0]
self.assertEqual(args[0], self.app)
self.assertTrue(compare_user(kwargs['user'], user))
def test_confirm_bad_token(self):
e = 'dude@lp.com'
with capture_registrations():
self.register(e)
with capture_signals() as mocks:
self.client.get('/confirm/bogus', follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
def test_confirm_twice(self):
e = 'dude@lp.com'
with capture_registrations() as registrations:
self.register(e)
token = registrations[0]['confirm_token']
self.client.get('/confirm/' + token, follow_redirects=True)
self.logout()
with capture_signals() as mocks:
self.client.get('/confirm/' + token, follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set([user_confirmed]))
# TODO: is that the desired behaviour?
def test_resend_confirmation(self):
e = 'dude@lp.com'
self.register(e)
with capture_signals() as mocks:
self._post('/confirm', data={'email': e})
user = self.app.security.datastore.find_user(email='dude@lp.com')
self.assertEqual(mocks.signals_sent(), set([confirm_instructions_sent]))
calls = mocks[confirm_instructions_sent]
self.assertEqual(len(calls), 1)
args, kwargs = calls[0]
self.assertTrue(compare_user(kwargs['user'], user))
self.assertEqual(args[0], self.app)
def test_send_confirmation_bad_email(self):
with capture_signals() as mocks:
self._post('/confirm', data=dict(email='bogus@bogus.com'))
self.assertEqual(mocks.signals_sent(), set())
class RecoverableSignalsTests(SignalTest):
AUTH_CONFIG = {
'SECURITY_RECOVERABLE': True,
'SECURITY_RESET_PASSWORD_ERROR_VIEW': '/',
'SECURITY_POST_FORGOT_VIEW': '/'
}
def test_reset_password_request(self):
with capture_signals() as mocks:
self._post('/reset', data=dict(email='joe@lp.com'),
follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set([reset_password_instructions_sent]))
user = self.app.security.datastore.find_user(email='joe@lp.com')
calls = mocks[reset_password_instructions_sent]
self.assertEqual(len(calls), 1)
args, kwargs = calls[0]
self.assertTrue(compare_user(kwargs['user'], user))
self.assertIn('token', kwargs)
self.assertEqual(args[0], self.app)
def test_reset_password(self):
with capture_reset_password_requests() as requests:
self._post('/reset', data=dict(email='joe@lp.com'),
follow_redirects=True)
token = requests[0]['token']
with capture_signals() as mocks:
data = dict(password='newpassword', password_confirm='newpassword')
self._post('/reset/' + token, data, follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set([password_reset]))
user = self.app.security.datastore.find_user(email='joe@lp.com')
calls = mocks[password_reset]
self.assertEqual(len(calls), 1)
args, kwargs = calls[0]
self.assertTrue(compare_user(kwargs['user'], user))
self.assertEqual(args[0], self.app)
def test_reset_password_invalid_emails(self):
with capture_signals() as mocks:
self._post('/reset', data=dict(email='nobody@lp.com'),
follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
def test_reset_password_invalid_token(self):
with capture_signals() as mocks:
data = dict(password='newpassword', password_confirm='newpassword')
self._post('/reset/bogus', data, follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
class ChangeableSignalsTests(SignalTest):
AUTH_CONFIG = {
'SECURITY_CHANGEABLE': True,
}
def test_change_password(self):
self.authenticate('joe@lp.com')
with capture_signals() as mocks:
with self.client as client:
client.post('/change',
data=dict(password='password',
new_password='newpassword',
new_password_confirm='newpassword'))
self.assertEqual(mocks.signals_sent(), set([password_changed]))
user = self.app.security.datastore.find_user(email='joe@lp.com')
calls = mocks[password_changed]
self.assertEqual(len(calls), 1)
args, kwargs = calls[0]
self.assertTrue(compare_user(kwargs['user'], user))
self.assertEqual(args[0], self.app)
def test_change_password_invalid_password(self):
with capture_signals() as mocks:
self.client.post('/change',
data=dict(password='notpassword',
new_password='newpassword',
new_password_confirm='newpassword'),
follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
def test_change_password_bad_password(self):
with capture_signals() as mocks:
self.client.post('/change',
data=dict(password='notpassword',
new_password='a',
new_password_confirm='a'),
follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
def test_change_password_mismatch_password(self):
with capture_signals() as mocks:
self.client.post('/change',
data=dict(password='password',
new_password='newpassword',
new_password_confirm='notnewpassword'),
follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
class PasswordlessTests(SignalTest):
AUTH_CONFIG = {
'SECURITY_PASSWORDLESS': True
}
def test_login_request_for_inactive_user(self):
with capture_signals() as mocks:
self._post('/login', data=dict(email='tiya@lp.com'),
follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
def test_login_request_for_invalid_email(self):
with capture_signals() as mocks:
self._post('/login', data=dict(email='nobody@lp.com'),
follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set())
def test_request_login_token_sends_email_and_can_login(self):
e = 'matt@lp.com'
with capture_signals() as mocks:
self._post('/login', data=dict(email=e), follow_redirects=True)
self.assertEqual(mocks.signals_sent(), set([login_instructions_sent]))
user = self.app.security.datastore.find_user(email='matt@lp.com')
calls = mocks[login_instructions_sent]
self.assertEqual(len(calls), 1)
args, kwargs = calls[0]
self.assertTrue(compare_user(kwargs['user'], user))
self.assertIn('login_token', kwargs)
self.assertEqual(args[0], self.app)
@@ -0,0 +1 @@
CUSTOM CHANGE PASSWORD
-185
View File
@@ -1,185 +0,0 @@
# -*- coding: utf-8 -*-
from flask import Flask, render_template, current_app
from flask.ext.mail import Mail
from flask.ext.security import login_required, roles_required, roles_accepted
from flask.ext.security.decorators import http_auth_required, \
auth_token_required, auth_required
from flask.ext.security.utils import encrypt_password
from werkzeug.local import LocalProxy
ds = LocalProxy(lambda: current_app.extensions['security'].datastore)
def create_app(config):
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'secret'
app.config['TESTING'] = True
app.config['LOGIN_DISABLED'] = False
for key, value in config.items():
app.config[key] = value
mail = Mail(app)
app.extensions['mail'] = mail
@app.route('/')
def index():
return render_template('index.html', content='Home Page')
@app.route('/profile')
@login_required
def profile():
return render_template('index.html', content='Profile Page')
@app.route('/post_login')
@login_required
def post_login():
return render_template('index.html', content='Post Login')
@app.route('/http')
@http_auth_required
def http():
return 'HTTP Authentication'
@app.route('/http_custom_realm')
@http_auth_required('My Realm')
def http_custom_realm():
return render_template('index.html', content='HTTP Authentication')
@app.route('/token')
@auth_token_required
def token():
return render_template('index.html', content='Token Authentication')
@app.route('/multi_auth')
@auth_required('session', 'token', 'basic')
def multi_auth():
return render_template('index.html', content='Session, Token, Basic auth')
@app.route('/post_logout')
def post_logout():
return render_template('index.html', content='Post Logout')
@app.route('/post_register')
def post_register():
return render_template('index.html', content='Post Register')
@app.route('/admin')
@roles_required('admin')
def admin():
return render_template('index.html', content='Admin Page')
@app.route('/admin_and_editor')
@roles_required('admin', 'editor')
def admin_and_editor():
return render_template('index.html', content='Admin and Editor Page')
@app.route('/admin_or_editor')
@roles_accepted('admin', 'editor')
def admin_or_editor():
return render_template('index.html', content='Admin or Editor Page')
@app.route('/unauthorized')
def unauthorized():
return render_template('unauthorized.html')
@app.route('/coverage/add_role_to_user')
def add_role_to_user():
u = ds.find_user(email='joe@lp.com')
r = ds.find_role('admin')
ds.add_role_to_user(u, r)
return 'success'
@app.route('/coverage/remove_role_from_user')
def remove_role_from_user():
u = ds.find_user(email='matt@lp.com')
ds.remove_role_from_user(u, 'admin')
return 'success'
@app.route('/coverage/deactivate_user')
def deactivate_user():
u = ds.find_user(email='matt@lp.com')
ds.deactivate_user(u)
return 'success'
@app.route('/coverage/activate_user')
def activate_user():
u = ds.find_user(email='tiya@lp.com')
ds.activate_user(u)
return 'success'
@app.route('/coverage/invalid_role')
def invalid_role():
return 'success' if ds.find_role('bogus') is None else 'failure'
@app.route('/page1')
def page_1():
return 'Page 1'
return app
def create_roles():
for role in ('admin', 'editor', 'author'):
ds.create_role(name=role)
ds.commit()
def create_users(count=None):
users = [('matt@lp.com', 'matt', 'password', ['admin'], True),
('joe@lp.com', 'joe', 'password', ['editor'], True),
('dave@lp.com', 'dave', 'password', ['admin', 'editor'], True),
('jill@lp.com', 'jill', 'password', ['author'], True),
('tiya@lp.com', 'tiya', 'password', [], False)]
count = count or len(users)
for u in users[:count]:
pw = encrypt_password(u[2])
roles = [ds.find_or_create_role(rn) for rn in u[3]]
ds.commit()
user = ds.create_user(email=u[0], username=u[1], password=pw, active=u[4])
ds.commit()
for role in roles:
ds.add_role_to_user(user, role)
ds.commit()
def populate_data(user_count=None):
create_roles()
create_users(user_count)
def add_context_processors(s):
@s.context_processor
def for_all():
return dict()
@s.forgot_password_context_processor
def forgot_password():
return dict()
@s.login_context_processor
def login():
return dict()
@s.register_context_processor
def register():
return dict()
@s.reset_password_context_processor
def reset_password():
return dict()
@s.send_confirmation_context_processor
def send_confirmation():
return dict()
@s.send_login_context_processor
def send_login():
return dict()
@s.mail_context_processor
def mail():
return dict()
-57
View File
@@ -1,57 +0,0 @@
# -*- coding: utf-8 -*-
import sys
import os
sys.path.pop(0)
sys.path.insert(0, os.getcwd())
from flask.ext.mongoengine import MongoEngine
from flask.ext.security import Security, UserMixin, RoleMixin, \
MongoEngineUserDatastore
from tests.test_app import create_app as create_base_app, populate_data, \
add_context_processors
def create_app(config, **kwargs):
app = create_base_app(config)
app.config['MONGODB_SETTINGS'] = dict(
db='flask_security_test',
host='localhost',
port=27017
)
db = MongoEngine(app)
class Role(db.Document, RoleMixin):
name = db.StringField(required=True, unique=True, max_length=80)
description = db.StringField(max_length=255)
class User(db.Document, UserMixin):
email = db.StringField(unique=True, max_length=255)
username = db.StringField(max_length=255)
password = db.StringField(required=True, max_length=255)
last_login_at = db.DateTimeField()
current_login_at = db.DateTimeField()
last_login_ip = db.StringField(max_length=100)
current_login_ip = db.StringField(max_length=100)
login_count = db.IntField()
active = db.BooleanField(default=True)
confirmed_at = db.DateTimeField()
roles = db.ListField(db.ReferenceField(Role), default=[])
@app.before_first_request
def before_first_request():
User.drop_collection()
Role.drop_collection()
populate_data(app.config.get('USER_COUNT', None))
app.security = Security(app, datastore=MongoEngineUserDatastore(db, User, Role), **kwargs)
add_context_processors(app.security)
return app
if __name__ == '__main__':
create_app({}).run()
-64
View File
@@ -1,64 +0,0 @@
# -*- coding: utf-8 -*-
import sys
import os
sys.path.pop(0)
sys.path.insert(0, os.getcwd())
from flask_peewee.db import Database
from peewee import *
from flask.ext.security import Security, UserMixin, RoleMixin, \
PeeweeUserDatastore
from tests.test_app import create_app as create_base_app, populate_data, \
add_context_processors
def create_app(config, **kwargs):
app = create_base_app(config)
app.config['DATABASE'] = {
'name': 'peewee.db',
'engine': 'peewee.SqliteDatabase'
}
db = Database(app)
class Role(db.Model, RoleMixin):
name = TextField(unique=True)
description = TextField(null=True)
class User(db.Model, UserMixin):
email = TextField()
username = TextField()
password = TextField()
last_login_at = DateTimeField(null=True)
current_login_at = DateTimeField(null=True)
last_login_ip = TextField(null=True)
current_login_ip = TextField(null=True)
login_count = IntegerField(null=True)
active = BooleanField(default=True)
confirmed_at = DateTimeField(null=True)
class UserRoles(db.Model):
""" Peewee does not have built-in many-to-many support, so we have to
create this mapping class to link users to roles."""
user = ForeignKeyField(User, related_name='roles')
role = ForeignKeyField(Role, related_name='users')
name = property(lambda self: self.role.name)
description = property(lambda self: self.role.description)
@app.before_first_request
def before_first_request():
for Model in (Role, User, UserRoles):
Model.drop_table(fail_silently=True)
Model.create_table()
populate_data(app.config.get('USER_COUNT', None))
app.security = Security(app, datastore=PeeweeUserDatastore(db, User, Role, UserRoles), **kwargs)
add_context_processors(app.security)
return app
if __name__ == '__main__':
create_app({}).run()
-60
View File
@@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
import sys
import os
sys.path.pop(0)
sys.path.insert(0, os.getcwd())
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.security import Security, UserMixin, RoleMixin, \
SQLAlchemyUserDatastore
from tests.test_app import create_app as create_base_app, populate_data, \
add_context_processors
def create_app(config, **kwargs):
app = create_base_app(config)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://'
db = SQLAlchemy(app)
roles_users = db.Table('roles_users',
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))
class Role(db.Model, RoleMixin):
id = db.Column(db.Integer(), primary_key=True)
name = db.Column(db.String(80), unique=True)
description = db.Column(db.String(255))
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True)
username = db.Column(db.String(255))
password = db.Column(db.String(255))
last_login_at = db.Column(db.DateTime())
current_login_at = db.Column(db.DateTime())
last_login_ip = db.Column(db.String(100))
current_login_ip = db.Column(db.String(100))
login_count = db.Column(db.Integer)
active = db.Column(db.Boolean())
confirmed_at = db.Column(db.DateTime())
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
@app.before_first_request
def before_first_request():
db.drop_all()
db.create_all()
populate_data(app.config.get('USER_COUNT', None))
app.security = Security(app, datastore=SQLAlchemyUserDatastore(db, User, Role), **kwargs)
add_context_processors(app.security)
return app
if __name__ == '__main__':
create_app({}).run()
+135
View File
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
"""
test_changeable
~~~~~~~~~~~~~~~
Changeable tests
"""
from flask_security.signals import password_changed
from utils import authenticate, init_app_with_options
def _get_client(app, datastore, **options):
config = {
'SECURITY_CHANGEABLE': True
}
config.update(options)
init_app_with_options(app, datastore, **config)
return app.test_client()
def test_recoverable_flag(app, sqlalchemy_datastore, get_message):
client = _get_client(app, sqlalchemy_datastore)
recorded = []
@password_changed.connect_via(app)
def on_password_changed(app, user):
recorded.append(user)
authenticate(client)
# Test change view
response = client.get('/change', follow_redirects=True)
assert b'Change password' in response.data
# Test wrong original password
response = client.post('/change', data={
'password': 'notpassword',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}, follow_redirects=True)
assert get_message('INVALID_PASSWORD') in response.data
# Test mismatch
response = client.post('/change', data={
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'notnewpassword'
}, follow_redirects=True)
assert get_message('PASSWORD_CHANGE') not in response.data
assert get_message('RETYPE_PASSWORD_MISMATCH') in response.data
# Test bad password
response = client.post('/change', data={
'password': 'password',
'new_password': 'a',
'new_password_confirm': 'a'
}, follow_redirects=True)
assert get_message('PASSWORD_CHANGE') not in response.data
assert get_message('PASSWORD_INVALID_LENGTH') in response.data
# Test same as previous
response = client.post('/change', data={
'password': 'password',
'new_password': 'password',
'new_password_confirm': 'password'
}, follow_redirects=True)
assert get_message('PASSWORD_CHANGE') not in response.data
assert get_message('PASSWORD_IS_THE_SAME') in response.data
# Test successful submit sends email notification
with app.mail.record_messages() as outbox:
response = client.post('/change', data={
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}, follow_redirects=True)
assert get_message('PASSWORD_CHANGE') in response.data
assert b'Home Page' in response.data
assert len(recorded) == 1
assert len(outbox) == 1
assert "Your password has been changed" in outbox[0].html
def test_custom_change_url(app, sqlalchemy_datastore, get_message):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_CHANGE_URL': '/custom_change'
})
authenticate(client)
response = client.get('/custom_change')
assert response.status_code == 200
def test_custom_change_template(app, sqlalchemy_datastore, get_message):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_CHANGE_PASSWORD_TEMPLATE': 'custom_security/change_password.html'
})
authenticate(client)
response = client.get('/change')
assert b'CUSTOM CHANGE PASSWORD' in response.data
def test_disable_change_emails(app, sqlalchemy_datastore):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_SEND_PASSWORD_CHANGE_EMAIL': False
})
authenticate(client)
with app.mail.record_messages() as outbox:
client.post('/change', data={
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}, follow_redirects=True)
assert len(outbox) == 0
def test_custom_post_change_view(app, sqlalchemy_datastore):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_POST_CHANGE_VIEW': '/profile',
})
authenticate(client)
response = client.post('/change', data={
'password': 'password',
'new_password': 'newpassword',
'new_password_confirm': 'newpassword'
}, follow_redirects=True)
assert b'Profile Page' in response.data
+256
View File
@@ -0,0 +1,256 @@
# -*- coding: utf-8 -*-
"""
test_common
~~~~~~~~~~~
Test common functionality
"""
import base64
try:
from cookielib import Cookie
except ImportError:
from http.cookiejar import Cookie
from utils import authenticate, json_authenticate, logout
def test_login_view(client):
response = client.get('/login')
assert b'<h1>Login</h1>' in response.data
def test_authenticate(client):
response = authenticate(client)
assert response.status_code == 302
response = authenticate(client, follow_redirects=True)
assert b'Hello matt@lp.com' in response.data
def test_authenticate_case_insensitive_email(client):
response = authenticate(client, email='MATT@lp.com', follow_redirects=True)
assert b'Hello matt@lp.com' in response.data
def test_unprovided_username(client, get_message):
response = authenticate(client, "")
assert get_message('EMAIL_NOT_PROVIDED') in response.data
def test_unprovided_password(client, get_message):
response = authenticate(client, password="")
assert get_message('PASSWORD_NOT_PROVIDED') in response.data
def test_invalid_user(client, get_message):
response = authenticate(client, email="bogus@bogus.com")
assert get_message('USER_DOES_NOT_EXIST') in response.data
def test_bad_password(client, get_message):
response = authenticate(client, password="bogus")
assert get_message('INVALID_PASSWORD') in response.data
def test_inactive_user(client, get_message):
response = authenticate(client, "tiya@lp.com", "password")
assert get_message('DISABLED_ACCOUNT') in response.data
def test_logout(client):
authenticate(client)
response = logout(client, follow_redirects=True)
assert b'Home Page' in response.data
def test_missing_session_access(client, get_message):
response = client.get('/profile', follow_redirects=True)
assert get_message('LOGIN') in response.data
def test_has_session_access(client):
authenticate(client)
response = client.get("/profile", follow_redirects=True)
assert b'profile' in response.data
def test_authorized_access(client):
authenticate(client)
response = client.get("/admin")
assert b'Admin Page' in response.data
def test_unauthorized_access(client, get_message):
authenticate(client, "joe@lp.com")
response = client.get("/admin", follow_redirects=True)
assert get_message('UNAUTHORIZED') in response.data
def test_roles_accepted(client):
for user in ("matt@lp.com", "joe@lp.com"):
authenticate(client, user)
response = client.get("/admin_or_editor")
assert b'Admin or Editor Page' in response.data
logout(client)
authenticate(client, "jill@lp.com")
response = client.get("/admin_or_editor", follow_redirects=True)
assert b'Home Page' in response.data
def test_unauthenticated_role_required(client, get_message):
response = client.get('/admin', follow_redirects=True)
assert get_message('UNAUTHORIZED') in response.data
def test_multiple_role_required(client):
for user in ("matt@lp.com", "joe@lp.com"):
authenticate(client, user)
response = client.get("/admin_and_editor", follow_redirects=True)
assert b'Home Page' in response.data
client.get('/logout')
authenticate(client, 'dave@lp.com')
response = client.get("/admin_and_editor", follow_redirects=True)
assert b'Admin and Editor Page' in response.data
def test_ok_json_auth(client):
response = json_authenticate(client)
assert response.jdata['meta']['code'] == 200
assert 'authentication_token' in response.jdata['response']['user']
def test_invalid_json_auth(client):
response = json_authenticate(client, password='junk')
assert b'"code": 400' in response.data
def test_token_auth_via_querystring_valid_token(client):
response = json_authenticate(client)
token = response.jdata['response']['user']['authentication_token']
response = client.get('/token?auth_token=' + token)
assert b'Token Authentication' in response.data
def test_token_auth_via_header_valid_token(client):
response = json_authenticate(client)
token = response.jdata['response']['user']['authentication_token']
headers = {"Authentication-Token": token}
response = client.get('/token', headers=headers)
assert b'Token Authentication' in response.data
def test_token_auth_via_querystring_invalid_token(client):
response = client.get('/token?auth_token=X')
assert 401 == response.status_code
def test_token_auth_via_header_invalid_token(client):
response = client.get('/token', headers={"Authentication-Token": 'X'})
assert 401 == response.status_code
def test_http_auth(client):
response = client.get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8')
})
assert b'HTTP Authentication' in response.data
def test_http_auth_no_authorization(client):
response = client.get('/http', headers={})
assert b'<h1>Unauthorized</h1>' in response.data
assert 'WWW-Authenticate' in response.headers
assert 'Basic realm="Login Required"' == response.headers['WWW-Authenticate']
def test_invalid_http_auth_invalid_username(client):
response = client.get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"bogus:bogus").decode('utf-8')
})
assert b'<h1>Unauthorized</h1>' in response.data
assert 'WWW-Authenticate' in response.headers
assert 'Basic realm="Login Required"' == response.headers['WWW-Authenticate']
def test_invalid_http_auth_bad_password(client):
response = client.get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8')
})
assert b'<h1>Unauthorized</h1>' in response.data
assert 'WWW-Authenticate' in response.headers
assert 'Basic realm="Login Required"' == response.headers['WWW-Authenticate']
def test_custom_http_auth_realm(client):
response = client.get('/http_custom_realm', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus").decode('utf-8')
})
assert b'<h1>Unauthorized</h1>' in response.data
assert 'WWW-Authenticate' in response.headers
assert 'Basic realm="My Realm"' == response.headers['WWW-Authenticate']
def test_multi_auth_basic(client):
response = client.get('/multi_auth', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:password").decode('utf-8')
})
assert b'Basic' in response.data
def test_multi_auth_token(client):
response = json_authenticate(client)
token = response.jdata['response']['user']['authentication_token']
response = client.get('/multi_auth?auth_token=' + token)
assert b'Token' in response.data
def test_multi_auth_session(client):
authenticate(client, )
response = client.get('/multi_auth')
assert b'Session' in response.data
def test_user_deleted_during_session_reverts_to_anonymous_user(app, client):
authenticate(client)
with app.test_request_context('/'):
user = app.security.datastore.find_user(email='matt@lp.com')
app.security.datastore.delete_user(user)
app.security.datastore.commit()
response = client.get('/')
assert b'Hello matt@lp.com' not in response.data
def test_remember_token(client):
response = authenticate(client, follow_redirects=False)
client.cookie_jar.clear_session_cookies()
response = client.get('/profile')
assert b'profile' in response.data
def test_token_loader_does_not_fail_with_invalid_token(client):
c = Cookie(version=0, name='remember_token', value='None', port=None,
port_specified=False, domain='www.example.com',
domain_specified=False, domain_initial_dot=False, path='/',
path_specified=True, secure=False, expires=None,
discard=True, comment=None, comment_url=None,
rest={'HttpOnly': None}, rfc2109=False)
client.cookie_jar.set_cookie(c)
response = client.get('/')
assert b'BadSignature' not in response.data
def test_coverage_endpoints(client):
for endpoint in [
'/coverage/add_role_to_user',
'/coverage/remove_role_from_user',
'/coverage/activate_user',
'/coverage/deactivate_user'
]:
response = client.get(endpoint)
assert b'success' in response.data
+48
View File
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""
test_configuration
~~~~~~~~~~~~~~~~~~
Basic configuration tests
"""
import base64
from utils import authenticate, logout, init_app_with_options
def test_view_configuration(app, sqlalchemy_datastore):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_LOGOUT_URL': '/custom_logout',
'SECURITY_LOGIN_URL': '/custom_login',
'SECURITY_POST_LOGIN_VIEW': '/post_login',
'SECURITY_POST_LOGOUT_VIEW': '/post_logout',
'SECURITY_DEFAULT_HTTP_AUTH_REALM': 'Custom Realm',
})
client = app.test_client()
response = client.get('/custom_login')
assert b"<h1>Login</h1>" in response.data
response = authenticate(client, endpoint='/custom_login', follow_redirects=True)
assert b'Post Login' in response.data
response = logout(client, endpoint='/custom_logout', follow_redirects=True)
assert b'Post Logout' in response.data
response = client.get('/http', headers={
'Authorization': 'Basic %s' % base64.b64encode(b"joe@lp.com:bogus")
})
assert b'<h1>Unauthorized</h1>' in response.data
assert 'WWW-Authenticate' in response.headers
assert 'Basic realm="Custom Realm"' == response.headers['WWW-Authenticate']
def test_template_configuration(app, sqlalchemy_datastore):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_LOGIN_USER_TEMPLATE': 'custom_security/login_user.html',
})
client = app.test_client()
response = client.get('/login')
assert b'CUSTOM LOGIN USER' in response.data
+166
View File
@@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
"""
test_confirmable
~~~~~~~~~~~~~~~~
Confirmable tests
"""
import time
from flask_security.signals import user_confirmed, confirm_instructions_sent
from flask_security.utils import capture_registrations
from utils import authenticate, logout, init_app_with_options
def test_confirmable_flag(app, sqlalchemy_datastore, get_message):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
})
client = app.test_client()
recorded_confirms = []
recorded_instructions_sent = []
@user_confirmed.connect_via(app)
def on_confirmed(app, user):
recorded_confirms.append(user)
@confirm_instructions_sent.connect_via(app)
def on_instructions_sent(app, user):
recorded_instructions_sent.append(user)
# Test login before confirmation
email = 'dude@lp.com'
with capture_registrations() as registrations:
response = client.post('/register', data=dict(email=email, password='password'))
assert response.status_code == 302
response = authenticate(client, email=email)
assert get_message('CONFIRMATION_REQUIRED') in response.data
# Test invalid token
response = client.get('/confirm/bogus', follow_redirects=True)
assert get_message('INVALID_CONFIRMATION_TOKEN') in response.data
# Test JSON
response = client.post('/confirm', data='{"email": "matt@lp.com"}', headers={
'Content-Type': 'application/json'
})
assert response.status_code == 200
assert response.headers['Content-Type'] == 'application/json'
assert 'user' in response.jdata['response']
assert len(recorded_instructions_sent) == 1
# Test ask for instructions with invalid email
response = client.post('/confirm', data=dict(email='bogus@bogus.com'))
assert get_message('USER_DOES_NOT_EXIST') in response.data
# Test resend instructions
response = client.post('/confirm', data=dict(email=email))
assert get_message('CONFIRMATION_REQUEST', email=email) in response.data
assert len(recorded_instructions_sent) == 2
# Test confirm
token = registrations[0]['confirm_token']
response = client.get('/confirm/' + token, follow_redirects=True)
assert get_message('EMAIL_CONFIRMED') in response.data
assert len(recorded_confirms) == 1
# Test already confirmed
response = client.get('/confirm/' + token, follow_redirects=True)
assert get_message('ALREADY_CONFIRMED') in response.data
# Test already confirmed when asking for confirmation instructions
logout(client)
response = client.get('/confirm')
assert response.status_code == 200
response = client.post('/confirm', data=dict(email=email))
assert get_message('ALREADY_CONFIRMED') in response.data
# Test user was deleted before confirmation
with capture_registrations() as registrations:
client.post('/register', data=dict(email='mary@lp.com', password='password'))
user = registrations[0]['user']
token = registrations[0]['confirm_token']
with app.app_context():
sqlalchemy_datastore.delete(user)
sqlalchemy_datastore.commit()
response = client.get('/confirm/' + token, follow_redirects=True)
assert get_message('INVALID_CONFIRMATION_TOKEN') in response.data
def test_expired_confirmation_token(app, sqlalchemy_datastore, get_message):
within = '1 milliseconds'
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
'SECURITY_CONFIRM_EMAIL_WITHIN': within
})
client = app.test_client()
with capture_registrations() as registrations:
data = dict(email='mary@lp.com', password='password')
client.post('/register', data=data, follow_redirects=True)
user = registrations[0]['user']
token = registrations[0]['confirm_token']
time.sleep(1)
response = client.get('/confirm/' + token, follow_redirects=True)
assert get_message('CONFIRMATION_EXPIRED', within=within, email=user.email) in response.data
def test_login_when_unconfirmed(app, sqlalchemy_datastore, get_message):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True
})
client = app.test_client()
data = dict(email='mary@lp.com', password='password')
response = client.post('/register', data=data, follow_redirects=True)
assert b'mary@lp.com' in response.data
def test_confirmation_different_user_when_logged_in(app, sqlalchemy_datastore, get_message):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_CONFIRMABLE': True,
'SECURITY_REGISTERABLE': True,
'SECURITY_LOGIN_WITHOUT_CONFIRMATION': True
})
client = app.test_client()
e1 = 'dude@lp.com'
e2 = 'lady@lp.com'
with capture_registrations() as registrations:
for e in e1, e2:
client.post('/register', data=dict(email=e, password='password'))
logout(client)
token1 = registrations[0]['confirm_token']
token2 = registrations[1]['confirm_token']
client.get('/confirm/' + token1, follow_redirects=True)
logout(client)
authenticate(client, email=e1)
response = client.get('/confirm/' + token2, follow_redirects=True)
assert get_message('EMAIL_CONFIRMED') in response.data
assert b'Hello lady@lp.com' in response.data
+72
View File
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
"""
test_datastore
~~~~~~~~~~~~~~
Datastore tests
"""
from pytest import raises
from flask_security import UserMixin
from flask_security.datastore import Datastore, UserDatastore
class User(UserMixin):
pass
def test_unimplemented_datastore_methods():
datastore = Datastore(None)
with raises(NotImplementedError):
datastore.put(None)
with raises(NotImplementedError):
datastore.delete(None)
def test_unimplemented_user_datastore_methods():
datastore = UserDatastore(None, None)
with raises(NotImplementedError):
datastore.find_user(None)
with raises(NotImplementedError):
datastore.find_role(None)
def test_toggle_active():
datastore = UserDatastore(None, None)
user = User()
user.active = True
assert datastore.toggle_active(user) is True
assert not user.active
assert datastore.toggle_active(user) is True
assert user.active is True
def test_deactivate_user():
datastore = UserDatastore(None, None)
user = User()
user.active = True
assert datastore.deactivate_user(user) is True
assert not user.active
def test_activate_user():
datastore = UserDatastore(None, None)
user = User()
user.active = False
assert datastore.activate_user(user) is True
assert user.active is True
def test_deactivate_returns_false_if_already_false():
datastore = UserDatastore(None, None)
user = User()
user.active = False
assert not datastore.deactivate_user(user)
def test_activate_returns_false_if_already_true():
datastore = UserDatastore(None, None)
user = User()
user.active = True
assert not datastore.activate_user(user)
+46
View File
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
test_entities
~~~~~~~~~~~~~
Entity tests
"""
from flask_security import RoleMixin, UserMixin, AnonymousUser
class Role(RoleMixin):
def __init__(self, name):
self.name = name
class User(UserMixin):
def __init__(self, roles):
self.roles = roles
def test_role_mixin_equal():
admin1 = Role('admin')
admin2 = Role('admin')
assert admin1 == admin2
def test_role_mixin_not_equal():
admin = Role('admin')
editor = Role('editor')
assert admin != editor
def test_user_mixin_has_role_with_string():
admin = Role('admin')
editor = Role('editor')
user = User([admin, editor])
assert user.has_role('admin') is True
assert user.has_role('editor') is True
assert user.has_role(admin) is True
assert user.has_role(editor) is True
def test_anonymous_user_has_no_roles():
user = AnonymousUser()
assert not user.has_role('admin')
+38
View File
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
"""
test_hashing
~~~~~~~~~~~~
hashing tests
"""
from pytest import raises
from flask_security.utils import verify_password, encrypt_password
from utils import authenticate, init_app_with_options
def test_verify_password_bcrypt(app, sqlalchemy_datastore):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_PASSWORD_HASH': 'bcrypt',
'SECURITY_PASSWORD_SALT': 'salty'
})
with app.app_context():
assert verify_password('pass', encrypt_password('pass'))
def test_login_with_bcrypt_enabled(app, sqlalchemy_datastore):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_PASSWORD_HASH': 'bcrypt',
'SECURITY_PASSWORD_SALT': 'salty'
})
response = authenticate(app.test_client(), follow_redirects=True)
assert b'Home Page' in response.data
def test_missing_hash_salt_option(app, sqlalchemy_datastore):
with raises(RuntimeError):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_PASSWORD_HASH': 'bcrypt',
})
+142
View File
@@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
"""
test_emails
~~~~~~~~~~~
Email functionality tests
"""
from flask_security import Security
from flask_security.forms import LoginForm, RegisterForm, ConfirmRegisterForm, \
SendConfirmationForm, PasswordlessLoginForm, ForgotPasswordForm, ResetPasswordForm, \
ChangePasswordForm, TextField, PasswordField, email_required, email_validator, valid_user_email
from flask_security.utils import capture_reset_password_requests
from utils import authenticate, init_app_with_options, populate_data
def test_async_email_task(app, sqlalchemy_datastore):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_RECOVERABLE': True
})
app.mail_sent = False
@app.security.send_mail_task
def send_email(msg):
app.mail_sent = True
client = app.test_client()
client.post('/reset', data=dict(email='matt@lp.com'))
assert app.mail_sent is True
def test_register_blueprint_flag(app, sqlalchemy_datastore):
app.security = Security(app, datastore=Security, register_blueprint=False)
client = app.test_client()
response = client.get('/login')
assert response.status_code == 404
def test_basic_custom_forms(app, sqlalchemy_datastore):
app.config['SECURITY_REGISTERABLE'] = True
app.config['SECURITY_RECOVERABLE'] = True
app.config['SECURITY_CHANGEABLE'] = True
class MyLoginForm(LoginForm):
email = TextField('My Login Email Address Field')
class MyRegisterForm(RegisterForm):
email = TextField('My Register Email Address Field')
class MyForgotPasswordForm(ForgotPasswordForm):
email = TextField('My Forgot Email Address Field',
validators=[email_required, email_validator, valid_user_email])
class MyResetPasswordForm(ResetPasswordForm):
password = TextField('My Reset Password Field')
class MyChangePasswordForm(ChangePasswordForm):
password = PasswordField('My Change Password Field')
app.security = Security(app,
datastore=sqlalchemy_datastore,
login_form=MyLoginForm,
register_form=MyRegisterForm,
forgot_password_form=MyForgotPasswordForm,
reset_password_form=MyResetPasswordForm,
change_password_form=MyChangePasswordForm)
populate_data(app)
client = app.test_client()
response = client.get('/login')
assert b'My Login Email Address Field' in response.data
response = client.get('/register')
assert b'My Register Email Address Field' in response.data
response = client.get('/reset')
assert b'My Forgot Email Address Field' in response.data
with capture_reset_password_requests() as requests:
response = client.post('/reset', data=dict(email='matt@lp.com'))
token = requests[0]['token']
response = client.get('/reset/' + token)
assert b'My Reset Password Field' in response.data
authenticate(client)
response = client.get('/change')
assert b'My Change Password Field' in response.data
def test_confirmable_custom_form(app, sqlalchemy_datastore):
app.config['SECURITY_REGISTERABLE'] = True
app.config['SECURITY_CONFIRMABLE'] = True
class MyRegisterForm(ConfirmRegisterForm):
email = TextField('My Register Email Address Field')
class MySendConfirmationForm(SendConfirmationForm):
email = TextField('My Send Confirmation Email Address Field')
app.security = Security(app,
datastore=sqlalchemy_datastore,
send_confirmation_form=MySendConfirmationForm,
confirm_register_form=MyRegisterForm)
client = app.test_client()
response = client.get('/register')
assert b'My Register Email Address Field' in response.data
response = client.get('/confirm')
assert b'My Send Confirmation Email Address Field' in response.data
def test_passwordless_custom_form(app, sqlalchemy_datastore):
app.config['SECURITY_PASSWORDLESS'] = True
class MyPasswordlessLoginForm(PasswordlessLoginForm):
email = TextField('My Passwordless Email Address Field')
app.security = Security(app,
datastore=sqlalchemy_datastore,
passwordless_login_form=MyPasswordlessLoginForm)
client = app.test_client()
response = client.get('/login')
assert b'My Passwordless Email Address Field' in response.data
def test_addition_identity_attributes(app, sqlalchemy_datastore):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_USER_IDENTITY_ATTRIBUTES': ('email', 'username')
})
client = app.test_client()
response = authenticate(client, email='matt', follow_redirects=True)
assert b'Hello matt@lp.com' in response.data
+97
View File
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
"""
test_passwordless
~~~~~~~~~~~~~~~~~
Passwordless tests
"""
import time
from flask_security.signals import login_instructions_sent
from flask_security.utils import capture_passwordless_login_requests
from utils import logout, init_app_with_options
def test_trackable_flag(app, sqlalchemy_datastore, get_message):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_PASSWORDLESS': True
})
client = app.test_client()
recorded = []
@login_instructions_sent.connect_via(app)
def on_instructions_sent(app, user, login_token):
recorded.append(user)
# Test disabled account
response = client.post('/login', data=dict(email='tiya@lp.com'), follow_redirects=True)
assert get_message('DISABLED_ACCOUNT') in response.data
# Test login with json and valid email
data = '{"email": "matt@lp.com", "password": "password"}'
response = client.post('/login', data=data, headers={'Content-Type': 'application/json'})
assert response.status_code == 200
assert len(recorded) == 1
# Test login with json and invalid email
data = '{"email": "nobody@lp.com", "password": "password"}'
response = client.post('/login', data=data, headers={'Content-Type': 'application/json'})
assert 'errors' in response.data
# Test sends email and shows appropriate response
with capture_passwordless_login_requests() as requests:
with app.mail.record_messages() as outbox:
response = client.post('/login', data=dict(email='matt@lp.com'), follow_redirects=True)
assert len(recorded) == 2
assert len(requests) == 1
assert len(outbox) == 1
assert 'user' in requests[0]
assert 'login_token' in requests[0]
user = requests[0]['user']
assert get_message('LOGIN_EMAIL_SENT', email=user.email) in response.data
token = requests[0]['login_token']
response = client.get('/login/' + token, follow_redirects=True)
assert get_message('PASSWORDLESS_LOGIN_SUCCESSFUL') in response.data
# Test already authenticated
response = client.get('/login/' + token, follow_redirects=True)
assert get_message('PASSWORDLESS_LOGIN_SUCCESSFUL') not in response.data
logout(client)
# Test invalid token
response = client.get('/login/bogus', follow_redirects=True)
assert get_message('INVALID_LOGIN_TOKEN') in response.data
# Test login request with invalid email
response = client.post('/login', data=dict(email='bogus@bogus.com'))
assert get_message('USER_DOES_NOT_EXIST') in response.data
def test_expired_login_token(app, sqlalchemy_datastore, get_message):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_PASSWORDLESS': True,
'SECURITY_LOGIN_WITHIN': '1 milliseconds',
})
client = app.test_client()
e = 'matt@lp.com'
with capture_passwordless_login_requests() as requests:
client.post('/login', data=dict(email=e), follow_redirects=True)
token = requests[0]['login_token']
user = requests[0]['user']
time.sleep(1)
response = client.get('/login/' + token, follow_redirects=True)
assert get_message('LOGIN_EXPIRED', within='1 milliseconds', email=user.email) in response.data
+153
View File
@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
"""
test_recoverable
~~~~~~~~~~~~~~~~
Recoverable functionality tests
"""
import time
from flask_security.signals import reset_password_instructions_sent, password_reset
from flask_security.utils import capture_reset_password_requests
from utils import authenticate, logout, init_app_with_options
def _get_client(app, datastore, **options):
config = {
'SECURITY_RECOVERABLE': True
}
config.update(options)
init_app_with_options(app, datastore, **config)
return app.test_client()
def test_recoverable_flag(app, sqlalchemy_datastore, get_message):
client = _get_client(app, sqlalchemy_datastore)
recorded_resets = []
recorded_instructions_sent = []
@password_reset.connect_via(app)
def on_password_reset(app, user):
recorded_resets.append(user)
@reset_password_instructions_sent.connect_via(app)
def on_instructions_sent(app, user, token):
recorded_instructions_sent.append(user)
# Test the reset view
response = client.get('/reset')
assert b'<h1>Send password reset instructions</h1>' in response.data
# Test submitting email to reset password creates a token and sends email
with capture_reset_password_requests() as requests:
with app.mail.record_messages() as outbox:
response = client.post('/reset', data=dict(email='joe@lp.com'), follow_redirects=True)
assert len(recorded_instructions_sent) == 1
assert len(outbox) == 1
assert response.status_code == 200
assert get_message('PASSWORD_RESET_REQUEST', email='joe@lp.com') in response.data
token = requests[0]['token']
# Test view for reset token
response = client.get('/reset/' + token)
assert b'<h1>Reset password</h1>' in response.data
# Test submitting a new password
response = client.post('/reset/' + token, data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
assert get_message('PASSWORD_RESET') in response.data
assert len(recorded_resets) == 1
logout(client)
# Test logging in with the new password
response = authenticate(client, 'joe@lp.com', 'newpassword', follow_redirects=True)
assert b'Hello joe@lp.com' in response.data
logout(client)
# Test submitting JSON
response = client.post('/reset', data='{"email": "joe@lp.com"}', headers={
'Content-Type': 'application/json'
})
assert response.headers['Content-Type'] == 'application/json'
assert 'user' in response.jdata['response']
logout(client)
# Test invalid email
response = client.post('/reset', data=dict(email='bogus@lp.com'), follow_redirects=True)
assert get_message('USER_DOES_NOT_EXIST') in response.data
logout(client)
# Test invalid token
response = client.post('/reset/bogus', data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
assert get_message('INVALID_RESET_PASSWORD_TOKEN') in response.data
# Test mangled token
token = ("WyIxNjQ2MzYiLCIxMzQ1YzBlZmVhM2VhZjYwODgwMDhhZGU2YzU0MzZjMiJd.BZEw_Q.lQyo3npdPZtcJ"
"_sNHVHP103syjM&url_id=fbb89a8328e58c181ea7d064c2987874bc54a23d")
response = client.post('/reset/' + token, data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
assert get_message('INVALID_RESET_PASSWORD_TOKEN') in response.data
def test_expired_reset_token(app, sqlalchemy_datastore, get_message):
within = '1 milliseconds'
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_RESET_PASSWORD_WITHIN': within
})
with capture_reset_password_requests() as requests:
client.post('/reset', data=dict(email='joe@lp.com'), follow_redirects=True)
user = requests[0]['user']
token = requests[0]['token']
time.sleep(1)
response = client.post('/reset/' + token, data={
'password': 'newpassword',
'password_confirm': 'newpassword'
}, follow_redirects=True)
assert get_message('PASSWORD_RESET_EXPIRED', within=within, email=user.email) in response.data
def test_custom_reset_url(app, sqlalchemy_datastore, get_message):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_RESET_URL': '/custom_reset'
})
response = client.get('/custom_reset')
assert response.status_code == 200
def test_custom_reset_templates(app, sqlalchemy_datastore):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_RESET_PASSWORD_TEMPLATE': 'custom_security/reset_password.html',
'SECURITY_FORGOT_PASSWORD_TEMPLATE': 'custom_security/forgot_password.html'
})
response = client.get('/reset')
assert b'CUSTOM FORGOT PASSWORD' in response.data
with capture_reset_password_requests() as requests:
client.post('/reset', data=dict(email='joe@lp.com'), follow_redirects=True)
token = requests[0]['token']
response = client.get('/reset/' + token)
assert b'CUSTOM RESET PASSWORD' in response.data
+106
View File
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
"""
test_registerable
~~~~~~~~~~~~~~~~~
Registerable tests
"""
from flask_security.signals import user_registered
from utils import authenticate, logout, init_app_with_options
def _get_client(app, datastore, **options):
config = {
'SECURITY_REGISTERABLE': True,
'SECURITY_POST_REGISTER_VIEW': '/post_register',
}
config.update(options)
init_app_with_options(app, datastore, **config)
return app.test_client()
def test_registerable_flag(app, sqlalchemy_datastore, get_message):
client = _get_client(app, sqlalchemy_datastore)
recorded = []
# Test the register view
response = client.get('/register')
assert b"<h1>Register</h1>" in response.data
# Test registering is successful, sends email, and fires signal
@user_registered.connect_via(app)
def on_user_registerd(app, user, confirm_token):
recorded.append(user)
data = dict(email='dude@lp.com', password='password', password_confirm='password')
with app.mail.record_messages() as outbox:
response = client.post('/register', data=data, follow_redirects=True)
assert len(recorded) == 1
assert len(outbox) == 1
assert b'Post Register' in response.data
logout(client)
# Test user can login after registering
response = authenticate(client, email='dude@lp.com', password='password')
assert response.status_code == 302
logout(client)
# Test registering with an existing email
data = dict(email='dude@lp.com', password='password', password_confirm='password')
response = client.post('/register', data=data, follow_redirects=True)
assert get_message('EMAIL_ALREADY_ASSOCIATED', email='dude@lp.com') in response.data
# Test registering with JSON
data = '{ "email": "dude2@lp.com", "password": "password"}'
response = client.post('/register', data=data, content_type='application/json')
assert response.headers['content-type'] == 'application/json'
assert response.jdata['meta']['code'] == 200
logout(client)
# Test ?next param
data = dict(email='dude3@lp.com',
password='password',
password_confirm='password')
response = client.post('/register?next=/page1', data=data, follow_redirects=True)
assert b'Page 1' in response.data
def test_custom_register_url(app, sqlalchemy_datastore):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_REGISTER_URL': '/custom_register'
})
response = client.get('/custom_register')
assert b"<h1>Register</h1>" in response.data
data = dict(email='dude@lp.com',
password='password',
password_confirm='password')
response = client.post('/custom_register', data=data, follow_redirects=True)
assert b'Post Register' in response.data
def test_custom_register_tempalate(app, sqlalchemy_datastore):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_REGISTER_USER_TEMPLATE': 'custom_security/register_user.html'
})
response = client.get('/register')
assert b'CUSTOM REGISTER USER' in response.data
def test_disable_register_emails(app, sqlalchemy_datastore):
client = _get_client(app, sqlalchemy_datastore, **{
'SECURITY_SEND_REGISTER_EMAIL': False
})
data = dict(email='dude@lp.com', password='password', password_confirm='password')
with app.mail.record_messages() as outbox:
client.post('/register', data=data, follow_redirects=True)
assert len(outbox) == 0
+30
View File
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
"""
test_trackable
~~~~~~~~~~~~~~
Trackable tests
"""
from utils import authenticate, logout, init_app_with_options
def test_trackable_flag(app, sqlalchemy_datastore):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_TRACKABLE': True
})
client = app.test_client()
e = 'matt@lp.com'
authenticate(client, email=e)
logout(client)
authenticate(client, email=e)
with app.app_context():
user = app.security.datastore.find_user(email=e)
assert user.last_login_at is not None
assert user.current_login_at is not None
assert user.last_login_ip == 'untrackable'
assert user.current_login_ip == 'untrackable'
assert user.login_count == 2
-88
View File
@@ -1,88 +0,0 @@
# -*- coding: utf-8 -*-
import unittest
from flask_security import RoleMixin, UserMixin, AnonymousUser
from flask_security.datastore import Datastore, UserDatastore
class Role(RoleMixin):
def __init__(self, name):
self.name = name
class User(UserMixin):
def __init__(self, email, roles):
self.email = email
self.roles = roles
admin = Role('admin')
admin2 = Role('admin')
editor = Role('editor')
user = User('matt@lp.com', [admin, editor])
class SecurityEntityTests(unittest.TestCase):
def test_role_mixin_equal(self):
self.assertEqual(admin, admin2)
def test_role_mixin_not_equal(self):
self.assertNotEqual(admin, editor)
def test_user_mixin_has_role_with_string(self):
self.assertTrue(user.has_role('admin'))
def test_user_mixin_has_role_with_role_obj(self):
self.assertTrue(user.has_role(Role('admin')))
def test_anonymous_user_has_no_roles(self):
au = AnonymousUser()
self.assertEqual(0, len(au.roles))
self.assertFalse(au.has_role('admin'))
class DatastoreTests(unittest.TestCase):
def setUp(self):
super(DatastoreTests, self).setUp()
self.ds = UserDatastore(None, None)
def test_unimplemented_datastore_methods(self):
ds = Datastore(None)
self.assertRaises(NotImplementedError, ds.put, None)
self.assertRaises(NotImplementedError, ds.delete, None)
def test_unimplemented_user_datastore_methods(self):
self.assertRaises(NotImplementedError, self.ds.find_user, None)
self.assertRaises(NotImplementedError, self.ds.find_role, None)
def test_toggle_active(self):
user.active = True
rv = self.ds.toggle_active(user)
self.assertTrue(rv)
self.assertFalse(user.active)
rv = self.ds.toggle_active(user)
self.assertTrue(rv)
self.assertTrue(user.active)
def test_deactivate_user(self):
user.active = True
rv = self.ds.deactivate_user(user)
self.assertTrue(rv)
self.assertFalse(user.active)
def test_activate_user(self):
ds = UserDatastore(None, None)
user.active = False
ds.activate_user(user)
self.assertTrue(user.active)
def test_deactivate_returns_false_if_already_false(self):
user.active = False
self.assertFalse(self.ds.deactivate_user(user))
def test_activate_returns_false_if_already_true(self):
user.active = True
self.assertFalse(self.ds.activate_user(user))
+80
View File
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
"""
utils
~~~~~
Test utils
"""
from flask import Response as BaseResponse, json
from flask_security import Security
from flask_security.utils import encrypt_password
_missing = object
def authenticate(client, email="matt@lp.com", password="password", endpoint=None, **kwargs):
data = dict(email=email, password=password, remember='y')
return client.post(endpoint or '/login', data=data, **kwargs)
def json_authenticate(client, email="matt@lp.com", password="password", endpoint=None):
data = '{"email": "%s", "password": "%s"}' % (email, password)
return client.post(endpoint or '/login', content_type="application/json", data=data)
def logout(client, endpoint=None, **kwargs):
return client.get(endpoint or '/logout', **kwargs)
def create_roles(ds):
for role in ('admin', 'editor', 'author'):
ds.create_role(name=role)
ds.commit()
def create_users(ds, count=None):
users = [('matt@lp.com', 'matt', 'password', ['admin'], True),
('joe@lp.com', 'joe', 'password', ['editor'], True),
('dave@lp.com', 'dave', 'password', ['admin', 'editor'], True),
('jill@lp.com', 'jill', 'password', ['author'], True),
('tiya@lp.com', 'tiya', 'password', [], False)]
count = count or len(users)
for u in users[:count]:
pw = encrypt_password(u[2])
roles = [ds.find_or_create_role(rn) for rn in u[3]]
ds.commit()
user = ds.create_user(email=u[0], username=u[1], password=pw, active=u[4])
ds.commit()
for role in roles:
ds.add_role_to_user(user, role)
ds.commit()
def populate_data(app, user_count=None):
ds = app.security.datastore
with app.app_context():
create_roles(ds)
create_users(ds, user_count)
class Response(BaseResponse): # pragma: no cover
@property
def jdata(self):
rv = getattr(self, '_cached_jdata', _missing)
if rv is not _missing:
return rv
try:
self._cached_jdata = json.loads(self.data)
except ValueError:
raise Exception('Invalid JSON response')
return self._cached_jdata
def init_app_with_options(app, datastore, **options):
app.config.update(**options)
app.security = Security(app, datastore=datastore)
populate_data(app)