# -*- coding: utf-8 -*- """ Gestion du mini-langage de template On travaille sur les fichiers cibles """ import imp import sys from shutil import copy import logging from typing import Dict, Any from subprocess import call from os import listdir, unlink, makedirs from os.path import dirname, basename, join, split, isfile, isdir from tempfile import mktemp from Cheetah import Parser from .annotator import VARIABLE_NAMESPACE # l'encoding du template est déterminé par une regexp (encodingDirectiveRE dans Parser.py) # il cherche un ligne qui ressemble à '#encoding: utf-8 # cette classe simule le module 're' et retourne toujours l'encoding utf-8 # 6224 class FakeEncoding: def groups(self): return ('utf-8',) def search(self, *args): return self Parser.encodingDirectiveRE = FakeEncoding() from Cheetah.Template import Template as ChtTemplate from Cheetah.NameMapper import NotFound as CheetahNotFound from tiramisu import Config from tiramisu.error import PropertiesOptionError from .config import patch_dir from .error import FileNotFound, TemplateError, TemplateDisabled from .i18n import _ from .utils import normalize_family log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) class IsDefined: """ filtre permettant de ne pas lever d'exception au cas où la variable Creole n'est pas définie """ def __init__(self, context): self.context = context def __call__(self, varname): if '.' in varname: splitted_var = varname.split('.') if len(splitted_var) != 2: msg = _("Group variables must be of type leader.follower") raise KeyError(msg) leader, follower = splitted_var if leader in self.context: return follower in self.context[leader].follower.keys() return False else: return varname in self.context class CreoleGet: def __init__(self, context): self.context = context def __call__(self, varname): return self.context[varname] def __getitem__(self, varname): """For bracket and dotted notation """ return self.context[varname] def __contains__(self, varname): """Check variable existence in context """ return varname in self.context @classmethod def cl_compile(kls, *args, **kwargs): kwargs['compilerSettings'] = {'directiveStartToken' : '%', 'cheetahVarStartToken' : '%%', 'EOLSlurpToken' : '%', 'PSPStartToken' : 'µ' * 10, 'PSPEndToken' : 'µ' * 10, 'commentStartToken' : 'µ' * 10, 'commentEndToken' : 'µ' * 10, 'multiLineCommentStartToken' : 'µ' * 10, 'multiLineCommentEndToken' : 'µ' * 10} return kls.old_compile(*args, **kwargs) ChtTemplate.old_compile = ChtTemplate.compile ChtTemplate.compile = cl_compile class CreoleClient: def __init__(self, config: Config): self.config = config class CheetahTemplate(ChtTemplate): """classe pour personnaliser et faciliter la construction du template Cheetah """ def __init__(self, filename: str, context, eosfunc: Dict, destfilename, variable): """Initialize Creole CheetahTemplate """ extra_context = {'is_defined' : IsDefined(context), 'normalize_family': normalize_family, 'rougail_filename': destfilename } if variable: extra_context['rougail_variable'] = variable ChtTemplate.__init__(self, file=filename, searchList=[context, eosfunc, extra_context]) class CreoleLeader: def __init__(self, value, follower=None, index=None): """ On rend la variable itérable pour pouvoir faire: for ip in iplist: print(ip.network) print(ip.netmask) print(ip) index is used for CreoleLint """ self._value = value if follower is not None: self.follower = follower else: self.follower = {} self._index = index def __getattr__(self, name): """Get follower variable or attribute of leader value. If the attribute is a name of a follower variable, return its value. Otherwise, returns the requested attribute of leader value. """ if name in self.follower: value = self.follower[name] if isinstance(value, PropertiesOptionError): raise AttributeError() return value else: return getattr(self._value, name) def __getitem__(self, index): """Get a leader.follower at requested index. """ ret = {} for key, values in self.follower.items(): ret[key] = values[index] return CreoleLeader(self._value[index], ret, index) def __iter__(self): """Iterate over leader.follower. Return synchronised value of leader.follower. """ for i in range(len(self._value)): ret = {} for key, values in self.follower.items(): ret[key] = values[i] yield CreoleLeader(self._value[i], ret, i) def __len__(self): """Delegate to leader value """ return len(self._value) def __repr__(self): """Show CreoleLeader as dictionary. The leader value is stored under 'value' key. The followers are stored under 'follower' key. """ return repr({'value': self._value, 'follower': self.follower}) def __eq__(self, value): return value == self._value def __ne__(self, value): return value != self._value def __lt__(self, value): return self._value < value def __le__(self, value): return self._value <= value def __gt__(self, value): return self._value > value def __ge__(self, value): return self._value >= value def __str__(self): """Delegate to leader value """ return str(self._value) def __add__(self, val): return self._value.__add__(val) def __radd__(self, val): return val + self._value def __contains__(self, item): return item in self._value async def add_follower(self, config, name, path): if isinstance(self._value, list): values = [] for idx in range(len(self._value)): try: values.append(await config.option(path, idx).value.get()) except PropertiesOptionError as err: values.append(err) else: raise Exception('hu?') self.follower[name] = values class CreoleExtra: def __init__(self, suboption: Dict) -> None: self.suboption = suboption def __getattr__(self, key: str) -> Any: return self.suboption[key] def __repr__(self): return self.suboption.__str__() class CreoleTemplateEngine: """Engine to process Creole cheetah template """ def __init__(self, config: Config, eosfunc_file: str, distrib_dir: str, tmp_dir: str, dest_dir: str, override_dest_dir: str, tmpfile_name: str, factory_prefix: str, ) -> None: self.config = config self.dest_dir = dest_dir self.override_dest_dir = override_dest_dir self.tmp_dir = tmp_dir self.distrib_dir = distrib_dir self.tmpfile_name = tmpfile_name self.factory_prefix = factory_prefix eos = {} if eosfunc_file is not None: eosfunc = imp.load_source('eosfunc', eosfunc_file) for func in dir(eosfunc): if not func.startswith('_'): eos[func] = getattr(eosfunc, func) self.eosfunc = eos self.rougail_variables_dict = {} async def load_eole_variables_rougail(self, optiondescription): for option in await optiondescription.list('all'): if await option.option.isoptiondescription(): if await option.option.isleadership(): for idx, suboption in enumerate(await option.list('all')): if idx == 0: leader = CreoleLeader(await suboption.value.get()) self.rougail_variables_dict[await suboption.option.name()] = leader else: await leader.add_follower(self.config, await suboption.option.name(), await suboption.option.path()) else: await self.load_eole_variables_rougail(option) else: self.rougail_variables_dict[await option.option.name()] = await option.value.get() async def load_eole_variables(self, namespace, optiondescription): families = {} for family in await optiondescription.list('all'): variables = {} for variable in await family.list('all'): if await variable.option.isoptiondescription() and await variable.option.isleadership(): for idx, suboption in enumerate(await variable.list('all')): if idx == 0: leader = CreoleLeader(await suboption.value.get()) leader_name = await suboption.option.name() else: await leader.add_follower(self.config, await suboption.option.name(), await suboption.option.path()) variables[leader_name] = leader else: variables[await variable.option.name()] = await variable.value.get() families[await family.option.name()] = CreoleExtra(variables) self.rougail_variables_dict[namespace] = CreoleExtra(families) def patch_template(self, filename: str): """Apply patch to a template """ patch_cmd = ['patch', '-d', self.tmp_dir, '-N', '-p1'] patch_no_debug = ['-s', '-r', '-', '--backup-if-mismatch'] # patches variante + locaux for directory in [join(patch_dir, 'variante'), patch_dir]: patch_file = join(directory, f'{filename}.patch') if isfile(patch_file): log.info(_("Patching template '{filename}' with '{patch_file}'")) ret = call(patch_cmd + patch_no_debug + ['-i', patch_file]) if ret: patch_cmd_err = ' '.join(patch_cmd + ['-i', patch_file]) log.error(_(f"Error applying patch: '{patch_file}'\nTo reproduce and fix this error {patch_cmd_err}")) copy(filename, self.tmp_dir) def prepare_template(self, filename: str): """Prepare template source file """ log.info(_("Copy template: '{filename}' -> '{self.tmp_dir}'")) copy(filename, self.tmp_dir) self.patch_template(filename) def process(self, source: str, destfilename: str, filevar: Dict, variable: Any): """Process a cheetah template """ # full path of the destination file log.info(_(f"Cheetah processing: '{destfilename}'")) try: cheetah_template = CheetahTemplate(source, self.rougail_variables_dict, self.eosfunc, destfilename, variable, ) data = str(cheetah_template) except CheetahNotFound as err: varname = err.args[0][13:-1] raise TemplateError(_(f"Error: unknown variable used in template {destfilename} : {varname}")) except Exception as err: raise TemplateError(_(f"Error while instantiating template {destfilename}: {err}")) with open(destfilename, 'w') as file_h: file_h.write(data) def instance_file(self, filevar: Dict, systemd_rights: list, override: bool, service_name: str) -> None: """Run templatisation on one file """ log.info(_("Instantiating file '{filename}'")) if 'variable' in filevar: variable = filevar['variable'] else: variable = None if override: filenames = [f'/systemd/system/{service_name}.service.d/rougail.conf'] else: filenames = filevar['name'] if not isinstance(filenames, list): filenames = [filenames] if variable: variable = [variable] for idx, filename in enumerate(filenames): if override: destfilename = join(self.override_dest_dir, filename[1:]) else: destfilename = join(self.dest_dir, filename[1:]) makedirs(dirname(destfilename), exist_ok=True) if variable: var = variable[idx] else: var = None source = join(self.tmp_dir, filevar['source']) if filevar['templating']: self.process(source, destfilename, filevar, var) else: copy(source, destfilename) if not override and self.tmpfile_name: systemd_rights.append(f'C {filename} {filevar["mode"]} {filevar["owner"]} {filevar["group"]} - {self.factory_prefix}{filename}') systemd_rights.append(f'z {filename} - - - - -') async def instance_files(self) -> None: """Run templatisation on all files """ for option in await self.config.option.list(type='all'): namespace = await option.option.name() if namespace in ['services', 'actions']: continue elif namespace == VARIABLE_NAMESPACE: await self.load_eole_variables_rougail(option) else: await self.load_eole_variables(namespace, option) for template in listdir(self.distrib_dir): self.prepare_template(join(self.distrib_dir, template)) systemd_rights = [] for service_obj in await self.config.option('services').list('all'): service_name = await service_obj.option.doc() for fills in await service_obj.list('all'): if await fills.option.name() in ['files', 'overrides']: for fill_obj in await fills.list('all'): fill = await fill_obj.value.dict() filename = fill['source'] distib_file = join(self.distrib_dir, filename) if not isfile(distib_file): raise FileNotFound(_(f"File {distib_file} does not exist.")) override = await fills.option.name() == 'overrides' if override or fill.get('activate', False): self.instance_file(fill, systemd_rights, override, service_name, ) else: log.debug(_("Instantiation of file '{filename}' disabled")) if self.tmpfile_name: with open(self.tmpfile_name, 'w') as fh: fh.write('\n'.join(systemd_rights)) fh.write('\n') async def generate(config: Config, eosfunc_file: str, distrib_dir: str, tmp_dir: str, dest_dir: str, override_dest_dir: str, tmpfile_name: str=None, factory_prefix: str=None, ) -> None: if not tmpfile_name and factory_prefix: raise Exception(_(f'only specify factory_prefix if tmpfile_name is set')) if tmpfile_name and not factory_prefix: raise Exception(_(f'if tmpfile_name is specify, set factory_prefix too')) engine = CreoleTemplateEngine(config, eosfunc_file, distrib_dir, tmp_dir, dest_dir, override_dest_dir, tmpfile_name, factory_prefix, ) await engine.instance_files()