From af5da925e21602598d89a92ef8b0065ba0e93d2e Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sun, 24 Feb 2019 15:11:08 +0100 Subject: [PATCH] add CIDR support --- test/test_config_ip.py | 31 +++++- test/test_dyn_optiondescription.py | 3 +- test/test_option_consistency.py | 25 ++++- tiramisu/option/ipoption.py | 135 ++++++++++++++++++--------- tiramisu/option/netmaskoption.py | 52 ++++++----- tiramisu/option/networkoption.py | 49 ++++++++-- tiramisu/option/optiondescription.py | 2 +- 7 files changed, 216 insertions(+), 81 deletions(-) diff --git a/test/test_config_ip.py b/test/test_config_ip.py index 3c4713d..8532fe6 100644 --- a/test/test_config_ip.py +++ b/test/test_config_ip.py @@ -3,8 +3,8 @@ do_autopath() import warnings from py.test import raises -from tiramisu import Config ,IPOption, NetworkOption, NetmaskOption, \ - PortOption, BroadcastOption, OptionDescription +from tiramisu import Config, IPOption, NetworkOption, NetmaskOption, \ + PortOption, BroadcastOption, OptionDescription from tiramisu.error import ValueWarning from tiramisu.storage import list_sessions @@ -38,6 +38,21 @@ def test_ip(): assert len(w) == 1 +def test_ip_cidr(): + b = IPOption('b', '', private_only=True, cidr=True) + c = IPOption('c', '', private_only=True) + warnings.simplefilter("always", ValueWarning) + od = OptionDescription('od', '', [b, c]) + config = Config(od) + raises(ValueError, "config.option('b').value.set('192.168.1.1')") + config.option('b').value.set('192.168.1.1/24') + raises(ValueError, "config.option('b').value.set('192.168.1.1/32')") + # + config.option('c').value.set('192.168.1.1') + raises(ValueError, "config.option('c').value.set('192.168.1.1/24')") + raises(ValueError, "config.option('c').value.set('192.168.1.1/32')") + + def test_ip_default(): a = IPOption('a', '', '88.88.88.88') od = OptionDescription('od', '', [a]) @@ -79,6 +94,18 @@ def test_network(): assert len(w) == 1 +def test_network_cidr(): + a = NetworkOption('a', '', cidr=True) + od = OptionDescription('od', '', [a]) + cfg = Config(od) + cfg.option('a').value.set('192.168.1.1/32') + cfg.option('a').value.set('192.168.1.0/24') + cfg.option('a').value.set('88.88.88.88/32') + cfg.option('a').value.set('0.0.0.0/0') + raises(ValueError, "cfg.option('a').value.set('192.168.1.1')") + raises(ValueError, "cfg.option('a').value.set('192.168.1.1/24')") + + def test_network_invalid(): raises(ValueError, "NetworkOption('a', '', default='toto')") diff --git a/test/test_dyn_optiondescription.py b/test/test_dyn_optiondescription.py index b0e87f4..a944cd2 100644 --- a/test/test_dyn_optiondescription.py +++ b/test/test_dyn_optiondescription.py @@ -913,8 +913,9 @@ def test_consistency_ip_netmask_dyndescription(): cfg.option('dodval1.aval1').value.set('192.168.1.1') cfg.option('dodval1.bval1').value.set('255.255.255.0') cfg.option('dodval2.aval2').value.set('192.168.1.2') - cfg.option('dodval2.bval2').value.set('255.255.255.255') + cfg.option('dodval2.bval2').value.set('255.255.255.128') cfg.option('dodval2.bval2').value.set('255.255.255.0') + raises(ValueError, "cfg.option('dodval2.bval2').value.set('255.255.255.255')") def test_consistency_ip_in_network_dyndescription(): diff --git a/test/test_option_consistency.py b/test/test_option_consistency.py index cfee716..137ec99 100644 --- a/test/test_option_consistency.py +++ b/test/test_option_consistency.py @@ -500,7 +500,7 @@ def test_consistency_ip_netmask(): api.option('a').value.set('192.168.1.1') api.option('b').value.set('255.255.255.0') api.option('a').value.set('192.168.1.2') - api.option('b').value.set('255.255.255.255') + api.option('b').value.set('255.255.255.128') api.option('b').value.set('255.255.255.0') raises(ValueError, "api.option('a').value.set('192.168.1.0')") raises(ValueError, "api.option('a').value.set('192.168.1.255')") @@ -567,6 +567,25 @@ def test_consistency_ip_in_network(): assert len(w) == 1 +def test_consistency_ip_in_network_cidr(): + a = NetworkOption('a', '', cidr=True) + c = IPOption('c', '') + d = IPOption('d', '') + od = OptionDescription('od', '', [a, c, d]) + c.impl_add_consistency('in_network', a) + d.impl_add_consistency('in_network', a, warnings_only=True) + warnings.simplefilter("always", ValueWarning) + api = Config(od) + api.option('a').value.set('192.168.1.0/24') + api.option('c').value.set('192.168.1.1') + raises(ValueError, "api.option('c').value.set('192.168.2.1')") + raises(ValueError, "api.option('c').value.set('192.168.1.0')") + raises(ValueError, "api.option('c').value.set('192.168.1.255')") + with warnings.catch_warnings(record=True) as w: + api.option('d').value.set('192.168.2.1') + assert len(w) == 1 + + def test_consistency_ip_in_network_invalid(): a = NetworkOption('a', '') b = NetmaskOption('b', '') @@ -593,7 +612,7 @@ def test_consistency_ip_netmask_multi(): api.option('a.a').value.set(['192.168.1.1']) api.option('a.b', 0).value.set('255.255.255.0') api.option('a.a').value.set(['192.168.1.2']) - api.option('a.b', 0).value.set('255.255.255.255') + api.option('a.b', 0).value.set('255.255.255.128') api.option('a.b', 0).value.set('255.255.255.0') raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])") # @@ -722,7 +741,7 @@ def test_consistency_ip_netmask_multi_leader(): api.option('a.a').value.set(['192.168.1.1']) api.option('a.b', 0).value.set('255.255.255.0') api.option('a.a').value.set(['192.168.1.2']) - api.option('a.b', 0).value.set('255.255.255.255') + api.option('a.b', 0).value.set('255.255.255.128') api.option('a.b', 0).value.set('255.255.255.0') raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])") api.option('a.a').value.set(['192.168.1.128']) diff --git a/tiramisu/option/ipoption.py b/tiramisu/option/ipoption.py index 89f5423..a217c78 100644 --- a/tiramisu/option/ipoption.py +++ b/tiramisu/option/ipoption.py @@ -18,13 +18,15 @@ # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ -from ipaddress import ip_address, ip_network, IPv4Address +from ipaddress import ip_address, ip_interface, ip_network, IPv4Address, IPv4Interface from ..error import ConfigError from ..setting import undefined, Undefined, OptionBag from ..i18n import _ from .option import Option from .stroption import StrOption +from .netmaskoption import NetmaskOption +from .networkoption import NetworkOption class IPOption(StrOption): @@ -46,22 +48,24 @@ class IPOption(StrOption): properties=None, private_only=False, allow_reserved=False, - warnings_only=False): + warnings_only=False, + cidr=False): extra = {'_private_only': private_only, - '_allow_reserved': allow_reserved} - super(IPOption, self).__init__(name, - doc, - default=default, - default_multi=default_multi, - callback=callback, - callback_params=callback_params, - requires=requires, - multi=multi, - validator=validator, - validator_params=validator_params, - properties=properties, - warnings_only=warnings_only, - extra=extra) + '_allow_reserved': allow_reserved, + '_cidr': cidr} + super().__init__(name, + doc, + default=default, + default_multi=default_multi, + callback=callback, + callback_params=callback_params, + requires=requires, + multi=multi, + validator=validator, + validator_params=validator_params, + properties=properties, + warnings_only=warnings_only, + extra=extra) def _validate(self, value: str, @@ -73,32 +77,53 @@ class IPOption(StrOption): raise ValueError(_('invalid string')) if value.count('.') != 3: raise ValueError() - for val in value.split('.'): + cidr = self.impl_get_extra('_cidr') + if cidr: + if '/' not in value: + raise ValueError(_('must use CIDR notation')) + value_ = value.split('/')[0] + else: + value_ = value + for val in value_.split('.'): if val.startswith("0") and len(val) > 1: raise ValueError() # 'standard' validation try: - if not isinstance(ip_address(value), IPv4Address): - raise ValueError() + if not cidr: + if not isinstance(ip_address(value), IPv4Address): + raise ValueError() + else: + if not isinstance(ip_interface(value), IPv4Address): + raise ValueError() except ValueError: raise ValueError() def _second_level_validation(self, value, warnings_only): - ip = ip_address(value) + ip = ip_interface(value) if not self.impl_get_extra('_allow_reserved') and ip.is_reserved: if warnings_only: - msg = _("shouldn't in reserved class") + msg = _("shouldn't be reserved IP") else: - msg = _("mustn't be in reserved class") + msg = _("mustn't be reserved IP") raise ValueError(msg) if self.impl_get_extra('_private_only') and not ip.is_private: if warnings_only: - msg = _("should be in private class") + msg = _("should be private IP") else: - msg = _("must be in private class") + msg = _("must be private IP") raise ValueError(msg) + if '/' in value: + net = NetmaskOption(self.impl_getname(), + self.impl_get_display_name(), + str(ip.netmask)) + net._cons_ip_netmask(self, + (net, self), + (str(ip.netmask), str(ip.ip)), + warnings_only, + None, + True) def _cons_in_network(self, current_opt, @@ -106,22 +131,46 @@ class IPOption(StrOption): vals, warnings_only, context): - if len(vals) != 3 and context is undefined: - raise ConfigError(_('ip_network needs an IP, a network and a netmask')) - if len(vals) != 3 or None in vals: - return - ip, network, netmask = vals - if ip_address(ip) not in ip_network('{0}/{1}'.format(network, - netmask)): - msg = _('"{4}" is not in network "{0}"/"{1}" ("{2}"/"{3}")') - raise ValueError(msg.format(network, - netmask, - opts[1].impl_get_display_name(), - opts[2].impl_get_display_name(), - ip)) - # test if ip is not network/broadcast IP - opts[2]._cons_ip_netmask(current_opt, - (opts[2], opts[0]), - (netmask, ip), - warnings_only, - context) + if len(opts) == 2 and isinstance(opts[0], IPOption) and \ + opts[0].impl_get_extra('_cidr') == False and \ + isinstance(opts[1], NetworkOption) and \ + opts[1].impl_get_extra('_cidr') == True: + if None in vals: + return + ip, network = vals + network_obj = ip_network(network) + if ip_interface(ip) not in network_obj: + msg = _('"{0}" is not in network "{1}" ("{2}")') + raise ValueError(msg.format(ip, + network, + opts[1].impl_get_display_name())) + # test if ip is not network/broadcast IP + netmask = NetmaskOption(self.impl_getname(), + self.impl_get_display_name(), + str(network_obj.netmask)) + netmask._cons_ip_netmask(self, + (netmask, self), + (str(network_obj.netmask), str(ip)), + warnings_only, + None, + True) + else: + if len(vals) != 3 and context is undefined: + raise ConfigError(_('ip_network needs an IP, a network and a netmask')) + if len(vals) != 3 or None in vals: + return + ip, network, netmask = vals + if ip_interface(ip) not in ip_network('{0}/{1}'.format(network, + netmask)): + msg = _('"{4}" is not in network "{0}"/"{1}" ("{2}"/"{3}")') + raise ValueError(msg.format(network, + netmask, + opts[1].impl_get_display_name(), + opts[2].impl_get_display_name(), + ip)) + # test if ip is not network/broadcast IP + opts[2]._cons_ip_netmask(current_opt, + (opts[2], opts[0]), + (netmask, ip), + warnings_only, + context) diff --git a/tiramisu/option/netmaskoption.py b/tiramisu/option/netmaskoption.py index 9bb3eb6..8f90977 100644 --- a/tiramisu/option/netmaskoption.py +++ b/tiramisu/option/netmaskoption.py @@ -60,18 +60,18 @@ class NetmaskOption(StrOption): if None in vals or len(vals) != 2: return msg = None - val_netmask, val_ipnetwork = vals + val_netmask, val_network = vals try: - ip_network('{0}/{1}'.format(val_ipnetwork, val_netmask)) + ip_network('{0}/{1}'.format(val_network, val_netmask)) except ValueError: if current_opt == opts[1]: raise ValueError(_('with netmask "{0}" ("{1}")').format(val_netmask, opts[0].impl_get_display_name())) else: - raise ValueError(_('with network "{0}" ("{1}")').format(val_ipnetwork, opts[1].impl_get_display_name())) + raise ValueError(_('with network "{0}" ("{1}")').format(val_network, opts[1].impl_get_display_name())) if msg is not None: self.raise_err(msg, val_netmask, - val_ipnetwork, + val_network, current_opt, opts, 'network') @@ -81,38 +81,41 @@ class NetmaskOption(StrOption): opts, vals, warnings_only, - context): - #opts must be (netmask, ip) options + context, + _cidr=False): + # opts must be (netmask, ip) options if context is undefined and len(vals) != 2: raise ConfigError(_('ip_netmask needs an IP and a netmask')) if None in vals or len(vals) != 2: return msg = None - val_netmask, val_ipnetwork = vals + val_netmask, val_ip = vals try: - ip = ip_interface('{0}/{1}'.format(val_ipnetwork, val_netmask)) + ip = ip_interface('{0}/{1}'.format(val_ip, val_netmask)) network = ip.network - #if not ip same has network - if str(network.netmask) != '255.255.255.255': - if ip.ip == network.network_address: - if current_opt == opts[1]: - msg = _('this is a network with netmask "{0}" ("{1}")') - else: - msg = _('this is a network with {2} "{0}" ("{1}")') - elif ip.ip == network.broadcast_address: - if current_opt == opts[1]: - msg = _('this is a broadcast with netmask "{0}" ("{1}")') - else: - msg = _('this is a broadcast with {2} "{0}" ("{1}")') + # if not ip same has network + if ip.ip == network.network_address: + if not _cidr and current_opt == opts[1]: + msg = _('this is a network with netmask "{0}" ("{1}")') + else: + msg = _('{2} "{0}" ("{1}") is the network') + elif ip.ip == network.broadcast_address: + if not _cidr and current_opt == opts[1]: + msg = _('this is a broadcast with netmask "{0}" ("{1}")') + else: + msg = _('{2} "{0}" ("{1}") is the broadcast') except ValueError: + import traceback + traceback.print_exc() pass if msg is not None: self.raise_err(msg, val_netmask, - val_ipnetwork, + val_ip, current_opt, opts, - 'IP') + 'IP', + _cidr) def raise_err(self, @@ -121,8 +124,9 @@ class NetmaskOption(StrOption): val_ipnetwork, current_opt, opts, - typ): - if current_opt == opts[1]: + typ, + _cidr=False): + if not _cidr and current_opt == opts[1]: raise ValueError(msg.format(val_netmask, opts[1].impl_get_display_name())) else: diff --git a/tiramisu/option/networkoption.py b/tiramisu/option/networkoption.py index 3f2c9c0..43e26f2 100644 --- a/tiramisu/option/networkoption.py +++ b/tiramisu/option/networkoption.py @@ -18,7 +18,7 @@ # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ -from ipaddress import ip_address, IPv4Address +from ipaddress import ip_address, ip_network, IPv4Network from ..setting import undefined from ..i18n import _ @@ -30,6 +30,35 @@ class NetworkOption(Option): __slots__ = tuple() _display_name = _('network address') + def __init__(self, + name, + doc, + default=None, + default_multi=None, + requires=None, + multi=False, + callback=None, + callback_params=None, + validator=None, + validator_params=None, + properties=None, + warnings_only=False, + cidr=False): + extra = {'_cidr': cidr} + super().__init__(name, + doc, + default=default, + default_multi=default_multi, + callback=callback, + callback_params=callback_params, + requires=requires, + multi=multi, + validator=validator, + validator_params=validator_params, + properties=properties, + warnings_only=warnings_only, + extra=extra) + def _validate(self, value, *args, @@ -38,11 +67,18 @@ class NetworkOption(Option): raise ValueError(_('invalid string')) if value.count('.') != 3: raise ValueError() - for val in value.split('.'): + cidr = self.impl_get_extra('_cidr') + if cidr: + if '/' not in value: + raise ValueError(_('must use CIDR notation')) + value_ = value.split('/')[0] + else: + value_ = value + for val in value_.split('.'): if val.startswith("0") and len(val) > 1: raise ValueError() try: - if not isinstance(ip_address(value), IPv4Address): + if not isinstance(ip_network(value), IPv4Network): raise ValueError() except ValueError: raise ValueError() @@ -50,10 +86,9 @@ class NetworkOption(Option): def _second_level_validation(self, value, warnings_only): - ip = ip_address(value) - if ip.is_reserved: + if ip_network(value).network_address.is_reserved: if warnings_only: - msg = _("shouldn't be in reserved class") + msg = _("shouldn't be reserved network") else: - msg = _("mustn't be in reserved class") + msg = _("mustn't be reserved network") raise ValueError(msg) diff --git a/tiramisu/option/optiondescription.py b/tiramisu/option/optiondescription.py index bd577d3..b858fb3 100644 --- a/tiramisu/option/optiondescription.py +++ b/tiramisu/option/optiondescription.py @@ -104,7 +104,7 @@ class CacheOptionDescription(BaseOption): if func not in ALLOWED_CONST_LIST and is_multi: if __debug__ and not option.impl_get_leadership(): raise ConfigError(_('malformed consistency option "{0}" ' - 'must be a leadership').format( + 'must be in same leadership').format( option.impl_getname())) leadership = option.impl_get_leadership() for weak_opt in all_cons_opts: