requirement can have callback

This commit is contained in:
Emmanuel Garette 2019-03-13 08:49:18 +01:00
parent 05abe76932
commit cab8dae15a
11 changed files with 712 additions and 176 deletions

View File

@ -1240,7 +1240,6 @@ def test_calc_value_condition():
def test_calc_value_allow_none():
from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
val1 = StrOption('val1', "", 'val1')
val2 = StrOption('val2', "")
val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True), allow_none=ParamValue(True)))
@ -1250,7 +1249,6 @@ def test_calc_value_allow_none():
def test_calc_value_remove_duplicate():
from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
val1 = StrOption('val1', "", 'val1')
val2 = StrOption('val2', "", 'val1')
val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True), remove_duplicate_value=ParamValue(True)))
@ -1260,7 +1258,6 @@ def test_calc_value_remove_duplicate():
def test_calc_value_join():
from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
val1 = StrOption('val1', "", 'val1')
val2 = StrOption('val2', "", 'val2')
val3 = StrOption('val3', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), join=ParamValue('.')))
@ -1270,7 +1267,6 @@ def test_calc_value_join():
def test_calc_value_min():
from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
val1 = StrOption('val1', "", 'val1')
val2 = StrOption('val2', "", 'val2')
val3 = StrOption('val3', "", 'val3')
@ -1284,7 +1280,6 @@ def test_calc_value_min():
def test_calc_value_add():
from tiramisu import calc_value, IntOption, OptionDescription, Config, Params, ParamOption, ParamValue
val1 = IntOption('val1', "", 1)
val2 = IntOption('val2', "", 2)
val3 = IntOption('val3', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), operator=ParamValue('add')))

View File

