diff --git a/library/network_connections.py b/library/network_connections.py index 5267e82..07b29cc 100644 --- a/library/network_connections.py +++ b/library/network_connections.py @@ -162,7 +162,7 @@ class SysUtil: linkinfos = getattr(cls, "_link_infos", None) if linkinfos is None: try_count = 0 - b = None + last_fetch_linkinfos = None while True: try_count += 1 try: @@ -170,11 +170,11 @@ class SysUtil: # and interfaces can be renamed. Try to avoid that by # fetching the info twice and repeat until we get the same # result. - if b is None: - b = SysUtil._link_infos_fetch() + if last_fetch_linkinfos is None: + last_fetch_linkinfos = SysUtil._link_infos_fetch() linkinfos = SysUtil._link_infos_fetch() - if linkinfos != b: - b = linkinfos + if linkinfos != last_fetch_linkinfos: + last_fetch_linkinfos = linkinfos raise Exception( "cannot read stable link-infos. They keep changing" ) @@ -190,15 +190,15 @@ class SysUtil: def link_info_find(cls, refresh=False, mac=None, ifname=None): if mac is not None: mac = Util.mac_norm(mac) - for li in cls.link_infos(refresh).values(): + for linkinfo in cls.link_infos(refresh).values(): if mac is not None and mac not in [ - li.get("perm-address", None), - li.get("address", None), + linkinfo.get("perm-address", None), + linkinfo.get("address", None), ]: continue - if ifname is not None and ifname != li.get("ifname", None): + if ifname is not None and ifname != linkinfo.get("ifname", None): continue - return li + return linkinfo return None @@ -449,19 +449,19 @@ class IfcfgUtil: ifcfg["ETHTOOL_OPTS"] = ethtool_options if connection["controller"] is not None: - m = ArgUtil.connection_find_controller( + controller = ArgUtil.connection_find_controller( connection["controller"], connections, idx ) if connection["port_type"] == "bridge": - ifcfg["BRIDGE"] = m + ifcfg["BRIDGE"] = controller elif connection["port_type"] == "bond": # wokeignore:rule=master - ifcfg["MASTER"] = m + ifcfg["MASTER"] = controller # wokeignore:rule=slave ifcfg["SLAVE"] = "yes" elif connection["port_type"] == "team": # wokeignore:rule=master - ifcfg["TEAM_MASTER"] = m + ifcfg["TEAM_MASTER"] = controller if "TYPE" in ifcfg: del ifcfg["TYPE"] if connection["type"] != "team": @@ -476,8 +476,12 @@ class IfcfgUtil: if connection["zone"]: ifcfg["ZONE"] = connection["zone"] - addrs4 = [a for a in ip["address"] if a["family"] == socket.AF_INET] - addrs6 = [a for a in ip["address"] if a["family"] == socket.AF_INET6] + addrs4 = [ + addr for addr in ip["address"] if addr["family"] == socket.AF_INET + ] + addrs6 = [ + addr for addr in ip["address"] if addr["family"] == socket.AF_INET6 + ] if ip["dhcp4"]: ifcfg["BOOTPROTO"] = "dhcp" @@ -511,7 +515,10 @@ class IfcfgUtil: ) if len(addrs6) > 1: ifcfg["IPV6ADDR_SECONDARIES"] = " ".join( - [a["address"] + "/" + str(a["prefix"]) for a in addrs6[1:]] + [ + addr["address"] + "/" + str(addr["prefix"]) + for addr in addrs6[1:] + ] ) if ip["gateway6"] is not None: ifcfg["IPV6_DEFAULTGW"] = ip["gateway6"] @@ -1210,18 +1217,18 @@ class NMUtil: ): s_ip6.add_route(r) for r in ip["route"]: - rr = NM.IPRoute.new( + new_route = NM.IPRoute.new( r["family"], r["network"], r["prefix"], r["gateway"], r["metric"] ) if r["table"]: NM.IPRoute.set_attribute( - rr, "table", Util.GLib().Variant.new_uint32(r["table"]) + new_route, "table", Util.GLib().Variant.new_uint32(r["table"]) ) if r["family"] == socket.AF_INET: - s_ip4.add_route(rr) + s_ip4.add_route(new_route) else: - s_ip6.add_route(rr) + s_ip6.add_route(new_route) for routing_rule in ip["routing_rule"]: nm_routing_rule = NM.IPRoutingRule.new(routing_rule["family"]) NM.IPRoutingRule.set_priority(nm_routing_rule, routing_rule["priority"]) diff --git a/module_utils/network_lsr/argument_validator.py b/module_utils/network_lsr/argument_validator.py index 9ccaeb6..bb11f20 100644 --- a/module_utils/network_lsr/argument_validator.py +++ b/module_utils/network_lsr/argument_validator.py @@ -22,7 +22,7 @@ class ArgUtil: def connection_find_by_name(name, connections, n_connections=None): if not name: raise ValueError("missing name argument") - c = None + conn = None for idx, connection in enumerate(connections): if n_connections is not None and idx >= n_connections: break @@ -30,34 +30,34 @@ class ArgUtil: continue if connection["persistent_state"] == "absent": - c = None + conn = None elif connection["persistent_state"] == "present": - c = connection - return c + conn = connection + return conn @staticmethod def connection_find_controller(name, connections, n_connections=None): - c = ArgUtil.connection_find_by_name(name, connections, n_connections) - if not c: + connection = ArgUtil.connection_find_by_name(name, connections, n_connections) + if not connection: raise MyError("invalid controller/parent '%s'" % (name)) - if c["interface_name"] is None: + if connection["interface_name"] is None: raise MyError( "invalid controller/parent '%s' which needs an 'interface_name'" % (name) ) - if not Util.ifname_valid(c["interface_name"]): + if not Util.ifname_valid(connection["interface_name"]): raise MyError( "invalid controller/parent '%s' with invalid 'interface_name' ('%s')" - % (name, c["interface_name"]) + % (name, connection["interface_name"]) ) - return c["interface_name"] + return connection["interface_name"] @staticmethod def connection_find_controller_uuid(name, connections, n_connections=None): - c = ArgUtil.connection_find_by_name(name, connections, n_connections) - if not c: + connection = ArgUtil.connection_find_by_name(name, connections, n_connections) + if not connection: raise MyError("invalid controller/parent '%s'" % (name)) - return c["nm.uuid"] + return connection["nm.uuid"] @staticmethod def connection_get_non_absent_names(connections): @@ -954,7 +954,7 @@ class ArgValidator_DictIP(ArgValidatorDict): def _validate_post(self, value, name, result): has_ipv6_addresses = any( - a for a in result["address"] if a["family"] == socket.AF_INET6 + addr for addr in result["address"] if addr["family"] == socket.AF_INET6 ) if result["ipv6_disabled"] is True: @@ -985,7 +985,7 @@ class ArgValidator_DictIP(ArgValidatorDict): if result["dhcp4"] is None: result["dhcp4"] = result["dhcp4_send_hostname"] is not None or not any( - a for a in result["address"] if a["family"] == socket.AF_INET + addr for addr in result["address"] if addr["family"] == socket.AF_INET ) if result["auto6"] is None: @@ -1655,7 +1655,7 @@ class ArgValidator_DictMacvlan(ArgValidatorDict): class ArgValidatorPath(ArgValidatorStr): """ - Valides that the value is a valid posix absolute path + Validates that the value is a valid posix absolute path """ def __init__(self, name, required=False, default_value=None): @@ -1987,10 +1987,10 @@ class ArgValidator_DictConnection(ArgValidatorDict): # FIXME: Maybe just accept all values, even if they are not # needed/meaningful in the respective context valid_fields = set(valid_fields) - for k in result: - if k not in valid_fields: + for key in result: + if key not in valid_fields: raise ValidationError( - name + "." + k, + name + "." + key, "property is not allowed for state '%s' and persistent_state '%s'" % (state, persistent_state), ) diff --git a/module_utils/network_lsr/utils.py b/module_utils/network_lsr/utils.py index ea79cdd..0c3c048 100644 --- a/module_utils/network_lsr/utils.py +++ b/module_utils/network_lsr/utils.py @@ -233,7 +233,7 @@ class Util: if mac_str is None: return mac_str i = 0 - b = [] + byte_array = [] for c in mac_str: if i == 2: if c != ":": @@ -249,17 +249,17 @@ class Util: raise AssertionError("i != 1 - value is {0}".format(i)) n = n + int(c, 16) i = 2 - b.append(n) + byte_array.append(n) except Exception: raise MyError("not a valid MAC address: '%s'" % (mac_str)) if i == 1: raise MyError("not a valid MAC address: '%s'" % (mac_str)) if force_len is not None: - if force_len != len(b): + if force_len != len(byte_array): raise MyError( "not a valid MAC address of length %s: '%s'" % (force_len, mac_str) ) - return b + return byte_array @staticmethod def mac_ntoa(mac): diff --git a/tests/unit/test_network_connections.py b/tests/unit/test_network_connections.py index 1abff1f..4d8cd65 100644 --- a/tests/unit/test_network_connections.py +++ b/tests/unit/test_network_connections.py @@ -275,7 +275,7 @@ class TestValidator(Python26CompatTestCase): s6.clear_routes() for r in kwargs["nm_route_list_current"][idx]: r = parser.validate(r) - rr = NM.IPRoute.new( + new_route = NM.IPRoute.new( r["family"], r["network"], r["prefix"], @@ -284,12 +284,14 @@ class TestValidator(Python26CompatTestCase): ) if r["table"]: NM.IPRoute.set_attribute( - rr, "table", Util.GLib().Variant.new_uint32(r["table"]) + new_route, + "table", + Util.GLib().Variant.new_uint32(r["table"]), ) if r["family"] == socket.AF_INET: - s4.add_route(rr) + s4.add_route(new_route) else: - s6.add_route(rr) + s6.add_route(new_route) con_new = nmutil.connection_create( connections, idx, connection_current=con_new )