414 lines
15 KiB
Python
414 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Gestion du mini-langage de template
|
|
On travaille sur les fichiers cibles
|
|
"""
|
|
|
|
import imp
|
|
from shutil import copy
|
|
import logging
|
|
from typing import Dict, Any
|
|
from subprocess import call
|
|
from os import listdir, makedirs
|
|
from os.path import dirname, join, isfile
|
|
|
|
from Cheetah.Template import Template as ChtTemplate
|
|
from Cheetah.NameMapper import NotFound as CheetahNotFound
|
|
|
|
from tiramisu import Config
|
|
from tiramisu.error import PropertiesOptionError
|
|
|
|
from .config import patch_dir, variable_namespace
|
|
from .error import FileNotFound, TemplateError
|
|
from .i18n import _
|
|
from .utils import normalize_family
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.addHandler(logging.NullHandler())
|
|
|
|
|
|
class IsDefined:
|
|
"""
|
|
filtre permettant de ne pas lever d'exception au cas où
|
|
la variable Creole n'est pas définie
|
|
"""
|
|
def __init__(self, context):
|
|
self.context = context
|
|
|
|
def __call__(self, varname):
|
|
if '.' in varname:
|
|
splitted_var = varname.split('.')
|
|
if len(splitted_var) != 2:
|
|
msg = _("Group variables must be of type leader.follower")
|
|
raise KeyError(msg)
|
|
leader, follower = splitted_var
|
|
if leader in self.context:
|
|
return follower in self.context[leader].follower.keys()
|
|
return False
|
|
else:
|
|
return varname in self.context
|
|
|
|
|
|
@classmethod
|
|
def cl_compile(kls, *args, **kwargs):
|
|
kwargs['compilerSettings'] = {'directiveStartToken' : '%',
|
|
'cheetahVarStartToken' : '%%',
|
|
'EOLSlurpToken' : '%',
|
|
'PSPStartToken' : 'µ' * 10,
|
|
'PSPEndToken' : 'µ' * 10,
|
|
'commentStartToken' : 'µ' * 10,
|
|
'commentEndToken' : 'µ' * 10,
|
|
'multiLineCommentStartToken' : 'µ' * 10,
|
|
'multiLineCommentEndToken' : 'µ' * 10}
|
|
return kls.old_compile(*args, **kwargs)
|
|
ChtTemplate.old_compile = ChtTemplate.compile
|
|
ChtTemplate.compile = cl_compile
|
|
|
|
|
|
class CheetahTemplate(ChtTemplate):
|
|
"""classe pour personnaliser et faciliter la construction
|
|
du template Cheetah
|
|
"""
|
|
def __init__(self,
|
|
filename: str,
|
|
context,
|
|
eosfunc: Dict,
|
|
destfilename,
|
|
variable):
|
|
"""Initialize Creole CheetahTemplate
|
|
"""
|
|
extra_context = {'is_defined' : IsDefined(context),
|
|
'normalize_family': normalize_family,
|
|
'rougail_filename': destfilename
|
|
}
|
|
if variable:
|
|
extra_context['rougail_variable'] = variable
|
|
ChtTemplate.__init__(self,
|
|
file=filename,
|
|
searchList=[context, eosfunc, extra_context])
|
|
|
|
|
|
class CreoleLeader:
|
|
def __init__(self, value, follower=None, index=None):
|
|
"""
|
|
On rend la variable itérable pour pouvoir faire:
|
|
for ip in iplist:
|
|
print(ip.network)
|
|
print(ip.netmask)
|
|
print(ip)
|
|
index is used for CreoleLint
|
|
"""
|
|
self._value = value
|
|
if follower is not None:
|
|
self.follower = follower
|
|
else:
|
|
self.follower = {}
|
|
self._index = index
|
|
|
|
def __getattr__(self, name):
|
|
"""Get follower variable or attribute of leader value.
|
|
|
|
If the attribute is a name of a follower variable, return its value.
|
|
Otherwise, returns the requested attribute of leader value.
|
|
"""
|
|
if name in self.follower:
|
|
value = self.follower[name]
|
|
if isinstance(value, PropertiesOptionError):
|
|
raise AttributeError()
|
|
return value
|
|
else:
|
|
return getattr(self._value, name)
|
|
|
|
def __getitem__(self, index):
|
|
"""Get a leader.follower at requested index.
|
|
"""
|
|
ret = {}
|
|
for key, values in self.follower.items():
|
|
ret[key] = values[index]
|
|
return CreoleLeader(self._value[index], ret, index)
|
|
|
|
def __iter__(self):
|
|
"""Iterate over leader.follower.
|
|
|
|
Return synchronised value of leader.follower.
|
|
"""
|
|
for i in range(len(self._value)):
|
|
ret = {}
|
|
for key, values in self.follower.items():
|
|
ret[key] = values[i]
|
|
yield CreoleLeader(self._value[i], ret, i)
|
|
|
|
def __len__(self):
|
|
"""Delegate to leader value
|
|
"""
|
|
return len(self._value)
|
|
|
|
def __repr__(self):
|
|
"""Show CreoleLeader as dictionary.
|
|
|
|
The leader value is stored under 'value' key.
|
|
The followers are stored under 'follower' key.
|
|
"""
|
|
return repr({'value': self._value, 'follower': self.follower})
|
|
|
|
def __eq__(self, value):
|
|
return value == self._value
|
|
|
|
def __ne__(self, value):
|
|
return value != self._value
|
|
|
|
def __lt__(self, value):
|
|
return self._value < value
|
|
|
|
def __le__(self, value):
|
|
return self._value <= value
|
|
|
|
def __gt__(self, value):
|
|
return self._value > value
|
|
|
|
def __ge__(self, value):
|
|
return self._value >= value
|
|
|
|
def __str__(self):
|
|
"""Delegate to leader value
|
|
"""
|
|
return str(self._value)
|
|
|
|
def __add__(self, val):
|
|
return self._value.__add__(val)
|
|
|
|
def __radd__(self, val):
|
|
return val + self._value
|
|
|
|
def __contains__(self, item):
|
|
return item in self._value
|
|
|
|
async def add_follower(self, config, name, path):
|
|
if isinstance(self._value, list):
|
|
values = []
|
|
for idx in range(len(self._value)):
|
|
try:
|
|
values.append(await config.option(path, idx).value.get())
|
|
except PropertiesOptionError as err:
|
|
values.append(err)
|
|
else:
|
|
raise Exception('hu?')
|
|
self.follower[name] = values
|
|
|
|
|
|
class CreoleExtra:
|
|
def __init__(self,
|
|
suboption: Dict) -> None:
|
|
self.suboption = suboption
|
|
|
|
def __getattr__(self,
|
|
key: str) -> Any:
|
|
return self.suboption[key]
|
|
|
|
def __repr__(self):
|
|
return self.suboption.__str__()
|
|
|
|
def __iter__(self):
|
|
return iter(self.suboption.values())
|
|
|
|
|
|
class CreoleTemplateEngine:
|
|
"""Engine to process Creole cheetah template
|
|
"""
|
|
def __init__(self,
|
|
config: Config,
|
|
eosfunc_file: str,
|
|
distrib_dir: str,
|
|
tmp_dir: str,
|
|
dest_dir: str,
|
|
) -> None:
|
|
self.config = config
|
|
self.dest_dir = dest_dir
|
|
self.tmp_dir = tmp_dir
|
|
self.distrib_dir = distrib_dir
|
|
eos = {}
|
|
if eosfunc_file is not None:
|
|
eosfunc = imp.load_source('eosfunc', eosfunc_file)
|
|
for func in dir(eosfunc):
|
|
if not func.startswith('_'):
|
|
eos[func] = getattr(eosfunc, func)
|
|
self.eosfunc = eos
|
|
self.rougail_variables_dict = {}
|
|
|
|
async def load_eole_variables_rougail(self,
|
|
optiondescription):
|
|
for option in await optiondescription.list('all'):
|
|
if await option.option.isoptiondescription():
|
|
if await option.option.isleadership():
|
|
for idx, suboption in enumerate(await option.list('all')):
|
|
if idx == 0:
|
|
leader = CreoleLeader(await suboption.value.get())
|
|
self.rougail_variables_dict[await suboption.option.name()] = leader
|
|
else:
|
|
await leader.add_follower(self.config,
|
|
await suboption.option.name(),
|
|
await suboption.option.path())
|
|
else:
|
|
await self.load_eole_variables_rougail(option)
|
|
else:
|
|
self.rougail_variables_dict[await option.option.name()] = await option.value.get()
|
|
|
|
async def load_eole_variables(self,
|
|
namespace,
|
|
optiondescription):
|
|
families = {}
|
|
for family in await optiondescription.list('all'):
|
|
variables = {}
|
|
for variable in await family.list('all'):
|
|
if await variable.option.isoptiondescription():
|
|
if await variable.option.isleadership():
|
|
for idx, suboption in enumerate(await variable.list('all')):
|
|
if idx == 0:
|
|
leader = CreoleLeader(await suboption.value.get())
|
|
leader_name = await suboption.option.name()
|
|
else:
|
|
await leader.add_follower(self.config,
|
|
await suboption.option.name(),
|
|
await suboption.option.path())
|
|
variables[leader_name] = leader
|
|
else:
|
|
subfamilies = await self.load_eole_variables(await variable.option.name(),
|
|
variable,
|
|
)
|
|
variables[await variable.option.name()] = subfamilies
|
|
else:
|
|
variables[await variable.option.name()] = await variable.value.get()
|
|
families[await family.option.name()] = CreoleExtra(variables)
|
|
return CreoleExtra(families)
|
|
|
|
def patch_template(self,
|
|
filename: str):
|
|
"""Apply patch to a template
|
|
"""
|
|
patch_cmd = ['patch', '-d', self.tmp_dir, '-N', '-p1']
|
|
patch_no_debug = ['-s', '-r', '-', '--backup-if-mismatch']
|
|
|
|
# patches variante + locaux
|
|
for directory in [join(patch_dir, 'variante'), patch_dir]:
|
|
patch_file = join(directory, f'{filename}.patch')
|
|
if isfile(patch_file):
|
|
log.info(_("Patching template '{filename}' with '{patch_file}'"))
|
|
ret = call(patch_cmd + patch_no_debug + ['-i', patch_file])
|
|
if ret:
|
|
patch_cmd_err = ' '.join(patch_cmd + ['-i', patch_file])
|
|
log.error(_(f"Error applying patch: '{patch_file}'\nTo reproduce and fix this error {patch_cmd_err}"))
|
|
copy(filename, self.tmp_dir)
|
|
|
|
def prepare_template(self,
|
|
filename: str):
|
|
"""Prepare template source file
|
|
"""
|
|
log.info(_("Copy template: '{filename}' -> '{self.tmp_dir}'"))
|
|
copy(filename, self.tmp_dir)
|
|
self.patch_template(filename)
|
|
|
|
def process(self,
|
|
source: str,
|
|
true_destfilename: str,
|
|
destfilename: str,
|
|
filevar: Dict,
|
|
variable: Any):
|
|
"""Process a cheetah template
|
|
"""
|
|
# full path of the destination file
|
|
log.info(_(f"Cheetah processing: '{destfilename}'"))
|
|
try:
|
|
cheetah_template = CheetahTemplate(source,
|
|
self.rougail_variables_dict,
|
|
self.eosfunc,
|
|
true_destfilename,
|
|
variable,
|
|
)
|
|
data = str(cheetah_template)
|
|
except CheetahNotFound as err:
|
|
varname = err.args[0][13:-1]
|
|
raise TemplateError(_(f"Error: unknown variable used in template {destfilename} : {varname}"))
|
|
except Exception as err:
|
|
raise TemplateError(_(f"Error while instantiating template {destfilename}: {err}"))
|
|
|
|
with open(destfilename, 'w') as file_h:
|
|
file_h.write(data)
|
|
|
|
def instance_file(self,
|
|
filevar: Dict,
|
|
service_name: str) -> None:
|
|
"""Run templatisation on one file
|
|
"""
|
|
log.info(_("Instantiating file '{filename}'"))
|
|
if 'variable' in filevar:
|
|
variable = filevar['variable']
|
|
else:
|
|
variable = None
|
|
filenames = filevar['name']
|
|
if not isinstance(filenames, list):
|
|
filenames = [filenames]
|
|
if variable:
|
|
variable = [variable]
|
|
for idx, filename in enumerate(filenames):
|
|
destfilename = join(self.dest_dir, filename[1:])
|
|
makedirs(dirname(destfilename), exist_ok=True)
|
|
if variable:
|
|
var = variable[idx]
|
|
else:
|
|
var = None
|
|
source = join(self.tmp_dir, filevar['source'])
|
|
if filevar['templating']:
|
|
self.process(source,
|
|
filename,
|
|
destfilename,
|
|
filevar,
|
|
var)
|
|
else:
|
|
copy(source, destfilename)
|
|
|
|
async def instance_files(self) -> None:
|
|
"""Run templatisation on all files
|
|
"""
|
|
for option in await self.config.option.list(type='all'):
|
|
namespace = await option.option.name()
|
|
if namespace == variable_namespace:
|
|
await self.load_eole_variables_rougail(option)
|
|
else:
|
|
families = await self.load_eole_variables(namespace,
|
|
option)
|
|
self.rougail_variables_dict[namespace] = families
|
|
for template in listdir(self.distrib_dir):
|
|
self.prepare_template(join(self.distrib_dir, template))
|
|
for service_obj in await self.config.option('services').list('all'):
|
|
service_name = await service_obj.option.doc()
|
|
for fills in await service_obj.list('all'):
|
|
if await fills.option.name() in ['files', 'overrides']:
|
|
for fill_obj in await fills.list('all'):
|
|
fill = await fill_obj.value.dict()
|
|
filename = fill['source']
|
|
distib_file = join(self.distrib_dir, filename)
|
|
if not isfile(distib_file):
|
|
raise FileNotFound(_(f"File {distib_file} does not exist."))
|
|
if fill.get('activate', False):
|
|
self.instance_file(fill,
|
|
service_name,
|
|
)
|
|
else:
|
|
log.debug(_("Instantiation of file '{filename}' disabled"))
|
|
|
|
|
|
async def generate(config: Config,
|
|
eosfunc_file: str,
|
|
distrib_dir: str,
|
|
tmp_dir: str,
|
|
dest_dir: str,
|
|
) -> None:
|
|
engine = CreoleTemplateEngine(config,
|
|
eosfunc_file,
|
|
distrib_dir,
|
|
tmp_dir,
|
|
dest_dir,
|
|
)
|
|
await engine.instance_files()
|