Fixed database typo and removed unnecessary class identifier.

This commit is contained in:
Batuhan Berk Başoğlu 2020-10-14 10:10:37 -04:00
parent 00ad49a143
commit 45fb349a7d
5098 changed files with 952558 additions and 85 deletions

View file

@ -0,0 +1,418 @@
import itertools
import pytest
import networkx as nx
from networkx.algorithms import flow
from networkx.algorithms.connectivity import local_edge_connectivity
from networkx.algorithms.connectivity import local_node_connectivity
flow_funcs = [
flow.boykov_kolmogorov,
flow.dinitz,
flow.edmonds_karp,
flow.preflow_push,
flow.shortest_augmenting_path,
]
# helper functions for tests
def _generate_no_biconnected(max_attempts=50):
attempts = 0
while True:
G = nx.fast_gnp_random_graph(100, 0.0575, seed=42)
if nx.is_connected(G) and not nx.is_biconnected(G):
attempts = 0
yield G
else:
if attempts >= max_attempts:
msg = f"Tried {max_attempts} times: no suitable Graph."
raise Exception(msg)
else:
attempts += 1
def test_average_connectivity():
# figure 1 from:
# Beineke, L., O. Oellermann, and R. Pippert (2002). The average
# connectivity of a graph. Discrete mathematics 252(1-3), 31-45
# http://www.sciencedirect.com/science/article/pii/S0012365X01001807
G1 = nx.path_graph(3)
G1.add_edges_from([(1, 3), (1, 4)])
G2 = nx.path_graph(3)
G2.add_edges_from([(1, 3), (1, 4), (0, 3), (0, 4), (3, 4)])
G3 = nx.Graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert nx.average_node_connectivity(G1, **kwargs) == 1, errmsg
assert nx.average_node_connectivity(G2, **kwargs) == 2.2, errmsg
assert nx.average_node_connectivity(G3, **kwargs) == 0, errmsg
def test_average_connectivity_directed():
G = nx.DiGraph([(1, 3), (1, 4), (1, 5)])
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert nx.average_node_connectivity(G) == 0.25, errmsg
def test_articulation_points():
Ggen = _generate_no_biconnected()
for flow_func in flow_funcs:
for i in range(3):
G = next(Ggen)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert nx.node_connectivity(G, flow_func=flow_func) == 1, errmsg
def test_brandes_erlebach():
# Figure 1 chapter 7: Connectivity
# http://www.informatik.uni-augsburg.de/thi/personen/kammer/Graph_Connectivity.pdf
G = nx.Graph()
G.add_edges_from(
[
(1, 2),
(1, 3),
(1, 4),
(1, 5),
(2, 3),
(2, 6),
(3, 4),
(3, 6),
(4, 6),
(4, 7),
(5, 7),
(6, 8),
(6, 9),
(7, 8),
(7, 10),
(8, 11),
(9, 10),
(9, 11),
(10, 11),
]
)
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 3 == local_edge_connectivity(G, 1, 11, **kwargs), errmsg
assert 3 == nx.edge_connectivity(G, 1, 11, **kwargs), errmsg
assert 2 == local_node_connectivity(G, 1, 11, **kwargs), errmsg
assert 2 == nx.node_connectivity(G, 1, 11, **kwargs), errmsg
assert 2 == nx.edge_connectivity(G, **kwargs), errmsg
assert 2 == nx.node_connectivity(G, **kwargs), errmsg
if flow_func is flow.preflow_push:
assert 3 == nx.edge_connectivity(G, 1, 11, cutoff=2, **kwargs), errmsg
else:
assert 2 == nx.edge_connectivity(G, 1, 11, cutoff=2, **kwargs), errmsg
def test_white_harary_1():
# Figure 1b white and harary (2001)
# # http://eclectic.ss.uci.edu/~drwhite/sm-w23.PDF
# A graph with high adhesion (edge connectivity) and low cohesion
# (vertex connectivity)
G = nx.disjoint_union(nx.complete_graph(4), nx.complete_graph(4))
G.remove_node(7)
for i in range(4, 7):
G.add_edge(0, i)
G = nx.disjoint_union(G, nx.complete_graph(4))
G.remove_node(G.order() - 1)
for i in range(7, 10):
G.add_edge(0, i)
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 1 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_white_harary_2():
# Figure 8 white and harary (2001)
# # http://eclectic.ss.uci.edu/~drwhite/sm-w23.PDF
G = nx.disjoint_union(nx.complete_graph(4), nx.complete_graph(4))
G.add_edge(0, 4)
# kappa <= lambda <= delta
assert 3 == min(nx.core_number(G).values())
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 1 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 1 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_complete_graphs():
for n in range(5, 20, 5):
for flow_func in flow_funcs:
G = nx.complete_graph(n)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert n - 1 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert n - 1 == nx.node_connectivity(
G.to_directed(), flow_func=flow_func
), errmsg
assert n - 1 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
assert n - 1 == nx.edge_connectivity(
G.to_directed(), flow_func=flow_func
), errmsg
def test_empty_graphs():
for k in range(5, 25, 5):
G = nx.empty_graph(k)
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 0 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 0 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_petersen():
G = nx.petersen_graph()
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 3 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_tutte():
G = nx.tutte_graph()
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 3 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_dodecahedral():
G = nx.dodecahedral_graph()
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 3 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_octahedral():
G = nx.octahedral_graph()
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 4 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 4 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_icosahedral():
G = nx.icosahedral_graph()
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 5 == nx.node_connectivity(G, flow_func=flow_func), errmsg
assert 5 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
def test_missing_source():
G = nx.path_graph(4)
for flow_func in flow_funcs:
pytest.raises(
nx.NetworkXError, nx.node_connectivity, G, 10, 1, flow_func=flow_func
)
def test_missing_target():
G = nx.path_graph(4)
for flow_func in flow_funcs:
pytest.raises(
nx.NetworkXError, nx.node_connectivity, G, 1, 10, flow_func=flow_func
)
def test_edge_missing_source():
G = nx.path_graph(4)
for flow_func in flow_funcs:
pytest.raises(
nx.NetworkXError, nx.edge_connectivity, G, 10, 1, flow_func=flow_func
)
def test_edge_missing_target():
G = nx.path_graph(4)
for flow_func in flow_funcs:
pytest.raises(
nx.NetworkXError, nx.edge_connectivity, G, 1, 10, flow_func=flow_func
)
def test_not_weakly_connected():
G = nx.DiGraph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert nx.node_connectivity(G) == 0, errmsg
assert nx.edge_connectivity(G) == 0, errmsg
def test_not_connected():
G = nx.Graph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert nx.node_connectivity(G) == 0, errmsg
assert nx.edge_connectivity(G) == 0, errmsg
def test_directed_edge_connectivity():
G = nx.cycle_graph(10, create_using=nx.DiGraph()) # only one direction
D = nx.cycle_graph(10).to_directed() # 2 reciprocal edges
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
assert 1 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
assert 1 == local_edge_connectivity(G, 1, 4, flow_func=flow_func), errmsg
assert 1 == nx.edge_connectivity(G, 1, 4, flow_func=flow_func), errmsg
assert 2 == nx.edge_connectivity(D, flow_func=flow_func), errmsg
assert 2 == local_edge_connectivity(D, 1, 4, flow_func=flow_func), errmsg
assert 2 == nx.edge_connectivity(D, 1, 4, flow_func=flow_func), errmsg
def test_cutoff():
G = nx.complete_graph(5)
for local_func in [local_edge_connectivity, local_node_connectivity]:
for flow_func in flow_funcs:
if flow_func is flow.preflow_push:
# cutoff is not supported by preflow_push
continue
for cutoff in [3, 2, 1]:
result = local_func(G, 0, 4, flow_func=flow_func, cutoff=cutoff)
assert cutoff == result, f"cutoff error in {flow_func.__name__}"
def test_invalid_auxiliary():
G = nx.complete_graph(5)
pytest.raises(nx.NetworkXError, local_node_connectivity, G, 0, 3, auxiliary=G)
def test_interface_only_source():
G = nx.complete_graph(5)
for interface_func in [nx.node_connectivity, nx.edge_connectivity]:
pytest.raises(nx.NetworkXError, interface_func, G, s=0)
def test_interface_only_target():
G = nx.complete_graph(5)
for interface_func in [nx.node_connectivity, nx.edge_connectivity]:
pytest.raises(nx.NetworkXError, interface_func, G, t=3)
def test_edge_connectivity_flow_vs_stoer_wagner():
graph_funcs = [nx.icosahedral_graph, nx.octahedral_graph, nx.dodecahedral_graph]
for graph_func in graph_funcs:
G = graph_func()
assert nx.stoer_wagner(G)[0] == nx.edge_connectivity(G)
class TestAllPairsNodeConnectivity:
@classmethod
def setup_class(cls):
cls.path = nx.path_graph(7)
cls.directed_path = nx.path_graph(7, create_using=nx.DiGraph())
cls.cycle = nx.cycle_graph(7)
cls.directed_cycle = nx.cycle_graph(7, create_using=nx.DiGraph())
cls.gnp = nx.gnp_random_graph(30, 0.1, seed=42)
cls.directed_gnp = nx.gnp_random_graph(30, 0.1, directed=True, seed=42)
cls.K20 = nx.complete_graph(20)
cls.K10 = nx.complete_graph(10)
cls.K5 = nx.complete_graph(5)
cls.G_list = [
cls.path,
cls.directed_path,
cls.cycle,
cls.directed_cycle,
cls.gnp,
cls.directed_gnp,
cls.K10,
cls.K5,
cls.K20,
]
def test_cycles(self):
K_undir = nx.all_pairs_node_connectivity(self.cycle)
for source in K_undir:
for target, k in K_undir[source].items():
assert k == 2
K_dir = nx.all_pairs_node_connectivity(self.directed_cycle)
for source in K_dir:
for target, k in K_dir[source].items():
assert k == 1
def test_complete(self):
for G in [self.K10, self.K5, self.K20]:
K = nx.all_pairs_node_connectivity(G)
for source in K:
for target, k in K[source].items():
assert k == len(G) - 1
def test_paths(self):
K_undir = nx.all_pairs_node_connectivity(self.path)
for source in K_undir:
for target, k in K_undir[source].items():
assert k == 1
K_dir = nx.all_pairs_node_connectivity(self.directed_path)
for source in K_dir:
for target, k in K_dir[source].items():
if source < target:
assert k == 1
else:
assert k == 0
def test_all_pairs_connectivity_nbunch(self):
G = nx.complete_graph(5)
nbunch = [0, 2, 3]
C = nx.all_pairs_node_connectivity(G, nbunch=nbunch)
assert len(C) == len(nbunch)
def test_all_pairs_connectivity_icosahedral(self):
G = nx.icosahedral_graph()
C = nx.all_pairs_node_connectivity(G)
assert all(5 == C[u][v] for u, v in itertools.combinations(G, 2))
def test_all_pairs_connectivity(self):
G = nx.Graph()
nodes = [0, 1, 2, 3]
nx.add_path(G, nodes)
A = {n: {} for n in G}
for u, v in itertools.combinations(nodes, 2):
A[u][v] = A[v][u] = nx.node_connectivity(G, u, v)
C = nx.all_pairs_node_connectivity(G)
assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
(k, sorted(v)) for k, v in C.items()
)
def test_all_pairs_connectivity_directed(self):
G = nx.DiGraph()
nodes = [0, 1, 2, 3]
nx.add_path(G, nodes)
A = {n: {} for n in G}
for u, v in itertools.permutations(nodes, 2):
A[u][v] = nx.node_connectivity(G, u, v)
C = nx.all_pairs_node_connectivity(G)
assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
(k, sorted(v)) for k, v in C.items()
)
def test_all_pairs_connectivity_nbunch_combinations(self):
G = nx.complete_graph(5)
nbunch = [0, 2, 3]
A = {n: {} for n in nbunch}
for u, v in itertools.combinations(nbunch, 2):
A[u][v] = A[v][u] = nx.node_connectivity(G, u, v)
C = nx.all_pairs_node_connectivity(G, nbunch=nbunch)
assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
(k, sorted(v)) for k, v in C.items()
)
def test_all_pairs_connectivity_nbunch_iter(self):
G = nx.complete_graph(5)
nbunch = [0, 2, 3]
A = {n: {} for n in nbunch}
for u, v in itertools.combinations(nbunch, 2):
A[u][v] = A[v][u] = nx.node_connectivity(G, u, v)
C = nx.all_pairs_node_connectivity(G, nbunch=iter(nbunch))
assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
(k, sorted(v)) for k, v in C.items()
)

