"""Template langage for Rougail Created by: EOLE (http://eole.orion.education.fr) Copyright (C) 2005-2018 Forked by: Cadoles (http://www.cadoles.com) Copyright (C) 2019-2021 distribued with GPL-2 or later license This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ from shutil import copy import logging from typing import Dict, Any from subprocess import call from os import listdir, makedirs, getcwd, chdir, unlink, rmdir from os.path import dirname, join, isfile, isdir, abspath try: from tiramisu3 import Config, undefined from tiramisu3.error import PropertiesOptionError # pragma: no cover except ModuleNotFoundError: # pragma: no cover from tiramisu import Config, undefined from tiramisu.error import PropertiesOptionError from ..config import RougailConfig from ..error import FileNotFound, TemplateError from ..i18n import _ from ..utils import load_modules from . import engine as engines ENGINES = {} for engine in engines.__all__: ENGINES[engine] = getattr(engines, engine) log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) INFORMATIONS = {'files': ['source', 'mode', 'engine', 'included'], 'overrides': ['name', 'source', 'engine'], } DEFAULT = {'files': ['owner', 'group'], 'overrides': [], } class RougailLeaderIndex: """This object is create when access to a specified Index of the variable """ def __init__(self, value, follower, index, ) -> None: self._value = value self._follower = follower self._index = index def __getattr__(self, name): if name not in self._follower: raise AttributeError(f'unable to find follower "{name}"') value = self._follower[name] if isinstance(value, PropertiesOptionError): raise AttributeError(f'unable to access to follower "{name}": {value}') return value def __getitem__(self, name): return self.__getattr__(name) def __contains__(self, name): if self._follower.__contains__(name): value = self._follower[name] return not isinstance(value, PropertiesOptionError) return False def __str__(self): return str(self._value) def __lt__(self, value): return self._value.__lt__(value) def __le__(self, value): return self._value.__le__(value) def __eq__(self, value): return self._value.__eq__(value) def __ne__(self, value): return self._value.__ne__(value) def __gt__(self, value): return self._value.__gt__(value) def __ge__(self, value): return self._value >= value def __add__(self, value): return self._value.__add__(value) def __radd__(self, value): return value + self._value class RougailLeader: """Implement access to leader and follower variable For examples: %%leader, %%leader[0].follower1 """ def __init__(self, leader_name, value, ) -> None: self._value = value self._follower = {leader_name: value} def __getitem__(self, index): """Get a leader.follower at requested index. """ followers = {key: values[index] for key, values in self._follower.items()} return RougailLeaderIndex(self._value[index], followers, index, ) def __iter__(self): """Iterate over leader.follower. Return synchronised value of leader.follower. """ for index in range(len(self._value)): yield self.__getitem__(index) def __len__(self): return len(self._value) def __contains__(self, value): return self._value.__contains__(value) async def _add_follower(self, config, name: str, path: str, ): """Add a new follower """ self._follower[name] = [] for index in range(len(self._value)): try: value = await config.option(path, index).value.get() except PropertiesOptionError as err: value = err self._follower[name].append(value) class RougailExtra: """Object that implement access to extra variable For example %%extra1.family.variable """ def __init__(self, suboption: Dict, ) -> None: self._suboption = suboption def __getattr__(self, key: str, ) -> Any: try: return self._suboption[key] except KeyError: raise AttributeError(f'unable to find extra "{key}"') def __getitem__(self, key: str, ) -> Any: return self.__getattr__(key) def __iter__(self): return iter(self._suboption.values()) def items(self): return self._suboption.items() def __str__(self): suboptions = {} for key, value in self._suboption.items(): suboptions[key] = str(value) return f'' class RougailBaseTemplate: """Engine to process Creole cheetah template """ def __init__(self, # pylint: disable=R0913 config: Config, rougailconfig: RougailConfig=None, ) -> None: if rougailconfig is None: rougailconfig = RougailConfig self.config = config self.destinations_dir = abspath(rougailconfig['destinations_dir']) self.tmp_dir = abspath(rougailconfig['tmp_dir']) self.templates_dir = [] templates_dir = rougailconfig['templates_dir'] if not isinstance(templates_dir, list): templates_dir = [templates_dir] for templ_dir in templates_dir: self.templates_dir.append(abspath(templ_dir)) self.patches_dir = abspath(rougailconfig['patches_dir']) eos = {} functions_file = rougailconfig['functions_file'] if not isinstance(functions_file, list): functions_file = [functions_file] for function in functions_file: if isfile(function): eosfunc = load_modules(function) for func in dir(eosfunc): if not func.startswith('_'): eos[func] = getattr(eosfunc, func) self.eosfunc = eos self.rougail_variables_dict = {} self.rougailconfig = rougailconfig self.log = log self.engines = ENGINES def patch_template(self, filename: str, templates_dir: str, ) -> None: """Apply patch to a template """ patch_cmd = ['patch', '-d', self.tmp_dir, '-N', '-p1', '-f'] patch_no_debug = ['-s', '-r', '-', '--backup-if-mismatch'] patch_file = join(self.patches_dir, f'{filename}.patch') if isfile(patch_file): self.log.info(_("Patching template '{filename}' with '{patch_file}'")) ret = call(patch_cmd + patch_no_debug + ['-i', patch_file]) if ret: # pragma: no cover patch_cmd_err = ' '.join(patch_cmd + ['-i', patch_file]) msg = _(f"Error applying patch: '{patch_file}'\n" f"To reproduce and fix this error {patch_cmd_err}") self.log.error(_(msg)) copy(join(templates_dir, filename), self.tmp_dir) def prepare_template(self, filename: str, templates_dir: str, ) -> None: """Prepare template source file """ self.log.info(_("Copy template: '{filename}' -> '{self.tmp_dir}'")) if not isdir(self.tmp_dir): raise TemplateError(_(f'cannot find tmp_dir {self.tmp_dir}')) copy(join(templates_dir, filename), self.tmp_dir) self.patch_template(filename, templates_dir) def instance_file(self, filevar: Dict, type_: str, service_name: str, ) -> str: """Run templatisation on one file """ self.log.info(_("Instantiating file '{filename}'")) if 'variable' in filevar: variable = filevar['variable'] else: variable = None filenames = filevar.get('name') if not isinstance(filenames, list): filenames = [filenames] if variable and not isinstance(variable, list): variable = [variable] if not isdir(self.destinations_dir): raise TemplateError(_(f'cannot find destinations_dir {self.destinations_dir}')) destfilenames = [] for idx, filename, in enumerate(filenames): if variable: var = variable[idx] else: var = None func = f'get_data_{type_}' data = getattr(self, func)(filevar, filename, service_name, variable, idx, ) if data is None: continue filename, source, true_destfilename, var = data destfilename = join(self.destinations_dir, true_destfilename[1:]) makedirs(dirname(destfilename), exist_ok=True) self.log.info(_(f"{filevar['engine']} processing: '{destfilename}'")) self.engines[filevar['engine']].process(filename=filename, source=source, true_destfilename=true_destfilename, destfilename=destfilename, destdir=self.destinations_dir, variable=var, index=idx, rougail_variables_dict=self.rougail_variables_dict, eosfunc=self.eosfunc, ) self.process(true_destfilename, destfilename, filevar.get('mode'), filevar.get('owner'), filevar.get('group'), ) destfilenames.append(destfilename) return destfilenames async def instance_files(self) -> None: """Run templatisation on all files """ try: ori_dir = getcwd() except FileNotFoundError: ori_dir = None chdir(self.tmp_dir) for option in await self.config.option.list(type='all'): namespace = await option.option.name() is_variable_namespace = namespace == self.rougailconfig['variable_namespace'] if namespace == 'services': is_service_namespace = 'root' else: is_service_namespace = False self.rougail_variables_dict[namespace] = await self.load_variables(option, is_variable_namespace, is_service_namespace, ) for templates_dir in self.templates_dir: for template in listdir(templates_dir): self.prepare_template(template, templates_dir, ) files_to_delete = [] for included in (True, False): for service_obj in await self.config.option('services').list('all'): service_name = await service_obj.option.description() if await service_obj.option('activate').value.get() is False: if included is False and not await service_obj.information.get('undisable', False): self.desactive_service(service_name) continue if not included: engine = await service_obj.information.get('engine', None) if engine: self.instance_file({'engine': engine}, 'service', service_name, ) target_name = await service_obj.information.get('target', None) if target_name: self.target_service(service_name, target_name, engine is None, ) for fills in await service_obj.list('optiondescription'): type_ = await fills.option.name() for fill_obj in await fills.list('all'): fill = await fill_obj.value.dict() self.get_default(type_, fill, fill_obj) await self.get_informations(type_, fill, fill_obj) if 'included' in fill: if (fill['included'] == 'no' and included is True) or \ (fill['included'] != 'no' and included is False): continue elif included is True: continue if fill['activate']: destfilenames = self.instance_file(fill, type_, service_name, ) if included and fill.get('included', 'no') == 'content': files_to_delete.extend(destfilenames) else: self.log.debug(_(f"Instantiation of file '{fill['name']}' disabled")) self.post_instance_service(service_name) for filename in files_to_delete: unlink(filename) parent = filename while True: parent = dirname(parent) if listdir(parent): break rmdir(parent) self.post_instance() if ori_dir is not None: chdir(ori_dir) def get_default(self, type_: str, dico: dict, obj: 'Option', ) -> None: for key in DEFAULT.get(type_, []): default_key = f'default_{type_}_{key}' if default_key in RougailConfig: default_value = RougailConfig[default_key] else: default_value = undefined dico[key] = dico.get(key, default_value) async def get_informations(self, type_: str, dico: dict, obj: 'Option', ) -> None: for key in INFORMATIONS.get(type_, []): default_key = f'default_{type_}_{key}' if default_key in RougailConfig: default_value = RougailConfig[default_key] else: default_value = undefined dico[key] = await obj.information.get(key, default_value) def desactive_service(self, *args, ): raise NotImplementedError(_('cannot desactivate a service')) def target_service(self, service_name: str, *args, ): raise NotImplementedError(_('cannot use target for the service {service_name}')) def post_instance_service(self, *args, ): # pragma: no cover pass def process(self, *args, ): # pragma: no cover raise NotImplementedError(_('cannot processed')) def post_instance(self): # pragma: no cover pass def get_data_ip(self, *args, ) -> None: # pragma: no cover raise NotImplementedError(_('cannot instanciate this service type ip')) def get_data_files(self, *args, ) -> None: # pragma: no cover raise NotImplementedError(_('cannot instanciate this service type file')) def get_data_service(self, *args, ) -> None: # pragma: no cover raise NotImplementedError(_('cannot instanciate this service')) def get_data_overrides(self, *args, ) -> None: # pragma: no cover raise NotImplementedError(_('cannot instanciate this service type override')) async def load_variables(self, optiondescription, is_variable_namespace: str, is_service_namespace: str, ) -> RougailExtra: """Load all variables and set it in RougailExtra objects """ variables = {} for option in await optiondescription.list('all'): if await option.option.isoptiondescription(): if await option.option.isleadership(): for idx, suboption in enumerate(await option.list('all')): if idx == 0: leader_name = await suboption.option.name() leader = RougailLeader(leader_name, await suboption.value.get()) leadership_name = await option.option.name() if is_variable_namespace: self.rougail_variables_dict[await suboption.option.name()] = leader else: await leader._add_follower(self.config, await suboption.option.name(), await suboption.option.path(), ) variables[leadership_name] = RougailExtra({leader_name: leader}) else: if is_service_namespace == 'root': new_is_service_namespace = 'service_name' elif is_service_namespace == 'service_name': new_is_service_namespace = await option.option.name() elif is_service_namespace in INFORMATIONS: # remove 's' new_is_service_namespace = is_service_namespace[:-1] else: new_is_service_namespace = is_service_namespace subfamilies = await self.load_variables(option, is_variable_namespace, new_is_service_namespace, ) variables[await option.option.name()] = subfamilies else: if is_variable_namespace: value = await option.value.get() self.rougail_variables_dict[await option.option.name()] = value if await option.option.issymlinkoption() and await option.option.isfollower(): value = [] path = await option.option.path() for index in range(await option.value.len()): value.append(await self.config.option(path, index).value.get()) else: value = await option.value.get() variables[await option.option.name()] = value if isinstance(is_service_namespace, str) and is_service_namespace + 's' in INFORMATIONS: self.get_default(is_service_namespace + 's', variables, optiondescription, ) await self.get_informations(is_service_namespace + 's', variables, optiondescription, ) return RougailExtra(variables)