# -*- coding: utf-8 -*- # Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ "options handler global entry point" import weakref from tiramisu.error import PropertiesOptionError, ConfigError from tiramisu.option import OptionDescription, Option, SymLinkOption from tiramisu.setting import groups, Settings, default_encoding, undefined from tiramisu.storage import get_storages, get_storage, set_storage, \ _impl_getstate_setting from tiramisu.value import Values, Multi from tiramisu.i18n import _ class SubConfig(object): """Sub configuration management entry. Tree if OptionDescription's responsability. SubConfig are generated on-demand. A Config is also a SubConfig. Root Config is call context below """ __slots__ = ('_impl_context', '_impl_descr', '_impl_path') def __init__(self, descr, context, subpath=None): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` :type subpath: `str` with the path name """ # main option description if descr is not None and not isinstance(descr, OptionDescription): raise TypeError(_('descr must be an optiondescription, not {0}' ).format(type(descr))) self._impl_descr = descr # sub option descriptions if not isinstance(context, weakref.ReferenceType): raise ValueError('context must be a Weakref') self._impl_context = context self._impl_path = subpath def cfgimpl_reset_cache(self, only_expired=False, only=('values', 'settings')): "remove cache (in context)" self._cfgimpl_get_context().cfgimpl_reset_cache(only_expired, only) def cfgimpl_get_home_by_path(self, path, force_permissive=False): """:returns: tuple (config, name)""" path = path.split('.') for step in path[:-1]: self = self.getattr(step, force_permissive=force_permissive) return self, path[-1] #def __hash__(self): #FIXME # return hash(self.cfgimpl_get_description().impl_getkey(self)) #def __eq__(self, other): #FIXME # "Config's comparison" # if not isinstance(other, Config): # return False # return self.cfgimpl_get_description().impl_getkey(self) == \ # other.cfgimpl_get_description().impl_getkey(other) #def __ne__(self, other): #FIXME # "Config's comparison" # if not isinstance(other, Config): # return True # return not self == other # ______________________________________________________________________ def __iter__(self): """Pythonesque way of parsing group's ordered options. iteration only on Options (not OptionDescriptions)""" for child in self.cfgimpl_get_description().impl_getchildren(): if not isinstance(child, OptionDescription): try: yield child.impl_getname(), getattr(self, child.impl_getname()) except GeneratorExit: raise StopIteration except PropertiesOptionError: pass # option with properties def iter_all(self, force_permissive=False): """A way of parsing options **and** groups. iteration on Options and OptionDescriptions.""" for child in self.cfgimpl_get_description().impl_getchildren(): try: yield child.impl_getname(), self.getattr(child.impl_getname(), force_permissive=force_permissive) except GeneratorExit: raise StopIteration except PropertiesOptionError: pass # option with properties def iter_groups(self, group_type=None, force_permissive=False): """iteration on groups objects only. All groups are returned if `group_type` is `None`, otherwise the groups can be filtered by categories (families, or whatever). :param group_type: if defined, is an instance of `groups.GroupType` or `groups.MasterGroupType` that lives in `setting.groups` """ if group_type is not None and not isinstance(group_type, groups.GroupType): raise TypeError(_("unknown group_type: {0}").format(group_type)) for child in self.cfgimpl_get_description().impl_getchildren(): if isinstance(child, OptionDescription): try: if group_type is None or (group_type is not None and child.impl_get_group_type() == group_type): yield child.impl_getname(), self.getattr(child.impl_getname(), force_permissive=force_permissive) except GeneratorExit: raise StopIteration except PropertiesOptionError: pass # ______________________________________________________________________ def __str__(self): "Config's string representation" lines = [] for name, grp in self.iter_groups(): lines.append("[{0}]".format(name)) for name, value in self: try: lines.append("{0} = {1}".format(name, value)) except UnicodeEncodeError: lines.append("{0} = {1}".format(name, value.encode(default_encoding))) return '\n'.join(lines) __repr__ = __str__ def _cfgimpl_get_context(self): """context could be None, we need to test it context is None only if all reference to `Config` object is deleted (for example we delete a `Config` and we manipulate a reference to old `SubConfig`, `Values`, `Multi` or `Settings`) """ context = self._impl_context() if context is None: raise ConfigError(_('the context does not exist anymore')) return context def cfgimpl_get_description(self): if self._impl_descr is None: raise ConfigError(_('no option description found for this config' ' (may be GroupConfig)')) else: return self._impl_descr def cfgimpl_get_settings(self): return self._cfgimpl_get_context()._impl_settings def cfgimpl_get_values(self): return self._cfgimpl_get_context()._impl_values # ____________________________________________________________ # attribute methods def __setattr__(self, name, value): "attribute notation mechanism for the setting of the value of an option" if name.startswith('_impl_'): object.__setattr__(self, name, value) return self._setattr(name, value) def _setattr(self, name, value, force_permissive=False): if '.' in name: homeconfig, name = self.cfgimpl_get_home_by_path(name) return homeconfig.__setattr__(name, value) child = getattr(self.cfgimpl_get_description(), name) if isinstance(child, OptionDescription): raise TypeError(_("can't assign to an OptionDescription")) elif not isinstance(child, SymLinkOption): if self._impl_path is None: path = name else: path = self._impl_path + '.' + name self.cfgimpl_get_values().setitem(child, value, path, force_permissive=force_permissive) else: context = self._cfgimpl_get_context() path = context.cfgimpl_get_description().impl_get_path_by_opt( child._opt) context._setattr(path, value, force_permissive=force_permissive) def __delattr__(self, name): child = getattr(self.cfgimpl_get_description(), name) self.cfgimpl_get_values().__delitem__(child) def __getattr__(self, name): return self.getattr(name) def _getattr(self, name, force_permissive=False, validate=True): """use getattr instead of _getattr """ return self.getattr(name, force_permissive, validate) def getattr(self, name, force_permissive=False, validate=True): """ attribute notation mechanism for accessing the value of an option :param name: attribute name :return: option's value if name is an option name, OptionDescription otherwise """ # attribute access by passing a path, # for instance getattr(self, "creole.general.family.adresse_ip_eth0") if '.' in name: homeconfig, name = self.cfgimpl_get_home_by_path( name, force_permissive=force_permissive) return homeconfig.getattr(name, force_permissive=force_permissive, validate=validate) opt_or_descr = self.cfgimpl_get_description().__getattr__(name) if self._impl_path is None: subpath = name else: subpath = self._impl_path + '.' + name # symlink options #FIXME a gerer plutot dans l'option ca ... #FIXME je n'en sais rien en fait ... :/ if isinstance(opt_or_descr, SymLinkOption): context = self._cfgimpl_get_context() path = context.cfgimpl_get_description().impl_get_path_by_opt( opt_or_descr._opt) return context.getattr(path, validate=validate, force_permissive=force_permissive) elif isinstance(opt_or_descr, OptionDescription): self.cfgimpl_get_settings().validate_properties( opt_or_descr, True, False, path=subpath, force_permissive=force_permissive) return SubConfig(opt_or_descr, self._impl_context, subpath) else: return self.cfgimpl_get_values()._get_cached_item( opt_or_descr, path=subpath, validate=validate, force_permissive=force_permissive) def find(self, bytype=None, byname=None, byvalue=undefined, type_='option', check_properties=True, force_permissive=False): """ finds a list of options recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option.impl_getname() :param byvalue: filter by the option's value :returns: list of matching Option objects """ return self._cfgimpl_get_context()._find(bytype, byname, byvalue, first=False, type_=type_, _subpath=self.cfgimpl_get_path(), check_properties=check_properties, force_permissive=force_permissive) def find_first(self, bytype=None, byname=None, byvalue=undefined, type_='option', display_error=True, check_properties=True, force_permissive=False): """ finds an option recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option.impl_getname() :param byvalue: filter by the option's value :returns: list of matching Option objects """ return self._cfgimpl_get_context()._find( bytype, byname, byvalue, first=True, type_=type_, _subpath=self.cfgimpl_get_path(), display_error=display_error, check_properties=check_properties, force_permissive=force_permissive) def _find(self, bytype, byname, byvalue, first, type_='option', _subpath=None, check_properties=True, display_error=True, force_permissive=False): """ convenience method for finding an option that lives only in the subtree :param first: return only one option if True, a list otherwise :return: find list or an exception if nothing has been found """ def _filter_by_value(): if byvalue is undefined: return True try: value = self.getattr(path, force_permissive=force_permissive) if isinstance(value, Multi): return byvalue in value else: return value == byvalue except PropertiesOptionError: # a property is a restriction # upon the access of the value return False if type_ not in ('option', 'path', 'value'): raise ValueError(_('unknown type_ type {0}' 'for _find').format(type_)) find_results = [] # if value and/or check_properties are set, need all avalaible option # If first one has no good value or not good property check second one # and so on only_first = first is True and byvalue is None and check_properties is None options = self.cfgimpl_get_description().impl_get_options_paths( bytype, byname, _subpath, only_first) for path, option in options: if not _filter_by_value(): continue #remove option with propertyerror, ... if byvalue is undefined and check_properties: try: value = self.getattr(path, force_permissive=force_permissive) except PropertiesOptionError: # a property restricts the access of the value continue if type_ == 'value': retval = value elif type_ == 'path': retval = path elif type_ == 'option': retval = option if first: return retval else: find_results.append(retval) return self._find_return_results(find_results, display_error) def _find_return_results(self, find_results, display_error): if find_results == []: if display_error: raise AttributeError(_("no option found in config" " with these criteria")) else: # translation is slow raise AttributeError() else: return find_results def make_dict(self, flatten=False, _currpath=None, withoption=None, withvalue=undefined, force_permissive=False): """exports the whole config into a `dict`, for example: >>> print cfg.make_dict() {'od2.var4': None, 'od2.var5': None, 'od2.var6': None} :param flatten: returns a dict(name=value) instead of a dict(path=value) :: >>> print cfg.make_dict(flatten=True) {'var5': None, 'var4': None, 'var6': None} :param withoption: returns the options that are present in the very same `OptionDescription` than the `withoption` itself:: >>> print cfg.make_dict(withoption='var1') {'od2.var4': None, 'od2.var5': None, 'od2.var6': None, 'od2.var1': u'value', 'od1.var1': None, 'od1.var3': None, 'od1.var2': None} :param withvalue: returns the options that have the value `withvalue` :: >>> print c.make_dict(withoption='var1', withvalue=u'value') {'od2.var4': None, 'od2.var5': None, 'od2.var6': None, 'od2.var1': u'value'} :returns: dict of Option's name (or path) and values """ pathsvalues = [] if _currpath is None: _currpath = [] if withoption is None and withvalue is not undefined: raise ValueError(_("make_dict can't filtering with value without " "option")) if withoption is not None: mypath = self.cfgimpl_get_path() for path in self._cfgimpl_get_context()._find(bytype=None, byname=withoption, byvalue=withvalue, first=False, type_='path', _subpath=mypath, force_permissive=force_permissive): path = '.'.join(path.split('.')[:-1]) opt = self._cfgimpl_get_context().cfgimpl_get_description( ).impl_get_opt_by_path(path) if mypath is not None: if mypath == path: withoption = None withvalue = undefined break else: tmypath = mypath + '.' if not path.startswith(tmypath): raise AttributeError(_('unexpected path {0}, ' 'should start with {1}' '').format(path, mypath)) path = path[len(tmypath):] self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten, force_permissive=force_permissive) #withoption can be set to None below ! if withoption is None: for opt in self.cfgimpl_get_description().impl_getchildren(): path = opt.impl_getname() self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten, force_permissive=force_permissive) if _currpath == []: options = dict(pathsvalues) return options return pathsvalues def _make_sub_dict(self, opt, path, pathsvalues, _currpath, flatten, force_permissive=False): try: if isinstance(opt, OptionDescription): pathsvalues += self.getattr(path, force_permissive=force_permissive).make_dict( flatten, _currpath + path.split('.'), force_permissive=force_permissive) else: value = self.getattr(opt.impl_getname(), force_permissive=force_permissive) if flatten: name = opt.impl_getname() else: name = '.'.join(_currpath + [opt.impl_getname()]) pathsvalues.append((name, value)) except PropertiesOptionError: pass def cfgimpl_get_path(self): descr = self.cfgimpl_get_description() context_descr = self._cfgimpl_get_context().cfgimpl_get_description() return context_descr.impl_get_path_by_opt(descr) class _CommonConfig(SubConfig): "abstract base class for the Config, GroupConfig and the MetaConfig" __slots__ = ('_impl_values', '_impl_settings', '_impl_meta', '_impl_test') def _impl_build_all_caches(self): if not self.cfgimpl_get_description().impl_already_build_caches(): self.cfgimpl_get_description().impl_build_cache_consistency() self.cfgimpl_get_description().impl_validate_options() self.cfgimpl_get_description().impl_build_cache_option() def read_only(self): "read only is a global config's setting, see `settings.py`" self.cfgimpl_get_settings().read_only() def read_write(self): "read write is a global config's setting, see `settings.py`" self.cfgimpl_get_settings().read_write() def getowner(self, opt, force_permissive=False): """convenience method to retrieve an option's owner from the config itself """ if not isinstance(opt, Option) and not isinstance(opt, SymLinkOption): raise TypeError(_('opt in getowner must be an option not {0}' '').format(type(opt))) return self.cfgimpl_get_values().getowner(opt, force_permissive=force_permissive) def unwrap_from_path(self, path, force_permissive=False): """convenience method to extract and Option() object from the Config() and it is **fast**: finds the option directly in the appropriate namespace :returns: Option() """ if '.' in path: homeconfig, path = self.cfgimpl_get_home_by_path( path, force_permissive=force_permissive) return getattr(homeconfig.cfgimpl_get_description(), path) return getattr(self.cfgimpl_get_description(), path) def cfgimpl_get_path(self): return None def cfgimpl_get_meta(self): if self._impl_meta is not None: return self._impl_meta() # information def impl_set_information(self, key, value): """updates the information's attribute :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ self._impl_values.set_information(key, value) def impl_get_information(self, key, default=undefined): """retrieves one information's item :param key: the item string (ex: "help") """ return self._impl_values.get_information(key, default) # ----- state def __getstate__(self): if self._impl_meta is not None: raise ConfigError(_('cannot serialize Config with MetaConfig')) slots = set() for subclass in self.__class__.__mro__: if subclass is not object: slots.update(subclass.__slots__) slots -= frozenset(['_impl_context', '_impl_meta', '__weakref__']) state = {} for slot in slots: try: state[slot] = getattr(self, slot) except AttributeError: pass storage = self._impl_values._p_._storage if not storage.serializable: raise ConfigError(_('this storage is not serialisable, could be a ' 'none persistent storage')) state['_storage'] = {'session_id': storage.session_id, 'persistent': storage.persistent} state['_impl_setting'] = _impl_getstate_setting() return state def __setstate__(self, state): for key, value in state.items(): if key not in ['_storage', '_impl_setting']: setattr(self, key, value) set_storage('config', **state['_impl_setting']) self._impl_context = weakref.ref(self) self._impl_settings.context = weakref.ref(self) self._impl_values.context = weakref.ref(self) storage = get_storage('config', test=self._impl_test, **state['_storage']) self._impl_values._impl_setstate(storage) self._impl_settings._impl_setstate(storage) self._impl_meta = None # ____________________________________________________________ class Config(_CommonConfig): "main configuration management entry" __slots__ = ('__weakref__', '_impl_test') def __init__(self, descr, session_id=None, persistent=False): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` :param session_id: session ID is import with persistent Config to retrieve good session :type session_id: `str` :param persistent: if persistent, don't delete storage when leaving :type persistent: `boolean` """ settings, values = get_storages(self, session_id, persistent) self._impl_settings = Settings(self, settings) self._impl_values = Values(self, values) super(Config, self).__init__(descr, weakref.ref(self)) self._impl_build_all_caches() self._impl_meta = None #undocumented option used only in test script self._impl_test = False def cfgimpl_reset_cache(self, only_expired=False, only=('values', 'settings')): if 'values' in only: self.cfgimpl_get_values().reset_cache(only_expired=only_expired) if 'settings' in only: self.cfgimpl_get_settings().reset_cache(only_expired=only_expired) class GroupConfig(_CommonConfig): __slots__ = ('_impl_children', '__weakref__') def __init__(self, children, session_id=None, persistent=False, _descr=None): if not isinstance(children, list): raise ValueError(_("metaconfig's children must be a list")) self._impl_children = children settings, values = get_storages(self, session_id, persistent) self._impl_settings = Settings(self, settings) self._impl_values = Values(self, values) super(GroupConfig, self).__init__(_descr, weakref.ref(self)) self._impl_meta = None #undocumented option used only in test script self._impl_test = False def cfgimpl_get_children(self): return self._impl_children #def cfgimpl_get_context(self): # "a meta config is a config which has a setting, that is itself" # return self # def cfgimpl_reset_cache(self, only_expired=False, only=('values', 'settings')): if 'values' in only: self.cfgimpl_get_values().reset_cache(only_expired=only_expired) if 'settings' in only: self.cfgimpl_get_settings().reset_cache(only_expired=only_expired) for child in self._impl_children: child.cfgimpl_reset_cache(only_expired=only_expired, only=only) def setattrs(self, path, value): """Setattr not in current GroupConfig, but in each children """ for child in self._impl_children: try: if not isinstance(child, GroupConfig): setattr(child, path, value) else: child.setattrs(path, value) except PropertiesOptionError: pass def find_firsts(self, byname=None, bypath=None, byvalue=undefined, type_='path', display_error=True): """Find first not in current GroupConfig, but in each children """ ret = [] #if MetaConfig, all children have same OptionDescription as context #so search only one time for all children try: if bypath is None and byname is not None and \ isinstance(self, MetaConfig): bypath = self._find(bytype=None, byvalue=undefined, byname=byname, first=True, type_='path', check_properties=False, display_error=display_error) byname = None except AttributeError: pass for child in self._impl_children: try: if not isinstance(child, MetaConfig): if bypath is not None: #if byvalue is None, try if not raise value = getattr(child, bypath) if byvalue is not undefined: if isinstance(value, Multi): if byvalue in value: ret.append(child) else: if value == byvalue: ret.append(child) else: ret.append(child) else: ret.append(child.find_first(byname=byname, byvalue=byvalue, type_=type_, display_error=False)) else: ret.extend(child.find_firsts(byname=byname, bypath=bypath, byvalue=byvalue, type_=type_, display_error=False)) except AttributeError: pass return self._find_return_results(ret, display_error) class MetaConfig(GroupConfig): __slots__ = tuple() def __init__(self, children, session_id=None, persistent=False): descr = None for child in children: if not isinstance(child, _CommonConfig): raise TypeError(_("metaconfig's children " "should be config, not {0}" ).format(type(child))) if child.cfgimpl_get_meta() is not None: raise ValueError(_("child has already a metaconfig's")) if descr is None: descr = child.cfgimpl_get_description() elif not descr is child.cfgimpl_get_description(): raise ValueError(_('all config in metaconfig must ' 'have the same optiondescription')) child._impl_meta = weakref.ref(self) super(MetaConfig, self).__init__(children, session_id, persistent, descr) def mandatory_warnings(config): #only for retro-compatibility return config.cfgimpl_get_values().mandatory_warnings()