rougail/creole/lint/parsetemplate.py

661 lines
26 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
from os.path import basename
from creole.loader import creole_loader
from creole.client import CreoleClient
from creole.template import CreoleGet, IsDefined, CreoleTemplateEngine, CreoleMaster
from creole import eosfunc
from tiramisu.option import *
from tiramisu import Config
from tiramisu.error import ConfigError, PropertiesOptionError, \
RequirementError, ValueWarning
from Cheetah import Parser, Compiler
from Cheetah.Template import Template
from Cheetah.NameMapper import NotFound
from pyeole.ansiprint import print_red
from creole.eosfunc import valid_regexp
from Cheetah.Unspecified import Unspecified
import warnings
DEBUG = False
#DEBUG = True
client = CreoleClient()
compilerSettings = {'directiveStartToken' : u'%',
'cheetahVarStartToken' : u'%%', 'EOLSlurpToken' : u'%',
'PSPStartToken' : u'µ' * 10, 'PSPEndToken' : u'µ' * 10,
'commentStartToken' : u'µ' * 10, 'commentEndToken' : u'µ' * 10,
'multiLineCommentStartToken' : u'µ' * 10,
'multiLineCommentEndToken' : u'µ' * 10}
#======================= CHEETAH =======================
# This class is used to retrieve all template vars
#true_HighLevelParser = Parser._HighLevelParser
global cl_chunks, cl_vars
cl_chunks = set()
cl_vars = set()
class cl_Parser(Parser.Parser):
def getCheetahVarNameChunks(self, *args, **kwargs):
global cl_chunks
chunks = super(cl_Parser, self).getCheetahVarNameChunks(*args, **kwargs)
for chunk in chunks:
#if false, it's internal variable
if chunk[1]:
name = chunk[0]
#remove master if master/slave and add force adding master
if '.' in name:
cl_chunks.add(name.split('.')[-1])
cl_chunks.add(name.split('.')[0])
else:
cl_chunks.add(name)
return chunks
def getCheetahVar(self, *args, **kwargs):
global cl_vars
var = super(cl_Parser, self).getCheetahVar(*args, **kwargs)
if not var.startswith(u'VFFSL('):
cl_vars.add(var)
return var
def getVars():
global cl_chunks, cl_vars
#retrieve all calculated vars
ret = list(cl_chunks - cl_vars)
cl_chunks = set()
cl_vars = set()
return ret
class CompilerGetVars(Compiler.ModuleCompiler):
parserClass = cl_Parser
true_compile = Template.compile
@classmethod
def cl_compile(kls, *args, **kwargs):
kwargs['compilerClass'] = CompilerGetVars
kwargs['useCache'] = False
return true_compile(*args, **kwargs)
Template.compile = cl_compile
def CompilerGetVar(varName, default=Unspecified):
#remplace Cheetah's getVar function
#this function permite to known variable if getVar is used
if varName.startswith('%%'):
raise Exception('varname should not start with %% {0}'.format(varName))
global extra_vars, config
config.read_only()
try:
option = config.creole.find_first(byname=varName)
path = config.cfgimpl_get_description().impl_get_path_by_opt(option)
value = getattr(config, path)
except (AttributeError, ConfigError):
try:
option = config.creole.find_first(byname=varName, check_properties=False)
path = config.cfgimpl_get_description().impl_get_path_by_opt(option)
#populate_mandatory(config, option, path, raise_propertyerror=True)
config.read_write()
populate_mandatories()
config.read_only()
value = getattr(config, path)
except (AttributeError, RequirementError), err:
config.read_only()
#support default value
if default != Unspecified:
return default
else:
raise AttributeError('option:', varName, ':', err)
except PropertiesOptionError as err:
if default != Unspecified:
return default
else:
raise err
except Exception as err:
config.read_only()
raise err
except Exception as err:
config.read_only()
raise err
lpath = '.'.join(path.split('.')[2:])
dico = {lpath: value}
engine = CreoleTemplateEngine(force_values=dico)
name = path.split('.')[-1]
extra_vars[option] = name
if "." in lpath:
spath = lpath.split('.')
if spath[0] == spath[1]:
ret = engine.creole_variables_dict[name]
else:
ret = engine.creole_variables_dict[spath[0]].slave[spath[1]]
else:
ret = engine.creole_variables_dict[name]
return ret
def CompilerGetattr(creolemaster, name, default=None):
if not isinstance(creolemaster, CreoleMaster):
raise Exception('creolemaster must be CreoleMaster, not {0}'.format(type(creolemaster)))
if name not in creolemaster.slave:
#FIXME assume name is slave?
value = CompilerGetVar(name, default)
if creolemaster._index is not None:
value = value[creolemaster._index]
creolemaster.add_slave(name, value)
return getattr(creolemaster, name, default)
#======================= EOSFUNC =======================
eos = {}
for func in dir(eosfunc):
if not func.startswith('_'):
eos[func] = getattr(eosfunc, func)
#======================= CONFIG =======================
def populate_mandatory(config, option, path, raise_propertyerror=False):
def _build_network(path):
for num in range(0, 4):
if path.startswith('creole.interface_{0}'.format(num)):
return num
#si il y a un test de consistence de type _cons_in_network (l'IP doit être dans un network défini)
#on utilise le réseau de ce network #10714
if getattr(option, '_consistencies', None) is not None:
for const in option._consistencies:
if const[0] == '_cons_in_network':
try:
opt = const[1][1]
path = config.cfgimpl_get_description().impl_get_path_by_opt(opt)
val = config.getattr(path, force_permissive=True)
if isinstance(val, list):
val = val[0]
return val.split('.')[2]
except IndexError:
pass
return 5
def _build_ip(path):
if path.endswith('_fichier_link'):
return 3
elif path.endswith('_proxy_link'):
return 2
else:
#ne pas retourner la même valeur si elle est censé être différente
if getattr(option, '_consistencies', None) is not None:
for const in option._consistencies:
if const[0] == '_cons_not_equal':
return 4
return 1
if option.impl_getname().startswith('nom_carte_eth'):
value = unicode(option.impl_getname())
elif isinstance(option, UnicodeOption):
value = u'value'
elif isinstance(option, IPOption):
value = u'192.168.{0}.{1}'.format(_build_network(path), _build_ip(path))
elif isinstance(option, NetworkOption):
value = u'192.168.{0}.0'.format(_build_network(path))
elif isinstance(option, NetmaskOption):
value = u'255.255.255.0'
elif isinstance(option, BroadcastOption):
value = u'192.168.{0}.255'.format(_build_network(path))
elif isinstance(option, EmailOption):
value = u'foo@bar.com'
elif isinstance(option, URLOption):
value = u'http://foo.com/bar'
elif isinstance(option, DomainnameOption):
allow_without_dot = option._get_extra('_allow_without_dot')
o_type = option._get_extra('_dom_type')
if option._name == 'smb_workgroup':
value = u'othervalue'
elif o_type in ['netbios', 'hostname']:
value = u'value'
else:
value = u'value.lan'
elif isinstance(option, FilenameOption):
value = u'/tmp/foo'
elif isinstance(option, ChoiceOption):
#FIXME devrait le faire tout seul non ?
value = option.impl_get_values(config)[0]
elif isinstance(option, IntOption):
value = 1
elif isinstance(option, PortOption):
value = 80
elif isinstance(option, DomainnameOption):
value = 'foo.com'
elif isinstance(option, UsernameOption):
value = 'toto'
elif isinstance(option, PasswordOption):
value = 'P@ssWord'
else:
raise Exception('the Tiramisu type {0} is not supported by CreoleLint (variable : {1})'.format(type(option), path))
validator = option.impl_get_validator()
if validator is not None and validator[0] == valid_regexp:
regexp = validator[1][''][0]
# génération d'une "value" valide
# en cas de valid_regexp sans valeur par défaut
if regexp == u'^[A-Z][0-9]$':
value = u'A1'
elif option._name == 'additional_repository_source':
# variable avec expression (très) spécifique #20291
value = u"deb http://test dist"
elif not regexp.startswith(u'^[a-z0-9]') and regexp.startswith('^'):
value = regexp[1:]
if option.impl_is_multi():
if option.impl_is_master_slaves('slave'):
#slave should have same length as master
masterpath = '.'.join(path.split('.')[:-1]+[path.split('.')[-2]])
try:
len_master = len(getattr(config, masterpath))
val = []
for i in range(0, len_master):
val.append(value)
value = val
except:
value = [value]
else:
value = [value]
try:
config.setattr(path, value, force_permissive=True)
except ValueError, err:
msg = str('error for {0} type {1}: {2}'.format(path, type(option), err))
raise Exception(msg)
except PropertiesOptionError, err:
if 'frozen' not in err.proptype:
if raise_propertyerror:
raise err
msg = str('error for {0} type {1}: {2}'.format(path, type(option), err))
raise Exception(msg)
class Reload(Exception):
pass
class Check_Template:
def __init__(self, template_name):
self.all_requires = {}
self.current_opt = {}
self.od_list = {}
global extra_vars
#reinit extra_vars
extra_vars = {}
self.old_dico = []
self.current_var = []
self.ori_options = []
self.file_path = None
self.template_name = template_name
self.current_container = client.get_container_infos('mail')
self.tmpl = None
self.is_tmpl = False
self.filename_ok = False
def populate_requires(self, option, path, force=False):
def _parse_requires(_option):
o_requires = _option.impl_getrequires()
if o_requires is not None:
for requires in o_requires:
for require in requires:
opt_ = require[0]
path_ = config.cfgimpl_get_description().impl_get_path_by_opt(opt_)
self.populate_requires(opt_, path_, force=True)
if not force and not path.startswith('creole.'):
return
if option in self.current_opt:
return
o_requires = option.impl_getrequires()
if o_requires is not None:
for requires in o_requires:
for require in requires:
if require[0].impl_is_master_slaves('slave'):
path_ = config.cfgimpl_get_description().impl_get_path_by_opt(require[0])
s_path = path_.split('.')
master_path = 'creole.' + s_path[1] + '.' + s_path[2] + '.' + s_path[2]
try:
opt_master = config.unwrap_from_path(master_path)
config.cfgimpl_get_settings().remove('everything_frozen')
populate_mandatory(config, opt_master, master_path)
except:
pass
self.all_requires.setdefault(option, []).append(require[0])
if isinstance(option, OptionDescription):
self.od_list[path] = option
if force and not option._name in self.current_var:
self.current_var.append(option._name)
if option._name in self.current_var or not path.startswith('creole.'):
if not isinstance(option, OptionDescription):
if path.startswith('creole.'):
self.current_opt[option] = '.'.join(path.split('.')[1:])
else:
self.current_opt[option] = None
_parse_requires(option)
#requires could be in parent's too
opath = ''
for parent in path.split('.')[:-1]:
opath += parent
if opath in self.od_list:
desc = self.od_list[opath]
self.current_opt[desc] = None
_parse_requires(desc)
opath += '.'
try:
if option._callback is not None:
for params in option._callback[1].values():
for param in params:
if isinstance(param, tuple):
opt = param[0]
path = config.cfgimpl_get_description().impl_get_path_by_opt(opt)
self.populate_requires(opt, path, force=True)
except (AttributeError, KeyError):
pass
def read_write(self):
config.read_write()
config.cfgimpl_get_settings().remove('disabled')
config.cfgimpl_get_settings().remove('hidden')
config.cfgimpl_get_settings().remove('frozen')
def change_value(self, path, value, multi, parse_message, option):
self.read_write()
config.cfgimpl_get_settings()[option].remove('force_default_on_freeze')
if multi:
if option.impl_is_master_slaves('slave'):
s_path = path.split('.')
master_path = s_path[0] + '.' + s_path[1] + '.' + s_path[2] + '.' + s_path[2]
master_option = config.cfgimpl_get_description().impl_get_opt_by_path(master_path)
if getattr(config, master_path) == []:
populate_mandatory(config, master_option, master_path)
value = [value]
if parse_message:
print parse_message, value
setattr(config, path, value)
config.read_only()
def template(self):
self.last_notfound = []
def get_value(opt_, path_):
try:
return getattr(config.creole, path_)
except PropertiesOptionError, err:
if err.proptype == ['mandatory']:
self.read_write()
config.cfgimpl_get_settings().remove('mandatory')
s_path = path_.split('.')
#set value to master
if len(s_path) == 3 and s_path[1] != s_path[2]:
master_path = 'creole.' + s_path[0] + '.' + s_path[1] + '.' + s_path[1]
opt_master = config.unwrap_from_path(master_path)
populate_mandatory(config, opt_master, master_path)
populate_mandatory(config, opt_, 'creole.' + path_)
config.read_only()
config.cfgimpl_get_settings().remove('mandatory')
try:
ret = getattr(config.creole, path_)
config.cfgimpl_get_settings().append('mandatory')
return ret
except PropertiesOptionError:
pass
raise NotFound('no value')
except ConfigError:
self.read_write()
populate_mandatory(config, opt_, 'creole.' + path_)
config.read_only()
try:
return getattr(config.creole, path_)
except ConfigError, err:
raise err
except PropertiesOptionError, err:
raise NotFound('no value')
try:
is_gen_file = getattr(config, self.file_path)
except PropertiesOptionError, err:
is_gen_file = False
if not is_gen_file:
return
try:
config.read_write()
populate_mandatories()
config.read_only()
dico = {}
for opt_, path_ in self.current_opt.items():
#path_ is None if it's an OptionDescription
if path_ is None:
continue
try:
dico[path_] = get_value(opt_, path_)
except NotFound:
pass
#FIXME revoir le strip_full_path
ndico = {}
for path_, value in dico.items():
sdico = path_.split('.')
if len(sdico) == 2:
ndico[sdico[1]] = value
elif len(sdico) == 3:
if sdico[1] == sdico[2]:
ndico[sdico[1]] = value
else:
ndico['.'.join(sdico[1:])] = value
else:
raise Exception('chemin de longueur inconnu {}'.format(path_))
engine = CreoleTemplateEngine(force_values=ndico)
dico = engine.creole_variables_dict
self.read_write()
except ConfigError, err:
msg = 'erreur de templating', err
raise ValueError(msg)
diff = True
for old in self.old_dico:
if dico.keys() == old.keys():
for key in old.keys():
if old[key] != dico[key]:
diff = False
break
if not diff:
break
if not diff:
return
try:
self.old_dico.append(dico)
searchlist = [dico, eos, {'is_defined' : IsDefined(dico),
'creole_client' : CreoleClient(),
'current_container': CreoleGet(self.current_container),
}]
rtmpl = self.tmpl(searchList=searchlist)
rtmpl.getVar = CompilerGetVar
rtmpl.getattr = CompilerGetattr
rtmpl = str(rtmpl)
#print rtmpl
self.is_tmpl = True
except NotFound, err:
lst = getVars()
if lst == []:
raise Exception("Il manque une option", err, 'avec le dictionnaire', dico)
for ls in lst:
try:
CompilerGetVar(ls)
except AttributeError:
self.last_notfound.append(ls)
raise Reload('')
except Exception, err:
raise Exception("Il y a une erreur", err, 'avec le dictionnaire', dico)
def check_reload_with_extra(self):
#if extra_vars has value, check if not already in current_opt
global extra_vars
if extra_vars != {}:
oret = set(extra_vars.keys())
opt_requires = oret & set(self.all_requires.keys())
for opt_ in opt_requires:
oret.update(self.all_requires[opt_])
dont_exists = set(oret) - set(self.current_opt.keys())
ret = []
for opt_ in dont_exists:
try:
ret.append(extra_vars[opt_])
except KeyError:
ret.append(opt_._name)
extra_vars = {}
if ret == []:
return None
return ret
def test_all_values_for(self, options, cpt):
option = options[0]
parse_message = None
if DEBUG:
parse_message = '*' * cpt + '>' + option._name
if not isinstance(option, ChoiceOption):
msg = str('pas simple la... ' + option._name)
raise NotImplementedError(msg)
multi = option.impl_is_multi()
path = config.cfgimpl_get_description().impl_get_path_by_opt(option)
for value in option.impl_get_values(config):
self.change_value(path, value, multi, parse_message, option)
if options[1:] != []:
#if already value to test, restart test_all_values_for
ret = self.test_all_values_for(options[1:], cpt + 1)
if ret != None:
return ret
else:
need_reload = False
try:
self.template()
except Reload:
need_reload = True
ret = self.check_reload_with_extra()
if need_reload and ret is None:
notfound = []
paths = config.cfgimpl_get_description()._cache_paths[1]
for ls in self.last_notfound:
#if variable is locale (means template) variable, not config's one
for path in paths:
if path.endswith('.' + ls):
notfound.append(ls)
break
if notfound != []:
raise Exception('variable not found after reload {0}'.format(notfound))
if ret is not None:
return ret
def open_file(self, force_var):
# Open template and compile it
# retrieve template vars (add force_var if needed)
filecontent = open(self.template_name).read()
#try to convert content in unicode
self.tmpl = Template.compile(filecontent, compilerSettings=compilerSettings) # ,
#compilerClass=CompilerGetVars)
self.current_var = getVars()
if force_var:
self.current_var.extend(force_var)
def populate_file(self, path, option):
if path.startswith('containers.files.file'):
if path.endswith('.source') and option.impl_getdefault().endswith('/{0}'.format(self.template_name.split('/')[-1])):
self.filename_ok = True
if self.filename_ok and path.endswith('.activate'):
self.file_path = path
self.filename_ok = False
self.populate_requires(option, path, force=True)
def test_all_values(self):
try:
options = list(set(self.all_requires.keys())&set(self.current_opt.keys()))
need_tmpl = False
if options != []:
requires_options = set()
for opt in options:
for op in self.all_requires[opt]:
if 'frozen' not in config.cfgimpl_get_settings()[op]:
requires_options.add(op)
if requires_options == set([]):
need_tmpl = True
else:
self.ori_options = requires_options
ret = self.test_all_values_for(list(requires_options), 0)
if ret is not None:
if DEBUG:
print "reload with", ret
self.check_template(ret, already_load=True)
else:
need_tmpl = True
if need_tmpl is True:
try:
self.template()
except:
self.test_all_values()
except Exception, err:
if DEBUG:
import traceback
traceback.print_exc()
msg = self.template_name, ':', err
raise Exception(msg)
def check_template(self, force_var=None, already_load=False):
#remove all modification (value, properties, ...)
open_error = None
try:
self.open_file(force_var)
except Exception, err:
open_error = "problème à l'ouverture du fichier {}".format(self.template_name)
config.read_only()
for index, option in enumerate(config.cfgimpl_get_description()._cache_paths[0]):
path = config.cfgimpl_get_description()._cache_paths[1][index]
self.populate_file(path, option)
self.populate_requires(option, path)
if self.file_path is None:
if open_error is not None:
print "le fichier {0} non présent dans un dictionnaire a un problème : {1}".format(basename(self.template_name),
open_error)
else:
print " \\-- fichier non présent dans un dictionnaire {0}".format(self.template_name)
return
if open_error is not None:
raise Exception(open_error)
if not already_load:
print " \\--", self.template_name
self.test_all_values()
if not self.is_tmpl:
print "pas de templating !"
def populate_mandatories():
for path in config.cfgimpl_get_values().mandatory_warnings(config):
if path.startswith('creole.'):
option = config.cfgimpl_get_description().impl_get_opt_by_path(path)
try:
populate_mandatory(config, option, path)
except PropertiesOptionError:
pass
def parse_templates(templates_name):
global config, cl_chunks, cl_vars, extra_vars
config = creole_loader(load_values=False, load_extra=True)
config.read_write()
populate_mandatories()
cfg = config
for template_name in templates_name:
cl_chunks = set()
cl_vars = set()
extra_vars = {}
config = cfg.duplicate()
config.read_write()
populate_mandatories()
ctmpl = Check_Template(template_name)
try:
ctmpl.check_template()
except Exception, err:
if DEBUG:
import traceback
traceback.print_exc()
print_red(str(err))
sys.exit(1)