separate RougailTemplate to RougailSystemdTemplate

This commit is contained in:
2021-02-20 15:46:13 +01:00
parent ba41b27dbf
commit eb3720d6bf
76 changed files with 313 additions and 427 deletions

View File

@ -25,8 +25,8 @@ along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
from .convert import RougailConvert
from .template.base import RougailTemplate
from .template.systemd import RougailSystemdTemplate
from .config import RougailConfig
from .annotator import modes
__ALL__ = ('RougailConvert', 'RougailTemplate', 'RougailConfig', 'modes')
__ALL__ = ('RougailConvert', 'RougailSystemdTemplate', 'RougailConfig', 'modes')

View File

@ -46,7 +46,6 @@ class ServiceAnnotator:
<services>
<service name="test">
<service_access service='ntp'>
<port protocol='udp' service_accesslist='ntp_udp'>123</port>
</service_access>
</service>
</services>
@ -54,6 +53,7 @@ class ServiceAnnotator:
def __init__(self, objectspace):
self.objectspace = objectspace
self.uniq_ip = []
self.uniq_overrides = []
if 'network_type' not in self.objectspace.types:
self.objectspace.types['network_type'] = self.objectspace.types['ip_type']
if hasattr(self.objectspace.space, 'services'):
@ -234,18 +234,20 @@ class ServiceAnnotator:
return variable
def _update_override(self,
file_,
override,
service_name,
):
file_.name = f'/systemd/system/{service_name}.service.d/rougail.conf'
# retrieve default value from File object
for attr in ['owner', 'group', 'mode']:
setattr(file_, attr, getattr(self.objectspace.file, attr))
if not hasattr(file_, 'source'):
file_.source = f'{service_name}.service'
self._update_file(file_,
service_name,
)
if service_name in self.uniq_overrides:
msg = _(f'only one override is allowed by service, '
f'please use a variable multiple if you want have more than one IP')
raise DictConsistencyError(msg, 69, ip.xmlfiles)
self.uniq_overrides.append(service_name)
override.name = service_name
if not hasattr(override, 'engine'):
override.engine = RougailConfig['default_engine']
if not hasattr(override, 'source'):
override.source = f'{service_name}.service'
@staticmethod
def _update_file(file_,
@ -268,7 +270,7 @@ class ServiceAnnotator:
service_name,
) -> None:
if service_name in self.uniq_ip:
msg = _(f'only one IP is allowed by IP, '
msg = _(f'only one IP is allowed by service, '
f'please use a variable multiple if you want have more than one IP')
raise DictConsistencyError(msg, 67, ip.xmlfiles)
self.uniq_ip.append(service_name)
@ -284,16 +286,3 @@ class ServiceAnnotator:
if netmask.type != 'netmask':
msg = _(f'netmask in ip must have type "netmask", not "{netmask.type}"')
raise DictConsistencyError(msg, 65, ip.xmlfiles)
# Convert IP to file
ip.network_type = ip.ip_type
ip.network = ip.name
ip.ip_type = 'filename'
ip.name = f'/systemd/system/{service_name}.service.d/rougail_ip.conf'
# retrieve default value from File object
for attr in ['owner', 'group', 'mode']:
setattr(ip, attr, getattr(self.objectspace.file, attr))
ip.source = None
ip.engine = 'creole'
self._update_file(ip,
service_name,
)

View File

@ -46,15 +46,10 @@
<!ELEMENT services (service*)>
<!ELEMENT service ((port*|ip*|file*|override*)*)>
<!ELEMENT service ((ip*|file*|override*)*)>
<!ATTLIST service name CDATA #REQUIRED>
<!ATTLIST service manage (True|False) "True">
<!ELEMENT port (#PCDATA)>
<!ATTLIST port port_type (port|variable) "port">
<!ATTLIST port portlist CDATA #IMPLIED>
<!ATTLIST port protocol (tcp|udp) "tcp">
<!ELEMENT ip (#PCDATA)>
<!ATTLIST ip iplist CDATA #IMPLIED>
<!ATTLIST ip ip_type (variable) "variable">
@ -130,7 +125,7 @@
<!ATTLIST param optional (True|False) "False">
<!ELEMENT target (#PCDATA)>
<!ATTLIST target type (variable|family|filelist|iplist|portlist) "variable">
<!ATTLIST target type (variable|family|filelist|iplist) "variable">
<!ATTLIST target optional (True|False) "False">
<!ELEMENT group (follower+)>

View File

@ -31,7 +31,7 @@ from typing import Dict, Any
from subprocess import call
from os import listdir, makedirs, getcwd, chdir
from os.path import dirname, join, isfile, isdir, abspath
from ipaddress import ip_network
try:
from tiramisu3 import Config
@ -50,13 +50,6 @@ ENGINES = {}
for engine in engines.__all__:
ENGINES[engine] = getattr(engines, engine)
ROUGAIL_IP_TEMPLATE = """[Service]
%for %%ip in %%rougail_variable
IPAddressAllow=%%ip
%end for
IPAddressDeny=any
"""
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
@ -170,13 +163,16 @@ class RougailExtra:
def __getattr__(self,
key: str,
) -> Any:
return self.suboption[key]
try:
return self.suboption[key]
except KeyError:
raise AttributeError
def __iter__(self):
return iter(self.suboption.values())
class RougailTemplate:
class RougailBaseTemplate:
"""Engine to process Creole cheetah template
"""
def __init__(self, # pylint: disable=R0913
@ -200,6 +196,8 @@ class RougailTemplate:
self.eosfunc = eos
self.rougail_variables_dict = {}
self.rougailconfig = rougailconfig
self.log = log
self.engines = ENGINES
def patch_template(self,
filename: str,
@ -211,13 +209,13 @@ class RougailTemplate:
patch_file = join(self.patches_dir, f'{filename}.patch')
if isfile(patch_file):
log.info(_("Patching template '{filename}' with '{patch_file}'"))
self.log.info(_("Patching template '{filename}' with '{patch_file}'"))
ret = call(patch_cmd + patch_no_debug + ['-i', patch_file])
if ret: # pragma: no cover
patch_cmd_err = ' '.join(patch_cmd + ['-i', patch_file])
msg = _(f"Error applying patch: '{patch_file}'\n"
f"To reproduce and fix this error {patch_cmd_err}")
log.error(_(msg))
self.log.error(_(msg))
copy(join(self.templates_dir, filename), self.tmp_dir)
def prepare_template(self,
@ -225,7 +223,7 @@ class RougailTemplate:
) -> None:
"""Prepare template source file
"""
log.info(_("Copy template: '{filename}' -> '{self.tmp_dir}'"))
self.log.info(_("Copy template: '{filename}' -> '{self.tmp_dir}'"))
if not isdir(self.tmp_dir):
raise TemplateError(_(f'cannot find tmp_dir {self.tmp_dir}'))
copy(filename, self.tmp_dir)
@ -233,29 +231,16 @@ class RougailTemplate:
def instance_file(self,
filevar: Dict,
object_type: str,
type: str,
service_name: str,
) -> None:
"""Run templatisation on one file
"""
log.info(_("Instantiating file '{filename}'"))
self.log.info(_("Instantiating file '{filename}'"))
if 'variable' in filevar:
variable = filevar['variable']
else:
variable = None
if 'network' in filevar:
if 'netmask' in filevar:
if isinstance(filevar['network'], list):
variable = [str(ip_network(f'{net}/{mask}'))
for net, mask in zip(filevar['network'], filevar['netmask'])]
else:
variable = str(ip_network(f'{filevar["network"]}/{filevar["netmask"]}'))
else:
variable = filevar['network']
if not isinstance(variable, list):
if variable is None:
variable = []
else:
variable = [variable]
filenames = filevar['name']
if not isinstance(filenames, list):
filenames = [filenames]
@ -263,30 +248,29 @@ class RougailTemplate:
variable = [variable]
if not isdir(self.destinations_dir):
raise TemplateError(_(f'cannot find destinations_dir {self.destinations_dir}'))
for idx, destfile in enumerate(filenames):
destfilename = join(self.destinations_dir, destfile[1:])
makedirs(dirname(destfilename), exist_ok=True)
if object_type == 'ip':
var = variable
elif variable:
for idx, filename, in enumerate(filenames):
if variable:
var = variable[idx]
else:
var = None
if object_type == 'ip':
filename = None
source = ROUGAIL_IP_TEMPLATE
else:
filename = join(self.tmp_dir, filevar['source'])
source = None
log.info(_(f"{filevar['engine']} processing: '{destfilename}'"))
ENGINES[filevar['engine']].process(filename=filename,
source=source,
true_destfilename=destfile,
destfilename=destfilename,
variable=var,
rougail_variables_dict=self.rougail_variables_dict,
eosfunc=self.eosfunc,
)
func = f'_instance_{type}'
filename, source, destfile, var = getattr(self, func)(filevar,
filename,
service_name,
variable,
idx,
)
destfilename = join(self.destinations_dir, destfile[1:])
makedirs(dirname(destfilename), exist_ok=True)
self.log.info(_(f"{filevar['engine']} processing: '{destfilename}'"))
self.engines[filevar['engine']].process(filename=filename,
source=source,
true_destfilename=destfile,
destfilename=destfilename,
variable=var,
rougail_variables_dict=self.rougail_variables_dict,
eosfunc=self.eosfunc,
)
async def instance_files(self) -> None:
"""Run templatisation on all files
@ -302,21 +286,36 @@ class RougailTemplate:
for template in listdir('.'):
self.prepare_template(template)
for service_obj in await self.config.option('services').list('all'):
service_name = await service_obj.option.name()
for fills in await service_obj.list('all'):
type_ = await fills.option.name()
if type_ in ['files', 'overrides', 'ip']:
for fill_obj in await fills.list('all'):
fill = await fill_obj.value.dict()
if type_ != 'ip':
filename = fill['source']
if not isfile(filename): # pragma: no cover
raise FileNotFound(_(f"File {filename} does not exist."))
if fill['activate']:
self.instance_file(fill, type_)
else:
log.debug(_("Instantiation of file '{filename}' disabled"))
for fill_obj in await fills.list('all'):
fill = await fill_obj.value.dict()
if fill['activate']:
self.instance_file(fill, type_, service_name)
else:
self.log.debug(_("Instantiation of file '{filename}' disabled"))
self.post_instance()
chdir(ori_dir)
def post_instance(self):
pass
def _instance_ip(self,
*args,
) -> None:
raise NotImplementedError(_('cannot instanciate this service type ip'))
def _instance_files(self,
*args,
) -> None:
raise NotImplementedError(_('cannot instanciate this service type file'))
def _instance_overrides(self,
*args,
) -> None:
raise NotImplementedError(_('cannot instanciate this service type override'))
async def load_variables(self,
optiondescription,
is_variable_namespace,

View File

@ -29,6 +29,7 @@ from Cheetah.Template import Template
from Cheetah.NameMapper import NotFound
from typing import Dict, Any
from ...i18n import _
from ...utils import normalize_family
from ...error import TemplateError
@ -117,10 +118,16 @@ def process(filename: str,
data = str(cheetah_template)
except NotFound as err: # pragma: no cover
varname = err.args[0][13:-1]
msg = f"Error: unknown variable used in template {filename} to {destfilename}: {varname}"
if filename:
msg = f"Error: unknown variable used in template {filename} to {destfilename}: {varname}"
else:
msg = f"Error: unknown variable used in file {destfilename}: {varname}"
raise TemplateError(_(msg)) from err
except Exception as err: # pragma: no cover
msg = _(f"Error while instantiating template {filename} to {destfilename}: {err}")
if filename:
msg = _(f"Error while instantiating template {filename} to {destfilename}: {err}")
else:
msg = _(f"Error while instantiating filename {destfilename}: {err}")
raise TemplateError(msg) from err
with open(destfilename, 'w') as file_h:

View File

@ -0,0 +1,131 @@
"""Template langage for Rougail to create file and systemd file
Cadoles (http://www.cadoles.com)
Copyright (C) 2021
distribued with GPL-2 or later license
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
from typing import Dict
from os import makedirs
from os.path import dirname, isfile, join
from ipaddress import ip_network
from .base import RougailBaseTemplate
from ..i18n import _
from ..error import FileNotFound
ROUGAIL_IP_TEMPLATE = """[Service]
%for %%ip in %%rougail_variable
IPAddressAllow=%%ip
%end for
IPAddressDeny=any
"""
ROUGAIL_TMPL_TEMPLATE = """%def display(%%file, %%filename)
%if %%filename.startswith('/etc/') or %%filename.startswith('/var/') or %%filename.startswith('/srv/')
C %%filename %%file.mode %%file.owner %%file.group - /usr/local/lib%%filename
z %%filename - - - - -
%end if
%end def
%for %%service in %%services
%if %%hasattr(%%service, 'files')
%for %%file in %%service.files
%if %%file.activate == True
%if %%isinstance(%%file.name, list)
%for %%filename in %%file.name
%%display(%%file, %%filename)%slurp
%end for
%else
%%display(%%file, %%file.name)%slurp
%end if
%end if
%end for
%end if
%end for"""
class RougailSystemdTemplate(RougailBaseTemplate):
def _instance_files(self,
filevar: Dict,
destfile: str,
service_name: str,
variable,
idx: int,
) -> tuple:
source = filevar['source']
if not isfile(source): # pragma: no cover
raise FileNotFound(_(f"File {source} does not exist."))
tmp_file = join(self.tmp_dir, source)
#self.instance_file(fill, 'files')
if variable:
var = variable[idx]
else:
var = None
return tmp_file, None, destfile, var
def _instance_overrides(self,
filevar: Dict,
destfile,
service_name: str,
*args,
) -> tuple:
source = filevar['source']
if not isfile(source): # pragma: no cover
raise FileNotFound(_(f"File {source} does not exist."))
tmp_file = join(self.tmp_dir, source)
service_name = filevar['name']
return tmp_file, None, f'/systemd/system/{service_name}.service.d/rougail.conf', None
def _instance_ip(self,
filevar: Dict,
destfile,
service_name: str,
*args,
) -> tuple:
if 'netmask' in filevar:
if isinstance(filevar['name'], list):
variable = [str(ip_network(f'{net}/{mask}'))
for net, mask in zip(filevar['name'], filevar['netmask'])]
else:
variable = str(ip_network(f'{filevar["name"]}/{filevar["netmask"]}'))
else:
variable = filevar['name']
if not isinstance(variable, list):
if variable is None:
variable = []
else:
variable = [variable]
filevar['engine'] = 'creole'
return None, ROUGAIL_IP_TEMPLATE, f'/systemd/system/{service_name}.service.d/rougail_ip.conf', variable
def post_instance(self):
destfile = '/tmpfiles.d/rougail.conf'
destfilename = join(self.destinations_dir, destfile[1:])
makedirs(dirname(destfilename), exist_ok=True)
self.log.info(_(f"creole processing: '{destfilename}'"))
self.engines['creole'].process(filename=None,
source=ROUGAIL_TMPL_TEMPLATE,
true_destfilename=destfile,
destfilename=destfilename,
variable=None,
rougail_variables_dict=self.rougail_variables_dict,
eosfunc=self.eosfunc,
)