View file

@ -0,0 +1,310 @@
import pytest
import networkx as nx
from networkx.algorithms import flow
from networkx.algorithms.connectivity import minimum_st_edge_cut
from networkx.algorithms.connectivity import minimum_st_node_cut
from networkx.utils import arbitrary_element
flow_funcs = [
flow.boykov_kolmogorov,
flow.dinitz,
flow.edmonds_karp,
flow.preflow_push,
flow.shortest_augmenting_path,
]
# Tests for node and edge cutsets
def _generate_no_biconnected(max_attempts=50):
attempts = 0
while True:
G = nx.fast_gnp_random_graph(100, 0.0575, seed=42)
if nx.is_connected(G) and not nx.is_biconnected(G):
attempts = 0
yield G
else:
if attempts >= max_attempts:
msg = f"Tried {attempts} times: no suitable Graph."
raise Exception(msg)
else:
attempts += 1
def test_articulation_points():
Ggen = _generate_no_biconnected()
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
for i in range(1): # change 1 to 3 or more for more realizations.
G = next(Ggen)
cut = nx.minimum_node_cut(G, flow_func=flow_func)
assert len(cut) == 1, errmsg
assert cut.pop() in set(nx.articulation_points(G)), errmsg
def test_brandes_erlebach_book():
# Figure 1 chapter 7: Connectivity
# http://www.informatik.uni-augsburg.de/thi/personen/kammer/Graph_Connectivity.pdf
G = nx.Graph()
G.add_edges_from(
[
(1, 2),
(1, 3),
(1, 4),
(1, 5),
(2, 3),
(2, 6),
(3, 4),
(3, 6),
(4, 6),
(4, 7),
(5, 7),
(6, 8),
(6, 9),
(7, 8),
(7, 10),
(8, 11),
(9, 10),
(9, 11),
(10, 11),
]
)
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge cutsets
assert 3 == len(nx.minimum_edge_cut(G, 1, 11, **kwargs)), errmsg
edge_cut = nx.minimum_edge_cut(G, **kwargs)
# Node 5 has only two edges
assert 2 == len(edge_cut), errmsg
H = G.copy()
H.remove_edges_from(edge_cut)
assert not nx.is_connected(H), errmsg
# node cuts
assert {6, 7} == minimum_st_node_cut(G, 1, 11, **kwargs), errmsg
assert {6, 7} == nx.minimum_node_cut(G, 1, 11, **kwargs), errmsg
node_cut = nx.minimum_node_cut(G, **kwargs)
assert 2 == len(node_cut), errmsg
H = G.copy()
H.remove_nodes_from(node_cut)
assert not nx.is_connected(H), errmsg
def test_white_harary_paper():
# Figure 1b white and harary (2001)
# http://eclectic.ss.uci.edu/~drwhite/sm-w23.PDF
# A graph with high adhesion (edge connectivity) and low cohesion
# (node connectivity)
G = nx.disjoint_union(nx.complete_graph(4), nx.complete_graph(4))
G.remove_node(7)
for i in range(4, 7):
G.add_edge(0, i)
G = nx.disjoint_union(G, nx.complete_graph(4))
G.remove_node(G.order() - 1)
for i in range(7, 10):
G.add_edge(0, i)
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge cuts
edge_cut = nx.minimum_edge_cut(G, **kwargs)
assert 3 == len(edge_cut), errmsg
H = G.copy()
H.remove_edges_from(edge_cut)
assert not nx.is_connected(H), errmsg
# node cuts
node_cut = nx.minimum_node_cut(G, **kwargs)
assert {0} == node_cut, errmsg
H = G.copy()
H.remove_nodes_from(node_cut)
assert not nx.is_connected(H), errmsg
def test_petersen_cutset():
G = nx.petersen_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge cuts
edge_cut = nx.minimum_edge_cut(G, **kwargs)
assert 3 == len(edge_cut), errmsg
H = G.copy()
H.remove_edges_from(edge_cut)
assert not nx.is_connected(H), errmsg
# node cuts
node_cut = nx.minimum_node_cut(G, **kwargs)
assert 3 == len(node_cut), errmsg
H = G.copy()
H.remove_nodes_from(node_cut)
assert not nx.is_connected(H), errmsg
def test_octahedral_cutset():
G = nx.octahedral_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge cuts
edge_cut = nx.minimum_edge_cut(G, **kwargs)
assert 4 == len(edge_cut), errmsg
H = G.copy()
H.remove_edges_from(edge_cut)
assert not nx.is_connected(H), errmsg
# node cuts
node_cut = nx.minimum_node_cut(G, **kwargs)
assert 4 == len(node_cut), errmsg
H = G.copy()
H.remove_nodes_from(node_cut)
assert not nx.is_connected(H), errmsg
def test_icosahedral_cutset():
G = nx.icosahedral_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge cuts
edge_cut = nx.minimum_edge_cut(G, **kwargs)
assert 5 == len(edge_cut), errmsg
H = G.copy()
H.remove_edges_from(edge_cut)
assert not nx.is_connected(H), errmsg
# node cuts
node_cut = nx.minimum_node_cut(G, **kwargs)
assert 5 == len(node_cut), errmsg
H = G.copy()
H.remove_nodes_from(node_cut)
assert not nx.is_connected(H), errmsg
def test_node_cutset_exception():
G = nx.Graph()
G.add_edges_from([(1, 2), (3, 4)])
for flow_func in flow_funcs:
pytest.raises(nx.NetworkXError, nx.minimum_node_cut, G, flow_func=flow_func)
def test_node_cutset_random_graphs():
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
for i in range(3):
G = nx.fast_gnp_random_graph(50, 0.25, seed=42)
if not nx.is_connected(G):
ccs = iter(nx.connected_components(G))
start = arbitrary_element(next(ccs))
G.add_edges_from((start, arbitrary_element(c)) for c in ccs)
cutset = nx.minimum_node_cut(G, flow_func=flow_func)
assert nx.node_connectivity(G) == len(cutset), errmsg
G.remove_nodes_from(cutset)
assert not nx.is_connected(G), errmsg
def test_edge_cutset_random_graphs():
for flow_func in flow_funcs:
errmsg = f"Assertion failed in function: {flow_func.__name__}"
for i in range(3):
G = nx.fast_gnp_random_graph(50, 0.25, seed=42)
if not nx.is_connected(G):
ccs = iter(nx.connected_components(G))
start = arbitrary_element(next(ccs))
G.add_edges_from((start, arbitrary_element(c)) for c in ccs)
cutset = nx.minimum_edge_cut(G, flow_func=flow_func)
assert nx.edge_connectivity(G) == len(cutset), errmsg
G.remove_edges_from(cutset)
assert not nx.is_connected(G), errmsg
def test_empty_graphs():
G = nx.Graph()
D = nx.DiGraph()
for interface_func in [nx.minimum_node_cut, nx.minimum_edge_cut]:
for flow_func in flow_funcs:
pytest.raises(
nx.NetworkXPointlessConcept, interface_func, G, flow_func=flow_func
)
pytest.raises(
nx.NetworkXPointlessConcept, interface_func, D, flow_func=flow_func
)
def test_unbounded():
G = nx.complete_graph(5)
for flow_func in flow_funcs:
assert 4 == len(minimum_st_edge_cut(G, 1, 4, flow_func=flow_func))
def test_missing_source():
G = nx.path_graph(4)
for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
for flow_func in flow_funcs:
pytest.raises(
nx.NetworkXError, interface_func, G, 10, 1, flow_func=flow_func
)
def test_missing_target():
G = nx.path_graph(4)
for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
for flow_func in flow_funcs:
pytest.raises(
nx.NetworkXError, interface_func, G, 1, 10, flow_func=flow_func
)
def test_not_weakly_connected():
G = nx.DiGraph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
for flow_func in flow_funcs:
pytest.raises(nx.NetworkXError, interface_func, G, flow_func=flow_func)
def test_not_connected():
G = nx.Graph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
for flow_func in flow_funcs:
pytest.raises(nx.NetworkXError, interface_func, G, flow_func=flow_func)
def tests_min_cut_complete():
G = nx.complete_graph(5)
for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
for flow_func in flow_funcs:
assert 4 == len(interface_func(G, flow_func=flow_func))
def tests_min_cut_complete_directed():
G = nx.complete_graph(5)
G = G.to_directed()
for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
for flow_func in flow_funcs:
assert 4 == len(interface_func(G, flow_func=flow_func))
def tests_minimum_st_node_cut():
G = nx.Graph()
G.add_nodes_from([0, 1, 2, 3, 7, 8, 11, 12])
G.add_edges_from([(7, 11), (1, 11), (1, 12), (12, 8), (0, 1)])
nodelist = minimum_st_node_cut(G, 7, 11)
assert nodelist == {}
def test_invalid_auxiliary():
G = nx.complete_graph(5)
pytest.raises(nx.NetworkXError, minimum_st_node_cut, G, 0, 3, auxiliary=G)
def test_interface_only_source():
G = nx.complete_graph(5)
for interface_func in [nx.minimum_node_cut, nx.minimum_edge_cut]:
pytest.raises(nx.NetworkXError, interface_func, G, s=0)
def test_interface_only_target():
G = nx.complete_graph(5)
for interface_func in [nx.minimum_node_cut, nx.minimum_edge_cut]:
pytest.raises(nx.NetworkXError, interface_func, G, t=3)

