mirror of
https://github.com/wassname/pyramid_formalchemy.git
synced 2026-06-27 16:10:40 +08:00
take care of security
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class Base(object):
|
||||
|
||||
@@ -12,10 +15,27 @@ class Base(object):
|
||||
request.model = self.__models__
|
||||
request.forms = self.__forms__
|
||||
request.fa_url = self.fa_url
|
||||
request.model_class = None
|
||||
request.model_name = None
|
||||
request.model_id = None
|
||||
request.format = 'html'
|
||||
|
||||
def get_model(self):
|
||||
request = self.request
|
||||
if request.model_class:
|
||||
return request.model_class
|
||||
if request.model_name:
|
||||
if isinstance(request.model, list):
|
||||
for model in request.model:
|
||||
if model.__name__ == request.model_name:
|
||||
request.model_class = model
|
||||
return request.model_class
|
||||
elif hasattr(request.model, request.model_name):
|
||||
request.model_class = getattr(request.model, request.model_name)
|
||||
return request.model_class
|
||||
raise NotFound()
|
||||
|
||||
|
||||
class Models(Base):
|
||||
|
||||
def __init__(self, request):
|
||||
@@ -31,6 +51,9 @@ class Models(Base):
|
||||
return self
|
||||
model = ModelListing(self.request, item)
|
||||
model.__parent__ = self
|
||||
if hasattr(model, '__acl__'):
|
||||
# propagate permissions to parent
|
||||
self.__acl__ = model.__acl__
|
||||
return model
|
||||
|
||||
class ModelListing(Base):
|
||||
@@ -38,6 +61,10 @@ class ModelListing(Base):
|
||||
def __init__(self, request, name):
|
||||
Base.__init__(self, request, name)
|
||||
request.model_name = name
|
||||
model = self.get_model()
|
||||
if hasattr(model, '__acl__'):
|
||||
# get permissions from SA class
|
||||
self.__acl__ = model.__acl__
|
||||
|
||||
def fa_url(self, *args):
|
||||
args = args[1:]
|
||||
|
||||
@@ -87,16 +87,6 @@ class ModelView(object):
|
||||
models[key] = request.fa_url(key, request.format)
|
||||
return self.render(models=models)
|
||||
|
||||
def get_model(self):
|
||||
request = self.request
|
||||
if isinstance(request.model, list):
|
||||
for model in request.model:
|
||||
if model.__name__ == self.model_name:
|
||||
return model
|
||||
elif hasattr(request.model, self.model_name):
|
||||
return getattr(request.model, self.model_name)
|
||||
raise NotFound()
|
||||
|
||||
def sync(self, fs, id=None):
|
||||
"""sync a record. If ``id`` is None add a new record else save current one.
|
||||
|
||||
@@ -187,7 +177,7 @@ class ModelView(object):
|
||||
Default is::
|
||||
|
||||
S = self.Session()
|
||||
query = S.query(self.get_model())
|
||||
query = S.query(self.context.get_model())
|
||||
kwargs = request.environ.get('pylons.routes_dict', {})
|
||||
return Page(query, page=int(request.GET.get('page', '1')), **kwargs)
|
||||
"""
|
||||
@@ -197,7 +187,7 @@ class ModelView(object):
|
||||
if partial:
|
||||
url += "&partial=1"
|
||||
return url
|
||||
options = dict(collection=S.query(self.get_model()),
|
||||
options = dict(collection=S.query(self.context.get_model()),
|
||||
page=int(self.request.GET.get('page', '1')),
|
||||
url=get_page_url)
|
||||
options.update(kwargs)
|
||||
@@ -210,16 +200,16 @@ class ModelView(object):
|
||||
Default is::
|
||||
|
||||
S = self.Session()
|
||||
model = self.get_model()
|
||||
model = self.context.get_model()
|
||||
if id:
|
||||
model = S.query(model).get(id)
|
||||
else:
|
||||
model = model()
|
||||
return model or abort(404)
|
||||
raise NotFound()
|
||||
|
||||
"""
|
||||
S = self.Session()
|
||||
model = self.get_model()
|
||||
model = self.context.get_model()
|
||||
if id:
|
||||
model = S.query(model).get(id)
|
||||
if model:
|
||||
@@ -228,12 +218,6 @@ class ModelView(object):
|
||||
|
||||
def get_fieldset(self, id=None):
|
||||
"""return a ``FieldSet`` object bound to the correct record for ``id``.
|
||||
|
||||
Default is::
|
||||
|
||||
fs = self.FieldSet(self.get(id))
|
||||
fs.engine = fs.engine or self.engine
|
||||
return fs
|
||||
"""
|
||||
request = self.request
|
||||
if request.forms and hasattr(request.forms, self.model_name):
|
||||
@@ -246,14 +230,6 @@ class ModelView(object):
|
||||
|
||||
def get_add_fieldset(self):
|
||||
"""return a ``FieldSet`` used for add form.
|
||||
|
||||
Default is::
|
||||
|
||||
fs = self.get_fieldset()
|
||||
for field in fs.render_fields.itervalues():
|
||||
if field.is_readonly():
|
||||
del fs[field.name]
|
||||
return fs
|
||||
"""
|
||||
fs = self.get_fieldset()
|
||||
for field in fs.render_fields.itervalues():
|
||||
@@ -266,7 +242,7 @@ class ModelView(object):
|
||||
|
||||
Default is::
|
||||
|
||||
grid = self.Grid(self.get_model())
|
||||
grid = self.Grid(self.context.get_model())
|
||||
grid.engine = self.engine
|
||||
self.update_grid(grid)
|
||||
return grid
|
||||
@@ -279,7 +255,7 @@ class ModelView(object):
|
||||
g.readonly = True
|
||||
self.update_grid(g)
|
||||
return g
|
||||
grid = self.Grid(self.get_model())
|
||||
grid = self.Grid(self.context.get_model())
|
||||
grid.engine = self.engine
|
||||
self.update_grid(grid)
|
||||
return grid
|
||||
@@ -353,7 +329,7 @@ class ModelView(object):
|
||||
fs = fs.bind(data=data, session=S)
|
||||
except:
|
||||
# non SA forms
|
||||
fs = fs.bind(self.get_model(), data=data, session=S)
|
||||
fs = fs.bind(self.context.get_model(), data=data, session=S)
|
||||
if fs.validate():
|
||||
fs.sync()
|
||||
self.sync(fs)
|
||||
|
||||
@@ -22,4 +22,3 @@ def main(global_config, **settings):
|
||||
return config.make_wsgi_app()
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ from sqlalchemy.orm import scoped_session
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from zope.sqlalchemy import ZopeTransactionExtension
|
||||
from pyramid.security import Allow, ALL_PERMISSIONS
|
||||
|
||||
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
|
||||
Base = declarative_base()
|
||||
@@ -26,6 +27,16 @@ class Foo(Base):
|
||||
id = Column(Integer, primary_key=True)
|
||||
bar = Column(Unicode(255))
|
||||
|
||||
|
||||
class Bar(Base):
|
||||
__tablename__ = 'bar'
|
||||
__acl__ = [
|
||||
(Allow, 'admin', ALL_PERMISSIONS),
|
||||
(Allow, 'bar_manager', ALL_PERMISSIONS),
|
||||
]
|
||||
id = Column(Integer, primary_key=True)
|
||||
foo = Column(Unicode(255))
|
||||
|
||||
def populate():
|
||||
session = DBSession()
|
||||
model = MyModel(name=u'root',value=55)
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
from pyramid.config import Configurator
|
||||
from sqlalchemy import engine_from_config
|
||||
from pyramid.authorization import ACLAuthorizationPolicy
|
||||
from pyramid.authentication import RemoteUserAuthenticationPolicy
|
||||
from pyramid_formalchemy.resources import Models
|
||||
from pyramid.security import Allow, Authenticated, ALL_PERMISSIONS
|
||||
|
||||
from pyramidapp.models import initialize_sql
|
||||
|
||||
class ModelsWithACL(Models):
|
||||
__acl__ = [
|
||||
(Allow, 'admin', ALL_PERMISSIONS),
|
||||
(Allow, Authenticated, 'view'),
|
||||
(Allow, 'manager', 'new'),
|
||||
(Allow, ('manager', 'editor'), 'edit'),
|
||||
(Allow, 'manager', 'delete'),
|
||||
]
|
||||
|
||||
def main(global_config, **settings):
|
||||
""" This function returns a Pyramid WSGI application.
|
||||
"""
|
||||
engine = engine_from_config(settings, 'sqlalchemy.')
|
||||
initialize_sql(engine)
|
||||
config = Configurator(settings=settings,
|
||||
authentication_policy=RemoteUserAuthenticationPolicy(),
|
||||
authorization_policy=ACLAuthorizationPolicy())
|
||||
config.add_static_view('static', 'pyramidapp:static')
|
||||
config.add_route('home', '/', view='pyramidapp.views.my_view',
|
||||
view_renderer='templates/mytemplate.pt')
|
||||
|
||||
# pyramid_formalchemy's configuration
|
||||
config.include('pyramid_formalchemy')
|
||||
config.formalchemy_admin('admin', package='pyramidapp', factory=ModelsWithACL)
|
||||
|
||||
return config.make_wsgi_app()
|
||||
|
||||
|
||||
@@ -11,22 +11,16 @@ dirname = os.path.abspath(__file__)
|
||||
dirname = os.path.dirname(dirname)
|
||||
dirname = os.path.dirname(dirname)
|
||||
|
||||
def _initTestingDB():
|
||||
from sqlalchemy import create_engine
|
||||
from pyramidapp.models import initialize_sql
|
||||
session = initialize_sql(create_engine('sqlite://'))
|
||||
return session
|
||||
|
||||
class TestUI(unittest.TestCase):
|
||||
class Test_1_UI(unittest.TestCase):
|
||||
|
||||
config = os.path.join(dirname, 'test.ini')
|
||||
extra_environ = {}
|
||||
|
||||
def setUp(self):
|
||||
app = loadapp('config:%s' % self.config)
|
||||
self.app = TestApp(app)
|
||||
self.app = TestApp(app, extra_environ=self.extra_environ)
|
||||
self.config = Configurator(autocommit=True)
|
||||
self.config.begin()
|
||||
#_initTestingDB()
|
||||
|
||||
def tearDown(self):
|
||||
self.config.end()
|
||||
@@ -103,11 +97,30 @@ class TestUI(unittest.TestCase):
|
||||
# delete
|
||||
response = self.app.delete(str(data['item_url']))
|
||||
|
||||
class Test_2_Security(Test_1_UI):
|
||||
|
||||
config = os.path.join(dirname, 'security.ini')
|
||||
extra_environ = {'REMOTE_USER': 'admin'}
|
||||
|
||||
class TestJQuery(TestUI):
|
||||
def test_model_security(self):
|
||||
resp = self.app.get('/admin/', extra_environ={'REMOTE_USER': 'editor'})
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
config = os.path.join(dirname, 'test2.ini')
|
||||
resp = self.app.get('/admin/Foo', extra_environ={'REMOTE_USER': 'editor'})
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
resp = self.app.get('/admin/Foo/new', status=403, extra_environ={'REMOTE_USER': 'editor'})
|
||||
self.assertEqual(resp.status_int, 403)
|
||||
|
||||
resp = self.app.get('/admin/Bar', status=403, extra_environ={'REMOTE_USER': 'editor'})
|
||||
self.assertEqual(resp.status_int, 403)
|
||||
|
||||
resp = self.app.get('/admin/Bar', extra_environ={'REMOTE_USER': 'bar_manager'})
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
class Test_3_JQuery(Test_1_UI):
|
||||
|
||||
config = os.path.join(dirname, 'jquery.ini')
|
||||
|
||||
def test_crud(self):
|
||||
# index
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
[app:pyramidapp]
|
||||
use = egg:pyramidapp#security
|
||||
reload_templates = true
|
||||
debug_authorization = false
|
||||
debug_notfound = false
|
||||
debug_routematch = false
|
||||
debug_templates = true
|
||||
default_locale_name = en
|
||||
sqlalchemy.url = sqlite://
|
||||
|
||||
[pipeline:main]
|
||||
pipeline =
|
||||
egg:WebError#evalerror
|
||||
egg:repoze.tm2#tm
|
||||
pyramidapp
|
||||
|
||||
[server:main]
|
||||
use = egg:Paste#http
|
||||
host = 0.0.0.0
|
||||
port = 6543
|
||||
|
||||
# Begin logging configuration
|
||||
|
||||
[loggers]
|
||||
keys = root, sqlalchemy
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
level = INFO
|
||||
handlers = console
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
# "level = INFO" logs SQL queries.
|
||||
# "level = DEBUG" logs SQL queries and results.
|
||||
# "level = WARN" logs neither. (Recommended for production systems.)
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s
|
||||
|
||||
# End logging configuration
|
||||
@@ -42,6 +42,7 @@ setup(name='pyramidapp',
|
||||
[paste.app_factory]
|
||||
main = pyramidapp:main
|
||||
jquery = pyramidapp.jquery:main
|
||||
security = pyramidapp.security:main
|
||||
""",
|
||||
paster_plugins=['pyramid'],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user