Files
wcm/vpn_manager/cidr4_merge/cidr4_merger.py
T

104 lines
3.0 KiB
Python

import cProfile
from .util import cidr4_to_node, get_data, make_cidr4
Node = tuple[int, int]
class Cidr4MergerError(Exception):
pass
def find_parent(a: Node, b: Node) -> Node:
ip_a, mask_len_a = a
ip_b, mask_len_b = b
mask_len = min(mask_len_a, mask_len_b)
mask = ((1 << mask_len) - 1) << (32 - mask_len)
while ip_a & mask != ip_b & mask:
mask_len -= 1
mask = (mask << 1) & ((1 << 32) - 1)
ip = ip_a & mask
parent_node = ip, mask_len
return parent_node
def calc_dip(mask_len_a: int, mask_len_b: int, mask_len_p: int) -> int:
def dip(mla, mlp):
m = mlp + 1
res = 1 << (mla - m)
res -= 1
res <<= 32 - mla
return res
return dip(mask_len_a, mask_len_p) + dip(mask_len_b, mask_len_p)
def merge_two_nodes(node_a: Node, node_b: Node) -> tuple[Node, int]:
parent_node = find_parent(node_a, node_b)
dip = calc_dip(node_a[1], node_b[1], parent_node[1])
return parent_node, dip
def merge_nodes(nodes: list[Node], required_len: int) -> tuple[list[Node], int]:
sum_dip = 0
while len(nodes) > required_len:
min_tuple = None, None, float("inf")
for i, (a, b) in enumerate(zip(nodes, nodes[1:])):
parent_node, dip = merge_two_nodes(a, b)
if dip < min_tuple[2]:
min_tuple = i, parent_node, dip
idx, parent_node, dip = min_tuple
nodes = nodes[:idx] + [parent_node] + nodes[idx + 2 :]
sum_dip += dip
while neighbors := find_neighbors(nodes):
idx, a, b = neighbors[0]
parent_node, dip = merge_two_nodes(a, b)
nodes = nodes[:idx] + [parent_node] + nodes[idx + 2 :]
sum_dip += dip
return nodes, sum_dip
def find_subnets(nodes: list[Node]) -> list[tuple[Node, Node]]:
subnets = []
for i, (a, b) in enumerate(zip(nodes, nodes[1:])):
parent_node, dip = merge_two_nodes(a, b)
if parent_node == a or parent_node == b:
subnets.append((a, b))
return subnets
def ensure_no_subnets(nodes: list[Node]):
if subnets := find_subnets(nodes):
raise Cidr4MergerError(f"There are subnets! {subnets=}")
def find_neighbors(nodes: list[Node]) -> list[tuple[int, Node, Node]]:
neighbors = []
for i, (a, b) in enumerate(zip(nodes, nodes[1:])):
parent_node, dip = merge_two_nodes(a, b)
if parent_node[1] + 1 == a[1] == b[1]:
neighbors.append((i, a, b))
return neighbors
def ensure_no_neighbors(nodes: list[Node]):
if neighbors := find_neighbors(nodes):
raise Cidr4MergerError(f"There are neighbors! {neighbors=}")
def main():
required_len = 20
data = get_data()
nodes = sorted(map(cidr4_to_node, data))
ensure_no_subnets(nodes)
ensure_no_neighbors(nodes)
merged_nodes, sum_dip = merge_nodes(nodes, required_len)
cidr4s = [make_cidr4(ip, mask_len) for ip, mask_len in merged_nodes]
print(cidr4s, sum_dip, sep="\n")
if __name__ == "__main__":
cProfile.run("main()")