View file

@ -0,0 +1,249 @@
import pytest
import networkx as nx
from networkx.algorithms import flow
from networkx.utils import pairwise
flow_funcs = [
flow.boykov_kolmogorov,
flow.edmonds_karp,
flow.dinitz,
flow.preflow_push,
flow.shortest_augmenting_path,
]
def is_path(G, path):
return all(v in G[u] for u, v in pairwise(path))
def are_edge_disjoint_paths(G, paths):
if not paths:
return False
for path in paths:
assert is_path(G, path)
paths_edges = [list(pairwise(p)) for p in paths]
num_of_edges = sum(len(e) for e in paths_edges)
num_unique_edges = len(set.union(*[set(es) for es in paths_edges]))
if num_of_edges == num_unique_edges:
return True
return False
def are_node_disjoint_paths(G, paths):
if not paths:
return False
for path in paths:
assert is_path(G, path)
# first and last nodes are source and target
st = {paths[0][0], paths[0][-1]}
num_of_nodes = len([n for path in paths for n in path if n not in st])
num_unique_nodes = len({n for path in paths for n in path if n not in st})
if num_of_nodes == num_unique_nodes:
return True
return False
def test_graph_from_pr_2053():
G = nx.Graph()
G.add_edges_from(
[
("A", "B"),
("A", "D"),
("A", "F"),
("A", "G"),
("B", "C"),
("B", "D"),
("B", "G"),
("C", "D"),
("C", "E"),
("C", "Z"),
("D", "E"),
("D", "F"),
("E", "F"),
("E", "Z"),
("F", "Z"),
("G", "Z"),
]
)
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge disjoint paths
edge_paths = list(nx.edge_disjoint_paths(G, "A", "Z", **kwargs))
assert are_edge_disjoint_paths(G, edge_paths), errmsg
assert nx.edge_connectivity(G, "A", "Z") == len(edge_paths), errmsg
# node disjoint paths
node_paths = list(nx.node_disjoint_paths(G, "A", "Z", **kwargs))
assert are_node_disjoint_paths(G, node_paths), errmsg
assert nx.node_connectivity(G, "A", "Z") == len(node_paths), errmsg
def test_florentine_families():
G = nx.florentine_families_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge disjoint paths
edge_dpaths = list(nx.edge_disjoint_paths(G, "Medici", "Strozzi", **kwargs))
assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
assert nx.edge_connectivity(G, "Medici", "Strozzi") == len(edge_dpaths), errmsg
# node disjoint paths
node_dpaths = list(nx.node_disjoint_paths(G, "Medici", "Strozzi", **kwargs))
assert are_node_disjoint_paths(G, node_dpaths), errmsg
assert nx.node_connectivity(G, "Medici", "Strozzi") == len(node_dpaths), errmsg
def test_karate():
G = nx.karate_club_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge disjoint paths
edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 33, **kwargs))
assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
assert nx.edge_connectivity(G, 0, 33) == len(edge_dpaths), errmsg
# node disjoint paths
node_dpaths = list(nx.node_disjoint_paths(G, 0, 33, **kwargs))
assert are_node_disjoint_paths(G, node_dpaths), errmsg
assert nx.node_connectivity(G, 0, 33) == len(node_dpaths), errmsg
def test_petersen_disjoint_paths():
G = nx.petersen_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge disjoint paths
edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 6, **kwargs))
assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
assert 3 == len(edge_dpaths), errmsg
# node disjoint paths
node_dpaths = list(nx.node_disjoint_paths(G, 0, 6, **kwargs))
assert are_node_disjoint_paths(G, node_dpaths), errmsg
assert 3 == len(node_dpaths), errmsg
def test_octahedral_disjoint_paths():
G = nx.octahedral_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge disjoint paths
edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 5, **kwargs))
assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
assert 4 == len(edge_dpaths), errmsg
# node disjoint paths
node_dpaths = list(nx.node_disjoint_paths(G, 0, 5, **kwargs))
assert are_node_disjoint_paths(G, node_dpaths), errmsg
assert 4 == len(node_dpaths), errmsg
def test_icosahedral_disjoint_paths():
G = nx.icosahedral_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
# edge disjoint paths
edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 6, **kwargs))
assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
assert 5 == len(edge_dpaths), errmsg
# node disjoint paths
node_dpaths = list(nx.node_disjoint_paths(G, 0, 6, **kwargs))
assert are_node_disjoint_paths(G, node_dpaths), errmsg
assert 5 == len(node_dpaths), errmsg
def test_cutoff_disjoint_paths():
G = nx.icosahedral_graph()
for flow_func in flow_funcs:
kwargs = dict(flow_func=flow_func)
errmsg = f"Assertion failed in function: {flow_func.__name__}"
for cutoff in [2, 4]:
kwargs["cutoff"] = cutoff
# edge disjoint paths
edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 6, **kwargs))
assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
assert cutoff == len(edge_dpaths), errmsg
# node disjoint paths
node_dpaths = list(nx.node_disjoint_paths(G, 0, 6, **kwargs))
assert are_node_disjoint_paths(G, node_dpaths), errmsg
assert cutoff == len(node_dpaths), errmsg
def test_missing_source_edge_paths():
with pytest.raises(nx.NetworkXError):
G = nx.path_graph(4)
list(nx.edge_disjoint_paths(G, 10, 1))
def test_missing_source_node_paths():
with pytest.raises(nx.NetworkXError):
G = nx.path_graph(4)
list(nx.node_disjoint_paths(G, 10, 1))
def test_missing_target_edge_paths():
with pytest.raises(nx.NetworkXError):
G = nx.path_graph(4)
list(nx.edge_disjoint_paths(G, 1, 10))
def test_missing_target_node_paths():
with pytest.raises(nx.NetworkXError):
G = nx.path_graph(4)
list(nx.node_disjoint_paths(G, 1, 10))
def test_not_weakly_connected_edges():
with pytest.raises(nx.NetworkXNoPath):
G = nx.DiGraph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
list(nx.edge_disjoint_paths(G, 1, 5))
def test_not_weakly_connected_nodes():
with pytest.raises(nx.NetworkXNoPath):
G = nx.DiGraph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
list(nx.node_disjoint_paths(G, 1, 5))
def test_not_connected_edges():
with pytest.raises(nx.NetworkXNoPath):
G = nx.Graph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
list(nx.edge_disjoint_paths(G, 1, 5))
def test_not_connected_nodes():
with pytest.raises(nx.NetworkXNoPath):
G = nx.Graph()
nx.add_path(G, [1, 2, 3])
nx.add_path(G, [4, 5])
list(nx.node_disjoint_paths(G, 1, 5))
def test_isolated_edges():
with pytest.raises(nx.NetworkXNoPath):
G = nx.Graph()
G.add_node(1)
nx.add_path(G, [4, 5])
list(nx.edge_disjoint_paths(G, 1, 5))
def test_isolated_nodes():
with pytest.raises(nx.NetworkXNoPath):
G = nx.Graph()
G.add_node(1)
nx.add_path(G, [4, 5])
list(nx.node_disjoint_paths(G, 1, 5))
def test_invalid_auxiliary():
with pytest.raises(nx.NetworkXError):
G = nx.complete_graph(5)
list(nx.node_disjoint_paths(G, 0, 3, auxiliary=G))

View file

