Initial commit based of previous repositories contents

This commit is contained in:
2026-01-06 18:26:38 +01:00
commit 35ad81484c
9 changed files with 602 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
*/__pycache__/**

38
README.md Normal file
View File

@@ -0,0 +1,38 @@
# Boundary-aware centrality for Spatial Graphs
University project for boundary-aware node centralities for spatial graphs. For the project details and results see the [typst paper](./doc/paper.typ). For building the document and the usage of the python library used for this project see the corresponding sections below.
## Code usage
> [!important]
> The implementation uses [gurobi](https://www.gurobi.com/solutions/gurobi-optimizer/) for solving the linear problem for the function fitting which requires a *license*. Please refer to the gurobi license documentation for details.
Install the requirements into a virtual environment:
```sh
# create virtual environment
python -m venv venv
# activate virtual environment
source venv/bin/activate
# install required dependencies
pip install -r requirements.txt
```
The `src` directory contains all the python source files that are used for the creation of the diagrams and images found in the `doc/figures` directory. For the corresponding usages for both datasets used for this project please see [merfish.py](./merfish.py) and [mibitof.py](./mibitof.py).
For running the generation of the diagrams and images you can run one of the two python scripts like so (inside of the virtual environment):
```sh
python merfish.py
```
## Document generation
The document is written using [typst](https://typst.app) and can be compiled into a pdf file using:
```sh
typst c doc/paper.typ
```
The generated pdf file will then be generated into the `doc` directory named `paper.pdf`.

188
example.py Normal file
View File

@@ -0,0 +1,188 @@
import math
import matplotlib.pyplot as plt
import numpy as np
# import squidpy as sq
from graph_tool.all import *
from src import centrality
from src import plot
from src import fitting
def merfish():
"""
Merfish dataset from `squidpy`.
"""
adata = sq.datasets.merfish()
adata = adata[adata.obs.Bregma == -9].copy()
return adata
def mibitof():
"""
Mibitof dataset from `squidpy`.
"""
adata = sq.datasets.mibitof()
return adata
def random_graph(n=5000, seed=None):
"""
Uniformly random point cloud generation.
`n` [int] Number of points to generate. Default 5000 seems like a good starting point in point density and corresponding runtime for the subsequent calculations.
@return [numpy.ndarray] Array of shape(n, 2) containing the coordinates for each point of the generated point cloud.
"""
if seed is None:
import secrets
seed = secrets.randbits(128)
rng = np.random.default_rng(seed=seed)
return rng.random((n, 2)), seed
def spatial_graph(adata):
"""
Generate the spatial graph using delaunay for the given `adata`.
`adata` will contain the calculated spatial graph contents in the keys
`adata.obps['spatial_distances']` and `adata.obsm['spatial']` afterwards too.
@return [Graph] generated networkx graph from adata['spatial_distances']
"""
g, pos = graph_tool.generation.triangulation(adata, type="delaunay")
g.vp["pos"] = pos
weight = g.new_edge_property("double")
for e in g.edges():
weight[e] = math.sqrt(sum(map(abs, pos[e.source()].a - pos[e.target()].a)))**2
return g, weight
# generate spatial graph from a given dataset
# g, weight = spatial_graph(merfish().obsm['spatial'])
for i in range(1, 10):
points, seed = random_graph()
g, weight = spatial_graph(points)
g = GraphView(g)
# calculate centrality values
vp = closeness(g, weight=weight)
vp.a = np.nan_to_num(vp.a) # correct floating point values
# ep.a = np.nan_to_num(ep.a) # correct floating point values
# calculate convex hull
convex_hull = centrality.convex_hull(g)
# plot graph with convex_hull
fig = plt.figure(figsize=(15, 5))
ax0, ax1 = fig.subplots(1, 2)
plot.graph_plot(fig, ax0, g, vp, convex_hull, f"Random Graph (seed: {seed})\nCloseness")
# generate model based on convex hull and associated centrality values
quantification = plot.quantification_data(g, vp, convex_hull)
# optimize model's piece-wise linear function
d = quantification[:, 0]
C = quantification[:, 1]
m_opt, c0_opt, b_opt = fitting.fit_piece_wise_linear(d, C)
# TODO
# should this be part of the plotting function itself, it should not be necessary for me to do this
d_curve = np.linspace(min(d), max(d), 500)
C_curve = np.piecewise(
d_curve,
[d_curve <= b_opt, d_curve > b_opt],
[lambda x: m_opt * x + c0_opt, lambda x: m_opt * b_opt + c0_opt]
)
# plot model containing modeled piece-wise linear function
plot.quantification_plot(ax1, quantification, d_curve, C_curve, 'Models')
# linear regression model
m_reg, c_reg = fitting.fit_linear_regression(d, C)
x = np.linspace(min(d), max(d), 500)
y = m_reg * x + c_reg
ax1.plot(x, y, color='k', linewidth=1, label="Simple Linear Regression")
ax1.legend()
fig.savefig(f"random_point_clouds/{i}_closeness.svg", format='svg')
# ---------------------------------------------------------------------------------------------
# calculate centrality values
vp, ep = betweenness(g, weight=weight)
vp.a = np.nan_to_num(vp.a) # correct floating point values
# ep.a = np.nan_to_num(ep.a) # correct floating point values
# calculate convex hull
convex_hull = centrality.convex_hull(g)
# plot graph with convex_hull
fig = plt.figure(figsize=(15, 5))
ax0, ax1 = fig.subplots(1, 2)
plot.graph_plot(fig, ax0, g, vp, convex_hull, f"Random Graph (seed: {seed})\nBetweenness")
# generate model based on convex hull and associated centrality values
quantification = plot.quantification_data(g, vp, convex_hull)
# optimize model's piece-wise linear function
d = quantification[:, 0]
C = quantification[:, 1]
m_opt, c0_opt, b_opt = fitting.fit_piece_wise_linear(d, C)
# TODO
# should this be part of the plotting function itself, it should not be necessary for me to do this
d_curve = np.linspace(min(d), max(d), 500)
C_curve = np.piecewise(
d_curve,
[d_curve <= b_opt, d_curve > b_opt],
[lambda x: m_opt * x + c0_opt, lambda x: m_opt * b_opt + c0_opt]
)
# plot model containing modeled piece-wise linear function
plot.quantification_plot(ax1, quantification, d_curve, C_curve, 'Models')
# linear regression model
m_reg, c_reg = fitting.fit_linear_regression(d, C)
x = np.linspace(min(d), max(d), 500)
y = m_reg * x + c_reg
ax1.plot(x, y, color='k', linewidth=1, label="Simple Linear Regression")
ax1.legend()
fig.savefig(f"random_point_clouds/{i}_betweenness.svg", format='svg')
# ---------------------------------------------------------------------------------------------
# calculate centrality values
vp = pagerank(g, weight=weight)
vp.a = np.nan_to_num(vp.a) # correct floating point values
# ep.a = np.nan_to_num(ep.a) # correct floating point values
# calculate convex hull
convex_hull = centrality.convex_hull(g)
# plot graph with convex_hull
fig = plt.figure(figsize=(15, 5))
ax0, ax1 = fig.subplots(1, 2)
plot.graph_plot(fig, ax0, g, vp, convex_hull, f"Random Graph (seed: {seed})\nPageRank")
# generate model based on convex hull and associated centrality values
quantification = plot.quantification_data(g, vp, convex_hull)
# optimize model's piece-wise linear function
d = quantification[:, 0]
C = quantification[:, 1]
m_opt, c0_opt, b_opt = fitting.fit_piece_wise_linear(d, C)
# TODO
# should this be part of the plotting function itself, it should not be necessary for me to do this
d_curve = np.linspace(min(d), max(d), 500)
C_curve = np.piecewise(
d_curve,
[d_curve <= b_opt, d_curve > b_opt],
[lambda x: m_opt * x + c0_opt, lambda x: m_opt * b_opt + c0_opt]
)
# plot model containing modeled piece-wise linear function
plot.quantification_plot(ax1, quantification, d_curve, C_curve, 'Models')
# linear regression model
m_reg, c_reg = fitting.fit_linear_regression(d, C)
x = np.linspace(min(d), max(d), 500)
y = m_reg * x + c_reg
ax1.plot(x, y, color='k', linewidth=1, label="Simple Linear Regression")
ax1.legend()
fig.savefig(f"random_point_clouds/{i}_pagerank.svg", format='svg')

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
gurobipy
graph-tools
numpy
matplotlib

0
src/__init__.py Normal file
View File

80
src/centrality.py Normal file
View File

@@ -0,0 +1,80 @@
import math
import numpy as np
from src import plot
def convex_hull(G):
# gift wrapping algorithm
pos = G.vp["pos"]
x = []
y = []
for v in G.vertices():
# print(pos[v])
ver = pos[v]
x.append(ver[0])
y.append(ver[1])
min_x = min(x)
min_y = min(y)
max_x = max(x)
max_y = max(y)
pos = np.stack((np.array(x), np.array(y)), axis=-1)
convex_hull = []
hull_point = np.array([min_x,pos[np.argmin(x)][1]])
convex_hull.append(hull_point)
endpoint = None
i = 0
while not plot.Vector.eql(endpoint, convex_hull[0]):
endpoint = pos[0]
for point in pos:
if plot.Vector.eql(endpoint, hull_point) or plot.Vector.cross_z(plot.Vector.vec(convex_hull[i], endpoint), plot.Vector.vec(convex_hull[i], point)) < 0:
endpoint = point
i += 1
hull_point = endpoint
convex_hull.append(hull_point)
return convex_hull
def correct(G, centrality, m_opt, c0_opt, b_opt):
"""
Correct a given centrality through the calculated model.
@param pos [Array-2d] Positions of points (i.e. `g.vp["pos"].get_2d_array()`)
@param centrality [VertexPropertyMap] Result of a centrality calculation of `graph-tool` on graph G
@param m_opt [Float] Model m value (slope of linear function)
@param c0_opt [Float] Model c0 value (offset of linear function)
@param b_opt [Float] Model b value (intersection point)
@return [VertexPropertyMap] Corrected centrality values based on @param centrality
"""
corrected_metric = {}
pos = G.vp["pos"]
x = []
y = []
for v in G.vertices():
# print(pos[v])
ver = pos[v]
x.append(ver[0])
y.append(ver[1])
keys = iter(centrality.a)
hull = convex_hull(x, y)
points = np.stack((np.array(x), np.array(y)), axis=-1)
for point in pos:
min_distance = math.inf
key = next(keys)
for edge in hull:
vector = plot.Vector.vec(point, edge)
distance = plot.Vector.vec_len(vector)
if distance < min_distance:
min_distance = distance
if min_distance <= b_opt:
# requires correction
value = centrality[key]
delta = (m_opt * b_opt) - (m_opt * min_distance)
corrected_metric[key] = value + delta
else:
# unaffected by boundary
corrected_metric[key] = centrality[key]
return corrected_metric

30
src/dataset.py Normal file
View File

@@ -0,0 +1,30 @@
import networkx as nx
import squidpy as sq
# NOTE these shall be provided from the outside and not be inside of the library!
def merfish():
"""
Merfish dataset from `squidpy`.
"""
adata = sq.datasets.merfish()
adata = adata[adata.obs.Bregma == -9].copy()
return adata
def mibitof():
"""
Mibitof dataset from `squidpy`.
"""
adata = sq.datasets.mibitof()
return adata
def spatial_graph(adata):
"""
Generate the spatial graph using delaunay for the given `adata`.
`adata` will contain the calculated spatial graph contents in the keys
`adata.obps['spatial_distances']` and `adata.obsm['spatial']` afterwards too.
@return [Graph] generated networkx graph from adata['spatial_distances']
"""
sq.gr.spatial_neighbors(adata, delaunay=True, coord_type="generic")
return nx.from_scipy_sparse_array(adata.obsp['spatial_distances'])

101
src/fitting.py Normal file
View File

@@ -0,0 +1,101 @@
import gurobipy as gp
from gurobipy import GRB
import numpy as np
import matplotlib.pyplot as plt
# NOTE:
# Thank you Anna for providing her sources: https://github.com/uderhardtlab/truncated_graphs/blob/main/src/fit.py
def fit_piece_wise_linear(d, C, M=1000):
"""
Fits a piecewise linear function to the given data using optimization.
Parameters:
d (array-like): Distance values.
C (array-like): Corresponding centrality values.
M (int, optional): Large constant for big-M constraints. Default is 1000.
Returns:
tuple: Optimized slope (m), intercept (c0), and breakpoint (b).
"""
n = len(d)
model = gp.Model()
m = model.addVar(vtype=GRB.CONTINUOUS, name="m") # Slope before breakpoint
c0 = model.addVar(vtype=GRB.CONTINUOUS, name="c0") # y-intercept
b = model.addVar(vtype=GRB.CONTINUOUS, lb=min(d), ub=max(d), name="b") # Breakpoint
z = model.addVars(n, vtype=GRB.BINARY, name="z")
epsilon = model.addVars(n, vtype=GRB.CONTINUOUS, name="epsilon")
model.setObjective(gp.quicksum(epsilon[i] * epsilon[i] for i in range(n)), GRB.MINIMIZE)
# Setting solver parameters for precision
model.setParam('OptimalityTol', 1e-4)
model.setParam('MIPGap', 0.01)
for i in range(n):
# Constraints enforcing piecewise linear fit
model.addConstr(epsilon[i] >= (m * d[i] + c0 - C[i]) - (1 - z[i]) * M)
model.addConstr(epsilon[i] >= -(m * d[i] + c0 - C[i]) - (1 - z[i]) * M)
model.addConstr(epsilon[i] >= (m * b + c0 - C[i]) - z[i] * M)
model.addConstr(epsilon[i] >= -(m * b + c0 - C[i]) - z[i] * M)
# Binary switch for piecewise behavior
model.addConstr(z[i] * M >= b - d[i])
model.addConstr((1 - z[i]) * M >= d[i] - b)
model.optimize()
return m.X, c0.X, b.X
def fit_linear_regression(d, C):
"""
Fits a simple linear regression model to the given data using optimization.
Parameters:
d (array-like): Distance values.
C (array-like): Corresponding centrality values.
Returns:
tuple: Optimized slope (beta) and intercept (alpha).
"""
n = len(d)
model = gp.Model()
alpha = model.addVar(vtype=GRB.CONTINUOUS, name="alpha")
beta = model.addVar(vtype=GRB.CONTINUOUS, name="beta")
model.setObjective(gp.quicksum((C[i] - alpha - beta * d[i])**2 for i in range(n)), GRB.MINIMIZE)
model.optimize()
return beta.X, alpha.X
def plot_piece_wise_linear(d, C, m_opt, c0_opt, b_opt, measure, n, t, path):
"""
Plots the original data and optimized piecewise linear fit.
"""
d_curve = np.linspace(min(d), max(d), 500)
C_curve = np.piecewise(
d_curve,
[d_curve <= b_opt, d_curve > b_opt],
[lambda x: m_opt * x + c0_opt, lambda x: m_opt * b_opt + c0_opt]
)
plt.scatter(d, C, color="blue", label="Original", alpha=0.5)
plt.plot(d_curve, C_curve, color="red", label="Optimized", linewidth=2)
plt.xlabel("Distance to border")
plt.ylabel(f"{measure.capitalize()} centrality")
plt.title(f"Optimized piece-wise linear fit for {t} graph and {measure} centrality")
plt.legend()
plt.savefig(path, format='svg')
fig, ax = plt.subplots()
ax.set_title(metric_name)
ax.set_xlabel('Distance to Bounding-Box')
ax.set_ylabel('Centrality')
ax.scatter(d, C, color='b', s=0.2)
fig.savefig(path, format='svg')

160
src/plot.py Normal file
View File

@@ -0,0 +1,160 @@
import math
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.collections import LineCollection
from src import centrality
class Vector:
"""
Small Vector class used for calculations of distances
"""
def cross_z(a, b):
return a[0] * b[1] - a[1] * b[0]
def vec(start, end):
"""
Create a new vector from `start` to `end`
@param start [Array-2d]
@param end [Array-2d]
@return [Array-2d]
"""
return [end[0]-start[0],end[1]-start[1]]
def eql(point, other):
"""
Check if `point` is equivalent to `other`.
"""
if point is None or other is None:
return False
return point[0] == other[0] and point[1] == other[1]
def vec_len(vector):
"""
Calculate the length of a `vector`
"""
return math.sqrt(vector[0]**2 + vector[1]**2)
def graph_plot(fig, ax, G, measures, convex_hull, name, show_edges=False):
"""
Plot a given graph.
@param G [Graph] networkx Graph
@param pos [Array-2d] position values of points
@param measures [Dict] Result of a centrality calculation of networkx on graph G that should be visualized
@param name [String] Name of the measurement used, which will be integrated into the generated plot
@param path [String] Path to store the generated plot as svg file
"""
pos = G.vp["pos"]
x = []
y = []
for v in G.vertices():
# print(pos[v])
ver = pos[v]
x.append(ver[0])
y.append(ver[1])
c = measures.get_array()
# convex hull -> Bounding-Box
ch = LineCollection([convex_hull], colors=['g'], linewidths=1)
ax.add_collection(ch)
if show_edges:
for e in G.edges():
ex = [pos[e.source()][0], pos[e.target()][0]]
ey = [pos[e.source()][1], pos[e.target()][1]]
ax.add_collection(LineCollection([np.column_stack([ex, ey])], colors=['k'], linewidths=0.1))
sc = ax.scatter(x, y, s=1, cmap=plt.cm.plasma, c=c) # map closeness values as color mapping on the verticies
ax.set_title(name)
fig.colorbar(sc, ax=ax)
def normalize_dict(d):
max = np.max(list(d.values()))
return {k: (v / max) for k, v in d.items()}
def quantification_data(G, measures, convex_hull):
"""
Create distance to metric relationship.
@param pos [Array-2d] Positions of points
@param metric [Dict] Result of a centrality calculation of networkx on graph G
@return [Array-2d] relationship ordered (acending) by distance
"""
quantification = []
pos = G.vp["pos"]
x = []
y = []
for v in G.vertices():
# print(pos[v])
ver = pos[v]
x.append(ver[0])
y.append(ver[1])
measures = measures.a
keys = iter(measures)
points = np.stack((np.array(x), np.array(y)), axis=-1)
for point in points:
min_distance = math.inf
key = next(keys)
for edge in convex_hull:
vector = Vector.vec(point, edge)
distance = Vector.vec_len(vector)
if distance < min_distance:
min_distance = distance
quantification.append([min_distance, key])
# sort by distance
quantification.sort(key=lambda entry: entry[0])
return np.array(quantification)
def quantification_plot(ax, quantification, d_curve, C_curve, metric_name):
"""
Plot relationship data.
@param data [Array-2d] see `data(pos, metric)`
@param d_curve linear function of the left side of the intersection point
@param C_curve constant function of the right side of the intersection point
@param metric_name [String] Name of the metric to be used as a title for the plot
@param path [String] Path to store the generated plot as svg file
"""
ax.set_title(metric_name)
ax.set_xlabel('Distance to Bounding-Box')
ax.set_ylabel('Centrality')
ax.scatter(quantification[:, 0], quantification[:, 1], c=quantification[:, 1], cmap=plt.cm.plasma, s=0.2)
if d_curve is not None and C_curve is not None:
ax.plot(d_curve, C_curve, color='g', linewidth=1, label='Piecewise Linear Model')
class Quantification:
def opt_data(pos, metric, b_opt):
"""
Optimal data relationship. Describe Points which are effected by the boundary.
@param pos [Array-2d] Positions of points
@param metric [Dict] Result of a centrality calculation of networkx on graph G
@param b_opt [int] Cross point distance.
@return [Dict] `metric` like dict (same keys) where True determines a node that is uneffected by the boundary, otherwise False.
"""
keys = iter(metric.keys())
boundary_effected = {}
hull = convex_hull(pos)
for point in pos:
min_distance = math.inf
key = next(keys)
for edge in hull:
vector = Vector.vec(point, edge)
distance = Vector.vec_len(vector)
if distance < min_distance:
min_distance = distance
if min_distance <= b_opt:
boundary_effected[key] = True
else:
boundary_effected[key] = False
# sort by distance
return boundary_effected