rougail/src/rougail/annotator/service.py

237 lines
9.8 KiB
Python

"""Annotate services
"""
from os.path import basename
from typing import Tuple
from ..i18n import _
from ..utils import normalize_family
from ..error import DictConsistencyError
# a CreoleObjSpace's attribute has some annotations
# that shall not be present in the exported (flatened) XML
ERASED_ATTRIBUTES = ('redefine', 'exists', 'fallback', 'optional', 'remove_check', 'namespace',
'remove_condition', 'path', 'instance_mode', 'index', 'is_in_leadership',
'level', 'remove_fill', 'xmlfiles', 'type', 'reflector_name',
'reflector_object', 'manage')
KEY_TYPE = {'variable': 'symlink',
'PortOption': 'port',
'UnicodeOption': 'string',
'NetworkOption': 'network',
'NetmaskOption': 'netmask',
'URLOption': 'web_address',
'FilenameOption': 'filename',
}
class ServiceAnnotator:
"""Manage service's object
for example::
<services>
<service name="test">
<service_access service='ntp'>
<port protocol='udp' service_accesslist='ntp_udp'>123</port>
</service_access>
</service>
</services>
"""
def __init__(self, objectspace):
self.objectspace = objectspace
if hasattr(self.objectspace.space, 'services'):
if not hasattr(self.objectspace.space.services, 'service'):
del self.objectspace.space.services
else:
self.convert_services()
def convert_services(self):
"""convert services to variables
"""
self.objectspace.space.services.hidden = True
self.objectspace.space.services.name = 'services'
self.objectspace.space.services.doc = 'services'
self.objectspace.space.services.path = 'services'
for service_name, service in self.objectspace.space.services.service.items():
service.information = self.objectspace.information(service.xmlfiles)
service.information.manage = service.manage
service.manage = None
for elttype, values in dict(vars(service)).items():
if not isinstance(values, (dict, list)) or elttype in ERASED_ATTRIBUTES:
continue
eltname = elttype + 's'
path = '.'.join(['services', normalize_family(service_name), eltname])
family = self._gen_family(eltname,
path,
service.xmlfiles,
)
if isinstance(values, dict):
values = list(values.values())
family.family = self.make_group_from_elts(service_name,
elttype,
values,
path,
)
setattr(service, elttype, family)
service.doc = service.name
def make_group_from_elts(self,
service_name,
elttype,
elts,
path,
):
"""Splits each objects into a group (and `OptionDescription`, in tiramisu terms)
and build elements and its attributes (the `Options` in tiramisu terms)
"""
families = []
listname = '{}list'.format(elttype)
for elt in elts:
# try to launch _update_xxxx() function
update_elt = '_update_' + elttype
if hasattr(self, update_elt):
getattr(self, update_elt)(elt,
service_name,
)
c_name, subpath = self._get_name_path(elt,
path,
)
family = self._gen_family(c_name,
subpath,
elt.xmlfiles,
)
family.variable = []
activate_obj = self._generate_element('boolean',
'activate',
True,
elt.xmlfiles,
'.'.join([subpath, 'activate']),
)
for key in dir(elt):
if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES:
continue
value = getattr(elt, key)
if key == listname:
self.objectspace.list_conditions.setdefault(listname,
{}).setdefault(
value,
[]).append(activate_obj)
continue
family.variable.append(self._generate_element(self._get_type(key,
elttype,
elt,
),
key,
value,
elt.xmlfiles,
f'{subpath}.{key}'
))
# FIXME ne devrait pas etre True par défaut
# devrait etre un calcule
family.variable.append(activate_obj)
families.append(family)
return families
def _get_name_path(self,
elt,
path: str,
) -> Tuple[str, str]:
# create element name, if already exists, add _xx to be uniq
if hasattr(elt, 'source'):
name = elt.source
else:
name = elt.name
idx = 0
while True:
c_name = name
if idx:
c_name += f'_{idx}'
subpath = '{}.{}'.format(path, normalize_family(c_name))
try:
self.objectspace.paths.get_family(subpath, 'services')
except DictConsistencyError as err:
if err.errno == 42:
return c_name, subpath
idx += 1
def _gen_family(self,
name,
path,
xmlfiles
):
family = self.objectspace.family(xmlfiles)
family.name = normalize_family(name)
family.doc = name
family.mode = None
self.objectspace.paths.add_family('services',
path,
family,
)
return family
def _generate_element(self,
type_,
key,
value,
xmlfiles,
path,
): # pylint: disable=R0913
variable = self.objectspace.variable(xmlfiles)
variable.name = normalize_family(key)
variable.mode = None
variable.type = type_
if type_ == 'symlink':
variable.opt = self.objectspace.paths.get_variable(value)
variable.multi = None
else:
variable.doc = key
variable.default = value
variable.namespace = 'services'
self.objectspace.paths.add_variable('services',
path,
'service',
False,
variable,
)
return variable
def _get_type(self,
key: str,
elt_name: str,
elt,
) -> str:
if key == 'name':
dtd_key_type = elt_name + '_type'
else:
dtd_key_type = key + '_type'
if key in self.objectspace.booleans_attributs:
return 'boolean'
if hasattr(elt, dtd_key_type):
return KEY_TYPE[getattr(elt, dtd_key_type)]
return 'string'
def _update_override(self,
file_,
service_name,
):
file_.name = f'/systemd/system/{service_name}.service.d/rougail.conf'
# retrieve default value from File object
for attr in ['owner', 'group', 'mode']:
setattr(file_, attr, getattr(self.objectspace.file, attr))
if not hasattr(file_, 'source'):
file_.source = f'{service_name}.service'
self._update_file(file_,
service_name,
)
def _update_file(self,
file_,
service_name,
):
if not hasattr(file_, 'file_type') or file_.file_type == "UnicodeOption":
if not hasattr(file_, 'source'):
file_.source = basename(file_.name)
elif not hasattr(file_, 'source'):
xmlfiles = self.objectspace.display_xmlfiles(file_.xmlfiles)
msg = _(f'attribute "source" is mandatory for the file "{file_.name}" '
f'"({service_name})" in {xmlfiles}')
raise DictConsistencyError(msg, 34)