@ -0,0 +1,495 @@
import random
import networkx as nx
import itertools as it
from networkx.utils import pairwise
import pytest
from networkx.algorithms.connectivity import k_edge_augmentation
from networkx.algorithms.connectivity.edge_augmentation import (
collapse,
complement_edges,
is_locally_k_edge_connected,
is_k_edge_connected,
_unpack_available_edges,
)
# This should be set to the largest k for which an efficient algorithm is
# explicitly defined.
MAX_EFFICIENT_K = 2
def tarjan_bridge_graph():
# graph from tarjan paper
# RE Tarjan - "A note on finding the bridges of a graph"
# Information Processing Letters, 1974 - Elsevier
# doi:10.1016/0020-0190(74)90003-9.
# define 2-connected components and bridges
ccs = [
(1, 2, 4, 3, 1, 4),
(5, 6, 7, 5),
(8, 9, 10, 8),
(17, 18, 16, 15, 17),
(11, 12, 14, 13, 11, 14),
]
bridges = [(4, 8), (3, 5), (3, 17)]
G = nx.Graph(it.chain(*(pairwise(path) for path in ccs + bridges)))
return G
def test_weight_key():
G = nx.Graph()
G.add_nodes_from([1, 2, 3, 4, 5, 6, 7, 8, 9])
G.add_edges_from([(3, 8), (1, 2), (2, 3)])
impossible = {(3, 6), (3, 9)}
rng = random.Random(0)
avail_uv = list(set(complement_edges(G)) - impossible)
avail = [(u, v, {"cost": rng.random()}) for u, v in avail_uv]
_augment_and_check(G, k=1)
_augment_and_check(G, k=1, avail=avail_uv)
_augment_and_check(G, k=1, avail=avail, weight="cost")
_check_augmentations(G, avail, weight="cost")
def test_is_locally_k_edge_connected_exceptions():
pytest.raises(nx.NetworkXNotImplemented, is_k_edge_connected, nx.DiGraph(), k=0)
pytest.raises(nx.NetworkXNotImplemented, is_k_edge_connected, nx.MultiGraph(), k=0)
pytest.raises(ValueError, is_k_edge_connected, nx.Graph(), k=0)
def test_is_k_edge_connected():
G = nx.barbell_graph(10, 0)
assert is_k_edge_connected(G, k=1)
assert not is_k_edge_connected(G, k=2)
G = nx.Graph()
G.add_nodes_from([5, 15])
assert not is_k_edge_connected(G, k=1)
assert not is_k_edge_connected(G, k=2)
G = nx.complete_graph(5)
assert is_k_edge_connected(G, k=1)
assert is_k_edge_connected(G, k=2)
assert is_k_edge_connected(G, k=3)
assert is_k_edge_connected(G, k=4)
def test_is_k_edge_connected_exceptions():
pytest.raises(
nx.NetworkXNotImplemented, is_locally_k_edge_connected, nx.DiGraph(), 1, 2, k=0
)
pytest.raises(
nx.NetworkXNotImplemented,
is_locally_k_edge_connected,
nx.MultiGraph(),
1,
2,
k=0,
)
pytest.raises(ValueError, is_locally_k_edge_connected, nx.Graph(), 1, 2, k=0)
def test_is_locally_k_edge_connected():
G = nx.barbell_graph(10, 0)
assert is_locally_k_edge_connected(G, 5, 15, k=1)
assert not is_locally_k_edge_connected(G, 5, 15, k=2)
G = nx.Graph()
G.add_nodes_from([5, 15])
assert not is_locally_k_edge_connected(G, 5, 15, k=2)
def test_null_graph():
G = nx.Graph()
_check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
def test_cliques():
for n in range(1, 10):
G = nx.complete_graph(n)
_check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
def test_clique_and_node():
for n in range(1, 10):
G = nx.complete_graph(n)
G.add_node(n + 1)
_check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
def test_point_graph():
G = nx.Graph()
G.add_node(1)
_check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
def test_edgeless_graph():
G = nx.Graph()
G.add_nodes_from([1, 2, 3, 4])
_check_augmentations(G)
def test_invalid_k():
G = nx.Graph()
pytest.raises(ValueError, list, k_edge_augmentation(G, k=-1))
pytest.raises(ValueError, list, k_edge_augmentation(G, k=0))
def test_unfeasible():
G = tarjan_bridge_graph()
pytest.raises(nx.NetworkXUnfeasible, list, k_edge_augmentation(G, k=1, avail=[]))
pytest.raises(nx.NetworkXUnfeasible, list, k_edge_augmentation(G, k=2, avail=[]))
pytest.raises(
nx.NetworkXUnfeasible, list, k_edge_augmentation(G, k=2, avail=[(7, 9)])
)
# partial solutions should not error if real solutions are infeasible
aug_edges = list(k_edge_augmentation(G, k=2, avail=[(7, 9)], partial=True))
assert aug_edges == [(7, 9)]
_check_augmentations(G, avail=[], max_k=MAX_EFFICIENT_K + 2)
_check_augmentations(G, avail=[(7, 9)], max_k=MAX_EFFICIENT_K + 2)
def test_tarjan():
G = tarjan_bridge_graph()
aug_edges = set(_augment_and_check(G, k=2)[0])
print(f"aug_edges = {aug_edges!r}")
# can't assert edge exactly equality due to non-determinant edge order
# but we do know the size of the solution must be 3
assert len(aug_edges) == 3
avail = [
(9, 7),
(8, 5),
(2, 10),
(6, 13),
(11, 18),
(1, 17),
(2, 3),
(16, 17),
(18, 14),
(15, 14),
]
aug_edges = set(_augment_and_check(G, avail=avail, k=2)[0])
# Can't assert exact length since approximation depends on the order of a
# dict traversal.
assert len(aug_edges) <= 3 * 2
_check_augmentations(G, avail)
def test_configuration():
# seeds = [2718183590, 2470619828, 1694705158, 3001036531, 2401251497]
seeds = [1001, 1002, 1003, 1004]
for seed in seeds:
deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
G = nx.Graph(nx.configuration_model(deg_seq, seed=seed))
G.remove_edges_from(nx.selfloop_edges(G))
_check_augmentations(G)
def test_shell():
# seeds = [2057382236, 3331169846, 1840105863, 476020778, 2247498425]
seeds = [18]
for seed in seeds:
constructor = [(12, 70, 0.8), (15, 40, 0.6)]
G = nx.random_shell_graph(constructor, seed=seed)
_check_augmentations(G)
def test_karate():
G = nx.karate_club_graph()
_check_augmentations(G)
def test_star():
G = nx.star_graph(3)
_check_augmentations(G)
G = nx.star_graph(5)
_check_augmentations(G)
G = nx.star_graph(10)
_check_augmentations(G)
def test_barbell():
G = nx.barbell_graph(5, 0)
_check_augmentations(G)
G = nx.barbell_graph(5, 2)
_check_augmentations(G)
G = nx.barbell_graph(5, 3)
_check_augmentations(G)
G = nx.barbell_graph(5, 4)
_check_augmentations(G)
def test_bridge():
G = nx.Graph([(2393, 2257), (2393, 2685), (2685, 2257), (1758, 2257)])
_check_augmentations(G)
def test_gnp_augmentation():
rng = random.Random(0)
G = nx.gnp_random_graph(30, 0.005, seed=0)
# Randomly make edges available
avail = {
(u, v): 1 + rng.random() for u, v in complement_edges(G) if rng.random() < 0.25
}
_check_augmentations(G, avail)
def _assert_solution_properties(G, aug_edges, avail_dict=None):
""" Checks that aug_edges are consistently formatted """
if avail_dict is not None:
assert all(
e in avail_dict for e in aug_edges
), "when avail is specified aug-edges should be in avail"
unique_aug = set(map(tuple, map(sorted, aug_edges)))
unique_aug = list(map(tuple, map(sorted, aug_edges)))
assert len(aug_edges) == len(unique_aug), "edges should be unique"
assert not any(u == v for u, v in unique_aug), "should be no self-edges"
assert not any(
G.has_edge(u, v) for u, v in unique_aug
), "aug edges and G.edges should be disjoint"
def _augment_and_check(
G, k, avail=None, weight=None, verbose=False, orig_k=None, max_aug_k=None
):
"""
Does one specific augmentation and checks for properties of the result
"""
if orig_k is None:
try:
orig_k = nx.edge_connectivity(G)
except nx.NetworkXPointlessConcept:
orig_k = 0
info = {}
try:
if avail is not None:
# ensure avail is in dict form
avail_dict = dict(zip(*_unpack_available_edges(avail, weight=weight)))
else:
avail_dict = None
try:
# Find the augmentation if possible
generator = nx.k_edge_augmentation(G, k=k, weight=weight, avail=avail)
assert not isinstance(generator, list), "should always return an iter"
aug_edges = []
for edge in generator:
aug_edges.append(edge)
except nx.NetworkXUnfeasible:
infeasible = True
info["infeasible"] = True
assert len(aug_edges) == 0, "should not generate anything if unfeasible"
if avail is None:
n_nodes = G.number_of_nodes()
assert n_nodes <= k, (
"unconstrained cases are only unfeasible if |V| <= k. "
f"Got |V|={n_nodes} and k={k}"
)
else:
if max_aug_k is None:
G_aug_all = G.copy()
G_aug_all.add_edges_from(avail_dict.keys())
try:
max_aug_k = nx.edge_connectivity(G_aug_all)
except nx.NetworkXPointlessConcept:
max_aug_k = 0
assert max_aug_k < k, (
"avail should only be unfeasible if using all edges "
"does not achieve k-edge-connectivity"
)
# Test for a partial solution
partial_edges = list(
nx.k_edge_augmentation(G, k=k, weight=weight, partial=True, avail=avail)
)
info["n_partial_edges"] = len(partial_edges)
if avail_dict is None:
assert set(partial_edges) == set(
complement_edges(G)
), "unweighted partial solutions should be the complement"
elif len(avail_dict) > 0:
H = G.copy()
# Find the partial / full augmented connectivity
H.add_edges_from(partial_edges)
partial_conn = nx.edge_connectivity(H)
H.add_edges_from(set(avail_dict.keys()))
full_conn = nx.edge_connectivity(H)
# Full connectivity should be no better than our partial
# solution.
assert (
partial_conn == full_conn
), "adding more edges should not increase k-conn"
# Find the new edge-connectivity after adding the augmenting edges
aug_edges = partial_edges
else:
infeasible = False
# Find the weight of the augmentation
num_edges = len(aug_edges)
if avail is not None:
total_weight = sum([avail_dict[e] for e in aug_edges])
else:
total_weight = num_edges
info["total_weight"] = total_weight
info["num_edges"] = num_edges
# Find the new edge-connectivity after adding the augmenting edges
G_aug = G.copy()
G_aug.add_edges_from(aug_edges)
try:
aug_k = nx.edge_connectivity(G_aug)
except nx.NetworkXPointlessConcept:
aug_k = 0
info["aug_k"] = aug_k
# Do checks
if not infeasible and orig_k < k:
assert info["aug_k"] >= k, f"connectivity should increase to k={k} or more"
assert info["aug_k"] >= orig_k, "augmenting should never reduce connectivity"
_assert_solution_properties(G, aug_edges, avail_dict)
except Exception:
info["failed"] = True
print(f"edges = {list(G.edges())}")
print(f"nodes = {list(G.nodes())}")
print(f"aug_edges = {list(aug_edges)}")
print(f"info = {info}")
raise
else:
if verbose:
print(f"info = {info}")
if infeasible:
aug_edges = None
return aug_edges, info
def _check_augmentations(G, avail=None, max_k=None, weight=None, verbose=False):
""" Helper to check weighted/unweighted cases with multiple values of k """
# Using all available edges, find the maximum edge-connectivity
try:
orig_k = nx.edge_connectivity(G)
except nx.NetworkXPointlessConcept:
orig_k = 0
if avail is not None:
all_aug_edges = _unpack_available_edges(avail, weight=weight)[0]
G_aug_all = G.copy()
G_aug_all.add_edges_from(all_aug_edges)
try:
max_aug_k = nx.edge_connectivity(G_aug_all)
except nx.NetworkXPointlessConcept:
max_aug_k = 0
else:
max_aug_k = G.number_of_nodes() - 1
if max_k is None:
max_k = min(4, max_aug_k)
avail_uniform = {e: 1 for e in complement_edges(G)}
if verbose:
print("\n=== CHECK_AUGMENTATION ===")
print(f"G.number_of_nodes = {G.number_of_nodes()!r}")
print(f"G.number_of_edges = {G.number_of_edges()!r}")
print(f"max_k = {max_k!r}")
print(f"max_aug_k = {max_aug_k!r}")
print(f"orig_k = {orig_k!r}")
# check augmentation for multiple values of k
for k in range(1, max_k + 1):
if verbose:
print("---------------")
print(f"Checking k = {k}")
# Check the unweighted version
if verbose:
print("unweighted case")
aug_edges1, info1 = _augment_and_check(G, k=k, verbose=verbose, orig_k=orig_k)
# Check that the weighted version with all available edges and uniform
# weights gives a similar solution to the unweighted case.
if verbose:
print("weighted uniform case")
aug_edges2, info2 = _augment_and_check(
G,
k=k,
avail=avail_uniform,
verbose=verbose,
orig_k=orig_k,
max_aug_k=G.number_of_nodes() - 1,
)
# Check the weighted version
if avail is not None:
if verbose:
print("weighted case")
aug_edges3, info3 = _augment_and_check(
G,
k=k,
avail=avail,
weight=weight,
verbose=verbose,
max_aug_k=max_aug_k,
orig_k=orig_k,
)
if aug_edges1 is not None:
# Check approximation ratios
if k == 1:
# when k=1, both solutions should be optimal
assert info2["total_weight"] == info1["total_weight"]
if k == 2:
# when k=2, the weighted version is an approximation
if orig_k == 0:
# the approximation ratio is 3 if G is not connected
assert info2["total_weight"] <= info1["total_weight"] * 3
else:
# the approximation ratio is 2 if G is was connected
assert info2["total_weight"] <= info1["total_weight"] * 2
_check_unconstrained_bridge_property(G, info1)
def _check_unconstrained_bridge_property(G, info1):
# Check Theorem 5 from Eswaran and Tarjan. (1975) Augmentation problems
import math
bridge_ccs = list(nx.connectivity.bridge_components(G))
# condense G into an forest C
C = collapse(G, bridge_ccs)
p = len([n for n, d in C.degree() if d == 1]) # leafs
q = len([n for n, d in C.degree() if d == 0]) # isolated
if p + q > 1:
size_target = int(math.ceil(p / 2.0)) + q
size_aug = info1["num_edges"]
assert (
size_aug == size_target
), "augmentation size is different from what theory predicts"

