from mimetypes import MimeTypes
import networkx as nx
import elpigraph
import numpy as np
import numba as nb
import matplotlib.pyplot as plt
import itertools
import pandas as pd
from sklearn.decomposition import PCA
from copy import deepcopy
from scipy.spatial.qhull import ConvexHull
from shapely.geometry import Point, Polygon, MultiLineString, LineString
from shapely.geometry.multipolygon import MultiPolygon
from sklearn.neighbors import NearestNeighbors
def flatten(S):
if S == []:
return S
if isinstance(S[0], list):
return flatten(S[0]) + flatten(S[1:])
return S[:1] + flatten(S[1:])
@nb.njit
def _get_intersect_inner(a1, a2, b1, b2):
"""
Returns the point of intersection of the lines passing through a2,a1 and b2,b1.
a1: [x, y] a point on the first line
a2: [x, y] another point on the first line
b1: [x, y] a point on the second line
b2: [x, y] another point on the second line
"""
s = np.vstack((a1, a2, b1, b2)) # s for stacked
h = np.hstack((s, np.ones((4, 1)))) # h for homogeneous
l1 = np.cross(h[0], h[1]) # get first line
l2 = np.cross(h[2], h[3]) # get second line
x, y, z = np.cross(l1, l2) # point of intersection
if z == 0: # lines are parallel
return np.array([np.inf, np.inf])
return np.array([x / z, y / z])
@nb.njit
def _isBetween(a, b, c):
""" Check if c is in between a and b """
dotproduct = (c[0] - a[0]) * (b[0] - a[0]) + (c[1] - a[1]) * (b[1] - a[1])
if dotproduct < 0:
return False
squaredlengthba = (b[0] - a[0]) * (b[0] - a[0]) + (b[1] - a[1]) * (b[1] - a[1])
if dotproduct > squaredlengthba:
return False
return True
@nb.njit
def _get_intersect(Xin, nodep, edges, cent):
inters = np.zeros_like(Xin)
for i, x in enumerate(Xin):
for e in edges:
p_inter = _get_intersect_inner(nodep[e[0]], nodep[e[1]], x, cent)
if _isBetween(nodep[e[0]], nodep[e[1]], p_inter):
if np.sum((x - p_inter) ** 2) < np.sum((cent - p_inter) ** 2):
inters[i] = p_inter
return inters
@nb.njit
def get_weights_lineproj(Xin, nodep, edges, cent, threshold=0.2):
Xin_lineproj = _get_intersect(Xin, nodep, edges, cent)
distcent_Xin_lineproj = np.sqrt(np.sum((Xin_lineproj - cent) ** 2, axis=1))
distcent_Xin = np.sqrt(np.sum((Xin - cent) ** 2, axis=1))
w = 1 - distcent_Xin / distcent_Xin_lineproj
idx_close = w > threshold
w[idx_close] = 1.0
return w, idx_close
def shrink_or_swell_shapely_polygon(coords, factor=0.10, swell=False):
"""returns the shapely polygon which is smaller or bigger by passed factor.
If swell = True , then it returns bigger polygon, else smaller"""
my_polygon = Polygon(coords)
xs = list(my_polygon.exterior.coords.xy[0])
ys = list(my_polygon.exterior.coords.xy[1])
x_center = 0.5 * min(xs) + 0.5 * max(xs)
y_center = 0.5 * min(ys) + 0.5 * max(ys)
min_corner = Point(min(xs), min(ys))
max_corner = Point(max(xs), max(ys))
center = Point(x_center, y_center)
shrink_distance = center.distance(min_corner) * factor
if swell:
my_polygon_resized = my_polygon.buffer(shrink_distance) # expand
else:
my_polygon_resized = my_polygon.buffer(-shrink_distance) # shrink
return my_polygon_resized
def remove_intersections(nodep, edges):
""" Update edges to account for possible 2d intersections in graph after adding loops """
new_nodep = nodep.copy()
lnodep = new_nodep.tolist()
new_edges = edges.tolist()
multiline = MultiLineString([LineString(new_nodep[e]) for e in new_edges])
while not (multiline.is_simple): # while intersections in graph
# find an intersection, update edges, break, update graph
for i, j in itertools.combinations(range(len(multiline.geoms)), 2):
line1, line2 = multiline.geoms[i], multiline.geoms[j]
if line1.intersects(line2):
if np.array(line1.intersection(line2).coords).flatten().tolist() not in lnodep:
new_nodep = np.append(
new_nodep, np.array(line1.intersection(line2).coords), axis=0,
)
intersects_idx = [list(new_edges[i]), list(new_edges[j])]
new_edges.pop(new_edges.index(intersects_idx[0]))
new_edges.pop(new_edges.index(intersects_idx[1]))
for n in np.array(intersects_idx).flatten():
new_edges.append([n, len(new_nodep) - 1])
break
multiline = MultiLineString([LineString(new_nodep[e]) for e in new_edges])
lnodep = new_nodep.tolist()
return new_nodep, np.array(new_edges)
@nb.njit
def mahalanobis(M, cent):
cov = np.cov(M, rowvar=0)
try:
cov_inverse = np.linalg.inv(cov)
except:
cov_inverse = np.linalg.pinv(cov)
M_c = M - cent
dist = np.sqrt(np.sum((M_c) * cov_inverse.dot(M_c.T).T, axis=1))
return dist
@nb.njit
def polygon_area(x, y):
# coordinate shift
x_ = x - x.mean()
y_ = y - y.mean()
# everything else is the same as maxb's code
correction = x_[-1] * y_[0] - y_[-1] * x_[0]
main_area = np.dot(x_[:-1], y_[1:]) - np.dot(y_[:-1], x_[1:])
return 0.5 * np.abs(main_area + correction)
def pp_compactness(cycle_nodep):
"""Polsby-Popper compactness"""
area = polygon_area(cycle_nodep[:, 0], cycle_nodep[:, 1])
length = np.sum(np.sqrt((np.diff(cycle_nodep, axis=0) ** 2).sum(axis=1)))
return (4 * np.pi * area) / (length ** 2)
def find_all_cycles(G, source=None, cycle_length_limit=None):
"""forked from networkx dfs_edges function. Assumes nodes are integers, or at least
types which work with min() and > ."""
if source is None:
# produce edges for all components
nodes = [list(i)[0] for i in nx.connected_components(G)]
else:
# produce edges for components with source
nodes = [source]
# extra variables for cycle detection:
cycle_stack = []
output_cycles = set()
def get_hashable_cycle(cycle):
"""cycle as a tuple in a deterministic order."""
m = min(cycle)
mi = cycle.index(m)
mi_plus_1 = mi + 1 if mi < len(cycle) - 1 else 0
if cycle[mi - 1] > cycle[mi_plus_1]:
result = cycle[mi:] + cycle[:mi]
else:
result = list(reversed(cycle[:mi_plus_1])) + list(
reversed(cycle[mi_plus_1:])
)
return tuple(result)
for start in nodes:
if start in cycle_stack:
continue
cycle_stack.append(start)
stack = [(start, iter(G[start]))]
while stack:
parent, children = stack[-1]
try:
child = next(children)
if child not in cycle_stack:
cycle_stack.append(child)
stack.append((child, iter(G[child])))
else:
i = cycle_stack.index(child)
if i < len(cycle_stack) - 2:
output_cycles.add(get_hashable_cycle(cycle_stack[i:]))
except StopIteration:
stack.pop()
cycle_stack.pop()
return [list(i) for i in output_cycles]
# def in_hull(points, queries):
# hull = _Qhull(
# b"i",
# points,
# options=b"",
# furthest_site=False,
# incremental=False,
# interior_point=None,
# )
# equations = hull.get_simplex_facet_array()[2].T
# return np.all(queries @ equations[:-1] < -equations[-1], axis=1)
def in_hull(points, queries):
equations = ConvexHull(points).equations.T
return np.all(queries @ equations[:-1] < -equations[-1], axis=1)
[docs]def findPaths(
X,
PG,
min_path_len=None,
nnodes=None,
max_inner_fraction=0.1,
min_node_n_points=2,
max_n_points=None,
min_compactness=0.5,
radius=None,
allow_same_branch=True,
fit_loops=True,
Lambda=None,
Mu=None,
cycle_Lambda=None,
cycle_Mu=None,
weights=None,
ignore_equivalent=False,
plot=False,
verbose=False,
):
"""
This function tries to add extra paths to the graph
by computing a series of principal curves connecting two nodes
and retaining plausible ones using heuristic parameters
min_path_len: int, default=None
Minimum distance along the graph (in number of nodes) that separates the two nodes to connect with a principal curve
n_nodes: int, default=None
Number of nodes in the candidate principal curves
max_inner_fraction: float in [0,1], default=0.1
Maximum fraction of points inside vs outside the loop Controls how empty the loop formed with the added path should be.
min_node_n_points: int, default=1
Minimum number of points associated to nodes of the principal curve (prevents creating paths through empty space)
max_n_points: int, default=5% of the number of points
Maximum number of points inside the loop
min_compactness: float in [0,1], default=0.5
Minimum 'roundness' of the loop (1=more round) (if very narrow loops are not desired)
radius: float, default=None
Max distance in space that separates the two nodes to connect with a principal curve
allow_same_branch: bool, default=True
Whether to allow new paths to connect two nodes from the same branch
fit_loops: bool, default=True
Whether to refit the graph to data after adding the new paths
plot: bool, default=False
Whether to plot selected candidate paths
verbose: bool, default=False
weights: bool, default=False
Whether to use point weights
"""
_PG = deepcopy(PG)
if "projection" in _PG.keys():
del _PG["projection"]
init_nodes_pos, init_edges = _PG["NodePositions"], _PG["Edges"][0]
# --- Init parameters, variables
epg = nx.Graph(init_edges.tolist())
SquaredX = np.sum(X ** 2, axis=1, keepdims=1)
part, part_dist = elpigraph.src.core.PartitionData(
X, init_nodes_pos, 10 ** 6, SquaredX=SquaredX
)
leaves = [k for k, v in epg.degree if v == 1]
edge_lengths = np.sqrt(
np.sum(
(init_nodes_pos[init_edges[:, 0], :] - init_nodes_pos[init_edges[:, 1], :])
** 2,
axis=1,
)
)
if Mu is None:
Mu = _PG["Mu"]
if Lambda is None:
Lambda = _PG["Lambda"]
if cycle_Mu is None:
cycle_Mu = Mu
if cycle_Lambda is None:
cycle_Lambda = Lambda
if radius is None:
radius = np.mean(edge_lengths) * len(init_nodes_pos) / 10
if min_path_len is None:
min_path_len = len(init_nodes_pos) // 5
if max_n_points is None:
max_n_points = int(len(X) * 0.05)
if min_node_n_points is None:
min_node_n_points = max(1, np.bincount(part.flat).min())
if weights is None:
weights = np.ones(len(X))[:, None]
if nnodes is None:
nnodes = min(16, max(6, int(radius / np.mean(edge_lengths))))
elif nnodes < 6:
raise ValueError("nnodes should be at least 6")
if verbose:
print(
f"Using default parameters: max_n_points={max_n_points},"
f" radius={radius:.2f}, min_node_n_points={min_node_n_points},"
f" min_path_len={min_path_len}, nnodes={nnodes}"
)
# --- Get candidate nodes to connect
dist, ind = (
NearestNeighbors(radius=radius)
.fit(init_nodes_pos)
.radius_neighbors(init_nodes_pos[leaves])
)
net = elpigraph.src.graphs.ConstructGraph({"Edges": [init_edges]})
if all(np.array(net.degree()) <= 2):
branches = net.get_shortest_paths(leaves[0], leaves[-1])
else:
(
_,
dict_branches,
_,
) = elpigraph.src.supervised.get_tree(init_edges, leaves[0])
branches = list(dict_branches.values())
candidate_nodes = []
for i in range(len(leaves)):
root_branch = [b for b in branches if leaves[i] in b][0]
if allow_same_branch:
_cand_nodes = [node for b in branches for node in b if node in ind[i]]
else:
_cand_nodes = [
node
for b in branches
for node in b
if not (node in root_branch) and node in ind[i]
]
paths = net.get_shortest_paths(leaves[i], _cand_nodes)
candidate_nodes.append([p[-1] for p in paths if len(p) > min_path_len])
# --- Test each of the loops connecting a leaf to its candidate nodes,
# --- for each leaf select the one with minimum energy and that respect constraints
if verbose:
print("testing", sum([len(_) for _ in candidate_nodes]), "candidates")
new_edges = []
new_nodep = []
new_leaves = []
new_part = []
new_energy = []
new_inner_fraction = []
for i, l in enumerate(leaves):
inner_fractions = []
energies = []
merged_edges = []
merged_nodep = []
merged_part = []
loop_edges = []
loop_nodep = []
loop_leaves = []
for c in candidate_nodes[i]:
clus = (part == c) | (part == l)
X_fit = np.vstack((init_nodes_pos[c], init_nodes_pos[l], X[clus.flat]))
try:
pg = elpigraph.computeElasticPrincipalCurve(
X_fit, nnodes, Lambda=Lambda, Mu=Mu, FixNodesAtPoints=[[0], [1]],
)[0]
except Exception as e:
energies.append(np.inf)
merged_edges.append(np.inf)
merged_nodep.append(np.inf)
loop_edges.append(np.inf)
loop_nodep.append(np.inf)
loop_leaves.append(np.inf)
# candidate curve has infinite energy, ignore error
if e.args == (
"local variable 'NewNodePositions' referenced before" " assignment",
):
continue
else:
raise e
# ---get nodep, edges, create new graph with added loop
nodep, edges = pg["NodePositions"], pg["Edges"][0]
_edges = edges.copy()
_edges[(edges != 0) & (edges != 1)] += init_edges.max() - 1
_edges[edges == 0] = c
_edges[edges == 1] = l
_merged_edges = np.concatenate((init_edges, _edges))
_merged_nodep = np.concatenate((init_nodes_pos, nodep[2:]))
cycle_nodes = find_all_cycles(nx.Graph(_merged_edges.tolist()))[0]
ElasticMatrix = elpigraph.src.core.MakeUniformElasticMatrix_with_cycle(
_merged_edges,
Lambda=Lambda,
Mu=Mu,
cycle_Lambda=cycle_Lambda,
cycle_Mu=cycle_Mu,
cycle_nodes=cycle_nodes,
)
_merged_nodep = elpigraph.src.core.PrimitiveElasticGraphEmbedment(
X,
_merged_nodep,
ElasticMatrix,
PointWeights=weights,
FixNodesAtPoints=[],
)[0]
### candidate validity tests ###
valid = True
# --- cycle validity test
if valid:
G = nx.Graph(_merged_edges.tolist())
cycle_nodes = find_all_cycles(G)[0]
cycle_nodep = np.array([_merged_nodep[e] for e in cycle_nodes])
cent_part, cent_dists = elpigraph.src.core.PartitionData(
X, _merged_nodep, 10 ** 6, SquaredX=SquaredX
)
cycle_points = np.isin(cent_part.flat, cycle_nodes)
if X.shape[1] > 2:
pca = PCA(n_components=2, svd_solver="arpack").fit(X[cycle_points])
X_cycle_2d = pca.transform(X[cycle_points])
cycle_2d = pca.transform(cycle_nodep)
else:
cycle_2d = cycle_nodep
X_cycle_2d = X[cycle_points]
inside_idx = in_hull(cycle_2d, X_cycle_2d)
if sum(inside_idx) == 0:
inner_fraction = 0.0
else:
cycle_centroid = np.mean(cycle_2d, axis=0, keepdims=1)
X_inside = X_cycle_2d[inside_idx]
if len(X_inside) == 1:
w = np.ones(len(X_inside))
else:
w = mahalanobis(X_inside, cycle_centroid)
# points belonging to cycle shrunk by 10% or within 2 std of centroid (mahalanobis < 2)
shrunk_cycle_2d = shrink_or_swell_shapely_polygon(
cycle_2d, factor=0.1
)
# prevent shapely edge case bug when multi-polygon is returned. Fall back to mahalanobis
if type(shrunk_cycle_2d) == MultiPolygon:
in_shrunk_cycle = np.ones(len(X_inside), dtype=bool)
else:
shrunk_cycle_2d = np.array(shrunk_cycle_2d.exterior.coords)
# prevent bug when self-intersection
if len(shrunk_cycle_2d) == 0:
in_shrunk_cycle = np.ones(len(X_inside), dtype=bool)
else:
in_shrunk_cycle = in_hull(shrunk_cycle_2d, X_inside)
idx_close = in_shrunk_cycle | (w < 1)
w = 1 - w / w.max()
w[idx_close] = 1
inner_fraction = np.sum(w) / np.sum(cycle_points)
if init_nodes_pos.shape[1] == 2:
intersect = not (
MultiLineString(
[LineString(_merged_nodep[e]) for e in _merged_edges]
).is_simple
)
if intersect:
valid = False
# idx for min_node_n_points test: points that are away from the center
ix_outside = np.ones(len(cycle_points), dtype=bool)
ix_outside[
np.arange(len(X))[cycle_points][inside_idx][idx_close]
] = False
if (
any(
np.bincount(
cent_part[ix_outside].flat, minlength=len(_merged_nodep),
)[len(init_nodes_pos) :]
< min_node_n_points
) # if empty cycle node
or (
inner_fraction > max_inner_fraction
) # if high fraction of points inside
or (not np.isfinite(inner_fraction)) # prevent no points error
or (np.sum(idx_close) > max_n_points) # if too many points inside
or pp_compactness(cycle_2d) < min_compactness # if loop is very narrow
):
valid = False
# ---> if cycle is invalid, continue
if not valid:
inner_fractions.append(np.inf)
energies.append(np.inf)
merged_edges.append(np.inf)
merged_nodep.append(np.inf)
merged_part.append(np.inf)
loop_edges.append(np.inf)
loop_nodep.append(np.inf)
loop_leaves.append(np.inf)
continue
# ---> valid cycle, compute graph energy
else:
(_merged_part, _merged_part_dist,) = elpigraph.src.core.PartitionData(
X, _merged_nodep, 10 ** 6, SquaredX=SquaredX
)
proj = elpigraph.src.reporting.project_point_onto_graph(
X, _merged_nodep, _merged_edges, _merged_part
)
MSE = proj["MSEP"]
inner_fractions.append(inner_fraction)
energies.append(MSE)
merged_edges.append(_merged_edges)
merged_nodep.append(_merged_nodep)
merged_part.append(np.where(np.isin(_merged_part.flat, cycle_nodes))[0])
loop_edges.append(edges)
loop_nodep.append(nodep[2:])
loop_leaves.append([c, l])
# --- among all valid cycles found, retain the best
if energies != [] and np.isfinite(energies).any():
best = np.argmin(energies)
if [loop_leaves[best][-1], loop_leaves[best][0]] not in new_leaves:
# and not any(np.isin(loop_leaves[best],np.unique(np.array(new_leaves))))):
new_edges.append(loop_edges[best])
new_nodep.append(loop_nodep[best])
new_leaves.append(loop_leaves[best])
new_part.append(merged_part[best])
new_energy.append(energies[best])
new_inner_fraction.append(inner_fractions[best])
_merged_edges = merged_edges[best]
_merged_nodep = merged_nodep[best]
if plot:
c = candidate_nodes[i][best]
clus = (part == c) | (part == l)
X_fit = np.vstack(
(init_nodes_pos[c], init_nodes_pos[l], X[clus.flat])
)
proj = elpigraph.src.reporting.project_point_onto_graph(
X, _merged_nodep, _merged_edges, _merged_part
)
MSE = proj["MSEP"]
# ----- cycle test
G = nx.Graph(_merged_edges.tolist())
cycle_nodes = find_all_cycles(G)[0]
cycle_nodep = np.array([_merged_nodep[e] for e in cycle_nodes])
cent_part, cent_dists = elpigraph.src.core.PartitionData(
X, _merged_nodep, 10 ** 6, SquaredX=SquaredX
)
cycle_points = np.isin(cent_part.flat, cycle_nodes)
if X.shape[1] > 2:
pca = PCA(n_components=2, svd_solver="arpack").fit(cycle_nodep)
cycle_2d = pca.transform(cycle_nodep)
X_cycle_2d = pca.transform(X[cycle_points])
else:
cycle_2d = cycle_nodep
X_cycle_2d = X[cycle_points]
inside_idx = in_hull(cycle_2d, X_cycle_2d)
cycle_centroid = np.mean(cycle_2d, axis=0, keepdims=1)
X_inside = X_cycle_2d[inside_idx]
if len(X_inside) == 1:
w = np.ones(len(X_inside))
else:
w = mahalanobis(X_inside, cycle_centroid)
# points belonging to cycle shrunk by 10% or within 2 std of centroid (mahalanobis < 2)
shrunk_cycle_2d = shrink_or_swell_shapely_polygon(
cycle_2d, factor=0.1
)
# prevent shapely bugs when multi-polygon is returned. Fall back to mahalanobis
if type(shrunk_cycle_2d) == MultiPolygon:
in_shrunk_cycle = np.ones(len(X_inside), dtype=bool)
else:
shrunk_cycle_2d = np.array(shrunk_cycle_2d.exterior.coords)
# prevent bug when self-intersection
if len(shrunk_cycle_2d) == 0:
in_shrunk_cycle = np.ones(len(X_inside), dtype=bool)
else:
in_shrunk_cycle = in_hull(shrunk_cycle_2d, X_inside)
idx_close = in_shrunk_cycle | (w < 1)
w = 1 - w / w.max()
w[idx_close] = 1
inner_fraction = np.sum(w) / np.sum(cycle_points)
compactness = pp_compactness(cycle_2d)
plt.title(
f"{c}, {l}, MSE={MSE:.4f}, \n"
f" inner%={inner_fraction:.2f},"
f" compactness={compactness:.2f}"
)
plt.scatter(*X[:, :2].T, alpha=0.1, s=5)
plt.scatter(*X_fit[:, :2].T, s=5)
plt.scatter(*_merged_nodep[:, :2].T, c="k")
for e in _merged_edges:
plt.plot(
[_merged_nodep[e[0], 0], _merged_nodep[e[1], 0]],
[_merged_nodep[e[0], 1], _merged_nodep[e[1], 1]],
c="k",
)
_ = plt.scatter(*X[cycle_points][inside_idx, :2].T, c=w.flat, s=5)
plt.colorbar(_)
plt.show()
# ignore equivalent loops (with more than 2/3 shared points and nodes)
valid = np.ones(len(new_part))
if ignore_equivalent:
for i in range(len(new_part) - 1):
for j in range(i + 1, len(new_part)):
if (
len(np.intersect1d(new_part[i], new_part[j]))
/ max(len(new_part[i]), len(new_part[j]))
) > (2 / 3):
#all_leaves_i = np.isin(new_leaves[i],leaves).all()
#all_leaves_j = np.isin(new_leaves[j],leaves).all()
## prioritize connecting leaves
#if all_leaves_j and not(all_leaves_i):
# valid[i] = 0
#elif all_leaves_i and not(all_leaves_j):
# valid[j] = 0
# else take lowest energy
if np.argmin([new_energy[i], new_energy[j]]) == 0:
valid[j] = 0
else:
valid[i] = 0
new_edges = [e for i, e in enumerate(new_edges) if valid[i]]
new_nodep = [e for i, e in enumerate(new_nodep) if valid[i]]
new_leaves = [e for i, e in enumerate(new_leaves) if valid[i]]
new_part = [e for i, e in enumerate(new_part) if valid[i]]
new_energy = [e for i, e in enumerate(new_energy) if valid[i]]
new_inner_fraction = [e for i, e in enumerate(new_inner_fraction) if valid[i]]
### form graph with all valid loops found ###
if (new_edges == []) or (sum(valid) == 0):
print("Found no valid path to add")
return None
merged_edges=init_edges.copy()
for i, loop_edges in enumerate(new_edges):
loop_edges[(loop_edges != 0) & (loop_edges != 1)] += merged_edges.max() - 1
ix0, ix1 = loop_edges == 0, loop_edges == 1
loop_edges[ix0],loop_edges[ix1] = new_leaves[i][0], new_leaves[i][1]
merged_edges = np.concatenate((merged_edges, loop_edges))
merged_nodep = np.concatenate((init_nodes_pos, *new_nodep))
### optionally refit the entire graph ###
if fit_loops:
cycle_nodes = np.concatenate(find_all_cycles(nx.Graph(merged_edges.tolist())))
ElasticMatrix = elpigraph.src.core.MakeUniformElasticMatrix_with_cycle(
merged_edges,
Lambda=Lambda,
Mu=Mu,
cycle_Lambda=cycle_Lambda,
cycle_Mu=cycle_Mu,
cycle_nodes=cycle_nodes,
)
merged_nodep = elpigraph.src.core.PrimitiveElasticGraphEmbedment(
X, merged_nodep, ElasticMatrix, PointWeights=weights, FixNodesAtPoints=[])[0]
# check intersection
if merged_nodep.shape[1] == 2:
intersect = not (
MultiLineString(
[LineString(merged_nodep[e]) for e in merged_edges]
).is_simple
)
if intersect:
merged_nodep, merged_edges = remove_intersections(
merged_nodep, merged_edges
)
_PG["Edges"] = [merged_edges]
_PG["NodePositions"] = merged_nodep
_PG["Lambda"] = Lambda
_PG["Mu"] = Mu
_PG["cycle_Lambda"] = cycle_Lambda
_PG["cycle_Mu"] = cycle_Mu
_PG["addLoopsdict"] = dict(
new_edges=new_edges,
new_nodep=new_nodep,
merged_nodep=merged_nodep,
merged_edges=merged_edges,
new_leaves=new_leaves,
new_part=new_part,
new_energy=new_energy,
new_inner_fraction=new_inner_fraction,
)
if verbose:
l = [
_PG["addLoopsdict"]["new_inner_fraction"],
_PG["addLoopsdict"]["new_energy"],
[len(n) for n in _PG["addLoopsdict"]["new_part"]],
]
df = pd.concat(
[pd.DataFrame(_PG["addLoopsdict"]["new_leaves"])]
+ [pd.Series(i) for i in l],
axis=1,
)
df.columns = [
"source node",
"target node",
"inner fraction",
"MSE",
"n° of points in path",
]
df.index = ["" for i in df.index]
print("Suggested paths:")
print(df.round(4))
return _PG
[docs]def addPath(
X,
PG,
source,
target,
n_nodes=None,
weights=None,
refit_graph=False,
Mu=None,
Lambda=None,
cycle_Mu=None,
cycle_Lambda=None,
):
"""
Add path (principal curve) between two nodes in the principal graph
"""
_PG = deepcopy(PG)
if "projection" in _PG.keys():
del _PG["projection"]
init_nodes_pos, init_edges = _PG["NodePositions"], _PG["Edges"][0]
init_edges_max = init_edges.max()
# --- Init parameters, variables
if not isinstance(target,int):
init_nodes_pos = np.vstack((init_nodes_pos,target))
init_edges_max+= 1
target = len(init_nodes_pos)-1
if Mu is None:
Mu = _PG["Mu"]
if Lambda is None:
Lambda = _PG["Lambda"]
if cycle_Mu is None:
cycle_Mu = Mu
if cycle_Lambda is None:
cycle_Lambda = Lambda
if n_nodes is None:
n_nodes = min(16, max(6, len(init_nodes_pos) / 20))
SquaredX = np.sum(X ** 2, axis=1, keepdims=1)
part, part_dist = elpigraph.src.core.PartitionData(
X, init_nodes_pos, 10 ** 6, SquaredX=SquaredX
)
clus = (part == source) | (part == target)
X_fit = np.vstack((init_nodes_pos[source], init_nodes_pos[target], X[clus.flat]))
# --- fit path
PG_path = elpigraph.computeElasticPrincipalCurve(
X_fit, NumNodes=n_nodes, Lambda=Lambda, Mu=Mu, FixNodesAtPoints=[[0], [1]],
)[0]
# --- get nodep, edges, create new graph with added loop
nodep, edges = PG_path["NodePositions"], PG_path["Edges"][0]
_edges = edges.copy()
_edges[(edges != 0) & (edges != 1)] += init_edges_max - 1
_edges[edges == 0] = source
_edges[edges == 1] = target
_merged_edges = np.concatenate((init_edges, _edges))
_merged_nodep = np.concatenate((init_nodes_pos, nodep[2:]))
if refit_graph:
cycle_nodes = elpigraph._graph_editing.find_all_cycles(
nx.Graph(_merged_edges.tolist())
)
if len(cycle_nodes) > 0:
cycle_nodes = flatten(cycle_nodes)
ElasticMatrix = elpigraph.src.core.MakeUniformElasticMatrix_with_cycle(
_merged_edges,
Lambda=Lambda,
Mu=Mu,
cycle_Lambda=cycle_Lambda,
cycle_Mu=cycle_Mu,
cycle_nodes=cycle_nodes,
)
(
_merged_nodep,
_,
_,
_,
_,
_,
_,
) = elpigraph.src.core.PrimitiveElasticGraphEmbedment(
X, _merged_nodep, ElasticMatrix, PointWeights=weights, FixNodesAtPoints=[],
)
# check intersection
if _merged_nodep.shape[1] == 2:
intersect = not (
MultiLineString(
[LineString(_merged_nodep[e]) for e in _merged_edges]
).is_simple
)
if intersect:
raise ValueError("The created path would intersect existing graph")
_PG["NodePositions"] = _merged_nodep
_PG["Edges"] = [_merged_edges]
_PG["Lambda"] = Lambda
_PG["Mu"] = Mu
_PG["cycle_Lambda"] = cycle_Lambda
_PG["cycle_Mu"] = cycle_Mu
return _PG
[docs]def delPath(
X,
PG,
source,
target,
nodes_to_include=None,
weights=None,
refit_graph=False,
Mu=None,
Lambda=None,
cycle_Mu=None,
cycle_Lambda=None,
):
"""
Delete path between two nodes in the principal graph
"""
_PG = deepcopy(PG)
# --- Init parameters, variables
if Mu is None:
Mu = _PG["Mu"]
if Lambda is None:
Lambda = _PG["Lambda"]
if cycle_Mu is None:
cycle_Mu = Mu
if cycle_Lambda is None:
cycle_Lambda = Lambda
# --- get path to remove
epg_edge = _PG["Edges"][0]
epg_edge_len = _PG["projection"]["edge_len"].copy()
del _PG["projection"]
G = nx.Graph()
G.add_nodes_from(range(_PG["NodePositions"].shape[0]))
edges_weighted = list(zip(epg_edge[:, 0], epg_edge[:, 1], epg_edge_len))
G.add_weighted_edges_from(edges_weighted, weight="len")
if nodes_to_include is None:
# nodes on the shortest path
nodes_sp = nx.shortest_path(G, source=source, target=target, weight="len")
else:
assert isinstance(nodes_to_include, list), "`nodes_to_include` must be list"
# lists of simple paths, in order from shortest to longest
list_paths = list(
nx.shortest_simple_paths(G, source=source, target=target, weight="len")
)
flag_exist = False
for p in list_paths:
if set(nodes_to_include).issubset(p):
nodes_sp = p
flag_exist = True
break
if not flag_exist:
print(f"no path that passes {nodes_to_include} exists")
G.remove_edges_from(np.vstack((nodes_sp[:-1], nodes_sp[1:])).T)
isolates = list(nx.isolates(G))
G.remove_nodes_from(isolates)
Gdel = nx.relabel_nodes(G, dict(zip(G.nodes, np.arange(len(G.nodes)))))
_PG["Edges"] = [np.array(Gdel.edges)]
_PG["NodePositions"] = _PG["NodePositions"][
~np.isin(range(len(_PG["NodePositions"])), isolates)
]
# --- get nodep, edges, create new graph with added loop
if refit_graph:
nodep, edges = _PG["NodePositions"], _PG["Edges"][0]
cycle_nodes = elpigraph._graph_editing.find_all_cycles(nx.Graph(edges.tolist()))
if len(cycle_nodes) > 0:
cycle_nodes = flatten(cycle_nodes)
ElasticMatrix = elpigraph.src.core.MakeUniformElasticMatrix_with_cycle(
edges,
Lambda=Lambda,
Mu=Mu,
cycle_Lambda=cycle_Lambda,
cycle_Mu=cycle_Mu,
cycle_nodes=cycle_nodes,
)
(
newnodep,
_,
_,
_,
_,
_,
_,
) = elpigraph.src.core.PrimitiveElasticGraphEmbedment(
X, nodep, ElasticMatrix, PointWeights=weights, FixNodesAtPoints=[]
)
_PG["NodePositions"] = newnodep
return _PG
def refitGraph(
X,
PG,
shift_nodes_pos={},
PointWeights=None,
Mu=None,
Lambda=None,
cycle_Mu=None,
cycle_Lambda=None,
):
init_nodes_pos, init_edges = PG["NodePositions"], PG["Edges"][0]
# --- Init parameters, variables
if Mu is None:
Mu = PG["Mu"]
if Lambda is None:
Lambda = PG["Lambda"]
if cycle_Mu is None:
cycle_Mu = Mu
if cycle_Lambda is None:
cycle_Lambda = Lambda
# ---Modify node pos values and order (first nodes are fixed)
for k, v in shift_nodes_pos.items():
init_nodes_pos[k] = v
fix_nodes = sorted(list(shift_nodes_pos.keys()), reverse=True)
fix_order = np.array(fix_nodes+[i for i in range(len(init_nodes_pos)) if i not in fix_nodes])
fix_nodes_pos = init_nodes_pos[fix_order]
fix_edges = init_edges.copy()
for i, ifix in enumerate(fix_order):
fix_edges[init_edges == ifix] = i
SquaredX = np.sum(X ** 2, axis=1, keepdims=1)
part, part_dist = elpigraph.src.core.PartitionData(
X, fix_nodes_pos, 10 ** 6, SquaredX=SquaredX
)
cycle_nodes = find_all_cycles(nx.Graph(fix_edges.tolist()))
if len(cycle_nodes) > 0:
cycle_nodes = flatten(cycle_nodes)
ElasticMatrix = elpigraph.src.core.MakeUniformElasticMatrix_with_cycle(
fix_edges,
Lambda=Lambda,
Mu=Mu,
cycle_Lambda=cycle_Lambda,
cycle_Mu=cycle_Mu,
cycle_nodes=cycle_nodes,
)
(
new_nodes_pos,
_,
_,
_,
_,
_,
_,
) = elpigraph.src.core.PrimitiveElasticGraphEmbedment(
X,
fix_nodes_pos,
ElasticMatrix,
PointWeights=PointWeights,
FixNodesAtPoints=[[] for i in range(len(fix_nodes))],
)
# ---Revert to initial node ordering
PG["NodePositions"] = new_nodes_pos[np.argsort(fix_order)]