515 lines
19 KiB
Python

# -*- coding: utf-8 -*-
""
# Copyright (C) 2014 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# ____________________________________________________________
from tiramisu.i18n import _
from tiramisu.setting import groups, undefined
from tiramisu.error import ConfigError
from .util import SqlAlchemyBase
import util
from sqlalchemy import not_, or_
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import Column, Integer, String, Boolean, PickleType, \
ForeignKey, Table
from sqlalchemy.orm import relationship, backref
from sqlalchemy.orm.collections import attribute_mapped_collection
from itertools import chain
def load_requires(collection_type, proxy):
def getter(obj):
if obj is None:
return None
ret = []
requires = getattr(obj, proxy.value_attr)
for require in requires:
option = util.session.query(_Base).filter_by(id=require.option).first()
ret.append(tuple([option, require.expected, require.action, require.inverse, require.transitive, require.same_action]))
return tuple(ret)
def setter(obj, value):
setattr(obj, proxy.value_attr, value)
return getter, setter
class _Require(SqlAlchemyBase):
__tablename__ = "require"
id = Column(Integer, primary_key=True)
requires_id = Column(Integer, ForeignKey("baseoption.id"), nullable=False)
requires = relationship('_RequireOption')
def __init__(self, requires):
for require in requires:
self.requires.append(_RequireOption(require))
class _RequireOption(SqlAlchemyBase):
__tablename__ = 'requireoption'
id = Column(Integer, primary_key=True)
require_id = Column(Integer, ForeignKey("require.id"), nullable=False)
option = Column(Integer, nullable=False)
_expected = relationship("_RequireExpected", collection_class=list,
cascade="all, delete-orphan")
expected = association_proxy("_expected", "expected")
#expected = Column(String)
action = Column(String, nullable=False)
inverse = Column(Boolean, default=False)
transitive = Column(Boolean, default=True)
same_action = Column(Boolean, default=True)
def __init__(self, values):
option, expected, action, inverse, transitive, same_action = values
self.option = option.id
self.expected = expected
self.action = action
self.inverse = inverse
self.transitive = transitive
self.same_action = same_action
class _RequireExpected(SqlAlchemyBase):
__tablename__ = 'expected'
id = Column(Integer, primary_key=True)
require = Column(Integer, ForeignKey('requireoption.id'), nullable=False)
expected = Column(PickleType)
def __init__(self, expected):
#FIXME ne pas creer plusieurs fois la meme _expected_
#FIXME pareil avec calc_properties
self.expected = expected
class _CalcProperties(SqlAlchemyBase):
__tablename__ = 'calcproperty'
id = Column(Integer, primary_key=True)
require = Column(Integer, ForeignKey('baseoption.id'), nullable=False)
name = Column(PickleType)
def __init__(self, name):
#FIXME ne pas creer plusieurs fois la meme _expected_
#FIXME pareil avec calc_properties
self.name = name
#____________________________________________________________
#
# properties
class _PropertyOption(SqlAlchemyBase):
__tablename__ = 'propertyoption'
id = Column(Integer, primary_key=True)
option = Column(Integer, ForeignKey('baseoption.id'), nullable=False)
name = Column(String)
def __init__(self, name):
self.name = name
#____________________________________________________________
#
# information
class _Information(SqlAlchemyBase):
__tablename__ = 'information'
id = Column(Integer, primary_key=True)
option = Column(Integer, ForeignKey('baseoption.id'), nullable=False)
key = Column(String)
value = Column(PickleType)
def __init__(self, key, value):
self.key = key
self.value = value
#____________________________________________________________
#
# callback
def load_callback_parm(collection_type, proxy):
def getter(obj):
if obj is None:
return None
ret = []
requires = getattr(obj, proxy.value_attr)
for require in requires:
if require.value is not None:
ret.append(require.value)
else:
option = util.session.query(_Base).filter_by(id=require.option).first()
ret.append((option, require.force_permissive))
return tuple(ret)
def setter(obj, value):
setattr(obj, proxy.value_attr, value)
return getter, setter
class _CallbackParamOption(SqlAlchemyBase):
__tablename__ = 'callback_param_option'
id = Column(Integer, primary_key=True)
callback_param = Column(Integer, ForeignKey('callback_param.id'))
option = Column(Integer)
force_permissive = Column(Boolean)
value = Column(PickleType)
def __init__(self, option=undefined, force_permissive=undefined, value=undefined):
if value is not undefined:
self.value = value
elif option is not undefined:
self.option = option.id
self.force_permissive = force_permissive
class _CallbackParam(SqlAlchemyBase):
__tablename__ = 'callback_param'
id = Column(Integer, primary_key=True)
callback = Column(Integer, ForeignKey('baseoption.id'))
key = Column(String)
params = relationship('_CallbackParamOption')
def __init__(self, key, params):
self.key = key
for param in params:
if isinstance(param, tuple):
if param == (None,):
self.params.append(_CallbackParamOption())
else:
self.params.append(_CallbackParamOption(option=param[0],
force_permissive=param[1]))
else:
self.params.append(_CallbackParamOption(value=param))
#____________________________________________________________
#
# consistency
consistency_table = Table('consistencyopt', SqlAlchemyBase.metadata,
Column('left_id', Integer, ForeignKey('consistency.id')),
Column('right_id', Integer, ForeignKey('baseoption.id'))
)
class _Consistency(SqlAlchemyBase):
__tablename__ = 'consistency'
id = Column(Integer, primary_key=True)
func = Column(PickleType)
params = Column(PickleType)
def __init__(self, func, all_cons_opts, params):
self.func = func
for option in all_cons_opts:
option._consistencies.append(self)
self.params = params
class _Parent(SqlAlchemyBase):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
child_id = Column(Integer)
child_name = Column(String)
parent_id = Column(Integer)
def __init__(self, parent, child):
self.parent_id = parent.id
self.child_id = child.id
self.child_name = child._name
#____________________________________________________________
#
# Base
class _Base(SqlAlchemyBase):
__tablename__ = 'baseoption'
id = Column(Integer, primary_key=True)
_name = Column(String)
#FIXME not autoload
_infos = relationship("_Information",
collection_class=attribute_mapped_collection('key'),
cascade="all, delete-orphan")
_informations = association_proxy("_infos", "value")
_default = Column(PickleType)
_default_multi = Column(PickleType)
_subdyn = Column(Integer)
_choice_values = Column(PickleType)
_cho_params = relationship('_CallbackParam',
collection_class=
attribute_mapped_collection('key'))
_choice_params = association_proxy("_cho_params", "params",
getset_factory=load_callback_parm)
_reqs = relationship("_Require", collection_class=list)
_requires = association_proxy("_reqs", "requires", getset_factory=load_requires)
_multi = Column(Integer)
_multitype = Column(String)
######
_callback = Column(PickleType)
_call_params = relationship('_CallbackParam',
collection_class=
attribute_mapped_collection('key'))
_callback_params = association_proxy("_call_params", "params",
getset_factory=load_callback_parm)
_validator = Column(PickleType)
_val_params = relationship('_CallbackParam',
collection_class=
attribute_mapped_collection('key'))
_validator_params = association_proxy("_val_params", "params",
getset_factory=load_callback_parm)
######
#FIXME pas 2 fois la meme properties dans la base ...
#FIXME not autoload
#FIXME normalement tuple ... transforme en set !
_props = relationship("_PropertyOption", collection_class=set)
_properties = association_proxy("_props", "name")
#FIXME fusion avec expected
_calc_props = relationship("_CalcProperties", collection_class=set)
_calc_properties = association_proxy("_calc_props", "name")
_warnings_only = Column(Boolean)
_readonly = Column(Boolean, default=False)
_consistencies = relationship('_Consistency', secondary=consistency_table,
backref=backref('options', enable_typechecks=False))
_type = Column(String(50))
__mapper_args__ = {
'polymorphic_identity': 'option',
'polymorphic_on': _type
}
_extra = Column(PickleType)
#FIXME devrait etre une table
_group_type = Column(String)
_is_build_cache = Column(Boolean, default=False)
def __init__(self):
util.session.add(self)
self.commit()
def commit(self):
util.session.commit()
def _add_consistency(self, func, all_cons_opts, params):
_Consistency(func, all_cons_opts, params)
def _get_consistencies(self):
return [(consistency.func, consistency.options, consistency.params)
for consistency in self._consistencies]
def _get_id(self):
return self.id
def _impl_getsubdyn(self):
return self._subdyn
def _impl_setsubdyn(self, subdyn):
self._subdyn = subdyn.id
class Cache(SqlAlchemyBase):
__tablename__ = 'cache'
id = Column(Integer, primary_key=True)
path = Column(String, nullable=False, index=True)
descr = Column(Integer, nullable=False, index=True)
parent = Column(Integer, nullable=False, index=True)
option = Column(Integer, nullable=False, index=True)
opt_type = Column(String, nullable=False, index=True)
is_subdyn = Column(Boolean, nullable=False, index=True)
subdyn_path = Column(String)
def __init__(self, descr, parent, option, path, subdyn_path):
self.descr = descr.id
self.parent = parent.id
self.option = option.id
self.path = path
self.opt_type = option.__class__.__name__
#is_subdyn = option._is_subdyn()
is_subdyn = option.impl_is_dynoptiondescription()
self.is_subdyn = is_subdyn
if is_subdyn:
self.subdyn_path = subdyn_path
class StorageOptionDescription(object):
def impl_already_build_caches(self):
return self._is_build_cache
def impl_get_opt_by_path(self, path):
ret = util.session.query(Cache).filter_by(descr=self.id, path=path).first()
if ret is None:
raise AttributeError(_('no option for path {0}').format(path))
return util.session.query(_Base).filter_by(id=ret.option).first()
def impl_get_path_by_opt(self, opt):
ret = util.session.query(Cache).filter_by(descr=self.id,
option=opt.id).first()
if ret is None:
raise AttributeError(_('no option {0} found').format(opt))
return ret.path
def impl_get_group_type(self):
return getattr(groups, self._group_type)
def impl_build_cache_option(self, descr=None, _currpath=None,
subdyn_path=None):
if descr is None:
save = True
descr = self
_currpath = []
else:
save = False
for option in self._impl_getchildren(dyn=False):
attr = option.impl_getname()
util.session.add(Cache(descr, self, option,
str('.'.join(_currpath + [attr])), subdyn_path))
if isinstance(option, StorageOptionDescription):
if option.impl_is_dynoptiondescription():
subdyn_path = '.'.join(_currpath)
_currpath.append(attr)
option.impl_build_cache_option(descr,
_currpath,
subdyn_path)
_currpath.pop()
if save:
self._is_build_cache = True
util.session.commit()
def impl_get_options_paths(self, bytype, byname, _subpath, only_first,
context):
sqlquery = util.session.query(Cache).filter_by(descr=self.id)
if bytype is None:
sqlquery = sqlquery.filter(not_(
Cache.opt_type == 'OptionDescription'))
else:
sqlquery = sqlquery.filter_by(opt_type=bytype.__name__)
query = ''
or_query = ''
if _subpath is not None:
query += _subpath + '.'
if byname is not None:
or_query = query + byname
query += '%.' + byname
if query != '':
filter_query = Cache.path.like(query)
if or_query != '':
filter_query = or_(Cache.path == or_query, filter_query)
sqlquery = sqlquery.filter(filter_query)
#if only_first:
# opt = sqlquery.first()
# if opt is None:
# return tuple()
# option = util.session.query(_Base).filter_by(id=opt.option).first()
# return ((opt.path, option),)
#else:
ret = []
for opt in sqlquery.all():
option = util.session.query(_Base).filter_by(id=opt.option).first()
if opt.is_subdyn:
name = option.impl_getname()
if byname is not None and byname.startswith(name):
found = False
for suffix in option._subdyn._impl_get_suffixes(
context):
if byname == name + suffix:
found = True
subdyn_path = opt.subdyn_path
dynpaths = opt.path[len(subdyn_path):].split('.')
path = subdyn_path
for dynpath in dynpaths:
path += '.' + dynpath + suffix
option = option._impl_to_dyn(
name + suffix, path)
break
if not found:
break
else:
ret_opt = (opt.path, option)
if only_first:
return ret_opt
ret.append(ret_opt)
return ret
def _add_children(self, child_names, children):
for child in children:
util.session.add(_Parent(self, child))
def _impl_st_getchildren(self, context, only_dyn=False):
if only_dyn is False or context is undefined:
for child in util.session.query(_Parent).filter_by(
parent_id=self.id).all():
yield(util.session.query(_Base).filter_by(id=child.child_id
).first())
else:
descr = context.cfgimpl_get_description().id
for child in util.session.query(Cache).filter_by(descr=descr,
parent=self.id
).filter_by(
is_subdyn=True).all():
yield(util.session.query(_Base).filter_by(id=child.option).first())
def _getattr(self, name, suffix=undefined, context=undefined, dyn=True):
error = False
if suffix is not undefined:
try:
if undefined in [suffix, context]: # pragma: optional cover
raise ConfigError(_("suffix and context needed if "
"it's a dyn option"))
if name.endswith(suffix):
oname = name[:-len(suffix)]
#child = self._children[1][self._children[0].index(oname)]
child = util.session.query(_Parent).filter_by(
parent_id=self.id, child_name=oname).first()
if child is None:
error = True
else:
opt = util.session.query(_Base).filter_by(
id=child.child_id).first()
return self._impl_get_dynchild(opt, suffix)
else:
error = True
except ValueError: # pragma: optional cover
error = True
else:
child = util.session.query(_Parent).filter_by(parent_id=self.id,
child_name=name
).first()
if child is None:
child = self._impl_search_dynchild(name, context=context)
if child != []:
return child
error = True
if error is False:
return util.session.query(_Base).filter_by(id=child.child_id
).first()
if error:
raise AttributeError(_('unknown Option {0} in OptionDescription {1}'
'').format(name, self.impl_getname()))
def _get_force_store_value(self):
#only option in current tree
current_ids = tuple(chain(*util.session.query(Cache.option).filter_by(
descr=self.id).all()))
for prop in util.session.query(_PropertyOption).filter(
_PropertyOption.option.in_(current_ids),
_PropertyOption.name == 'force_store_value').all():
opt = util.session.query(_Base).filter_by(id=prop.option).first()
path = self.impl_get_path_by_opt(opt)
yield (opt, path)
class StorageBase(_Base):
@declared_attr
def __mapper_args__(self):
return {'polymorphic_identity': self.__name__.lower()}