View file

@ -0,0 +1,485 @@
import networkx as nx
import itertools as it
import pytest
from networkx.utils import pairwise
from networkx.algorithms.connectivity import bridge_components, EdgeComponentAuxGraph
from networkx.algorithms.connectivity.edge_kcomponents import general_k_edge_subgraphs
# ----------------
# Helper functions
# ----------------
def fset(list_of_sets):
""" allows == to be used for list of sets """
return set(map(frozenset, list_of_sets))
def _assert_subgraph_edge_connectivity(G, ccs_subgraph, k):
"""
tests properties of k-edge-connected subgraphs
the actual edge connectivity should be no less than k unless the cc is a
single node.
"""
for cc in ccs_subgraph:
C = G.subgraph(cc)
if len(cc) > 1:
connectivity = nx.edge_connectivity(C)
assert connectivity >= k
def _memo_connectivity(G, u, v, memo):
edge = (u, v)
if edge in memo:
return memo[edge]
if not G.is_directed():
redge = (v, u)
if redge in memo:
return memo[redge]
memo[edge] = nx.edge_connectivity(G, *edge)
return memo[edge]
def _all_pairs_connectivity(G, cc, k, memo):
# Brute force check
for u, v in it.combinations(cc, 2):
# Use a memoization dict to save on computation
connectivity = _memo_connectivity(G, u, v, memo)
if G.is_directed():
connectivity = min(connectivity, _memo_connectivity(G, v, u, memo))
assert connectivity >= k
def _assert_local_cc_edge_connectivity(G, ccs_local, k, memo):
"""
tests properties of k-edge-connected components
the local edge connectivity between each pair of nodes in the the original
graph should be no less than k unless the cc is a single node.
"""
for cc in ccs_local:
if len(cc) > 1:
# Strategy for testing a bit faster: If the subgraph has high edge
# connectivity then it must have local connectivity
C = G.subgraph(cc)
connectivity = nx.edge_connectivity(C)
if connectivity < k:
# Otherwise do the brute force (with memoization) check
_all_pairs_connectivity(G, cc, k, memo)
# Helper function
def _check_edge_connectivity(G):
"""
Helper - generates all k-edge-components using the aux graph. Checks the
both local and subgraph edge connectivity of each cc. Also checks that
alternate methods of computing the k-edge-ccs generate the same result.
"""
# Construct the auxiliary graph that can be used to make each k-cc or k-sub
aux_graph = EdgeComponentAuxGraph.construct(G)
# memoize the local connectivity in this graph
memo = {}
for k in it.count(1):
# Test "local" k-edge-components and k-edge-subgraphs
ccs_local = fset(aux_graph.k_edge_components(k))
ccs_subgraph = fset(aux_graph.k_edge_subgraphs(k))
# Check connectivity properties that should be garuenteed by the
# algorithms.
_assert_local_cc_edge_connectivity(G, ccs_local, k, memo)
_assert_subgraph_edge_connectivity(G, ccs_subgraph, k)
if k == 1 or k == 2 and not G.is_directed():
assert (
ccs_local == ccs_subgraph
), "Subgraphs and components should be the same when k == 1 or (k == 2 and not G.directed())"
if G.is_directed():
# Test special case methods are the same as the aux graph
if k == 1:
alt_sccs = fset(nx.strongly_connected_components(G))
assert alt_sccs == ccs_local, "k=1 failed alt"
assert alt_sccs == ccs_subgraph, "k=1 failed alt"
else:
# Test special case methods are the same as the aux graph
if k == 1:
alt_ccs = fset(nx.connected_components(G))
assert alt_ccs == ccs_local, "k=1 failed alt"
assert alt_ccs == ccs_subgraph, "k=1 failed alt"
elif k == 2:
alt_bridge_ccs = fset(bridge_components(G))
assert alt_bridge_ccs == ccs_local, "k=2 failed alt"
assert alt_bridge_ccs == ccs_subgraph, "k=2 failed alt"
# if new methods for k == 3 or k == 4 are implemented add them here
# Check the general subgraph method works by itself
alt_subgraph_ccs = fset(
[set(C.nodes()) for C in general_k_edge_subgraphs(G, k=k)]
)
assert alt_subgraph_ccs == ccs_subgraph, "alt subgraph method failed"
# Stop once k is larger than all special case methods
# and we cannot break down ccs any further.
if k > 2 and all(len(cc) == 1 for cc in ccs_local):
break
# ----------------
# Misc tests
# ----------------
def test_zero_k_exception():
G = nx.Graph()
# functions that return generators error immediately
pytest.raises(ValueError, nx.k_edge_components, G, k=0)
pytest.raises(ValueError, nx.k_edge_subgraphs, G, k=0)
# actual generators only error when you get the first item
aux_graph = EdgeComponentAuxGraph.construct(G)
pytest.raises(ValueError, list, aux_graph.k_edge_components(k=0))
pytest.raises(ValueError, list, aux_graph.k_edge_subgraphs(k=0))
pytest.raises(ValueError, list, general_k_edge_subgraphs(G, k=0))
def test_empty_input():
G = nx.Graph()
assert [] == list(nx.k_edge_components(G, k=5))
assert [] == list(nx.k_edge_subgraphs(G, k=5))
G = nx.DiGraph()
assert [] == list(nx.k_edge_components(G, k=5))
assert [] == list(nx.k_edge_subgraphs(G, k=5))
def test_not_implemented():
G = nx.MultiGraph()
pytest.raises(nx.NetworkXNotImplemented, EdgeComponentAuxGraph.construct, G)
pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_components, G, k=2)
pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_subgraphs, G, k=2)
pytest.raises(nx.NetworkXNotImplemented, bridge_components, G)
pytest.raises(nx.NetworkXNotImplemented, bridge_components, nx.DiGraph())
def test_general_k_edge_subgraph_quick_return():
# tests quick return optimization
G = nx.Graph()
G.add_node(0)
subgraphs = list(general_k_edge_subgraphs(G, k=1))
assert len(subgraphs) == 1
for subgraph in subgraphs:
assert subgraph.number_of_nodes() == 1
G.add_node(1)
subgraphs = list(general_k_edge_subgraphs(G, k=1))
assert len(subgraphs) == 2
for subgraph in subgraphs:
assert subgraph.number_of_nodes() == 1
# ----------------
# Undirected tests
# ----------------
def test_random_gnp():
# seeds = [1550709854, 1309423156, 4208992358, 2785630813, 1915069929]
seeds = [12, 13]
for seed in seeds:
G = nx.gnp_random_graph(20, 0.2, seed=seed)
_check_edge_connectivity(G)
def test_configuration():
# seeds = [2718183590, 2470619828, 1694705158, 3001036531, 2401251497]
seeds = [14, 15]
for seed in seeds:
deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
G = nx.Graph(nx.configuration_model(deg_seq, seed=seed))
G.remove_edges_from(nx.selfloop_edges(G))
_check_edge_connectivity(G)
def test_shell():
# seeds = [2057382236, 3331169846, 1840105863, 476020778, 2247498425]
seeds = [20]
for seed in seeds:
constructor = [(12, 70, 0.8), (15, 40, 0.6)]
G = nx.random_shell_graph(constructor, seed=seed)
_check_edge_connectivity(G)
def test_karate():
G = nx.karate_club_graph()
_check_edge_connectivity(G)
def test_tarjan_bridge():
# graph from tarjan paper
# RE Tarjan - "A note on finding the bridges of a graph"
# Information Processing Letters, 1974 - Elsevier
# doi:10.1016/0020-0190(74)90003-9.
# define 2-connected components and bridges
ccs = [
(1, 2, 4, 3, 1, 4),
(5, 6, 7, 5),
(8, 9, 10, 8),
(17, 18, 16, 15, 17),
(11, 12, 14, 13, 11, 14),
]
bridges = [(4, 8), (3, 5), (3, 17)]
G = nx.Graph(it.chain(*(pairwise(path) for path in ccs + bridges)))
_check_edge_connectivity(G)
def test_bridge_cc():
# define 2-connected components and bridges
cc2 = [(1, 2, 4, 3, 1, 4), (8, 9, 10, 8), (11, 12, 13, 11)]
bridges = [(4, 8), (3, 5), (20, 21), (22, 23, 24)]
G = nx.Graph(it.chain(*(pairwise(path) for path in cc2 + bridges)))
bridge_ccs = fset(bridge_components(G))
target_ccs = fset(
[{1, 2, 3, 4}, {5}, {8, 9, 10}, {11, 12, 13}, {20}, {21}, {22}, {23}, {24}]
)
assert bridge_ccs == target_ccs
_check_edge_connectivity(G)
def test_undirected_aux_graph():
# Graph similar to the one in
# http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
a, b, c, d, e, f, g, h, i = "abcdefghi"
paths = [
(a, d, b, f, c),
(a, e, b),
(a, e, b, c, g, b, a),
(c, b),
(f, g, f),
(h, i),
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
aux_graph = EdgeComponentAuxGraph.construct(G)
components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
target_1 = fset([{a, b, c, d, e, f, g}, {h, i}])
assert target_1 == components_1
# Check that the undirected case for k=1 agrees with CCs
alt_1 = fset(nx.k_edge_subgraphs(G, k=1))
assert alt_1 == components_1
components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
target_2 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
assert target_2 == components_2
# Check that the undirected case for k=2 agrees with bridge components
alt_2 = fset(nx.k_edge_subgraphs(G, k=2))
assert alt_2 == components_2
components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
target_3 = fset([{a}, {b, c, f, g}, {d}, {e}, {h}, {i}])
assert target_3 == components_3
components_4 = fset(aux_graph.k_edge_subgraphs(k=4))
target_4 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
assert target_4 == components_4
_check_edge_connectivity(G)
def test_local_subgraph_difference():
paths = [
(11, 12, 13, 14, 11, 13, 14, 12), # first 4-clique
(21, 22, 23, 24, 21, 23, 24, 22), # second 4-clique
# paths connecting each node of the 4 cliques
(11, 101, 21),
(12, 102, 22),
(13, 103, 23),
(14, 104, 24),
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
aux_graph = EdgeComponentAuxGraph.construct(G)
# Each clique is returned separately in k-edge-subgraphs
subgraph_ccs = fset(aux_graph.k_edge_subgraphs(3))
subgraph_target = fset(
[{101}, {102}, {103}, {104}, {21, 22, 23, 24}, {11, 12, 13, 14}]
)
assert subgraph_ccs == subgraph_target
# But in k-edge-ccs they are returned together
# because they are locally 3-edge-connected
local_ccs = fset(aux_graph.k_edge_components(3))
local_target = fset([{101}, {102}, {103}, {104}, {11, 12, 13, 14, 21, 22, 23, 24}])
assert local_ccs == local_target
def test_local_subgraph_difference_directed():
dipaths = [(1, 2, 3, 4, 1), (1, 3, 1)]
G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))
assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))
# Unlike undirected graphs, when k=2, for directed graphs there is a case
# where the k-edge-ccs are not the same as the k-edge-subgraphs.
# (in directed graphs ccs and subgraphs are the same when k=2)
assert fset(nx.k_edge_components(G, k=2)) != fset(nx.k_edge_subgraphs(G, k=2))
assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))
_check_edge_connectivity(G)
def test_triangles():
paths = [
(11, 12, 13, 11), # first 3-clique
(21, 22, 23, 21), # second 3-clique
(11, 21), # connected by an edge
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
# subgraph and ccs are the same in all cases here
assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))
assert fset(nx.k_edge_components(G, k=2)) == fset(nx.k_edge_subgraphs(G, k=2))
assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))
_check_edge_connectivity(G)
def test_four_clique():
paths = [
(11, 12, 13, 14, 11, 13, 14, 12), # first 4-clique
(21, 22, 23, 24, 21, 23, 24, 22), # second 4-clique
# paths connecting the 4 cliques such that they are
# 3-connected in G, but not in the subgraph.
# Case where the nodes bridging them do not have degree less than 3.
(100, 13),
(12, 100, 22),
(13, 200, 23),
(14, 300, 24),
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
# The subgraphs and ccs are different for k=3
local_ccs = fset(nx.k_edge_components(G, k=3))
subgraphs = fset(nx.k_edge_subgraphs(G, k=3))
assert local_ccs != subgraphs
# The cliques ares in the same cc
clique1 = frozenset(paths[0])
clique2 = frozenset(paths[1])
assert clique1.union(clique2).union({100}) in local_ccs
# but different subgraphs
assert clique1 in subgraphs
assert clique2 in subgraphs
assert G.degree(100) == 3
_check_edge_connectivity(G)
def test_five_clique():
# Make a graph that can be disconnected less than 4 edges, but no node has
# degree less than 4.
G = nx.disjoint_union(nx.complete_graph(5), nx.complete_graph(5))
paths = [
# add aux-connections
(1, 100, 6),
(2, 100, 7),
(3, 200, 8),
(4, 200, 100),
]
G.add_edges_from(it.chain(*[pairwise(path) for path in paths]))
assert min(dict(nx.degree(G)).values()) == 4
# For k=3 they are the same
assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))
# For k=4 they are the different
# the aux nodes are in the same CC as clique 1 but no the same subgraph
assert fset(nx.k_edge_components(G, k=4)) != fset(nx.k_edge_subgraphs(G, k=4))
# For k=5 they are not the same
assert fset(nx.k_edge_components(G, k=5)) != fset(nx.k_edge_subgraphs(G, k=5))
# For k=6 they are the same
assert fset(nx.k_edge_components(G, k=6)) == fset(nx.k_edge_subgraphs(G, k=6))
_check_edge_connectivity(G)
# ----------------
# Undirected tests
# ----------------
def test_directed_aux_graph():
# Graph similar to the one in
# http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
a, b, c, d, e, f, g, h, i = "abcdefghi"
dipaths = [
(a, d, b, f, c),
(a, e, b),
(a, e, b, c, g, b, a),
(c, b),
(f, g, f),
(h, i),
]
G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))
aux_graph = EdgeComponentAuxGraph.construct(G)
components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
target_1 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
assert target_1 == components_1
# Check that the directed case for k=1 agrees with SCCs
alt_1 = fset(nx.strongly_connected_components(G))
assert alt_1 == components_1
components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
target_2 = fset([{i}, {e}, {d}, {b, c, f, g}, {h}, {a}])
assert target_2 == components_2
components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
target_3 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
assert target_3 == components_3
def test_random_gnp_directed():
# seeds = [3894723670, 500186844, 267231174, 2181982262, 1116750056]
seeds = [21]
for seed in seeds:
G = nx.gnp_random_graph(20, 0.2, directed=True, seed=seed)
_check_edge_connectivity(G)
def test_configuration_directed():
# seeds = [671221681, 2403749451, 124433910, 672335939, 1193127215]
seeds = [67]
for seed in seeds:
deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
G = nx.DiGraph(nx.configuration_model(deg_seq, seed=seed))
G.remove_edges_from(nx.selfloop_edges(G))
_check_edge_connectivity(G)
def test_shell_directed():
# seeds = [3134027055, 4079264063, 1350769518, 1405643020, 530038094]
seeds = [31]
for seed in seeds:
constructor = [(12, 70, 0.8), (15, 40, 0.6)]
G = nx.random_shell_graph(constructor, seed=seed).to_directed()
_check_edge_connectivity(G)
def test_karate_directed():
G = nx.karate_club_graph().to_directed()
_check_edge_connectivity(G)

