network/module_utils/network_lsr/utils.py
Rich Megginson c1b0002bc9 fix python black issues
There was recently an upgrade to python black which made function
documentation checking stricter.  This caused network tox CI to
break.  The fix is to format the function doc text in the manner
expected by black.

Signed-off-by: Rich Megginson <rmeggins@redhat.com>
2021-05-03 18:41:05 +02:00

355 lines
10 KiB
Python

# SPDX-License-Identifier: BSD-3-Clause
# vim: fileencoding=utf8
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import socket
import sys
import uuid
# pylint: disable=import-error, no-name-in-module
from ansible.module_utils.network_lsr.myerror import MyError # noqa:E501
class Util:
PY3 = sys.version_info[0] == 3
# pylint: disable=undefined-variable
STRING_TYPE = str if PY3 else basestring # noqa:F821
@staticmethod
def first(iterable, default=None, pred=None):
for v in iterable:
if pred is None or pred(v):
return v
return default
@staticmethod
def path_to_glib_bytes(path):
"""
Converts a path to a GLib.Bytes object that can be accepted by NM
"""
return Util.GLib().Bytes.new(("file://%s\x00" % path).encode("utf-8"))
@staticmethod
def convert_passwd_flags_nm(secret_flags):
"""
Converts an array of "secret flags" strings
to an integer represantion understood by NetworkManager
"""
flag_int = 0
if "none" in secret_flags:
flag_int += 0
if "agent-owned" in secret_flags:
flag_int += 1
if "not-saved" in secret_flags:
flag_int += 2
if "not-required" in secret_flags:
flag_int += 4
return flag_int
@classmethod
def create_uuid(cls):
return str(uuid.uuid4())
@classmethod
def NM(cls):
n = getattr(cls, "_NM", None)
if n is None:
# Installing pygobject in a tox virtualenv does not work out of the
# box
# pylint: disable=import-error
import gi
gi.require_version("NM", "1.0")
from gi.repository import NM, GLib, Gio, GObject
cls._NM = NM
cls._GLib = GLib
cls._Gio = Gio
cls._GObject = GObject
n = NM
return n
@classmethod
def GLib(cls):
cls.NM()
return cls._GLib
@classmethod
def Gio(cls):
cls.NM()
return cls._Gio
@classmethod
def GObject(cls):
cls.NM()
return cls._GObject
@classmethod
def Timestamp(cls):
return cls.GLib().get_monotonic_time()
@classmethod
def GMainLoop(cls):
gmainloop = getattr(cls, "_GMainLoop", None)
if gmainloop is None:
gmainloop = cls.GLib().MainLoop()
cls._GMainLoop = gmainloop
return gmainloop
@classmethod
def GMainLoop_run(cls, timeout=None):
if timeout is None:
cls.GMainLoop().run()
return True
GLib = cls.GLib()
timeout_reached = []
loop = cls.GMainLoop()
def _timeout_cb(unused):
timeout_reached.append(1)
loop.quit()
return False
timeout_id = GLib.timeout_add(int(timeout * 1000), _timeout_cb, None)
loop.run()
if not timeout_reached:
GLib.source_remove(timeout_id)
return not timeout_reached
@classmethod
def GMainLoop_iterate(cls, may_block=False):
return cls.GMainLoop().get_context().iteration(may_block)
@classmethod
def GMainLoop_iterate_all(cls):
c = 0
while cls.GMainLoop_iterate():
c += 1
return c
@classmethod
def call_async_method(cls, object_, action, args, mainloop_timeout=10):
"""Asynchronously call a NetworkManager method"""
cancellable = cls.create_cancellable()
async_action = action + "_async"
# NM does not use a uniform naming for the async methods,
# for checkpoints it is:
# NMClient.checkpoint_create() and NMClient.checkpoint_create_finish(),
# but for reapply it is:
# NMDevice.reapply_async() and NMDevice.reapply_finish()
# NMDevice.reapply() is a synchronous version
# Therefore check if there is a method if an `async` suffix and use the
# one without the suffix otherwise
if not hasattr(object_, async_action):
async_action = action
finish = action + "_finish"
user_data = {}
fullargs = []
fullargs += args
fullargs += (cancellable, cls.create_callback(finish), user_data)
getattr(object_, async_action)(*fullargs)
if not cls.GMainLoop_run(mainloop_timeout):
cancellable.cancel()
raise MyError("failure to call %s.%s(): timeout" % (object_, async_action))
success = user_data.get("success", None)
if success is not None:
return success
raise MyError(
"failure to %s checkpoint: %s: %r"
% (action, user_data.get("error", "unknown error"), user_data)
)
@classmethod
def create_cancellable(cls):
return cls.Gio().Cancellable.new()
@classmethod
def create_callback(cls, finish_method):
"""
Create a callback that will return the result of the finish method and
quit the GMainLoop
:param finish_method str: Name of the finish method to call from the
source object in the callback
"""
def callback(source_object, res, user_data):
success = None
try:
success = getattr(source_object, finish_method)(res)
except Exception as e:
if cls.error_is_cancelled(e):
return
user_data["error"] = str(e)
user_data["success"] = success
cls.GMainLoop().quit()
return callback
@classmethod
def error_is_cancelled(cls, e):
GLib = cls.GLib()
if isinstance(e, GLib.GError):
if (
e.domain == "g-io-error-quark"
and e.code == cls.Gio().IOErrorEnum.CANCELLED
):
return True
return False
@staticmethod
def ifname_valid(ifname):
# see dev_valid_name() in kernel's net/core/dev.c
if not ifname:
return False
if ifname in [".", ".."]:
return False
if len(ifname) >= 16:
return False
if any([c == "/" or c == ":" or c.isspace() for c in ifname]):
return False
# FIXME: encoding issues regarding python unicode string
return True
@staticmethod
def mac_aton(mac_str, force_len=None):
# we also accept None and '' for convenience.
# - None yiels None
# - '' yields []
if mac_str is None:
return mac_str
i = 0
b = []
for c in mac_str:
if i == 2:
if c != ":":
raise MyError("not a valid MAC address: '%s'" % (mac_str))
i = 0
continue
try:
if i == 0:
n = int(c, 16) * 16
i = 1
else:
if not i == 1:
raise AssertionError("i != 1 - value is {0}".format(i))
n = n + int(c, 16)
i = 2
b.append(n)
except Exception:
raise MyError("not a valid MAC address: '%s'" % (mac_str))
if i == 1:
raise MyError("not a valid MAC address: '%s'" % (mac_str))
if force_len is not None:
if force_len != len(b):
raise MyError(
"not a valid MAC address of length %s: '%s'" % (force_len, mac_str)
)
return b
@staticmethod
def mac_ntoa(mac):
if mac is None:
return None
# bytearray() is needed for python2 compatibility
return ":".join(["%02x" % c for c in bytearray(mac)])
@staticmethod
def mac_norm(mac_str, force_len=None):
return Util.mac_ntoa(Util.mac_aton(mac_str, force_len))
@staticmethod
def boolean(arg):
if arg is None or isinstance(arg, bool):
return arg
arg0 = arg
if isinstance(arg, Util.STRING_TYPE):
arg = arg.lower()
if arg in ["y", "yes", "on", "1", "true", 1, True]:
return True
if arg in ["n", "no", "off", "0", "false", 0, False]:
return False
raise MyError("value '%s' is not a boolean" % (arg0))
@staticmethod
def parse_ip(addr, family=None):
if addr is None:
return (None, None)
if family is not None:
Util.addr_family_check(family)
a = socket.inet_pton(family, addr)
else:
a = None
family = None
try:
a = socket.inet_pton(socket.AF_INET, addr)
family = socket.AF_INET
except Exception:
a = socket.inet_pton(socket.AF_INET6, addr)
family = socket.AF_INET6
return (socket.inet_ntop(family, a), family)
@staticmethod
def addr_family_check(family):
if family != socket.AF_INET and family != socket.AF_INET6:
raise MyError("invalid address family %s" % (family))
@staticmethod
def addr_family_to_v(family):
if family is None:
return ""
if family == socket.AF_INET:
return "v4"
if family == socket.AF_INET6:
return "v6"
raise MyError("invalid address family '%s'" % (family))
@staticmethod
def addr_family_default_prefix(family):
Util.addr_family_check(family)
if family == socket.AF_INET:
return 24
else:
return 64
@staticmethod
def addr_family_valid_prefix(family, prefix):
Util.addr_family_check(family)
if family == socket.AF_INET:
m = 32
else:
m = 128
return prefix >= 0 and prefix <= m
@staticmethod
def parse_address(address, family=None):
try:
parts = address.split()
addr_parts = parts[0].split("/")
if len(addr_parts) != 2:
raise MyError("expect two addr-parts: ADDR/PLEN")
a, family = Util.parse_ip(addr_parts[0], family)
prefix = int(addr_parts[1])
if not Util.addr_family_valid_prefix(family, prefix):
raise MyError("invalid prefix %s" % (prefix))
if len(parts) > 1:
raise MyError("too many parts")
return {"address": a, "family": family, "prefix": prefix}
except Exception:
raise MyError("invalid address '%s'" % (address))