@ -8,7 +8,7 @@ from tiramisu.setting import groups
from tiramisu import setting
setting.expires_time = 1
from tiramisu import IPOption, OptionDescription, BoolOption, IntOption, StrOption, \
Leadership, Config
Leadership, Config, calc_value, Params, ParamOption
from tiramisu.error import PropertiesOptionError, RequirementError
from py.test import raises
from tiramisu.storage import list_sessions, delete_session
@ -65,6 +65,27 @@ def test_requires():
api.option('ip_address_service').value.get()
def test_requires_callback():
a = BoolOption('activate_service', '', True)
b = IPOption('ip_address_service', '',
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(a)), 'expected': False, 'action': 'disabled'}])
od = OptionDescription('service', '', [a, b])
api = Config(od)
api.property.read_write()
assert not api.option('activate_service').option.requires()
assert api.option('ip_address_service').option.requires()
api.option('ip_address_service').value.get()
api.option('activate_service').value.set(False)
props = []
try:
api.option('ip_address_service').value.get()
except PropertiesOptionError as err:
props = err.proptype
assert frozenset(props) == frozenset(['disabled'])
api.option('activate_service').value.set(True)
api.option('ip_address_service').value.get()
def test_requires_inverse():
a = BoolOption('activate_service', '', True)
b = IPOption('ip_address_service', '',
@ -179,6 +200,44 @@ def test_requires_same_action():
assert frozenset(props) == frozenset(['disabled'])
def test_requires_same_action_callback():
activate_service = BoolOption('activate_service', '', True)
activate_service_web = BoolOption('activate_service_web', '', True,
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(activate_service)), 'expected': False,
'action': 'new'}])
ip_address_service_web = IPOption('ip_address_service_web', '',
requires=[{'option': activate_service_web, 'expected': False,
'action': 'disabled', 'inverse': False,
'transitive': True, 'same_action': False}])
od1 = OptionDescription('service', '', [activate_service, activate_service_web, ip_address_service_web])
api = Config(od1)
api.property.read_write()
api.property.add('new')
api.option('activate_service').value.get()
api.option('activate_service_web').value.get()
api.option('ip_address_service_web').value.get()
api.option('activate_service').value.set(False)
#
props = []
try:
api.option('activate_service_web').value.get()
except PropertiesOptionError as err:
props = err.proptype
assert frozenset(props) == frozenset(['new'])
#
props = []
try:
api.option('ip_address_service_web').value.get()
except PropertiesOptionError as err:
props = err.proptype
submsg = '"disabled" (' + _('the calculated value is {0}').format('"False"') + ')'
assert str(err) == str(_('cannot access to {0} "{1}" because has {2} {3}').format('option', 'ip_address_service_web', 'property', submsg))
#access to cache
assert str(err) == str(_('cannot access to {0} "{1}" because has {2} {3}').format('option', 'ip_address_service_web', 'property', submsg))
assert frozenset(props) == frozenset(['disabled'])
def test_multiple_requires():
a = StrOption('activate_service', '')
b = IPOption('ip_address_service', '',
@ -326,6 +385,36 @@ def test_requires_transitive():
assert frozenset(props) == frozenset(['disabled'])
def test_requires_transitive_callback():
a = BoolOption('activate_service', '', True)
b = BoolOption('activate_service_web', '', True,
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(a)), 'expected': False, 'action': 'disabled'}])
d = IPOption('ip_address_service_web', '',
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(b)), 'expected': False, 'action': 'disabled'}])
od = OptionDescription('service', '', [a, b, d])
api = Config(od)
api.property.read_write()
api.option('activate_service').value.get()
api.option('activate_service_web').value.get()
api.option('ip_address_service_web').value.get()
api.option('activate_service').value.set(False)
#
props = []
try:
api.option('activate_service_web').value.get()
except PropertiesOptionError as err:
props = err.proptype
assert frozenset(props) == frozenset(['disabled'])
#
props = []
try:
api.option('ip_address_service_web').value.get()
except PropertiesOptionError as err:
props = err.proptype
assert frozenset(props) == frozenset(['disabled'])
def test_requires_transitive_unrestraint():
a = BoolOption('activate_service', '', True)
b = BoolOption('activate_service_web', '', True,
@ -589,6 +678,46 @@ def test_requires_multi_disabled():
assert frozenset(props) == frozenset(['disabled'])
def test_requires_multi_disabled_callback():
a = BoolOption('activate_service', '')
b = IntOption('num_service', '')
c = IPOption('ip_address_service', '',
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(a)), 'expected': True, 'action': 'disabled'},
{'callback': calc_value, 'callback_params': Params(ParamOption(b)), 'expected': 1, 'action': 'disabled'}])
od = OptionDescription('service', '', [a, b, c])
api = Config(od)
api.property.read_write()
api.option('ip_address_service').value.get()
api.option('activate_service').value.set(True)
props = []
try:
api.option('ip_address_service').value.get()
except PropertiesOptionError as err:
props = err.proptype
assert frozenset(props) == frozenset(['disabled'])
api.option('activate_service').value.set(False)
api.option('ip_address_service').value.get()
api.option('num_service').value.set(1)
props = []
try:
api.option('ip_address_service').value.get()
except PropertiesOptionError as err:
props = err.proptype
assert frozenset(props) == frozenset(['disabled'])
api.option('activate_service').value.set(True)
props = []
try:
api.option('ip_address_service').value.get()
except PropertiesOptionError as err:
props = err.proptype
assert frozenset(props) == frozenset(['disabled'])
def test_requires_multi_disabled_new_format():
a = BoolOption('activate_service', '')
b = IntOption('num_service', '')
@ -1006,6 +1135,56 @@ def test_leadership_requires():
del ret['ip_admin_eth0.netmask_admin_eth0']
def test_leadership_requires_callback():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True,
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(ip_admin_eth0)), 'expected': '192.168.1.1', 'action': 'disabled'}])
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
maconfig = OptionDescription('toto', '', [interface1])
api = Config(maconfig)
api.property.read_write()
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == []
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2'])
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2']
#
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2', '192.168.1.1'])
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get()")
#
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2', '192.168.1.2'])
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get() is None
api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.set('255.255.255.255')
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get() == '255.255.255.255'
assert api.value.dict() == {'ip_admin_eth0.ip_admin_eth0': ['192.168.1.2', '192.168.1.2'],
'ip_admin_eth0.netmask_admin_eth0': [None, '255.255.255.255']}
#
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2', '192.168.1.1'])
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get()")
ret = api.value.dict()
assert set(ret.keys()) == set(['ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0'])
assert ret['ip_admin_eth0.ip_admin_eth0'] == ['192.168.1.2', '192.168.1.1']
assert len(ret['ip_admin_eth0.netmask_admin_eth0']) == 2
assert ret['ip_admin_eth0.netmask_admin_eth0'][0] is None
assert isinstance(ret['ip_admin_eth0.netmask_admin_eth0'][1], PropertiesOptionError)
del ret['ip_admin_eth0.netmask_admin_eth0'][1]
del ret['ip_admin_eth0.netmask_admin_eth0'][0]
del ret['ip_admin_eth0.netmask_admin_eth0']
#
api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.set('255.255.255.255')
ret = api.value.dict()
assert set(ret.keys()) == set(['ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0'])
assert ret['ip_admin_eth0.ip_admin_eth0'] == ['192.168.1.2', '192.168.1.1']
assert len(ret['ip_admin_eth0.netmask_admin_eth0']) == 2
assert ret['ip_admin_eth0.netmask_admin_eth0'][0] == '255.255.255.255'
assert isinstance(ret['ip_admin_eth0.netmask_admin_eth0'][1], PropertiesOptionError)
del ret['ip_admin_eth0.netmask_admin_eth0'][1]
del ret['ip_admin_eth0.netmask_admin_eth0'][0]
del ret['ip_admin_eth0.netmask_admin_eth0']
def test_leadership_requires_both():
ip_admin = StrOption('ip_admin_eth0', "ip réseau autorisé")
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True,
@ -1068,6 +1247,34 @@ def test_leadership_requires_leader():
assert api.value.dict() == {'activate': False}
def test_leadership_requires_leader_callback():
activate = BoolOption('activate', "Activer l'accès au réseau", True)
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True,
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(activate)), 'expected': False, 'action': 'disabled'}])
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True)
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
maconfig = OptionDescription('toto', '', [activate, interface1])
api = Config(maconfig)
api.property.read_write()
#
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == []
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2'])
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2']
#
api.option('activate').value.set(False)
raises(PropertiesOptionError, "api.option('ip_admin_eth0.ip_admin_eth0').value.get()")
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get()")
#
api.option('activate').value.set(True)
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
#
api.option('activate').value.set(False)
raises(PropertiesOptionError, "api.option('ip_admin_eth0.ip_admin_eth0').value.get()")
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get()")
assert api.value.dict() == {'activate': False}
def test_leadership_requires_leadership():
activate = BoolOption('activate', "Activer l'accès au réseau", True)
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
@ -1096,6 +1303,34 @@ def test_leadership_requires_leadership():
assert api.value.dict() == {'activate': False}
def test_leadership_requires_leadership_callback():
activate = BoolOption('activate', "Activer l'accès au réseau", True)
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True)
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0],
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(activate)), 'expected': False, 'action': 'disabled'}])
maconfig = OptionDescription('toto', '', [activate, interface1])
api = Config(maconfig)
api.property.read_write()
#
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == []
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2'])
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2']
#
api.option('activate').value.set(False)
raises(PropertiesOptionError, "api.option('ip_admin_eth0.ip_admin_eth0').value.get()")
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get()")
#
api.option('activate').value.set(True)
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
#
api.option('activate').value.set(False)
raises(PropertiesOptionError, "api.option('ip_admin_eth0.ip_admin_eth0').value.get()")
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get()")
assert api.value.dict() == {'activate': False}
def test_leadership_requires_no_leader():
activate = BoolOption('activate', "Activer l'accès au réseau", True)
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
@ -1125,6 +1360,40 @@ def test_leadership_requires_no_leader():
assert api.value.dict() == {'ip_admin_eth0.ip_admin_eth0': ['192.168.1.2', '192.168.1.1'], 'activate': False}
def test_leadership_requires_no_leader_callback():
activate = BoolOption('activate', "Activer l'accès au réseau", True)
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True,
requires=[{'callback': calc_value, 'callback_params': Params(ParamOption(activate)), 'expected': False, 'action': 'disabled'}])
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
maconfig = OptionDescription('toto', '', [activate, interface1])
api = Config(maconfig)
api.property.read_write()
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == []
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2'])
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2']
api.option('activate').value.set(False)
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2', '192.168.1.1'])
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2', '192.168.1.1']
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get()")
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get()")
api.option('activate').value.set(True)
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() is None
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get() is None
api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.set('255.255.255.255')
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get() == '255.255.255.255'
api.option('activate').value.set(False)
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get()")
raises(PropertiesOptionError, "api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get()")
dico = api.value.dict()
assert set(dico.keys()) == {'activate', 'ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0'}
dico['ip_admin_eth0.ip_admin_eth0'] == ['192.168.1.2', '192.168.1.1']
dico['activate'] == False
del dico['ip_admin_eth0.netmask_admin_eth0'][1]
del dico['ip_admin_eth0.netmask_admin_eth0'][0]
def test_leadership_requires_complet():
optiontoto = StrOption('unicodetoto', "Unicode leader")
option = StrOption('unicode', "Unicode leader", multi=True)
@ -1212,6 +1481,106 @@ def test_leadership_requires_complet():
del dico['options.unicode.unicode7']
def test_leadership_requires_complet_callback():
optiontoto = StrOption('unicodetoto', "Unicode leader")
option = StrOption('unicode', "Unicode leader", multi=True)
option1 = StrOption('unicode1', "Unicode follower 1", multi=True)
option2 = StrOption('unicode2', "Values 'test' must show 'Unicode follower 3'", multi=True)
option3 = StrOption('unicode3', "Unicode follower 3", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(option)),
'expected': u'test',
'action': 'hidden',
'inverse': True}],
multi=True)
option4 = StrOption('unicode4', "Unicode follower 4", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(option2)),
'expected': u'test',
'action': 'hidden',
'inverse': True}],
multi=True)
option5 = StrOption('unicode5', "Unicode follower 5", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(optiontoto)),
'expected': u'test',
'action': 'hidden',
'inverse': True}],
multi=True)
option6 = StrOption('unicode6', "Unicode follower 6", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(optiontoto)),
'expected': u'test',
'action': 'hidden',
'inverse': True},
{'callback': calc_value,
'callback_params': Params(ParamOption(option2)),
'expected': u'test',
'action': 'hidden',
'inverse': True}],
multi=True)
option7 = StrOption('unicode7', "Unicode follower 7", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(option2)),
'expected': u'test',
'action': 'hidden',
'inverse': True},
{'callback': calc_value,
'callback_params': Params(ParamOption(optiontoto)),
'expected': u'test',
'action': 'hidden',
'inverse': True}],
multi=True)
descr1 = Leadership("unicode", "Common configuration 1",
[option, option1, option2, option3, option4, option5, option6, option7])
descr = OptionDescription("options", "Common configuration 2", [descr1, optiontoto])
descr = OptionDescription("unicode1_leadership_requires", "Leader followers with Unicode follower 3 hidden when Unicode follower 2 is test", [descr])
config = Config(descr)
config.property.read_write()
config.option('options.unicode.unicode').value.set(['test', 'trah'])
config.option('options.unicode.unicode2', 0).value.set('test')
dico = config.value.dict()
assert dico.keys() == set(['options.unicode.unicode', 'options.unicode.unicode1', 'options.unicode.unicode2', 'options.unicode.unicode3', 'options.unicode.unicode4', 'options.unicode.unicode5', 'options.unicode.unicode6', 'options.unicode.unicode7', 'options.unicodetoto'])
assert dico['options.unicode.unicode'] == ['test', 'trah']
assert dico['options.unicode.unicode1'] == [None, None]
assert dico['options.unicode.unicode2'] == ['test', None]
assert dico['options.unicode.unicode3'][0] is None
assert isinstance(dico['options.unicode.unicode3'][1], PropertiesOptionError)
assert dico['options.unicode.unicode4'][0] is None
assert isinstance(dico['options.unicode.unicode4'][1], PropertiesOptionError)
assert dico['options.unicodetoto'] is None
del dico['options.unicode.unicode3'][1]
del dico['options.unicode.unicode3']
del dico['options.unicode.unicode4'][1]
del dico['options.unicode.unicode4']
del dico['options.unicode.unicode5'][0]
del dico['options.unicode.unicode5'][0]
del dico['options.unicode.unicode6'][0]
del dico['options.unicode.unicode6'][0]
del dico['options.unicode.unicode7'][0]
del dico['options.unicode.unicode7'][0]
#
config.option('options.unicodetoto').value.set('test')
dico = config.value.dict()
assert dico.keys() == set(['options.unicode.unicode', 'options.unicode.unicode1', 'options.unicode.unicode2', 'options.unicode.unicode3', 'options.unicode.unicode4', 'options.unicode.unicode5', 'options.unicode.unicode6', 'options.unicode.unicode7', 'options.unicodetoto'])
assert dico['options.unicode.unicode'] == ['test', 'trah']
assert dico['options.unicode.unicode1'] == [None, None]
assert dico['options.unicode.unicode2'] == ['test', None]
assert dico['options.unicode.unicode3'][0] is None
assert isinstance(dico['options.unicode.unicode3'][1], PropertiesOptionError)
assert dico['options.unicode.unicode4'][0] is None
assert isinstance(dico['options.unicode.unicode4'][1], PropertiesOptionError)
assert dico['options.unicode.unicode5'] == [None, None]
assert dico['options.unicode.unicode6'][0] is None
assert isinstance(dico['options.unicode.unicode6'][1], PropertiesOptionError)
assert dico['options.unicode.unicode7'][0] is None
assert isinstance(dico['options.unicode.unicode7'][1], PropertiesOptionError)
assert dico['options.unicodetoto'] == 'test'
del dico['options.unicode.unicode3'][1]
del dico['options.unicode.unicode3']
del dico['options.unicode.unicode4'][1]
del dico['options.unicode.unicode4']
del dico['options.unicode.unicode6'][1]
del dico['options.unicode.unicode6']
del dico['options.unicode.unicode7'][1]
del dico['options.unicode.unicode7']
def test_leadership_requires_transitive():
optiontoto = StrOption('unicodetoto', "Unicode leader")
option = StrOption('unicode', "Unicode leader", multi=True)
@ -1325,3 +1694,124 @@ def test_leadership_requires_transitive():
del (dico['options.unicode.unicode4'][2])
del (dico['options.unicode.unicode4'][1])
del (dico['options.unicode.unicode4'][0])
def test_leadership_requires_transitive_callback():
optiontoto = StrOption('unicodetoto', "Unicode leader")
option = StrOption('unicode', "Unicode leader", multi=True)
option1 = StrOption('unicode1', "Unicode follower 1", multi=True)
option2 = StrOption('unicode2', "Unicode follower 2", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(optiontoto)),
'expected': u'test',
'action': 'disabled',
'transitive': True,
'inverse': True}],
multi=True)
option3 = StrOption('unicode3', "Unicode follower 3", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(option2)),
'expected': u'test',
'action': 'disabled',
'transitive': True,
'inverse': True}],
multi=True)
option4 = StrOption('unicode4', "Unicode follower 4", requires=[{'callback': calc_value,
'callback_params': Params(ParamOption(option3)),
'expected': u'test',
'action': 'disabled',
'transitive': True,
'inverse': True}],
multi=True)
descr1 = Leadership("unicode", "Common configuration 1",
[option, option1, option2, option3, option4])
descr = OptionDescription("options", "Common configuration 2", [descr1, optiontoto])
descr = OptionDescription("unicode1", "", [descr])
config = Config(descr)
config.property.read_write()
assert config.value.dict() == {'options.unicode.unicode': [], 'options.unicode.unicode1': [], 'options.unicode.unicode2': [], 'options.unicode.unicode3': [], 'options.unicode.unicode4': [], 'options.unicodetoto': None}
#
config.option('options.unicodetoto').value.set('test')
assert config.value.dict() == {'options.unicode.unicode': [], 'options.unicode.unicode1': [], 'options.unicode.unicode2': [], 'options.unicode.unicode3': [], 'options.unicode.unicode4': [], 'options.unicodetoto': 'test'}
#
config.option('options.unicode.unicode').value.set(['a', 'b', 'c'])
dico = config.value.dict()
assert list(dico.keys()) == ['options.unicode.unicode', 'options.unicode.unicode1', 'options.unicode.unicode2', 'options.unicode.unicode3', 'options.unicode.unicode4', 'options.unicodetoto']
assert dico['options.unicodetoto'] == 'test'
assert dico['options.unicode.unicode'] == ['a', 'b', 'c']
assert dico['options.unicode.unicode1'] == [None, None, None]
assert dico['options.unicode.unicode2'] == [None, None, None]
assert isinstance(dico['options.unicode.unicode3'][0], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode3'][1], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode3'][2], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][0], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][1], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][2], PropertiesOptionError)
del (dico['options.unicode.unicode3'][2])
del (dico['options.unicode.unicode3'][1])
del (dico['options.unicode.unicode3'][0])
del (dico['options.unicode.unicode4'][2])
del (dico['options.unicode.unicode4'][1])
del (dico['options.unicode.unicode4'][0])
#
config.option('options.unicode.unicode2', 1).value.set('test')
config.option('options.unicode.unicode3', 1).value.set('test')
dico = config.value.dict()
assert list(dico.keys()) == ['options.unicode.unicode', 'options.unicode.unicode1', 'options.unicode.unicode2', 'options.unicode.unicode3', 'options.unicode.unicode4', 'options.unicodetoto']
assert dico['options.unicodetoto'] == 'test'
assert dico['options.unicode.unicode'] == ['a', 'b', 'c']
assert dico['options.unicode.unicode1'] == [None, None, None]
assert dico['options.unicode.unicode2'] == [None, 'test', None]
assert isinstance(dico['options.unicode.unicode3'][0], PropertiesOptionError)
assert dico['options.unicode.unicode3'][1] == 'test'
assert isinstance(dico['options.unicode.unicode3'][2], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][0], PropertiesOptionError)
assert dico['options.unicode.unicode4'][1] == None
assert isinstance(dico['options.unicode.unicode4'][2], PropertiesOptionError)
del (dico['options.unicode.unicode3'][2])
del (dico['options.unicode.unicode3'][1])
del (dico['options.unicode.unicode3'][0])
del (dico['options.unicode.unicode4'][2])
del (dico['options.unicode.unicode4'][1])
del (dico['options.unicode.unicode4'][0])
#
config.option('options.unicode.unicode2', 1).value.set('rah')
dico = config.value.dict()
assert list(dico.keys()) == ['options.unicode.unicode', 'options.unicode.unicode1', 'options.unicode.unicode2', 'options.unicode.unicode3', 'options.unicode.unicode4', 'options.unicodetoto']
assert dico['options.unicodetoto'] == 'test'
assert dico['options.unicode.unicode'] == ['a', 'b', 'c']
assert dico['options.unicode.unicode1'] == [None, None, None]
assert dico['options.unicode.unicode2'] == [None, 'rah', None]
assert isinstance(dico['options.unicode.unicode3'][0], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode3'][1], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode3'][2], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][0], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][1], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][2], PropertiesOptionError)
del (dico['options.unicode.unicode3'][2])
del (dico['options.unicode.unicode3'][1])
del (dico['options.unicode.unicode3'][0])
del (dico['options.unicode.unicode4'][2])
del (dico['options.unicode.unicode4'][1])
del (dico['options.unicode.unicode4'][0])
#
config.option('options.unicode.unicode2', 1).value.set('test')
config.option('options.unicodetoto').value.set('rah')
dico = config.value.dict()
assert list(dico.keys()) == ['options.unicode.unicode', 'options.unicode.unicode1', 'options.unicode.unicode2', 'options.unicode.unicode3', 'options.unicode.unicode4', 'options.unicodetoto']
assert dico['options.unicodetoto'] == 'rah'
assert dico['options.unicode.unicode'] == ['a', 'b', 'c']
assert dico['options.unicode.unicode1'] == [None, None, None]
assert isinstance(dico['options.unicode.unicode3'][0], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode3'][1], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode3'][2], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][0], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][1], PropertiesOptionError)
assert isinstance(dico['options.unicode.unicode4'][2], PropertiesOptionError)
del (dico['options.unicode.unicode2'][2])
del (dico['options.unicode.unicode2'][1])
del (dico['options.unicode.unicode2'][0])
del (dico['options.unicode.unicode3'][2])
del (dico['options.unicode.unicode3'][1])
del (dico['options.unicode.unicode3'][0])
del (dico['options.unicode.unicode4'][2])
del (dico['options.unicode.unicode4'][1])
del (dico['options.unicode.unicode4'][0])