View file

@ -0,0 +1,295 @@
# Test for Moody and White k-components algorithm
import pytest
import networkx as nx
from networkx.algorithms.connectivity.kcomponents import (
build_k_number_dict,
_consolidate,
)
##
# A nice synthetic graph
##
def torrents_and_ferraro_graph():
# Graph from https://arxiv.org/pdf/1503.04476v1 p.26
G = nx.convert_node_labels_to_integers(
nx.grid_graph([5, 5]), label_attribute="labels"
)
rlabels = nx.get_node_attributes(G, "labels")
labels = {v: k for k, v in rlabels.items()}
for nodes in [(labels[(0, 4)], labels[(1, 4)]), (labels[(3, 4)], labels[(4, 4)])]:
new_node = G.order() + 1
# Petersen graph is triconnected
P = nx.petersen_graph()
G = nx.disjoint_union(G, P)
# Add two edges between the grid and P
G.add_edge(new_node + 1, nodes[0])
G.add_edge(new_node, nodes[1])
# K5 is 4-connected
K = nx.complete_graph(5)
G = nx.disjoint_union(G, K)
# Add three edges between P and K5
G.add_edge(new_node + 2, new_node + 11)
G.add_edge(new_node + 3, new_node + 12)
G.add_edge(new_node + 4, new_node + 13)
# Add another K5 sharing a node
G = nx.disjoint_union(G, K)
nbrs = G[new_node + 10]
G.remove_node(new_node + 10)
for nbr in nbrs:
G.add_edge(new_node + 17, nbr)
# This edge makes the graph biconnected; it's
# needed because K5s share only one node.
G.add_edge(new_node + 16, new_node + 8)
for nodes in [(labels[(0, 0)], labels[(1, 0)]), (labels[(3, 0)], labels[(4, 0)])]:
new_node = G.order() + 1
# Petersen graph is triconnected
P = nx.petersen_graph()
G = nx.disjoint_union(G, P)
# Add two edges between the grid and P
G.add_edge(new_node + 1, nodes[0])
G.add_edge(new_node, nodes[1])
# K5 is 4-connected
K = nx.complete_graph(5)
G = nx.disjoint_union(G, K)
# Add three edges between P and K5
G.add_edge(new_node + 2, new_node + 11)
G.add_edge(new_node + 3, new_node + 12)
G.add_edge(new_node + 4, new_node + 13)
# Add another K5 sharing two nodes
G = nx.disjoint_union(G, K)
nbrs = G[new_node + 10]
G.remove_node(new_node + 10)
for nbr in nbrs:
G.add_edge(new_node + 17, nbr)
nbrs2 = G[new_node + 9]
G.remove_node(new_node + 9)
for nbr in nbrs2:
G.add_edge(new_node + 18, nbr)
return G
def test_directed():
with pytest.raises(nx.NetworkXNotImplemented):
G = nx.gnp_random_graph(10, 0.2, directed=True, seed=42)
nx.k_components(G)
# Helper function
def _check_connectivity(G, k_components):
for k, components in k_components.items():
if k < 3:
continue
# check that k-components have node connectivity >= k.
for component in components:
C = G.subgraph(component)
K = nx.node_connectivity(C)
assert K >= k
@pytest.mark.slow
def test_torrents_and_ferraro_graph():
G = torrents_and_ferraro_graph()
result = nx.k_components(G)
_check_connectivity(G, result)
# In this example graph there are 8 3-components, 4 with 15 nodes
# and 4 with 5 nodes.
assert len(result[3]) == 8
assert len([c for c in result[3] if len(c) == 15]) == 4
assert len([c for c in result[3] if len(c) == 5]) == 4
# There are also 8 4-components all with 5 nodes.
assert len(result[4]) == 8
assert all(len(c) == 5 for c in result[4])
@pytest.mark.slow
def test_random_gnp():
G = nx.gnp_random_graph(50, 0.2, seed=42)
result = nx.k_components(G)
_check_connectivity(G, result)
@pytest.mark.slow
def test_shell():
constructor = [(20, 80, 0.8), (80, 180, 0.6)]
G = nx.random_shell_graph(constructor, seed=42)
result = nx.k_components(G)
_check_connectivity(G, result)
def test_configuration():
deg_seq = nx.random_powerlaw_tree_sequence(100, tries=5, seed=72)
G = nx.Graph(nx.configuration_model(deg_seq))
G.remove_edges_from(nx.selfloop_edges(G))
result = nx.k_components(G)
_check_connectivity(G, result)
def test_karate():
G = nx.karate_club_graph()
result = nx.k_components(G)
_check_connectivity(G, result)
def test_karate_component_number():
karate_k_num = {
0: 4,
1: 4,
2: 4,
3: 4,
4: 3,
5: 3,
6: 3,
7: 4,
8: 4,
9: 2,
10: 3,
11: 1,
12: 2,
13: 4,
14: 2,
15: 2,
16: 2,
17: 2,
18: 2,
19: 3,
20: 2,
21: 2,
22: 2,
23: 3,
24: 3,
25: 3,
26: 2,
27: 3,
28: 3,
29: 3,
30: 4,
31: 3,
32: 4,
33: 4,
}
G = nx.karate_club_graph()
k_components = nx.k_components(G)
k_num = build_k_number_dict(k_components)
assert karate_k_num == k_num
def test_davis_southern_women():
G = nx.davis_southern_women_graph()
result = nx.k_components(G)
_check_connectivity(G, result)
def test_davis_southern_women_detail_3_and_4():
solution = {
3: [
{
"Nora Fayette",
"E10",
"Myra Liddel",
"E12",
"E14",
"Frances Anderson",
"Evelyn Jefferson",
"Ruth DeSand",
"Helen Lloyd",
"Eleanor Nye",
"E9",
"E8",
"E5",
"E4",
"E7",
"E6",
"E1",
"Verne Sanderson",
"E3",
"E2",
"Theresa Anderson",
"Pearl Oglethorpe",
"Katherina Rogers",
"Brenda Rogers",
"E13",
"Charlotte McDowd",
"Sylvia Avondale",
"Laura Mandeville",
}
],
4: [
{
"Nora Fayette",
"E10",
"Verne Sanderson",
"E12",
"Frances Anderson",
"Evelyn Jefferson",
"Ruth DeSand",
"Helen Lloyd",
"Eleanor Nye",
"E9",
"E8",
"E5",
"E4",
"E7",
"E6",
"Myra Liddel",
"E3",
"Theresa Anderson",
"Katherina Rogers",
"Brenda Rogers",
"Charlotte McDowd",
"Sylvia Avondale",
"Laura Mandeville",
}
],
}
G = nx.davis_southern_women_graph()
result = nx.k_components(G)
for k, components in result.items():
if k < 3:
continue
assert len(components) == len(solution[k])
for component in components:
assert component in solution[k]
def test_set_consolidation_rosettacode():
# Tests from http://rosettacode.org/wiki/Set_consolidation
def list_of_sets_equal(result, solution):
assert {frozenset(s) for s in result} == {frozenset(s) for s in solution}
question = [{"A", "B"}, {"C", "D"}]
solution = [{"A", "B"}, {"C", "D"}]
list_of_sets_equal(_consolidate(question, 1), solution)
question = [{"A", "B"}, {"B", "C"}]
solution = [{"A", "B", "C"}]
list_of_sets_equal(_consolidate(question, 1), solution)
question = [{"A", "B"}, {"C", "D"}, {"D", "B"}]
solution = [{"A", "C", "B", "D"}]
list_of_sets_equal(_consolidate(question, 1), solution)
question = [{"H", "I", "K"}, {"A", "B"}, {"C", "D"}, {"D", "B"}, {"F", "G", "H"}]
solution = [{"A", "C", "B", "D"}, {"G", "F", "I", "H", "K"}]
list_of_sets_equal(_consolidate(question, 1), solution)
question = [
{"A", "H"},
{"H", "I", "K"},
{"A", "B"},
{"C", "D"},
{"D", "B"},
{"F", "G", "H"},
]
solution = [{"A", "C", "B", "D", "G", "F", "I", "H", "K"}]
list_of_sets_equal(_consolidate(question, 1), solution)
question = [
{"H", "I", "K"},
{"A", "B"},
{"C", "D"},
{"D", "B"},
{"F", "G", "H"},
{"A", "H"},
]
solution = [{"A", "C", "B", "D", "G", "F", "I", "H", "K"}]
list_of_sets_equal(_consolidate(question, 1), solution)

