library: fetch the MAC address from the interfaces

This commit is contained in:
Thomas Haller 2017-02-12 14:14:33 +01:00
parent 8682cb64b8
commit faf3b699ba
2 changed files with 154 additions and 2 deletions

View file

@ -60,6 +60,19 @@ class Util:
return v
return default
@staticmethod
def check_output(argv, lang = None):
# subprocess.check_output is python 2.7.
with open('/dev/null', 'wb') as DEVNULL:
import subprocess, os
ev = os.environ.copy()
ev['LANG'] = lang if lang is not None else 'C'
p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=DEVNULL, env=ev)
out = p.communicate()[0]
if p.returncode != 0:
raise MyError('failure calling %s: exit with %s' % (argv, p.returncode))
return out
@classmethod
def create_uuid(cls):
cls.NM()
@ -159,6 +172,49 @@ class Util:
# FIXME: encoding issues regarding python unicode string
return True
@staticmethod
def mac_aton(mac_str, force_len=None):
# we also accept None and '' for convenience.
# - None yiels None
# - '' yields []
if mac_str is None:
return mac_str
i = 0
b = []
for c in mac_str:
if i == 2:
if c != ':':
raise MyError('not a valid MAC address: "%s"' % (mac_str))
i = 0
continue
try:
if i == 0:
n = int(c, 16) * 16
i = 1
else:
assert(i == 1)
n = n + int(c, 16)
i = 2
b.append(n)
except:
raise MyError('not a valid MAC address: "%s"' % (mac_str))
if i == 1:
raise MyError('not a valid MAC address: "%s"' % (mac_str))
if force_len is not None:
if force_len != len(b):
raise MyError('not a valid MAC address of length %s: "%s"' % (force_len, mac_str))
return b
@staticmethod
def mac_ntoa(mac):
if mac is None:
return None
return ':'.join(['%02x' % c for c in mac])
@staticmethod
def mac_norm(mac_str, force_len = None):
return Util.mac_ntoa(Util.mac_aton(mac_str, force_len))
@staticmethod
def boolean(arg):
import ansible.module_utils.basic as basic
@ -225,6 +281,84 @@ class Util:
###############################################################################
class SysUtil:
@staticmethod
def _sysctl_read(filename):
try_count = 0
while True:
try_count += 1
try:
with open(filename, 'r') as f:
return f.read()
except Exception as e:
if try_count < 5:
continue
raise
@staticmethod
def _link_read_ifindex(ifname):
c = SysUtil._sysctl_read('/sys/class/net/' + ifname + '/ifindex')
return int(c.strip())
@staticmethod
def _link_read_address(ifname):
c = SysUtil._sysctl_read('/sys/class/net/' + ifname + '/address')
return Util.mac_norm(c.strip())
@staticmethod
def _link_read_permaddress(ifname):
out = Util.check_output(['ethtool', '-P', ifname])
import re
m = re.match('^Permanent address: ([0-9A-Fa-f:]*)\n$', out)
if not m:
return None
return Util.mac_norm(m.group(1))
@staticmethod
def _link_infos_fetch():
import os
links = {}
for ifname in os.listdir('/sys/class/net/'):
ifindex = SysUtil._link_read_ifindex(ifname)
links[ifname] = {
'ifindex': ifindex,
'ifname': ifname,
'address': SysUtil._link_read_address(ifname),
'perm-address': SysUtil._link_read_permaddress(ifname),
}
return links
@classmethod
def link_infos(cls, refresh=False):
if refresh:
l = None
else:
l = getattr(cls, '_link_infos', None)
if l is None:
try_count = 0
b = None
while True:
try:
# there is a race in that we lookup properties by ifname
# and interfaces can be renamed. Try to avoid that by fetching
# the info twice and repeat until we get the same result.
if b is None:
b = SysUtil._link_infos_fetch()
l = SysUtil._link_infos_fetch()
if l != b:
b = l
raise Exception("cannot read stable link-infos. They keep changing")
except:
if try_count < 50:
raise
continue
break
cls._link_infos = l
return l
###############################################################################
class ArgUtil:
@staticmethod
def connection_find_by_name(name, connections, n_connections = None):
@ -328,6 +462,20 @@ class ArgValidatorIP(ArgValidatorStr):
return addr
return { "family": family, "address": addr }
class ArgValidatorMac(ArgValidatorStr):
def __init__(self, name, force_len = None, required = False, default_value = None):
ArgValidatorStr.__init__(self, name, required, default_value, None)
self.force_len = force_len
def _validate(self, value, name):
v = ArgValidatorStr._validate(self, value, name)
try:
addr = Util.mac_aton(v, self.force_len)
except MyError as e:
raise ValidationError(name, 'value "%s" is not a valid MAC address' % (value))
if not addr:
raise ValidationError(name, 'value "%s" is not a valid MAC address' % (value))
return Util.mac_ntoa(addr)
class ArgValidatorIPAddr(ArgValidatorStr):
def __init__(self, name, family = None, required = False, default_value = None):
ArgValidatorStr.__init__(self, name, required, default_value, None)
@ -457,7 +605,7 @@ class ArgValidator_DictConnection(ArgValidatorDict):
ArgValidatorStr ('slave_type', enum_values = ArgValidator_DictConnection.VALID_SLAVE_TYPES),
ArgValidatorStr ('master'),
ArgValidatorStr ('interface_name'),
ArgValidatorStr ('mac'),
ArgValidatorMac ('mac'),
ArgValidatorStr ('parent'),
ArgValidatorInt ('vlan_id', val_min = 0, val_max = 4095, default_value = None),
ArgValidatorBool('ignore_errors', default_value = None),

View file

@ -224,7 +224,7 @@ class TestValidator(unittest.TestCase):
'route_metric4': None,
'route_metric6': None,
},
'mac': None,
'mac': 'aa:bb:cc',
'master': None,
'name': '5',
'parent': None,
@ -239,10 +239,14 @@ class TestValidator(unittest.TestCase):
{
'name': '5',
'type': 'ethernet',
'mac': 'AA:bb:cC',
}
]),
)
with self.assertRaises(n.ValidationError):
n.AnsibleUtil.ARGS_CONNECTIONS.validate([ { 'name': 'b', 'type': 'ethernet', 'mac': 'aa:b' } ])
if __name__ == '__main__':
unittest.main()