View File

@ -465,8 +465,9 @@ class _TiramisuOptionValueOption:
if isinstance(value, list):
while undefined in value:
idx = value.index(undefined)
value[idx] = values.getdefaultvalue(self._option_bag,
force_index=idx)
soption_bag = self._option_bag.copy()
soption_bag.index = idx
value[idx] = values.getdefaultvalue(soption_bag)
elif value == undefined:
value = values.getdefaultvalue(self._option_bag)
self._subconfig.setattr(value,

View File

@ -90,11 +90,11 @@ def manager_callback(callbk: Union[ParamOption, ParamValue],
return value[index]
return value
except PropertiesOptionError as err:
# raise because must not add value None in carry_out_calculation
# raise PropertiesOptionError (which is catched) because must not add value None in carry_out_calculation
if callbk.notraisepropertyerror:
raise err
raise ConfigError(_('unable to carry out a calculation for "{}"'
', {}').format(option.impl_get_display_name(), err))
', {}').format(option.impl_get_display_name(), err), err)
def carry_out_calculation(option,

View File

@ -116,7 +116,7 @@ class PropertiesOptionError(AttributeError):
self._name,
prop_msg,
msg))
del self._requires, self._opt_type, self._name, self._option_bag
del self._requires, self._opt_type, self._name
del self._settings, self._orig_opt
return self.msg
@ -127,7 +127,11 @@ class ConfigError(Exception):
"""attempt to change an option's owner without a value
or in case of `_cfgimpl_descr` is None
or if a calculation cannot be carried out"""
pass
def __init__(self,
exp,
ori_err=None):
super().__init__(exp)
self.ori_err = ori_err
class ConflictError(Exception):

View File

@ -49,9 +49,12 @@ class Param:
class ParamOption(Param):
__slots__ = ('option', 'notraisepropertyerror')
def __init__(self, option, notraisepropertyerror=False):
if not hasattr(option, 'impl_is_symlinkoption'):
__slots__ = ('option',
'notraisepropertyerror')
def __init__(self,
option: 'Option',
notraisepropertyerror: bool=False) -> None:
if __debug__ and not hasattr(option, 'impl_is_symlinkoption'):
raise ValueError(_('paramoption needs an option not {}').format(type(option)))
if option.impl_is_symlinkoption():
cur_opt = option.impl_getopt()
@ -95,6 +98,7 @@ def calc_value(*args: List[Any],
join: Optional[str]=None,
min_args_len: Optional[int]=None,
operator: Optional[str]=None,
index: Optional[int]=None,
**kwargs) -> Any:
"""calculate value
:param multi: value returns must be a list of value
@ -181,7 +185,6 @@ def calc_value(*args: List[Any],
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val1', 'val3': ['val1']}
* you want to join two values with '.'
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
@ -305,6 +308,11 @@ def calc_value(*args: List[Any],
value = None
else:
value = value[0]
if isinstance(value, list) and index is not None:
if len(value) > index:
value = value[index]
else:
value = None
elif None in value and not allow_none:
value = []
elif remove_duplicate_value:

View File

@ -170,7 +170,7 @@ class Base:
param.option._add_dependency(self)
if type_ == 'validator':
self._has_dependency = True
is_multi = self.impl_is_dynoptiondescription() or self.impl_is_multi()
is_multi = self.impl_is_optiondescription() or self.impl_is_multi()
func_args, func_kwargs, func_positional, func_keyword = self._get_function_args(calculator)
calculator_args, calculator_kwargs = self._get_parameters_args(calculator_params, add_value)
# remove knowned kwargs
@ -253,6 +253,7 @@ class Base:
def _impl_set_callback(self,
callback: Callable,
callback_params: Optional[Params]=None) -> None:
if __debug__:
if callback is None and callback_params is not None:
raise ValueError(_("params defined for a callback function but "
"no callback defined"
@ -443,9 +444,11 @@ def validate_requires_arg(new_option: BaseOption,
the description of the requires dictionary
"""
def get_option(require):
if 'option' in require:
option = require['option']
if option == 'self':
option = new_option
if __debug__:
if not isinstance(option, BaseOption):
raise ValueError(_('malformed requirements '
'must be an option in option {0}').format(name))
@ -454,6 +457,12 @@ def validate_requires_arg(new_option: BaseOption,
'multi option must not set '
'as requires of non multi option {0}').format(name))
option._add_dependency(new_option)
else:
callback = require['callback']
callback_params = new_option._build_calculator_params(callback,
require.get('callback_params'),
'callback')
option = (callback, callback_params)
return option
def _set_expected(action,
@ -482,11 +491,11 @@ def validate_requires_arg(new_option: BaseOption,
operator = get_operator(require)
if isinstance(expected, list):
for exp in expected:
if set(exp.keys()) != {'option', 'value'}:
if __debug__ and set(exp.keys()) != {'option', 'value'}:
raise ValueError(_('malformed requirements expected must have '
'option and value for option {0}').format(name))
option = get_option(exp)
if option is not None:
if __debug__:
try:
option._validate(exp['value'], undefined)
except ValueError as err:
@ -502,7 +511,7 @@ def validate_requires_arg(new_option: BaseOption,
operator)
else:
option = get_option(require)
if expected is not None:
if __debug__ and not isinstance(option, tuple) and expected is not None:
try:
option._validate(expected, undefined)
except ValueError as err:
@ -560,11 +569,12 @@ def validate_requires_arg(new_option: BaseOption,
# start parsing all requires given by user (has dict)
# transforme it to a tuple
for require in requires:
if __debug__:
if not isinstance(require, dict):
raise ValueError(_("malformed requirements type for option:"
" {0}, must be a dict").format(name))
valid_keys = ('option', 'expected', 'action', 'inverse', 'transitive',
'same_action', 'operator')
'same_action', 'operator', 'callback', 'callback_params')
unknown_keys = frozenset(require.keys()) - frozenset(valid_keys)
if unknown_keys != frozenset():
raise ValueError(_('malformed requirements for option: {0}'
@ -572,10 +582,13 @@ def validate_requires_arg(new_option: BaseOption,
'{2}').format(name,
unknown_keys,
valid_keys))
# prepare all attributes
if not ('expected' in require and isinstance(require['expected'], list)) and \
not ('option' in require and 'expected' in require) or \
'action' not in require:
# {'expected': ..., 'option': ..., 'action': ...}
# {'expected': [{'option': ..., 'value': ...}, ...}], 'action': ...}
# {'expected': ..., 'callback': ..., 'action': ...}
if not 'expected' in require or not 'action' in require or \
not (isinstance(require['expected'], list) or \
'option' in require or \
'callback' in require):
raise ValueError(_("malformed requirements for option: {0}"
" require must have option, expected and"
" action keys").format(name))

View File

@ -109,7 +109,7 @@ class Leadership(OptionDescription):
for requires_ in getattr(self, '_requires', ()):
for require in requires_:
for require_opt, values in require[0]:
if require_opt.impl_is_multi() and require_opt.impl_get_leadership():
if not isinstance(require_opt, tuple) and require_opt.impl_is_multi() and require_opt.impl_get_leadership():
raise ValueError(_('malformed requirements option "{0}" '
'must not be in follower for "{1}"').format(
require_opt.impl_getname(),

View File

@ -139,7 +139,7 @@ class CacheOptionDescription(BaseOption):
# * current option must be a follower (and only a follower)
# * option in require and current option must be in same leadership
for require_opt, values in require[0]:
if require_opt.impl_is_multi():
if not isinstance(require_opt, tuple) and require_opt.impl_is_multi():
if is_follower is None:
is_follower = option.impl_is_follower()
if is_follower:

View File

@ -509,12 +509,13 @@ class Settings(object):
exps, action, inverse, transitive, same_action, operator = require
breaked = False
for option, expected in exps:
if not isinstance(option, tuple):
if option.issubdyn():
option = option.to_dynoption(option_bag.option.rootpath,
option_bag.option.impl_getsuffix())
reqpath = option.impl_getpath()
#FIXME too later!
if reqpath.startswith(option_bag.path + '.'):
if __debug__ and reqpath.startswith(option_bag.path + '.'):
# FIXME too later!
raise RequirementError(_("malformed requirements "
"imbrication detected for option:"
" '{0}' with requirement on: "
@ -542,10 +543,25 @@ class Settings(object):
else:
soption_bag.config_bag.properties = soption_bag.config_bag.true_properties
soption_bag.config_bag.set_permissive()
else:
if not option_bag.option.impl_is_optiondescription() and option_bag.option.impl_is_follower():
idx = option_bag.index
if idx is None:
continue
is_indexed = False
try:
if not isinstance(option, tuple):
value = context.getattr(reqpath,
soption_bag)
except PropertiesOptionError as err:
else:
value = context.cfgimpl_get_values().carry_out_calculation(option_bag,
option[0],
option[1])
except (PropertiesOptionError, ConfigError) as err:
if isinstance(err, ConfigError):
if not isinstance(err.ori_err, PropertiesOptionError):
raise err
err = err.ori_err
properties = err.proptype
# if not transitive, properties must be verify in current requires
# otherwise if same_action, property must be in properties
@ -586,13 +602,19 @@ class Settings(object):
inverse and value not in expected):
if operator != 'and':
if readable:
display_value = display_list(expected, 'or', add_quote=True)
if isinstance(option, tuple):
if not inverse:
msg = _('the value of "{0}" is {1}')
msg = _('the calculated value is {0}').format(display_value)
else:
msg = _('the value of "{0}" is not {1}')
calc_properties.setdefault(action, []).append(
msg.format(option.impl_get_display_name(),
display_list(expected, 'or', add_quote=True)))
msg = _('the calculated value is not {0}').format(display_value)
else:
name = option.impl_get_display_name()
if not inverse:
msg = _('the value of "{0}" is {1}').format(name, display_value)
else:
msg = _('the value of "{0}" is not {1}').format(name, display_value)
calc_properties.setdefault(action, []).append(msg)
else:
calc_properties.add(action)
breaked = True

View File

@ -16,10 +16,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ____________________________________________________________
import weakref
from typing import Optional
from typing import Optional, Any, Callable
from .error import ConfigError, PropertiesOptionError
from .setting import owners, undefined, forbidden_owners, OptionBag, ConfigBag
from .autolib import carry_out_calculation
from .function import Params
from .i18n import _
@ -135,26 +136,48 @@ class Values(object):
return self.getdefaultvalue(option_bag)
def getdefaultvalue(self,
option_bag,
force_index: Optional[int]=None):
option_bag):
"""get default value:
- get meta config value or
- get calculated value or
- get default value
:param opt: the `option.Option()` object
:param path: path for `option.Option()` object
:type path: str
:param index: index of a multi/submulti
:type index: int
:returns: default value
"""
moption_bag = self._get_meta(option_bag)
if moption_bag:
# retrieved value from meta config
return moption_bag.config_bag.context.cfgimpl_get_values().get_cached_value(moption_bag)
if option_bag.option.impl_has_callback():
# default value is a calculated value
value = self.calculate_value(option_bag)
if value is not undefined:
return value
# now try to get default value:
value = option_bag.option.impl_getdefault()
# - if option is a submulti, return a list a list
# - if option is a multi, return a list
# - default value
if option_bag.option.impl_is_multi() and option_bag.index is not None:
# if index, must return good value for this index
if len(value) > option_bag.index:
value = value[option_bag.index]
else:
# no value for this index, retrieve default multi value
# default_multi is already a list for submulti
value = option_bag.option.impl_getdefault_multi()
return value
def calculate_value(self,
option_bag: OptionBag) -> Any:
def _reset_cache(_value):
if not 'expire' in option_bag.properties:
return
is_cache, cache_value = self._p_.getcache(option_bag.path,
None,
index,
config_bag.properties,
option_bag.index,
option_bag.config_bag.properties,
option_bag.properties,
'value')
if not is_cache or cache_value == _value:
@ -162,75 +185,55 @@ class Values(object):
# so do not invalidate cache
return
# calculated value is a new value, so reset cache
config_bag.context.cfgimpl_reset_cache(option_bag)
option_bag.config_bag.context.cfgimpl_reset_cache(option_bag)
config_bag = option_bag.config_bag
opt = option_bag.option
if force_index is not None:
index = force_index
else:
index = option_bag.index
moption_bag = self._get_meta(option_bag)
if moption_bag:
# retrieved value from meta config
return moption_bag.config_bag.context.cfgimpl_get_values().get_cached_value(moption_bag)
if opt.impl_has_callback():
# if value has callback, calculate value
callback, callback_params = opt.impl_get_callback()
value = carry_out_calculation(opt,
callback=callback,
callback_params=callback_params,
index=index,
config_bag=config_bag,
fromconsistency=option_bag.fromconsistency)
if isinstance(value, list) and index is not None:
callback, callback_params = option_bag.option.impl_get_callback()
value = self.carry_out_calculation(option_bag,
callback,
callback_params)
if isinstance(value, list) and option_bag.index is not None:
# if value is a list and index is set
if opt.impl_is_submulti() and (value == [] or not isinstance(value[0], list)):
if option_bag.option.impl_is_submulti() and (value == [] or not isinstance(value[0], list)):
# return value only if it's a submulti and not a list of list
_reset_cache(value)
return value
if len(value) > index:
if len(value) > option_bag.index:
# return the value for specified index if found
_reset_cache(value[index])
return value[index]
_reset_cache(value[option_bag.index])
return value[option_bag.index]
# there is no calculate value for this index,
# so return an other default value
else:
if opt.impl_is_submulti():
if option_bag.option.impl_is_submulti():
if isinstance(value, list):
# value is a list, but no index specified
if (value != [] and not isinstance(value[0], list)):
# if submulti, return a list of value
value = [value]
elif index is not None:
elif option_bag.index is not None:
# if submulti, return a list of value
value = [value]
else:
# return a list of list for a submulti
value = [[value]]
elif opt.impl_is_multi() and not isinstance(value, list) and index is None:
elif option_bag.option.impl_is_multi() and not isinstance(value, list) and option_bag.index is None:
# return a list for a multi
value = [value]
_reset_cache(value)
return value
return undefined
# now try to get default value:
# - if opt is a submulti, return a list a list
# - if opt is a multi, return a list
# - default value
value = opt.impl_getdefault()
if opt.impl_is_multi() and index is not None:
# if index, must return good value for this index
if len(value) > index:
value = value[index]
else:
# no value for this index, retrieve default multi value
# default_multi is already a list for submulti
value = opt.impl_getdefault_multi()
return value
def carry_out_calculation(self,
option_bag: OptionBag,
callback: Callable,
callback_params: Optional[Params]) -> Any:
return carry_out_calculation(option_bag.option,
callback=callback,
callback_params=callback_params,
index=option_bag.index,
config_bag=option_bag.config_bag,
fromconsistency=option_bag.fromconsistency)
def isempty(self,
opt,
value,