View file

@ -0,0 +1,266 @@
# Jordi Torrents
# Test for k-cutsets
import itertools
import pytest
import networkx as nx
from networkx.algorithms import flow
from networkx.algorithms.connectivity.kcutsets import _is_separating_set
MAX_CUTSETS_TO_TEST = 4 # originally 100. cut to decrease testing time
flow_funcs = [
flow.boykov_kolmogorov,
flow.dinitz,
flow.edmonds_karp,
flow.preflow_push,
flow.shortest_augmenting_path,
]
##
# Some nice synthetic graphs
##
def graph_example_1():
G = nx.convert_node_labels_to_integers(
nx.grid_graph([5, 5]), label_attribute="labels"
)
rlabels = nx.get_node_attributes(G, "labels")
labels = {v: k for k, v in rlabels.items()}
for nodes in [
(labels[(0, 0)], labels[(1, 0)]),
(labels[(0, 4)], labels[(1, 4)]),
(labels[(3, 0)], labels[(4, 0)]),
(labels[(3, 4)], labels[(4, 4)]),
]:
new_node = G.order() + 1
# Petersen graph is triconnected
P = nx.petersen_graph()
G = nx.disjoint_union(G, P)
# Add two edges between the grid and P
G.add_edge(new_node + 1, nodes[0])
G.add_edge(new_node, nodes[1])
# K5 is 4-connected
K = nx.complete_graph(5)
G = nx.disjoint_union(G, K)
# Add three edges between P and K5
G.add_edge(new_node + 2, new_node + 11)
G.add_edge(new_node + 3, new_node + 12)
G.add_edge(new_node + 4, new_node + 13)
# Add another K5 sharing a node
G = nx.disjoint_union(G, K)
nbrs = G[new_node + 10]
G.remove_node(new_node + 10)
for nbr in nbrs:
G.add_edge(new_node + 17, nbr)
G.add_edge(new_node + 16, new_node + 5)
return G
def torrents_and_ferraro_graph():
G = nx.convert_node_labels_to_integers(
nx.grid_graph([5, 5]), label_attribute="labels"
)
rlabels = nx.get_node_attributes(G, "labels")
labels = {v: k for k, v in rlabels.items()}
for nodes in [(labels[(0, 4)], labels[(1, 4)]), (labels[(3, 4)], labels[(4, 4)])]:
new_node = G.order() + 1
# Petersen graph is triconnected
P = nx.petersen_graph()
G = nx.disjoint_union(G, P)
# Add two edges between the grid and P
G.add_edge(new_node + 1, nodes[0])
G.add_edge(new_node, nodes[1])
# K5 is 4-connected
K = nx.complete_graph(5)
G = nx.disjoint_union(G, K)
# Add three edges between P and K5
G.add_edge(new_node + 2, new_node + 11)
G.add_edge(new_node + 3, new_node + 12)
G.add_edge(new_node + 4, new_node + 13)
# Add another K5 sharing a node
G = nx.disjoint_union(G, K)
nbrs = G[new_node + 10]
G.remove_node(new_node + 10)
for nbr in nbrs:
G.add_edge(new_node + 17, nbr)
# Commenting this makes the graph not biconnected !!
# This stupid mistake make one reviewer very angry :P
G.add_edge(new_node + 16, new_node + 8)
for nodes in [(labels[(0, 0)], labels[(1, 0)]), (labels[(3, 0)], labels[(4, 0)])]:
new_node = G.order() + 1
# Petersen graph is triconnected
P = nx.petersen_graph()
G = nx.disjoint_union(G, P)
# Add two edges between the grid and P
G.add_edge(new_node + 1, nodes[0])
G.add_edge(new_node, nodes[1])
# K5 is 4-connected
K = nx.complete_graph(5)
G = nx.disjoint_union(G, K)
# Add three edges between P and K5
G.add_edge(new_node + 2, new_node + 11)
G.add_edge(new_node + 3, new_node + 12)
G.add_edge(new_node + 4, new_node + 13)
# Add another K5 sharing two nodes
G = nx.disjoint_union(G, K)
nbrs = G[new_node + 10]
G.remove_node(new_node + 10)
for nbr in nbrs:
G.add_edge(new_node + 17, nbr)
nbrs2 = G[new_node + 9]
G.remove_node(new_node + 9)
for nbr in nbrs2:
G.add_edge(new_node + 18, nbr)
return G
# Helper function
def _check_separating_sets(G):
for cc in nx.connected_components(G):
if len(cc) < 3:
continue
Gc = G.subgraph(cc)
node_conn = nx.node_connectivity(Gc)
all_cuts = nx.all_node_cuts(Gc)
# Only test a limited number of cut sets to reduce test time.
for cut in itertools.islice(all_cuts, MAX_CUTSETS_TO_TEST):
assert node_conn == len(cut)
assert not nx.is_connected(nx.restricted_view(G, cut, []))
@pytest.mark.slow
def test_torrents_and_ferraro_graph():
G = torrents_and_ferraro_graph()
_check_separating_sets(G)
def test_example_1():
G = graph_example_1()
_check_separating_sets(G)
def test_random_gnp():
G = nx.gnp_random_graph(100, 0.1, seed=42)
_check_separating_sets(G)
def test_shell():
constructor = [(20, 80, 0.8), (80, 180, 0.6)]
G = nx.random_shell_graph(constructor, seed=42)
_check_separating_sets(G)
def test_configuration():
deg_seq = nx.random_powerlaw_tree_sequence(100, tries=5, seed=72)
G = nx.Graph(nx.configuration_model(deg_seq))
G.remove_edges_from(nx.selfloop_edges(G))
_check_separating_sets(G)
def test_karate():
G = nx.karate_club_graph()
_check_separating_sets(G)
def _generate_no_biconnected(max_attempts=50):
attempts = 0
while True:
G = nx.fast_gnp_random_graph(100, 0.0575, seed=42)
if nx.is_connected(G) and not nx.is_biconnected(G):
attempts = 0
yield G
else:
if attempts >= max_attempts:
msg = f"Tried {attempts} times: no suitable Graph."
raise Exception(msg)
else:
attempts += 1
def test_articulation_points():
Ggen = _generate_no_biconnected()
for i in range(1): # change 1 to 3 or more for more realizations.
G = next(Ggen)
articulation_points = list({a} for a in nx.articulation_points(G))
for cut in nx.all_node_cuts(G):
assert cut in articulation_points
def test_grid_2d_graph():
# All minimum node cuts of a 2d grid
# are the four pairs of nodes that are
# neighbors of the four corner nodes.
G = nx.grid_2d_graph(5, 5)
solution = [{(0, 1), (1, 0)}, {(3, 0), (4, 1)}, {(3, 4), (4, 3)}, {(0, 3), (1, 4)}]
for cut in nx.all_node_cuts(G):
assert cut in solution
def test_disconnected_graph():
G = nx.fast_gnp_random_graph(100, 0.01, seed=42)
cuts = nx.all_node_cuts(G)
pytest.raises(nx.NetworkXError, next, cuts)
@pytest.mark.slow
def test_alternative_flow_functions():
graphs = [nx.grid_2d_graph(4, 4), nx.cycle_graph(5)]
for G in graphs:
node_conn = nx.node_connectivity(G)
for flow_func in flow_funcs:
all_cuts = nx.all_node_cuts(G, flow_func=flow_func)
# Only test a limited number of cut sets to reduce test time.
for cut in itertools.islice(all_cuts, MAX_CUTSETS_TO_TEST):
assert node_conn == len(cut)
assert not nx.is_connected(nx.restricted_view(G, cut, []))
def test_is_separating_set_complete_graph():
G = nx.complete_graph(5)
assert _is_separating_set(G, {0, 1, 2, 3})
def test_is_separating_set():
for i in [5, 10, 15]:
G = nx.star_graph(i)
max_degree_node = max(G, key=G.degree)
assert _is_separating_set(G, {max_degree_node})
def test_non_repeated_cuts():
# The algorithm was repeating the cut {0, 1} for the giant biconnected
# component of the Karate club graph.
K = nx.karate_club_graph()
bcc = max(list(nx.biconnected_components(K)), key=len)
G = K.subgraph(bcc)
solution = [{32, 33}, {2, 33}, {0, 3}, {0, 1}, {29, 33}]
cuts = list(nx.all_node_cuts(G))
if len(solution) != len(cuts):
print(nx.info(G))
print(f"Solution: {solution}")
print(f"Result: {cuts}")
assert len(solution) == len(cuts)
for cut in cuts:
assert cut in solution
def test_cycle_graph():
G = nx.cycle_graph(5)
solution = [{0, 2}, {0, 3}, {1, 3}, {1, 4}, {2, 4}]
cuts = list(nx.all_node_cuts(G))
assert len(solution) == len(cuts)
for cut in cuts:
assert cut in solution
def test_complete_graph():
G = nx.complete_graph(5)
solution = [{0, 1, 2, 3}, {0, 1, 2, 4}, {0, 1, 3, 4}, {0, 2, 3, 4}, {1, 2, 3, 4}]
cuts = list(nx.all_node_cuts(G))
assert len(solution) == len(cuts)
for cut in cuts:
assert cut in solution

