leader/follower support
This commit is contained in:
@ -12,7 +12,7 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
from typing import Union, List, Optional
|
||||
from typing import Union, List, Dict, Tuple, Optional, Any
|
||||
from argparse import ArgumentParser, Namespace, SUPPRESS, _HelpAction, HelpFormatter
|
||||
from copy import copy
|
||||
from gettext import gettext as _
|
||||
@ -20,10 +20,11 @@ from gettext import gettext as _
|
||||
|
||||
try:
|
||||
from tiramisu import Config
|
||||
from tiramisu.error import PropertiesOptionError
|
||||
from tiramisu.error import PropertiesOptionError, RequirementError
|
||||
except ModuleNotFoundError:
|
||||
Config = None
|
||||
from tiramisu_json_api.error import PropertiesOptionError
|
||||
RequirementError = PropertiesOptionError
|
||||
try:
|
||||
from tiramisu_json_api import Config as ConfigJson
|
||||
if Config is None:
|
||||
@ -33,34 +34,71 @@ except ModuleNotFoundError:
|
||||
|
||||
|
||||
class TiramisuNamespace(Namespace):
|
||||
def _populate(self):
|
||||
#self._config.property.read_only()
|
||||
def __init__(self,
|
||||
config: Config) -> None:
|
||||
super().__setattr__('_config', config)
|
||||
super().__setattr__('list_force_no', {})
|
||||
self._populate()
|
||||
super().__init__()
|
||||
|
||||
def _populate(self) -> None:
|
||||
for tiramisu_key, tiramisu_value in self._config.value.dict().items():
|
||||
option = self._config.option(tiramisu_key)
|
||||
if not option.option.issymlinkoption():
|
||||
if tiramisu_value == [] and option.option.ismulti() and option.owner.isdefault():
|
||||
if tiramisu_value == [] and \
|
||||
option.option.ismulti() and \
|
||||
option.owner.isdefault():
|
||||
tiramisu_value = None
|
||||
super().__setattr__(tiramisu_key, tiramisu_value)
|
||||
#self._config.property.read_write()
|
||||
|
||||
def __init__(self, config):
|
||||
self._config = config
|
||||
super().__init__()
|
||||
def __setattr__(self,
|
||||
key: str,
|
||||
value: Any) -> None:
|
||||
if key in self.list_force_no:
|
||||
true_key = self.list_force_no[key]
|
||||
else:
|
||||
true_key = key
|
||||
option = self._config.option(true_key)
|
||||
if option.option.isfollower():
|
||||
_setattr = self._setattr_follower
|
||||
true_value = ','.join(value[1:])
|
||||
else:
|
||||
_setattr = self._setattr
|
||||
true_value = value
|
||||
try:
|
||||
_setattr(option, true_key, key, value)
|
||||
except ValueError as err:
|
||||
if option.option.type() == 'choice':
|
||||
raise ValueError("invalid choice: '{}' (choose from {})".format(true_value, ', '.join([f"'{val}'" for val in option.value.list()])))
|
||||
else:
|
||||
raise err
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key == '_config':
|
||||
super().__setattr__(key, value)
|
||||
return
|
||||
# self._config.property.read_write()
|
||||
option = self._config.option(key)
|
||||
if option.option.ismulti() and value is not None and not isinstance(value, list):
|
||||
def _setattr(self,
|
||||
option: 'Option',
|
||||
true_key: str,
|
||||
key: str,
|
||||
value: Any) -> None:
|
||||
if option.option.ismulti() and \
|
||||
value is not None and \
|
||||
not isinstance(value, list):
|
||||
value = [value]
|
||||
option.value.set(value)
|
||||
|
||||
def __getattribute__(self, key):
|
||||
if key == '__dict__' and hasattr(self, '_config'):
|
||||
self._populate()
|
||||
return super().__getattribute__(key)
|
||||
def _setattr_follower(self,
|
||||
option: 'Option',
|
||||
true_key: str,
|
||||
key: str,
|
||||
value: Any) -> None:
|
||||
if not value[0].isdecimal():
|
||||
raise ValueError('index must be a number, not {}'.format(value[0]))
|
||||
index = int(value[0])
|
||||
if option.option.type() == 'boolean':
|
||||
value = key not in self.list_force_no
|
||||
elif option.option.issubmulti():
|
||||
value = value[1:]
|
||||
else:
|
||||
value = value[1]
|
||||
self._config.option(true_key, index).value.set(value)
|
||||
|
||||
|
||||
class TiramisuHelpFormatter(HelpFormatter):
|
||||
@ -81,6 +119,62 @@ class _TiramisuHelpAction(_HelpAction):
|
||||
_HelpAction.__call__(self, parser, None, None)
|
||||
|
||||
|
||||
class _BuildKwargs:
|
||||
def __init__(self,
|
||||
name: str,
|
||||
option: 'Option',
|
||||
cmdlineparser: 'TiramisuCmdlineParser',
|
||||
properties: List[str],
|
||||
force_no: bool) -> None:
|
||||
self.kwargs = {}
|
||||
self.cmdlineparser = cmdlineparser
|
||||
self.properties = properties
|
||||
self.force_no = force_no
|
||||
if not self.force_no:
|
||||
self.kwargs['help'] = option.doc().replace('%', '%%')
|
||||
if 'positional' not in self.properties:
|
||||
is_short_name = self.cmdlineparser._is_short_name(name, 'longargument' in self.properties)
|
||||
if self.force_no:
|
||||
ga_name = self.gen_argument_name(name, is_short_name)
|
||||
self.cmdlineparser.namespace.list_force_no[ga_name] = option.path()
|
||||
else:
|
||||
ga_name = name
|
||||
self.kwargs['dest'] = self.gen_argument_name(option.path(), False)
|
||||
self.args = [self.cmdlineparser._gen_argument(ga_name, is_short_name)]
|
||||
else:
|
||||
self.args = [option.path()]
|
||||
|
||||
def __setitem__(self,
|
||||
key: str,
|
||||
value: Any) -> None:
|
||||
self.kwargs[key] = value
|
||||
|
||||
def add_argument(self,
|
||||
option: 'Option'):
|
||||
is_short_name = self.cmdlineparser._is_short_name(option.name(), 'longargument' in self.properties)
|
||||
if self.force_no:
|
||||
name = self.gen_argument_name(option.name(), is_short_name)
|
||||
else:
|
||||
name = option.name()
|
||||
self.args.insert(0, self.cmdlineparser._gen_argument(name, is_short_name))
|
||||
|
||||
def gen_argument_name(self, name, is_short_name):
|
||||
if self.force_no:
|
||||
if is_short_name:
|
||||
prefix = 'n'
|
||||
else:
|
||||
prefix = 'no-'
|
||||
if '.' in name:
|
||||
sname = name.rsplit('.', 1)
|
||||
name = sname[0] + '.' + prefix + sname[1]
|
||||
else:
|
||||
name = prefix + name
|
||||
return name
|
||||
|
||||
def get(self) -> Tuple[Dict]:
|
||||
return self.args, self.kwargs
|
||||
|
||||
|
||||
class TiramisuCmdlineParser(ArgumentParser):
|
||||
def __init__(self,
|
||||
config: Union[Config, ConfigJson],
|
||||
@ -91,6 +185,7 @@ class TiramisuCmdlineParser(ArgumentParser):
|
||||
self.fullpath = fullpath
|
||||
self.config = config
|
||||
kwargs['formatter_class'] = TiramisuHelpFormatter
|
||||
self.namespace = TiramisuNamespace(self.config)
|
||||
super().__init__(*args, **kwargs)
|
||||
self.register('action', 'help', _TiramisuHelpAction)
|
||||
self._config_to_argparser(_forhelp,
|
||||
@ -118,9 +213,19 @@ class TiramisuCmdlineParser(ArgumentParser):
|
||||
actions.pop(0)
|
||||
return super()._match_arguments_partial(actions, arg_string_pattern)
|
||||
|
||||
def _parse_known_args(self, args=None, namespace=None):
|
||||
def _is_short_name(self, name, longargument):
|
||||
return len(name) == 1 and not longargument
|
||||
|
||||
namespace_, args_ = super()._parse_known_args(args, namespace)
|
||||
def _gen_argument(self, name, is_short_name):
|
||||
if is_short_name:
|
||||
return self.prefix_chars + name
|
||||
return self.prefix_chars * 2 + name
|
||||
|
||||
def _parse_known_args(self, args=None, namespace=None):
|
||||
try:
|
||||
namespace_, args_ = super()._parse_known_args(args, namespace)
|
||||
except ValueError as err:
|
||||
self.error(err)
|
||||
if args != args_ and args_ and args_[0].startswith(self.prefix_chars):
|
||||
# option that was disabled are no more disable
|
||||
# so create a new parser
|
||||
@ -148,21 +253,39 @@ class TiramisuCmdlineParser(ArgumentParser):
|
||||
def add_subparsers(self, *args, **kwargs):
|
||||
raise NotImplementedError(_('do not use add_subparsers'))
|
||||
|
||||
def _gen_argument(self, name, longargument, no_prefix=False):
|
||||
shortarg = len(name) == 1 and not longargument
|
||||
if no_prefix:
|
||||
if shortarg:
|
||||
prefix = 'n'
|
||||
def _option_is_not_default(self,
|
||||
properties,
|
||||
type,
|
||||
name,
|
||||
value):
|
||||
if 'positional' not in properties:
|
||||
is_short_name = self._is_short_name(name, 'longargument' in properties)
|
||||
self.prog += ' {}'.format(self._gen_argument(name, is_short_name))
|
||||
if type != 'boolean':
|
||||
self.prog += f' {value}'
|
||||
|
||||
def _config_list(self,
|
||||
config: Config,
|
||||
prefix: Optional[str],
|
||||
_forhelp: bool,
|
||||
group):
|
||||
for obj in config.list(type='all'):
|
||||
# do not display frozen option
|
||||
if 'frozen' in obj.option.properties():
|
||||
continue
|
||||
if obj.option.isoptiondescription():
|
||||
if _forhelp:
|
||||
group = self.add_argument_group(obj.option.doc())
|
||||
if prefix:
|
||||
prefix_ = prefix + '.' + obj.option.name()
|
||||
else:
|
||||
prefix_ = obj.option.path()
|
||||
self._config_to_argparser(_forhelp, obj, prefix_, group)
|
||||
elif obj.option.type() == 'boolean' and not obj.option.issymlinkoption():
|
||||
yield obj, False
|
||||
yield obj, True
|
||||
else:
|
||||
prefix = 'no-'
|
||||
if '.' in name:
|
||||
sname = name.rsplit('.', 1)
|
||||
name = sname[0] + '.' + prefix + sname[1]
|
||||
else:
|
||||
name = prefix + name
|
||||
if shortarg:
|
||||
return self.prefix_chars + name
|
||||
return self.prefix_chars * 2 + name
|
||||
yield obj, None
|
||||
|
||||
def _config_to_argparser(self,
|
||||
_forhelp: bool,
|
||||
@ -172,107 +295,128 @@ class TiramisuCmdlineParser(ArgumentParser):
|
||||
if group is None:
|
||||
group = super()
|
||||
actions = {}
|
||||
for obj in config.list(type='all'):
|
||||
leadership_len = None
|
||||
for obj, force_no in self._config_list(config, prefix, _forhelp, group):
|
||||
option = obj.option
|
||||
if option.isoptiondescription():
|
||||
if _forhelp:
|
||||
group = self.add_argument_group(option.doc())
|
||||
if prefix:
|
||||
prefix_ = prefix + '.' + option.name()
|
||||
else:
|
||||
prefix_ = option.path()
|
||||
self._config_to_argparser(_forhelp, obj, prefix_, group)
|
||||
continue
|
||||
if 'frozen' in option.properties():
|
||||
continue
|
||||
if option.isleader():
|
||||
value = obj.value.get()
|
||||
leadership_len = len(value)
|
||||
elif option.isfollower():
|
||||
value = []
|
||||
for index in range(leadership_len):
|
||||
value.append(self.config.option(obj.option.path(), index).value.get())
|
||||
else:
|
||||
value = obj.value.get()
|
||||
name = option.name()
|
||||
if name.startswith(self.prefix_chars):
|
||||
raise ValueError(_('name cannot startswith "{}"').format(self.prefix_chars))
|
||||
if self.fullpath and prefix:
|
||||
name = prefix + '.' + name
|
||||
properties = obj.property.get()
|
||||
kwargs = {'help': option.doc().replace('%', '%%')}
|
||||
if option.isfollower():
|
||||
properties = obj.option.properties()
|
||||
else:
|
||||
properties = obj.property.get()
|
||||
if option.issymlinkoption():
|
||||
symlink_name = option.name(follow_symlink=True)
|
||||
if symlink_name in actions:
|
||||
actions[symlink_name][0][0].insert(0, self._gen_argument(option.name(), 'longargument' in properties))
|
||||
if len(actions[symlink_name]) == 2:
|
||||
actions[symlink_name][1][0].insert(0, self._gen_argument(option.name(), False, True))
|
||||
for action in actions[symlink_name]:
|
||||
action.add_argument(option)
|
||||
continue
|
||||
if _forhelp and not obj.owner.isdefault() and obj.value.get() is not None:
|
||||
if 'positional' not in properties:
|
||||
self.prog += ' {}'.format(self._gen_argument(name, 'longargument' in properties))
|
||||
if option.type() != 'boolean':
|
||||
self.prog += ' {}'.format(obj.value.get())
|
||||
kwargs = _BuildKwargs(name, option, self, properties, force_no)
|
||||
if not option.isfollower() and _forhelp and not obj.owner.isdefault() and value is not None:
|
||||
if not force_no:
|
||||
self._option_is_not_default(properties,
|
||||
option.type(),
|
||||
name,
|
||||
value)
|
||||
else:
|
||||
if 'positional' in properties:
|
||||
if option.type() == 'boolean':
|
||||
raise ValueError(_('boolean option must not be positional'))
|
||||
# if not 'mandatory' in properties:
|
||||
# raise ValueError('"positional" argument must be "mandatory" too')
|
||||
args = [option.path()]
|
||||
if not 'mandatory' in properties:
|
||||
raise ValueError('"positional" argument must be "mandatory" too')
|
||||
if _forhelp:
|
||||
kwargs['default'] = obj.value.default()
|
||||
else:
|
||||
kwargs['default'] = obj.value.get()
|
||||
kwargs['default'] = value
|
||||
kwargs['nargs'] = '?'
|
||||
else:
|
||||
kwargs['dest'] = option.path()
|
||||
kwargs['default'] = SUPPRESS
|
||||
if _forhelp and 'mandatory' in properties:
|
||||
kwargs['required'] = True
|
||||
if option.type() == 'boolean':
|
||||
if option.type() == 'boolean' and not option.isfollower():
|
||||
if 'storefalse' in properties:
|
||||
if force_no:
|
||||
action = 'store_true'
|
||||
else:
|
||||
action = 'store_false'
|
||||
elif force_no:
|
||||
action = 'store_false'
|
||||
no_action = 'store_true'
|
||||
else:
|
||||
action = 'store_true'
|
||||
no_action = 'store_false'
|
||||
kwargs['action'] = action
|
||||
args = [self._gen_argument(name, 'longargument' in properties)]
|
||||
#
|
||||
nkwargs = copy(kwargs)
|
||||
nkwargs['action'] = no_action
|
||||
del nkwargs['help']
|
||||
nargs = [self._gen_argument(name, 'longargument' in properties, True)]
|
||||
actions[option.name()] = [(args, kwargs), (nargs, nkwargs)]
|
||||
continue
|
||||
args = [self._gen_argument(name, 'longargument' in properties)]
|
||||
if _forhelp:
|
||||
value = obj.value.default()
|
||||
else:
|
||||
value = obj.value.get()
|
||||
if value not in [None, []]:
|
||||
#kwargs['default'] = kwargs['const'] = option.default()
|
||||
#kwargs['action'] = 'store_const'
|
||||
kwargs['nargs'] = '?'
|
||||
if option.ismulti():
|
||||
if _forhelp and 'mandatory' in properties:
|
||||
kwargs['nargs'] = '+'
|
||||
else:
|
||||
kwargs['nargs'] = '*'
|
||||
if option.type() == 'string':
|
||||
pass
|
||||
elif option.type() == 'integer':
|
||||
kwargs['type'] = int
|
||||
elif option.type() == 'choice':
|
||||
kwargs['choices'] = obj.value.list()
|
||||
else:
|
||||
pass
|
||||
#raise NotImplementedError('not supported yet')
|
||||
actions[option.name()] = [(args, kwargs)]
|
||||
if option.type() == 'boolean':
|
||||
kwargs['metavar'] = 'INDEX'
|
||||
if option.type() != 'boolean':
|
||||
if _forhelp:
|
||||
value = obj.value.default()
|
||||
if value not in [None, []]:
|
||||
#kwargs['default'] = kwargs['const'] = option.default()
|
||||
#kwargs['action'] = 'store_const'
|
||||
kwargs['nargs'] = '?'
|
||||
|
||||
if not option.isfollower() and option.ismulti():
|
||||
if _forhelp and 'mandatory' in properties:
|
||||
kwargs['nargs'] = '+'
|
||||
else:
|
||||
kwargs['nargs'] = '*'
|
||||
if option.isfollower() and not option.type() == 'boolean':
|
||||
metavar = option.name().upper()
|
||||
if option.issubmulti():
|
||||
kwargs['nargs'] = '+'
|
||||
else:
|
||||
kwargs['nargs'] = 2
|
||||
if _forhelp and 'mandatory' not in properties:
|
||||
metavar = f'[{metavar}]'
|
||||
if option.type() == 'choice':
|
||||
choice_list = obj.value.list()
|
||||
if choice_list[0] == '':
|
||||
del choice_list[0]
|
||||
choices = '{{{}}}'.format(','.join(choice_list))
|
||||
if 'mandatory' not in properties:
|
||||
choices = f'[{choices}]'
|
||||
kwargs['metavar'] = ('INDEX', choices)
|
||||
else:
|
||||
kwargs['metavar'] = ('INDEX', metavar)
|
||||
if option.type() == 'string':
|
||||
pass
|
||||
elif option.type() == 'integer' or option.type() == 'boolean':
|
||||
# when boolean we are here only if follower
|
||||
kwargs['type'] = int
|
||||
if _forhelp and option.type() == 'boolean':
|
||||
kwargs['metavar'] = 'INDEX'
|
||||
kwargs['nargs'] = 1
|
||||
elif option.type() == 'choice' and not option.isfollower():
|
||||
kwargs['choices'] = obj.value.list()
|
||||
else:
|
||||
pass
|
||||
#raise NotImplementedError('not supported yet')
|
||||
actions.setdefault(option.name(), []).append(kwargs)
|
||||
for values in actions.values():
|
||||
for args, kwargs in values:
|
||||
for value in values:
|
||||
args, kwargs = value.get()
|
||||
group.add_argument(*args, **kwargs)
|
||||
def _valid_mandatory(self):
|
||||
pass
|
||||
|
||||
def parse_args(self,
|
||||
*args,
|
||||
valid_mandatory=True,
|
||||
**kwargs):
|
||||
kwargs['namespace'] = TiramisuNamespace(self.config)
|
||||
kwargs['namespace'] = self.namespace
|
||||
try:
|
||||
namespaces = super().parse_args(*args, **kwargs)
|
||||
del namespaces.__dict__['_config']
|
||||
except PropertiesOptionError as err:
|
||||
name = err._option_bag.option.impl_getname()
|
||||
properties = self.config.option(name).property.get()
|
||||
@ -286,19 +430,31 @@ class TiramisuCmdlineParser(ArgumentParser):
|
||||
else:
|
||||
self.error('unrecognized arguments: {}'.format(name))
|
||||
if valid_mandatory:
|
||||
errors = []
|
||||
for key in self.config.value.mandatory():
|
||||
properties = self.config.option(key).property.get()
|
||||
if 'positional' not in properties:
|
||||
if self.fullpath or '.' not in key:
|
||||
name = key
|
||||
properties = self.config.option(key).option.properties()
|
||||
if not self.config.option(key).option.isfollower():
|
||||
if 'positional' not in properties:
|
||||
if self.fullpath or '.' not in key:
|
||||
name = key
|
||||
else:
|
||||
name = key.rsplit('.', 1)[1]
|
||||
is_short_name = self._is_short_name(name, 'longargument' in self.config.option(key).property.get())
|
||||
args = self._gen_argument(name, is_short_name)
|
||||
else:
|
||||
name = key.rsplit('.', 1)[1]
|
||||
args = self._gen_argument(name, 'longargument' in self.config.option(key).property.get())
|
||||
args = key
|
||||
else:
|
||||
args = key
|
||||
if 'positional' not in properties:
|
||||
args = self._gen_argument(key, False)
|
||||
else:
|
||||
args = key
|
||||
if not self.fullpath and '.' in args:
|
||||
args = args.rsplit('.', 1)[1]
|
||||
self.error('the following arguments are required: {}'.format(args))
|
||||
if 'positional' not in properties:
|
||||
args = self._gen_argument(args, False)
|
||||
errors.append(args)
|
||||
if errors:
|
||||
self.error('the following arguments are required: {}'.format(', '.join(errors)))
|
||||
return namespaces
|
||||
|
||||
def format_usage(self,
|
||||
|
Reference in New Issue
Block a user