mirror of
https://github.com/wassname/flask-security.git
synced 2026-06-27 16:10:11 +08:00
37908ca335
Docs: Fixed typo and made punctuation more consistent.
334 lines
10 KiB
Python
334 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
flask.ext.security.datastore
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
This module contains an user datastore classes.
|
|
|
|
:copyright: (c) 2012 by Matt Wright.
|
|
:license: MIT, see LICENSE for more details.
|
|
"""
|
|
|
|
from .utils import get_identity_attributes, string_types
|
|
|
|
|
|
class Datastore(object):
|
|
def __init__(self, db):
|
|
self.db = db
|
|
|
|
def commit(self):
|
|
pass
|
|
|
|
def put(self, model):
|
|
raise NotImplementedError
|
|
|
|
def delete(self, model):
|
|
raise NotImplementedError
|
|
|
|
|
|
class SQLAlchemyDatastore(Datastore):
|
|
def commit(self):
|
|
self.db.session.commit()
|
|
|
|
def put(self, model):
|
|
self.db.session.add(model)
|
|
return model
|
|
|
|
def delete(self, model):
|
|
self.db.session.delete(model)
|
|
|
|
|
|
class MongoEngineDatastore(Datastore):
|
|
def put(self, model):
|
|
model.save()
|
|
return model
|
|
|
|
def delete(self, model):
|
|
model.delete()
|
|
|
|
|
|
class PeeweeDatastore(Datastore):
|
|
def put(self, model):
|
|
model.save()
|
|
return model
|
|
|
|
def delete(self, model):
|
|
model.delete_instance()
|
|
|
|
|
|
class UserDatastore(object):
|
|
"""Abstracted user datastore.
|
|
|
|
:param user_model: A user model class definition
|
|
:param role_model: A role model class definition
|
|
"""
|
|
|
|
def __init__(self, user_model, role_model):
|
|
self.user_model = user_model
|
|
self.role_model = role_model
|
|
|
|
def _prepare_role_modify_args(self, user, role):
|
|
if isinstance(user, string_types):
|
|
user = self.find_user(email=user)
|
|
if isinstance(role, string_types):
|
|
role = self.find_role(role)
|
|
return user, role
|
|
|
|
def _prepare_create_user_args(self, **kwargs):
|
|
kwargs.setdefault('active', True)
|
|
roles = kwargs.get('roles', [])
|
|
for i, role in enumerate(roles):
|
|
rn = role.name if isinstance(role, self.role_model) else role
|
|
# see if the role exists
|
|
roles[i] = self.find_role(rn)
|
|
kwargs['roles'] = roles
|
|
return kwargs
|
|
|
|
def get_user(self, id_or_email):
|
|
"""Returns a user matching the specified ID or email address."""
|
|
raise NotImplementedError
|
|
|
|
def find_user(self, *args, **kwargs):
|
|
"""Returns a user matching the provided parameters."""
|
|
raise NotImplementedError
|
|
|
|
def find_role(self, *args, **kwargs):
|
|
"""Returns a role matching the provided name."""
|
|
raise NotImplementedError
|
|
|
|
def add_role_to_user(self, user, role):
|
|
"""Adds a role to a user.
|
|
|
|
:param user: The user to manipulate
|
|
:param role: The role to add to the user
|
|
"""
|
|
user, role = self._prepare_role_modify_args(user, role)
|
|
if role not in user.roles:
|
|
user.roles.append(role)
|
|
self.put(user)
|
|
return True
|
|
return False
|
|
|
|
def remove_role_from_user(self, user, role):
|
|
"""Removes a role from a user.
|
|
|
|
:param user: The user to manipulate
|
|
:param role: The role to remove from the user
|
|
"""
|
|
rv = False
|
|
user, role = self._prepare_role_modify_args(user, role)
|
|
if role in user.roles:
|
|
rv = True
|
|
user.roles.remove(role)
|
|
self.put(user)
|
|
return rv
|
|
|
|
def toggle_active(self, user):
|
|
"""Toggles a user's active status. Always returns True."""
|
|
user.active = not user.active
|
|
return True
|
|
|
|
def deactivate_user(self, user):
|
|
"""Deactivates a specified user. Returns `True` if a change was made.
|
|
|
|
:param user: The user to deactivate
|
|
"""
|
|
if user.active:
|
|
user.active = False
|
|
return True
|
|
return False
|
|
|
|
def activate_user(self, user):
|
|
"""Activates a specified user. Returns `True` if a change was made.
|
|
|
|
:param user: The user to activate
|
|
"""
|
|
if not user.active:
|
|
user.active = True
|
|
return True
|
|
return False
|
|
|
|
def create_role(self, **kwargs):
|
|
"""Creates and returns a new role from the given parameters."""
|
|
|
|
role = self.role_model(**kwargs)
|
|
return self.put(role)
|
|
|
|
def find_or_create_role(self, name, **kwargs):
|
|
"""Returns a role matching the given name or creates it with any
|
|
additionally provided parameters.
|
|
"""
|
|
kwargs["name"] = name
|
|
return self.find_role(name) or self.create_role(**kwargs)
|
|
|
|
def create_user(self, **kwargs):
|
|
"""Creates and returns a new user from the given parameters."""
|
|
kwargs = self._prepare_create_user_args(**kwargs)
|
|
user = self.user_model(**kwargs)
|
|
return self.put(user)
|
|
|
|
def delete_user(self, user):
|
|
"""Deletes the specified user.
|
|
|
|
:param user: The user to delete
|
|
"""
|
|
self.delete(user)
|
|
|
|
|
|
class SQLAlchemyUserDatastore(SQLAlchemyDatastore, UserDatastore):
|
|
"""A SQLAlchemy datastore implementation for Flask-Security that assumes the
|
|
use of the Flask-SQLAlchemy extension.
|
|
"""
|
|
def __init__(self, db, user_model, role_model):
|
|
SQLAlchemyDatastore.__init__(self, db)
|
|
UserDatastore.__init__(self, user_model, role_model)
|
|
|
|
def get_user(self, identifier):
|
|
if self._is_numeric(identifier):
|
|
return self.user_model.query.get(identifier)
|
|
for attr in get_identity_attributes():
|
|
query = getattr(self.user_model, attr).ilike(identifier)
|
|
rv = self.user_model.query.filter(query).first()
|
|
if rv is not None:
|
|
return rv
|
|
|
|
def _is_numeric(self, value):
|
|
try:
|
|
int(value)
|
|
except ValueError:
|
|
return False
|
|
return True
|
|
|
|
def find_user(self, **kwargs):
|
|
return self.user_model.query.filter_by(**kwargs).first()
|
|
|
|
def find_role(self, role):
|
|
return self.role_model.query.filter_by(name=role).first()
|
|
|
|
|
|
class MongoEngineUserDatastore(MongoEngineDatastore, UserDatastore):
|
|
"""A MongoEngine datastore implementation for Flask-Security that assumes
|
|
the use of the Flask-MongoEngine extension.
|
|
"""
|
|
def __init__(self, db, user_model, role_model):
|
|
MongoEngineDatastore.__init__(self, db)
|
|
UserDatastore.__init__(self, user_model, role_model)
|
|
|
|
def get_user(self, identifier):
|
|
from mongoengine import ValidationError
|
|
try:
|
|
return self.user_model.objects(id=identifier).first()
|
|
except ValidationError:
|
|
pass
|
|
for attr in get_identity_attributes():
|
|
query_key = '%s__iexact' % attr
|
|
query = {query_key: identifier}
|
|
rv = self.user_model.objects(**query).first()
|
|
if rv is not None:
|
|
return rv
|
|
|
|
def find_user(self, **kwargs):
|
|
try:
|
|
from mongoengine.queryset import Q, QCombination
|
|
except ImportError:
|
|
from mongoengine.queryset.visitor import Q, QCombination
|
|
from mongoengine.errors import ValidationError
|
|
|
|
queries = map(lambda i: Q(**{i[0]: i[1]}), kwargs.items())
|
|
query = QCombination(QCombination.AND, queries)
|
|
try:
|
|
return self.user_model.objects(query).first()
|
|
except ValidationError: # pragma: no cover
|
|
return None
|
|
|
|
def find_role(self, role):
|
|
return self.role_model.objects(name=role).first()
|
|
|
|
# TODO: Not sure why this was added but tests pass without it
|
|
# def add_role_to_user(self, user, role):
|
|
# rv = super(MongoEngineUserDatastore, self).add_role_to_user(user, role)
|
|
# if rv:
|
|
# self.put(user)
|
|
# return rv
|
|
|
|
|
|
class PeeweeUserDatastore(PeeweeDatastore, UserDatastore):
|
|
"""A PeeweeD datastore implementation for Flask-Security that assumes
|
|
the use of the Flask-Peewee extension.
|
|
|
|
:param user_model: A user model class definition
|
|
:param role_model: A role model class definition
|
|
:param role_link: A model implementing the many-to-many user-role relation
|
|
"""
|
|
def __init__(self, db, user_model, role_model, role_link):
|
|
PeeweeDatastore.__init__(self, db)
|
|
UserDatastore.__init__(self, user_model, role_model)
|
|
self.UserRole = role_link
|
|
|
|
def get_user(self, identifier):
|
|
try:
|
|
return self.user_model.get(self.user_model.id == identifier)
|
|
except ValueError:
|
|
pass
|
|
|
|
for attr in get_identity_attributes():
|
|
column = getattr(self.user_model, attr)
|
|
try:
|
|
return self.user_model.get(column ** identifier)
|
|
except self.user_model.DoesNotExist:
|
|
pass
|
|
|
|
def find_user(self, **kwargs):
|
|
try:
|
|
return self.user_model.filter(**kwargs).get()
|
|
except self.user_model.DoesNotExist:
|
|
return None
|
|
|
|
def find_role(self, role):
|
|
try:
|
|
return self.role_model.filter(name=role).get()
|
|
except self.role_model.DoesNotExist:
|
|
return None
|
|
|
|
def create_user(self, **kwargs):
|
|
"""Creates and returns a new user from the given parameters."""
|
|
roles = kwargs.pop('roles', [])
|
|
user = self.user_model(**self._prepare_create_user_args(**kwargs))
|
|
user = self.put(user)
|
|
for role in roles:
|
|
self.add_role_to_user(user, role)
|
|
self.put(user)
|
|
return user
|
|
|
|
def add_role_to_user(self, user, role):
|
|
"""Adds a role to a user.
|
|
|
|
:param user: The user to manipulate
|
|
:param role: The role to add to the user
|
|
"""
|
|
user, role = self._prepare_role_modify_args(user, role)
|
|
result = self.UserRole.select() \
|
|
.where(self.UserRole.user == user.id, self.UserRole.role == role.id)
|
|
if result.count():
|
|
return False
|
|
else:
|
|
self.put(self.UserRole.create(user=user.id, role=role.id))
|
|
return True
|
|
|
|
def remove_role_from_user(self, user, role):
|
|
"""Removes a role from a user.
|
|
|
|
:param user: The user to manipulate
|
|
:param role: The role to remove from the user
|
|
"""
|
|
user, role = self._prepare_role_modify_args(user, role)
|
|
result = self.UserRole.select() \
|
|
.where(self.UserRole.user == user, self.UserRole.role == role)
|
|
if result.count():
|
|
query = self.UserRole.delete().where(
|
|
self.UserRole.user == user, self.UserRole.role == role)
|
|
query.execute()
|
|
return True
|
|
else:
|
|
return False
|