View file

@ -0,0 +1,100 @@
from itertools import chain
import networkx as nx
import pytest
def _check_partition(G, cut_value, partition, weight):
assert isinstance(partition, tuple)
assert len(partition) == 2
assert isinstance(partition[0], list)
assert isinstance(partition[1], list)
assert len(partition[0]) > 0
assert len(partition[1]) > 0
assert sum(map(len, partition)) == len(G)
assert set(chain.from_iterable(partition)) == set(G)
partition = tuple(map(set, partition))
w = 0
for u, v, e in G.edges(data=True):
if (u in partition[0]) == (v in partition[1]):
w += e.get(weight, 1)
assert w == cut_value
def _test_stoer_wagner(G, answer, weight="weight"):
cut_value, partition = nx.stoer_wagner(G, weight, heap=nx.utils.PairingHeap)
assert cut_value == answer
_check_partition(G, cut_value, partition, weight)
cut_value, partition = nx.stoer_wagner(G, weight, heap=nx.utils.BinaryHeap)
assert cut_value == answer
_check_partition(G, cut_value, partition, weight)
def test_graph1():
G = nx.Graph()
G.add_edge("x", "a", weight=3)
G.add_edge("x", "b", weight=1)
G.add_edge("a", "c", weight=3)
G.add_edge("b", "c", weight=5)
G.add_edge("b", "d", weight=4)
G.add_edge("d", "e", weight=2)
G.add_edge("c", "y", weight=2)
G.add_edge("e", "y", weight=3)
_test_stoer_wagner(G, 4)
def test_graph2():
G = nx.Graph()
G.add_edge("x", "a")
G.add_edge("x", "b")
G.add_edge("a", "c")
G.add_edge("b", "c")
G.add_edge("b", "d")
G.add_edge("d", "e")
G.add_edge("c", "y")
G.add_edge("e", "y")
_test_stoer_wagner(G, 2)
def test_graph3():
# Source:
# Stoer, M. and Wagner, F. (1997). "A simple min-cut algorithm". Journal of
# the ACM 44 (4), 585-591.
G = nx.Graph()
G.add_edge(1, 2, weight=2)
G.add_edge(1, 5, weight=3)
G.add_edge(2, 3, weight=3)
G.add_edge(2, 5, weight=2)
G.add_edge(2, 6, weight=2)
G.add_edge(3, 4, weight=4)
G.add_edge(3, 7, weight=2)
G.add_edge(4, 7, weight=2)
G.add_edge(4, 8, weight=2)
G.add_edge(5, 6, weight=3)
G.add_edge(6, 7, weight=1)
G.add_edge(7, 8, weight=3)
_test_stoer_wagner(G, 4)
def test_weight_name():
G = nx.Graph()
G.add_edge(1, 2, weight=1, cost=8)
G.add_edge(1, 3, cost=2)
G.add_edge(2, 3, cost=4)
_test_stoer_wagner(G, 6, weight="cost")
def test_exceptions():
G = nx.Graph()
pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
G.add_node(1)
pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
G.add_node(2)
pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
G.add_edge(1, 2, weight=-2)
pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
G = nx.DiGraph()
pytest.raises(nx.NetworkXNotImplemented, nx.stoer_wagner, G)
G = nx.MultiGraph()
pytest.raises(nx.NetworkXNotImplemented, nx.stoer_wagner, G)
G = nx.MultiDiGraph()
pytest.raises(nx.NetworkXNotImplemented, nx.stoer_wagner, G)