API documentation
This pkg aims to implement serveral filtering methods for (un)directed graphs.
Edge filtering methods allows to extract the backbone of a graph or sampling the most important edges. You can use edge filtering methods as a preprocessing step aiming to improve the performance/results of graph algorithms or to turn a graph visualtzation more asthetic.
See the example below for a simple usage of the package.
import networkx as nx
import edgeseraser as ee
g = nx.erdos_renyi_graph(100, 0.4)
ee.noise_score.filter_nx_graph(g)
g # filtered graph
Available methods and details
Method | Description | suitable for | limitations/restrictions/details |
---|---|---|---|
Noise Score | Filters edges with high noise score. Paper:1 | Directed, Undirected, Weighted | Very good and fast! 4 |
Disparity | Dirichlet process filter (stick-breaking) Paper:2 | Directed, Undirected, Weighted | There are some criticism regarding the use in undirected graphs3 |
Pólya-Urn | Filters edges with Pólya-Urn method. Paper:5 | Directed, Undirected, Integer Weighted |
disparity
cond_edges2erase(alphas: ndarray, thresh: float = 0.1) -> ndarray
Parameters:
Name | Type | Description | Default |
---|---|---|---|
alphas |
ndarray |
np.array edge scores |
required |
thresh |
float |
float Between 0 and 1. |
0.1 |
Returns:
Type | Description |
---|---|
np.array |
indices of edges to be erased |
Source code in edgeseraser/disparity.py
def cond_edges2erase(alphas: NpArrayEdgesFloat, thresh: float = 0.1) -> NpArrayEdgesIds:
"""
Args:
alphas: np.array
edge scores
thresh: float
Between 0 and 1.
Returns:
np.array:
indices of edges to be erased
"""
ids2erase: NpArrayEdgesIds = np.argwhere(alphas > thresh).flatten().astype(np.int64)
return ids2erase
filter_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, thresh: float = 0.8, cond: Literal['or', 'both', 'out', 'in'] = 'or', is_directed: bool = False) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter edges from a graph using the disparity filter. (Dirichet proccess)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
g |
networkx.Graph graph to be filtered |
required | |
thresh |
float |
float Between 0 and 1. |
0.8 |
cond |
Literal['or', 'both', 'out', 'in'] |
str "out", "in", "both", "or" |
'or' |
is_directed |
bool |
bool if True, the graph is considered as directed |
False |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array) - indices of edges to be erased - alphas scores for each edge |
Source code in edgeseraser/disparity.py
def filter_generic_graph(
num_vertices: int,
edges: NpArrayEdges,
weights: NpArrayEdgesFloat,
thresh: float = 0.8,
cond: Literal["or", "both", "out", "in"] = "or",
is_directed: bool = False,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
"""Filter edges from a graph using the disparity filter.
(Dirichet proccess)
Args:
g: networkx.Graph
graph to be filtered
thresh: float
Between 0 and 1.
cond: str
"out", "in", "both", "or"
is_directed: bool
if True, the graph is considered as directed
Returns:
(np.array, np.array)
- indices of edges to be erased
- alphas scores for each edge
"""
alphas = scores_generic_graph(
num_vertices, edges, weights, cond=cond, is_directed=is_directed
)
ids2erase = cond_edges2erase(alphas, thresh=thresh)
return ids2erase, alphas
filter_ig_graph(g: Graph, thresh: float = 0.8, cond: Literal['or', 'both', 'out', 'in'] = 'or', field: Optional[str] = None) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter edges from a igraph instance using the disparity filter. (Dirichet proccess)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
g |
Graph |
igraph.Graph graph to be filtered |
required |
thresh |
float |
float Between 0 and 1. |
0.8 |
cond |
Literal['or', 'both', 'out', 'in'] |
str "out", "in", "both", "or" |
'or' |
field |
Optional[str] |
str or None field to use for edge weights |
None |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array) - indices of edges erased - alphas scores for each edge |
Source code in edgeseraser/disparity.py
def filter_ig_graph(
g: ig.Graph,
thresh: float = 0.8,
cond: Literal["or", "both", "out", "in"] = "or",
field: Optional[str] = None,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
"""Filter edges from a igraph instance using the disparity filter.
(Dirichet proccess)
Parameters:
g: igraph.Graph
graph to be filtered
thresh: float
Between 0 and 1.
cond: str
"out", "in", "both", "or"
field: str or None
field to use for edge weights
Returns:
(np.array, np.array)
- indices of edges erased
- alphas scores for each edge
"""
assert thresh > 0.0 and thresh < 1.0, "thresh must be between 0 and 1"
edges, weights, num_vertices, opts = ig_extract(g, field)
ids2erase, alphas = filter_generic_graph(
num_vertices,
edges,
weights,
cond=cond,
is_directed=opts["is_directed"],
thresh=thresh,
)
ig_erase(g, ids2erase)
return ids2erase, alphas
filter_nx_graph(g: Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph], thresh: float = 0.8, cond: Literal['or', 'both', 'out', 'in'] = 'or', field: Optional[str] = None, remap_labels: bool = False, save_scores: bool = False) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter edges from a networkx graph using the disparity filter. (Dirichet proccess)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
g |
Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph] |
networkx.Graph graph to be filtered |
required |
thresh |
float |
float Between 0 and 1. |
0.8 |
cond |
Literal['or', 'both', 'out', 'in'] |
str "out", "in", "both", "or" |
'or' |
field |
Optional[str] |
str edge weight field |
None |
remap_labels |
bool |
bool if True, the labels of the graph will be remapped to consecutive integers |
False |
save_scores |
bool |
bool (default: False) if True, the scores of the edges will be saved in the graph attribute |
False |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array) - indices of edges erased - alphas scores for each edge |
Source code in edgeseraser/disparity.py
def filter_nx_graph(
g: Union[nx.Graph, nx.DiGraph],
thresh: float = 0.8,
cond: Literal["or", "both", "out", "in"] = "or",
field: Optional[str] = None,
remap_labels: bool = False,
save_scores: bool = False,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
"""Filter edges from a networkx graph using the disparity filter.
(Dirichet proccess)
Parameters:
g: networkx.Graph
graph to be filtered
thresh: float
Between 0 and 1.
cond: str
"out", "in", "both", "or"
field: str
edge weight field
remap_labels: bool
if True, the labels of the graph will be remapped to consecutive integers
save_scores: bool (default: False)
if True, the scores of the edges will be saved in the graph attribute
Returns:
(np.array, np.array)
- indices of edges erased
- alphas scores for each edge
"""
assert thresh > 0.0 and thresh < 1.0, "thresh must be between 0 and 1"
edges, weights, num_vertices, opts = nx_extract(g, remap_labels, field)
ids2erase, alphas = filter_generic_graph(
num_vertices,
edges,
weights,
cond=cond,
is_directed=opts["is_directed"],
thresh=thresh,
)
if save_scores:
nx.set_edge_attributes(
g,
{(u, v): {"alpha": a} for u, v, a in zip(edges[:, 0], edges[:, 1], alphas)},
)
nx_erase(g, edges[ids2erase], opts)
return ids2erase, alphas
scores_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, cond: Literal['or', 'both', 'out', 'in'] = 'or', is_directed: bool = False) -> ndarray
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_vertices |
int |
int number ofvertices |
required |
edges |
ndarray |
np.array edges |
required |
weights |
ndarray |
np.array edge weights |
required |
cond |
Literal['or', 'both', 'out', 'in'] |
str "out", "in", "both", "or" |
'or' |
Returns:
Type | Description |
---|---|
np.array |
alphas edge scores |
Source code in edgeseraser/disparity.py
def scores_generic_graph(
num_vertices: int,
edges: NpArrayEdges,
weights: NpArrayEdgesFloat,
cond: Literal["or", "both", "out", "in"] = "or",
is_directed: bool = False,
) -> NpArrayEdgesFloat:
"""
Args:
num_vertices: int
number ofvertices
edges: np.array
edges
weights: np.array
edge weights
cond: str
"out", "in", "both", "or"
Returns:
np.array:
**alphas** edge scores
"""
w_adj, adj = construct_sp_matrices(
weights, edges, num_vertices, is_directed=is_directed
)
def calc_degree(adj: Any, i: int) -> NpArrayEdgesFloat:
return np.asarray(adj.sum(axis=i)).flatten().astype(np.float64)
iin = edges[:, 1]
iout = edges[:, 0]
wdegree_out = calc_degree(w_adj, 0)[iout]
degree_out = calc_degree(adj, 0)[iout]
wdegree_in = calc_degree(w_adj, 1)[iin]
degree_in = calc_degree(adj, 1)[iin]
if cond == "out":
alphas = stick_break_scores(wdegree_out, degree_out, edges, weights)
elif cond == "in":
alphas = stick_break_scores(wdegree_in, degree_in, edges, weights)
else:
alphas_out = stick_break_scores(wdegree_out, degree_out, edges, weights)
alphas_in = stick_break_scores(wdegree_in, degree_in, edges, weights)
if cond == "both":
alphas = np.maximum(alphas_out, alphas_in)
elif cond == "or":
alphas = np.minimum(alphas_out, alphas_in)
return alphas
stick_break_scores(wdegree: ndarray, degree: ndarray, edges: ndarray, weights: ndarray) -> ndarray
Calculate the stick-breaking scores for each edge.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wdegree |
ndarray |
np.array edge weighted degree |
required |
degree |
ndarray |
np.array degree of each vertex |
required |
edges |
ndarray |
np.array edges of the graph |
required |
weights |
ndarray |
np.array edge weights |
required |
Returns:
Type | Description |
---|---|
np.array |
alphas stick-breaking scores for each edge |
Source code in edgeseraser/disparity.py
def stick_break_scores(
wdegree: NpArrayEdgesFloat,
degree: NpArrayEdgesFloat,
edges: NpArrayEdges,
weights: NpArrayEdgesFloat,
) -> NpArrayEdgesFloat:
"""
Calculate the stick-breaking scores for each edge.
Args:
wdegree: np.array
edge weighted degree
degree: np.array
degree of each vertex
edges: np.array
edges of the graph
weights: np.array
edge weights
Returns:
np.array:
**alphas** stick-breaking scores for each edge
"""
alphas = np.ones(edges.shape[0])
ids_d1: NpArrayEdgesBool = degree > 1
st = weights[ids_d1] / wdegree[ids_d1]
assert np.all(st <= 1)
alphas[ids_d1] = (1 - st) ** (degree[ids_d1] - 1)
return alphas
misc
special
fast_math
Contains some math functions that are faster than the standard library.
nbbetaln_parallel(z, w)
Compute the LogBeta function.
Algorithm extracted from Numerical Recipes in C1 chapter 6.1
Parameters:
Name | Type | Description | Default |
---|---|---|---|
z |
np.array |
required | |
w |
np.array |
required |
Returns:
Type | Description |
---|---|
np.array |
$\beta = \Gamma(z) + \Gamma(w) - \Gamma(z + w)$ $\ln(\abs \beta)$
Source code in edgeseraser/misc/fast_math.py
@njit(fastmath=True, error_model="numpy")
def nbbetaln_parallel(z, w):
"""Compute the LogBeta function.
Algorithm extracted from **Numerical Recipes in C**[1] chapter 6.1
[1]: http://www.it.uom.gr/teaching/linearalgebra/NumericalRecipiesInC/
Args:
z: np.array
w: np.array
Returns:
np.array
$\\beta = \\Gamma(z) + \\Gamma(w) - \\Gamma(z + w)$
$\\ln(\\abs \\beta)$
"""
arg = nbgammaln_parallel(z) + nbgammaln_parallel(w) - nbgammaln_parallel(z + w)
return arg
nbgammaln(z: float) -> float
Compute the log of the gamma function.
Algorithm extracted from Numerical Recipes in C1 chapter 6.1
Parameters:
Name | Type | Description | Default |
---|---|---|---|
z |
float |
np.float64 |
required |
Returns:
Type | Description |
---|---|
float |
np.float64 |
Source code in edgeseraser/misc/fast_math.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def nbgammaln(z: float) -> float:
"""Compute the log of the gamma function.
Algorithm extracted from **Numerical Recipes in C**[1] chapter 6.1
[1]: http://www.it.uom.gr/teaching/linearalgebra/NumericalRecipiesInC/
Args:
z: np.float64
Returns:
np.float64
"""
coefs = np.array(
[
57.1562356658629235,
-59.5979603554754912,
14.1360979747417471,
-0.491913816097620199,
0.339946499848118887e-4,
0.465236289270485756e-4,
-0.983744753048795646e-4,
0.158088703224912494e-3,
-0.210264441724104883e-3,
0.217439618115212643e-3,
-0.164318106536763890e-3,
0.844182239838527433e-4,
-0.261908384015814087e-4,
0.368991826595316234e-5,
]
)
y = z
f = 2.5066282746310005 / z
tmp = z + 5.24218750000000000
tmp = (z + 0.5) * np.log(tmp) - tmp
ser = 0.999999999999997092
for j in range(14):
y = y + 1.0
ser = ser + coefs[j] / y
out = tmp + np.log(f * ser)
return out
nbgammaln_parallel(z: ndarray) -> ndarray
Compute the log of the gamma function.
Algorithm extracted from Numerical Recipes in C1 chapter 6.1
Parameters:
Name | Type | Description | Default |
---|---|---|---|
z |
ndarray |
np.array |
required |
Returns:
Type | Description |
---|---|
ndarray |
np.array |
Source code in edgeseraser/misc/fast_math.py
@njit(fastmath=True, nogil=True, error_model="numpy", parallel=True)
def nbgammaln_parallel(z: np.ndarray) -> np.ndarray:
"""Compute the log of the gamma function.
Algorithm extracted from **Numerical Recipes in C**[1] chapter 6.1
[1]: http://www.it.uom.gr/teaching/linearalgebra/NumericalRecipiesInC/
Args:
z: np.array
Returns:
np.array
"""
coefs = np.array(
[
57.1562356658629235,
-59.5979603554754912,
14.1360979747417471,
-0.491913816097620199,
0.339946499848118887e-4,
0.465236289270485756e-4,
-0.983744753048795646e-4,
0.158088703224912494e-3,
-0.210264441724104883e-3,
0.217439618115212643e-3,
-0.164318106536763890e-3,
0.844182239838527433e-4,
-0.261908384015814087e-4,
0.368991826595316234e-5,
]
)
n = z.shape[0]
out = np.zeros(n)
for i in prange(n):
y = z[i]
tmp = z[i] + 5.24218750000000000
tmp = (z[i] + 0.5) * np.log(tmp) - tmp
ser = 0.999999999999997092
for j in range(14):
y = y + 1.0
ser = ser + coefs[j] / y
out[i] = tmp + np.log(2.5066282746310005 * ser / z[i])
return out
noise_score
cond_edges2erase(scores_uv: ndarray, std_uv: ndarray, thresh: float = 1.28) -> ndarray
Filter edges with high noise score.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scores_uv |
ndarray |
np.array edge scores |
required |
std_uv |
ndarray |
np.array edge standard deviations |
required |
thresh |
float |
float
|
1.28 |
Returns:
Type | Description |
---|---|
np.array |
indices of edges to be erased |
Source code in edgeseraser/noise_score.py
def cond_edges2erase(
scores_uv: NpArrayEdgesFloat, std_uv: NpArrayEdgesFloat, thresh: float = 1.28
) -> NpArrayEdgesIds:
"""Filter edges with high noise score.
Args:
scores_uv: np.array
edge scores
std_uv: np.array
edge standard deviations
thresh: float
>Since this is roughly equivalent to a one-tailed test of
statistical significance, common values of δ are 1.28, 1.64, and
2.32, which approximate p-values of 0.1, 0.05, and 0.0
Returns:
np.array:
indices of edges to be erased
"""
ids2erase = np.argwhere(scores_uv <= thresh * std_uv).flatten()
return ids2erase
filter_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, param: float = 1.28) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Compute noise corrected edge weights for a sparse graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_vertices |
int |
int |
required |
edges |
ndarray |
np.array Edges of the graph. |
required |
weights |
ndarray |
np.array Edge weights of the graph. |
required |
param |
float |
float
|
1.28 |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array, np.array) - indices of edges to be erased - noise score for each edge - standard deviation of noise score for each edge |
Source code in edgeseraser/noise_score.py
def filter_generic_graph(
num_vertices: int,
edges: NpArrayEdges,
weights: NpArrayEdgesFloat,
param: float = 1.28,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat, NpArrayEdgesFloat]:
"""Compute noise corrected edge weights for a sparse graph.
Args:
num_vertices: int
edges: np.array
Edges of the graph.
weights: np.array
Edge weights of the graph.
param: float
>Since this is roughly equivalent to a one-tailed test of
statistical significance, common values of δ are 1.28, 1.64, and
2.32, which approximate p-values of 0.1, 0.05, and 0.0
Returns:
(np.array, np.array, np.array)
- indices of edges to be erased
- noise score for each edge
- standard deviation of noise score for each edge
"""
# check if the graph is complete
w_adj, adj = construct_sp_matrices(weights, edges, num_vertices)
w_adj = sp.csr_matrix((weights, edges.T), shape=(num_vertices, num_vertices))
wdegree: NpArrayEdgesFloat = (
np.asarray(w_adj.sum(axis=1)).flatten().astype(np.float64)
)
scores_uv, std_uv = scores_generic_graph(wdegree, edges, weights)
ids2erase = cond_edges2erase(scores_uv, std_uv, thresh=param)
return ids2erase, scores_uv, std_uv
filter_ig_graph(g: Graph, param: float = 1.28, field: Optional[str] = None) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter edge with high noise score from a igraph graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
g |
Graph |
igraph.Graph Graph to be filtered. |
required |
param |
float |
float
|
1.28 |
field |
Optional[str] |
str Edge field to be used for filtering. |
None |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array, np.array) - indices of edges erased - noise score for each edge - standard deviation of noise score for each edge |
Examples:
import networkx as nx
import edgeseraser as ee
g = nx.erdos_renyi_graph(100, 0.1)
ee.noise_score.filter_nx_graph(g, field=None)
g # filtered graph
Source code in edgeseraser/noise_score.py
def filter_ig_graph(
g: ig.Graph, param: float = 1.28, field: Optional[str] = None
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat, NpArrayEdgesFloat]:
"""Filter edge with high noise score from a igraph graph.
Args:
g: igraph.Graph
Graph to be filtered.
param: float
>Since this is roughly equivalent to a one-tailed test of
statistical significance, common values of δ are 1.28, 1.64, and
2.32, which approximate p-values of 0.1, 0.05, and 0.0
field: str
Edge field to be used for filtering.
Returns:
(np.array, np.array, np.array)
- indices of edges erased
- noise score for each edge
- standard deviation of noise score for each edge
Example:
```python
import networkx as nx
import edgeseraser as ee
g = nx.erdos_renyi_graph(100, 0.1)
ee.noise_score.filter_nx_graph(g, field=None)
g # filtered graph
```
"""
edges, weights, num_vertices, opts = ig_extract(g, field)
ids2erase, scores_uv, std_uv = filter_generic_graph(
num_vertices, edges, weights, param=param
)
ig_erase(g, ids2erase)
return ids2erase, scores_uv, std_uv
filter_nx_graph(g: Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph], param: float = 1.28, field: Optional[str] = None, remap_labels: bool = False, save_scores: bool = False) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter edge with high noise score from a networkx graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
g |
Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph] |
networkx.Graph Graph to be filtered. |
required |
param |
float |
float
|
1.28 |
field |
Optional[str] |
str Edge field to be used for filtering. |
None |
remap_labels |
bool |
bool (default: False) If True, the labels of the graph will be remapped to consecutive |
False |
save_scores |
bool |
bool (default: False) If True, the noise scores will be saved in the graph. |
False |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array, np.array) - indices of edges erased - noise score for each edge - standard deviation of noise score for each edge |
Examples:
import networkx as nx
import edgeseraser as ee
g = nx.erdos_renyi_graph(100, 0.1)
ee.noise_score.filter_nx_graph(g)
g # filtered graph
Source code in edgeseraser/noise_score.py
def filter_nx_graph(
g: Union[nx.Graph, nx.DiGraph],
param: float = 1.28,
field: Optional[str] = None,
remap_labels: bool = False,
save_scores: bool = False,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat, NpArrayEdgesFloat]:
"""Filter edge with high noise score from a networkx graph.
Args:
g: networkx.Graph
Graph to be filtered.
param: float
>Since this is roughly equivalent to a one-tailed test of
statistical significance, common values of δ are 1.28, 1.64, and
2.32, which approximate p-values of 0.1, 0.05, and 0.0
field: str
Edge field to be used for filtering.
remap_labels: bool (default: False)
If True, the labels of the graph will be remapped to consecutive
save_scores: bool (default: False)
If True, the noise scores will be saved in the graph.
Returns:
(np.array, np.array, np.array)
- indices of edges erased
- noise score for each edge
- standard deviation of noise score for each edge
Example:
```python
import networkx as nx
import edgeseraser as ee
g = nx.erdos_renyi_graph(100, 0.1)
ee.noise_score.filter_nx_graph(g)
g # filtered graph
```
"""
edges, weights, num_vertices, opts = nx_extract(g, remap_labels, field)
ids2erase, scores_uv, std_uv = filter_generic_graph(
num_vertices, edges, weights, param=param
)
if save_scores:
params: NpArrayEdgesFloat = scores_uv - param * std_uv
nx.set_edge_attributes(
g,
{
(u, v): {"score": score, "std": std, "score-std": r}
for u, v, score, std, r in zip(
edges[:, 0],
edges[:, 1],
scores_uv,
std_uv,
params,
)
},
)
nx_erase(g, edges[ids2erase], opts)
return ids2erase, scores_uv, std_uv
noisy_scores(st_u: ndarray, st_v: ndarray, vol: float, w: ndarray) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Get noise score and the std for each edge
Parameters:
Name | Type | Description | Default |
---|---|---|---|
st_u |
ndarray |
np.array strength for each vertex |
required |
st_v |
ndarray |
np.array strength for each vertex |
required |
vol |
float |
float volume of the graph |
required |
w |
ndarray |
np.array edge weights |
required |
Returns:
Type | Description |
---|---|
(np.array, np.array) |
|
Source code in edgeseraser/noise_score.py
def noisy_scores(
st_u: NpArrayEdgesFloat, st_v: NpArrayEdgesFloat, vol: float, w: NpArrayEdgesFloat
) -> Tuple[NpArrayEdgesFloat, NpArrayEdgesFloat]:
"""
Get noise score and the std for each edge
Args:
st_u: np.array
strength for each vertex
st_v: np.array
strength for each vertex
vol: float
volume of the graph
w: np.array
edge weights
Returns:
(np.array, np.array):
- **scores_uv**
Noise score for each edge.
- **std_uv**
standard deviation of noise score for each edge
"""
st_prod_uv = np.multiply(st_u, st_v)
mean_prior_prob = st_prod_uv / (vol) ** 2.0
kappa = vol / st_prod_uv
score = (kappa * w - 1) / (kappa * w + 1)
var_prior_prob = (
(1 / vol**2.0)
* (st_prod_uv * (vol - st_u) * (vol - st_v))
/ (vol**2 * (vol - 1))
)
alpha_prior = (mean_prior_prob**2 / var_prior_prob) * (
1 - mean_prior_prob
) - mean_prior_prob
beta_prior = (
(mean_prior_prob / var_prior_prob) * (1 - mean_prior_prob**2.0)
- 1
+ mean_prior_prob
)
alpha_post = alpha_prior + w
beta_post = vol - w + beta_prior
expected_puv = alpha_post / (alpha_post + beta_post)
variance_uv = expected_puv * (1 - expected_puv) * vol
d = 1 / (st_prod_uv) - vol * ((st_u + st_v) / st_prod_uv**2)
variance_cuv = variance_uv * (2 * (kappa + w * d) / (kappa * w + 1) ** 2.0) ** 2.0
sdev_cuv = variance_cuv**0.5
return score, sdev_cuv
scores_generic_graph(wdegree: ndarray, edges: ndarray, weights: ndarray) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Compute noise corrected edge weights for a sparse graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wdegree |
ndarray |
np.ndarray Weight degree of each vertex. |
required |
edges |
ndarray |
np.array Edges of the graph. |
required |
weights |
ndarray |
np.array Edge weights of the graph. |
required |
Returns:
Type | Description |
---|---|
(np.array, np.array) |
|
Source code in edgeseraser/noise_score.py
def scores_generic_graph(
wdegree: NpArrayEdgesFloat, edges: NpArrayEdges, weights: NpArrayEdgesFloat
) -> Tuple[NpArrayEdgesFloat, NpArrayEdgesFloat]:
"""Compute noise corrected edge weights for a sparse graph.
Args:
wdegree: np.ndarray
Weight degree of each vertex.
edges: np.array
Edges of the graph.
weights: np.array
Edge weights of the graph.
Returns:
(np.array, np.array):
- **scores_uv**
Noise corrected edge weights.
- **std_uv**
Standard deviation of noise corrected edge weights.
"""
vol = wdegree.sum()
st_u = wdegree[edges[:, 0]]
st_v = wdegree[edges[:, 1]]
scores = st_u * st_v
ids_0 = np.argwhere(scores == 0)
st_u[ids_0] = 1
st_v[ids_0] = 1
scores_uv, std_uv = noisy_scores(st_u, st_v, vol, weights)
scores_uv[ids_0] = 0
std_uv[ids_0] = 0.0
return scores_uv, std_uv
polya
cond_edges2erase(alphas: ndarray, thresh: float = 0.1) -> ndarray
Parameters:
Name | Type | Description | Default |
---|---|---|---|
alphas |
ndarray |
np.array edge scores |
required |
thresh |
float |
float Between 0 and 1. |
0.1 |
Returns:
Type | Description |
---|---|
np.array |
indices of edges to be erased |
Source code in edgeseraser/polya.py
def cond_edges2erase(alphas: NpArrayEdgesFloat, thresh: float = 0.1) -> NpArrayEdgesIds:
"""
Args:
alphas: np.array
edge scores
thresh: float
Between 0 and 1.
Returns:
np.array:
indices of edges to be erased
"""
ids2erase: NpArrayEdgesIds = np.argwhere(alphas > thresh).flatten().astype("int64")
return ids2erase
filter_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, thresh: float = 0.4, a: float = 1, apt_lvl: int = 10, is_directed: bool = False, eps: float = 1e-20, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter the graph using the Pólya-based method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_vertices |
int |
int number of vertices |
required |
edges |
ndarray |
np.array edges |
required |
weights |
ndarray |
np.array edge weights |
required |
thresh |
float |
float |
0.4 |
a |
float |
float |
1 |
apt_lvl |
int |
int |
10 |
is_directed |
bool |
bool |
False |
eps |
float |
float |
1e-20 |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array) - indices of edges to be erased - probability for each edge |
Source code in edgeseraser/polya.py
def filter_generic_graph(
num_vertices: int,
edges: NpArrayEdges,
weights: NpArrayEdgesFloat,
thresh: float = 0.4,
a: float = 1,
apt_lvl: int = 10,
is_directed: bool = False,
eps: float = 1e-20,
optimization: OptionsForCache = "lru-nb-szuszik",
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
"""Filter the graph using the Pólya-based method.
Args:
num_vertices: int
number of vertices
edges: np.array
edges
weights: np.array
edge weights
thresh: float
a: float
apt_lvl: int
is_directed: bool
eps: float
Returns:
(np.array, np.array)
- indices of edges to be erased
- probability for each edge
"""
p = scores_generic_graph(
num_vertices,
edges,
weights,
a=a,
apt_lvl=apt_lvl,
is_directed=is_directed,
eps=eps,
optimization=optimization,
)
ids2erase = cond_edges2erase(p, thresh=thresh)
return ids2erase, p
filter_ig_graph(g: Graph, thresh: float = 0.5, field: Optional[str] = None, a: float = 2, apt_lvl: int = 10, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter edges from a igraph using the Pólya-Urn filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
g |
Graph |
ig.Graph graph to be filtered |
required |
thresh |
float |
float |
0.5 |
field |
Optional[str] |
str |
None |
a |
float |
float 0 is the Binomial distribution, 1 the filter will behave like the Disparity filter. |
2 |
apt_lvl |
int |
int |
10 |
Return: (np.array, np.array) - indices of edges erased - probability for each edge
Source code in edgeseraser/polya.py
def filter_ig_graph(
g: ig.Graph,
thresh: float = 0.5,
field: Optional[str] = None,
a: float = 2,
apt_lvl: int = 10,
optimization: OptionsForCache = "lru-nb-szuszik",
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
"""Filter edges from a igraph using the Pólya-Urn filter.
Parameters:
g: ig.Graph
graph to be filtered
thresh: float
field: str
a: float
0 is the Binomial distribution,
1 the filter will behave like the Disparity filter.
apt_lvl: int
Return:
(np.array, np.array)
- indices of edges erased
- probability for each edge
"""
edges, weights, num_vertices, opts = ig_extract(g, field)
is_directed: bool = opts["is_directed"]
ids2erase, probs = filter_generic_graph(
num_vertices,
edges,
weights,
is_directed=is_directed,
a=a,
apt_lvl=apt_lvl,
thresh=thresh,
optimization=optimization,
)
ig_erase(g, ids2erase)
return ids2erase, probs
filter_nx_graph(g: Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph], thresh: float = 0.5, field: Optional[str] = None, a: float = 2, apt_lvl: int = 10, remap_labels: bool = False, save_scores: bool = False, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]
Filter edges from a networkx graph using the Pólya-Urn filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
g |
Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph] |
networkx.Graph graph to be filtered |
required |
thresh |
float |
float |
0.5 |
field |
Optional[str] |
str |
None |
a |
float |
float 0 is the Binomial distribution, 1 the filter will behave like the Disparity filter. |
2 |
apt_lvl |
int |
int |
10 |
remap_labels |
bool |
bool If True, the labels of the nodes are remapped to consecutive integers. |
False |
save_scores |
bool |
bool (default: False) If True, the scores of the edges are saved in the graph. |
False |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
(np.array, np.array) - indices of edges erased - probability for each edge |
Source code in edgeseraser/polya.py
def filter_nx_graph(
g: Union[nx.Graph, nx.DiGraph],
thresh: float = 0.5,
field: Optional[str] = None,
a: float = 2,
apt_lvl: int = 10,
remap_labels: bool = False,
save_scores: bool = False,
optimization: OptionsForCache = "lru-nb-szuszik",
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
"""Filter edges from a networkx graph using the Pólya-Urn filter.
Parameters:
g: networkx.Graph
graph to be filtered
thresh: float
field: str
a: float
0 is the Binomial distribution,
1 the filter will behave like the Disparity filter.
apt_lvl: int
remap_labels: bool
If True, the labels of the nodes are remapped to consecutive integers.
save_scores: bool (default: False)
If True, the scores of the edges are saved in the graph.
Returns:
(np.array, np.array)
- indices of edges erased
- probability for each edge
"""
edges, weights, num_vertices, opts = nx_extract(g, remap_labels, field)
is_directed: bool = opts["is_directed"]
ids2erase, probs = filter_generic_graph(
num_vertices,
edges,
weights,
is_directed=is_directed,
a=a,
apt_lvl=apt_lvl,
thresh=thresh,
optimization=optimization,
)
if save_scores:
nx.set_edge_attributes(
g,
{
(u, v): {"prob": prob}
for u, v, prob in zip(edges[:, 0], edges[:, 1], probs)
},
)
nx_erase(g, edges[ids2erase], opts)
return ids2erase, probs
scores_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, a: float = 1, apt_lvl: int = 10, is_directed: bool = False, eps: float = 1e-20, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> ndarray
Compute the probability for each edge using the Pólya-based method for a generic weighted graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_vertices |
int |
int number of vertices |
required |
edges |
ndarray |
np.array edges |
required |
weights |
ndarray |
np.array edge weights |
required |
a |
float |
float |
1 |
apt_lvl |
int |
int |
10 |
is_directed |
bool |
bool |
False |
eps |
float |
float |
1e-20 |
optimization |
Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] |
OptionsForCache |
'lru-nb-szuszik' |
Returns:
Type | Description |
---|---|
np.array |
edge scores. Probability values |
Source code in edgeseraser/polya.py
def scores_generic_graph(
num_vertices: int,
edges: NpArrayEdges,
weights: NpArrayEdgesFloat,
a: float = 1,
apt_lvl: int = 10,
is_directed: bool = False,
eps: float = 1e-20,
optimization: OptionsForCache = "lru-nb-szuszik",
) -> NpArrayEdgesFloat:
"""Compute the probability for each edge using the Pólya-based method for
a generic weighted graph.
Args:
num_vertices: int
number of vertices
edges: np.array
edges
weights: np.array
edge weights
a: float
apt_lvl: int
is_directed: bool
eps: float
optimization: OptionsForCache
Returns:
np.array:
edge scores. Probability values
"""
w_adj, adj = construct_sp_matrices(
weights, edges, num_vertices, is_directed=is_directed
)
def calc_degree(x: csr_matrix, i: int) -> NpArrayEdgesFloat:
return np.asarray(x.sum(axis=i)).flatten()
ids_out = edges[:, 0]
ids_in = edges[:, 1]
wdegree_out = calc_degree(w_adj, 1)[ids_out]
wdegree_in = calc_degree(w_adj, 0)[ids_in]
degree_out = calc_degree(adj, 1)[ids_out]
degree_in = calc_degree(adj, 0)[ids_in]
if np.mod(weights, 1).sum() > eps:
# non integer weights
apt_lvl = 0
if optimization == "lru-nb":
cache_obj = NbComputePolyaCacheDict()
elif optimization == "lru-nb-szuszik":
cache_obj = NbComputePolyaCacheSzuszik()
elif optimization == "lru-nbf":
cache_obj = NbGammaLnCache()
else:
cache_obj = None
p_in = polya_cdf(
wdegree_in,
degree_in,
weights,
a,
apt_lvl,
optimization=optimization,
cache_obj=cache_obj,
)
p_out = polya_cdf(
wdegree_out,
degree_out,
weights,
a,
apt_lvl,
optimization=optimization,
cache_obj=cache_obj,
)
p: NpArrayEdgesFloat = np.minimum(p_in, p_out)
return p
polya_tools
special
numba_tools
compute_polya_pdf(x: ndarray, w: ndarray, k: ndarray, a: float) -> ndarray
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
ndarray |
np.array array of integer weights |
required |
w |
ndarray |
float weighted degree |
required |
k |
ndarray |
float degree |
required |
a |
float |
float |
required |
Returns:
Type | Description |
---|---|
ndarray |
np.array |
Source code in edgeseraser/polya_tools/numba_tools.py
@njit(
nb_float64[:](nb_float64[:], nb_float64, nb_float64, nb_float64),
nogil=True,
)
def compute_polya_pdf(
x: NpArrayEdgesFloat, w: NpArrayEdgesFloat, k: NpArrayEdgesFloat, a: float
) -> NpArrayEdgesFloat:
"""
Args:
x: np.array
array of integer weights
w: float
weighted degree
k: float
degree
a: float
Returns:
np.array
"""
a_inv = 1.0 / a
b = (k - 1.0) * a_inv
ones = np.ones(x.shape[0], dtype=np.float64)
p: NpArrayEdgesFloat = np.exp(
nbgammaln_parallel(w + ones)
+ nbbetaln_parallel(x + a_inv, w - x + b)
- nbgammaln_parallel(x + ones)
- nbgammaln_parallel(w - x + ones)
- nbbetaln_parallel(a_inv * ones, b * ones)
)
return p
integer_cdf_lru_nb(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float, cache_obj) -> ndarray
Compute the prob of the integer weight distribution using numba JIT and parallelization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wdegree |
ndarray |
np.array edge weighted degrees |
required |
degree |
ndarray |
np.array vertex degrees |
required |
weights |
ndarray |
np.array edge weights |
required |
a |
float |
float |
required |
Returns:
Type | Description |
---|---|
np.array |
Probability values for each edge with integer weights |
Source code in edgeseraser/polya_tools/numba_tools.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def integer_cdf_lru_nb(
wdegree: NpArrayEdgesFloat,
degree: NpArrayEdgesFloat,
weights: NpArrayEdgesFloat,
a: float,
cache_obj,
) -> NpArrayEdgesFloat:
"""Compute the prob of the integer weight distribution using numba JIT and parallelization.
Args:
wdegree: np.array
edge weighted degrees
degree: np.array
vertex degrees
weights: np.array
edge weights
a: float
Returns:
np.array:
Probability values for each edge with integer weights
"""
n = wdegree.shape[0]
p = np.zeros(n, dtype=np.float64)
for i in range(n):
wi = int(weights[i])
if wi < 2:
p[i] = 0.0
continue
wd = wdegree[i]
d = degree[i]
p[i] = cache_obj.call(wi, wd, d, a)
return p
integer_cdf_lru_nb_f(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float, cache_obj) -> ndarray
Compute the prob of the integer weight distribution using numba JIT and parallelization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wdegree |
ndarray |
np.array edge weighted degrees |
required |
degree |
ndarray |
np.array vertex degrees |
required |
weights |
ndarray |
np.array |
required |
a |
float |
float |
required |
cache_obj |
object |
required |
Returns:
Type | Description |
---|---|
np.array |
Probability values for each edge with integer weights |
Source code in edgeseraser/polya_tools/numba_tools.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def integer_cdf_lru_nb_f(
wdegree: NpArrayEdgesFloat,
degree: NpArrayEdgesFloat,
weights: NpArrayEdgesFloat,
a: float,
cache_obj,
) -> NpArrayEdgesFloat:
"""Compute the prob of the integer weight distribution using numba JIT and parallelization.
Args:
wdegree: np.array
edge weighted degrees
degree: np.array
vertex degrees
weights: np.array
edge weights
a: float
cache_obj: object
Returns:
np.array:
Probability values for each edge with integer weights
"""
a_inv = 1 / a
n = wdegree.shape[0]
p = np.zeros(n, dtype=np.float64)
for i in range(n):
wi = int(weights[i])
if wi < 2:
p[i] = 0.0
continue
wd = wdegree[i]
d = degree[i]
b = (d - 1.0) * a_inv
betaln2 = (
cache_obj.callp(a_inv) + cache_obj.callp(b) - cache_obj.callp(a_inv + b)
)
gammaln1 = cache_obj.callp(wd + 1.0)
gammalnbetaln1 = cache_obj.callp(wd + a_inv + b)
r = 0.0
x = 0.0
for _ in range(0, wi):
z1 = x + a_inv
z2 = wd - x + b
z3 = x + 1.0
z4 = wd - x + 1.0
v1 = cache_obj.callp(z1)
v2 = cache_obj.callp(z2)
v3 = cache_obj.callp(z3)
v4 = cache_obj.callp(z4)
betaln1 = v1 + v2 - gammalnbetaln1
r += np.exp(gammaln1 - v3 - v4 + betaln1 - betaln2)
x += 1
p[i] = 1 - r
return p
integer_cdf_nb(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float) -> ndarray
Compute the prob of the integer weight distribution using numba JIT and parallelization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wdegree |
ndarray |
np.array edge weighted degrees |
required |
degree |
ndarray |
np.array vertex degrees |
required |
weights |
ndarray |
np.array edge weights |
required |
a |
float |
float |
required |
Returns:
Type | Description |
---|---|
np.array |
Probability values for each edge with integer weights |
Source code in edgeseraser/polya_tools/numba_tools.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def integer_cdf_nb(
wdegree: NpArrayEdgesFloat,
degree: NpArrayEdgesFloat,
weights: NpArrayEdgesFloat,
a: float,
) -> NpArrayEdgesFloat:
"""Compute the prob of the integer weight distribution using numba JIT and parallelization.
Args:
wdegree: np.array
edge weighted degrees
degree: np.array
vertex degrees
weights: np.array
edge weights
a: float
Returns:
np.array:
Probability values for each edge with integer weights
"""
n = wdegree.shape[0]
p = np.zeros(n, dtype=np.float64)
for i in range(n):
wi = int(weights[i])
if wi < 2:
p[i] = 0.0
continue
wd = wdegree[i]
d = degree[i]
x = np.arange(0, wi, dtype=np.float64)
polya_pdf = compute_polya_pdf(x, wd, d, a)
p[i] = 1 - np.sum(polya_pdf)
return p
szuszik(a, b)
Szuszik's pairing algorithm is a map from a pair of natural numbers to a unique natural number. $\mathcal{N} imes\mathcal{N}\mapsto N$.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
a |
int first integer |
required | |
b |
int second integer |
required |
Returns:
Type | Description |
---|---|
int Szuszik's algorithm result |
Source code in edgeseraser/polya_tools/numba_tools.py
@njit(nb_int64(nb_int64, nb_int64))
def szuszik(a, b):
"""
Szuszik's pairing algorithm is a map from a pair of natural numbers to a unique
natural number.
$\\mathcal{N}\times\\mathcal{N}\\mapsto N$.
Args:
a: int
first integer
b: int
second integer
Returns:
int
Szuszik's algorithm result
"""
if a != max(a, b):
map_key = pow(b, 2) + a
else:
map_key = pow(a, 2) + a + b
return map_key
statistics
compute_polya_pdf_approx(w: ndarray, n: ndarray, k: ndarray, a: float = 0.0) -> ndarray
Parameters:
Name | Type | Description | Default |
---|---|---|---|
w |
ndarray |
np.array edge weights |
required |
n |
ndarray |
np.array |
required |
k |
ndarray |
np.array degree |
required |
a |
float |
float (default: 0) |
0.0 |
Returns:
Type | Description |
---|---|
ndarray |
np.array |
Source code in edgeseraser/polya_tools/statistics.py
def compute_polya_pdf_approx(
w: NpArrayEdgesFloat, n: NpArrayEdgesFloat, k: NpArrayEdgesFloat, a: float = 0.0
) -> NpArrayEdgesFloat:
"""
Args:
w: np.array
edge weights
n: np.array
k: np.array
degree
a: float (default: 0)
Returns:
np.array
"""
if a == 0.0:
prob_success: NpArrayEdgesFloat = 1.0 / k
p: NpArrayEdgesFloat = stats.binom.cdf(w, n, prob_success)
return p
a_inv = 1 / a
b = (k - 1) * a_inv
if a == 1.0:
p = (1 - w / n) ** (k - 1)
else:
p = 1 / gamma(a_inv) * ((1 - w / n) ** (b)) * ((w * k / n * a) ** (a_inv - 1))
return p
integer_cdf_lru_py_native(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float) -> ndarray
Compute the prob of the integer weight distribution using numba JIT and parallelization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wdegree |
ndarray |
np.array edge weighted degrees |
required |
degree |
ndarray |
np.array vertex degrees |
required |
weights |
ndarray |
np.array edge weights |
required |
a |
float |
float |
required |
Returns:
Type | Description |
---|---|
np.array |
Probability values for each edge with integer weights |
Source code in edgeseraser/polya_tools/statistics.py
def integer_cdf_lru_py_native(
wdegree: NpArrayEdgesFloat,
degree: NpArrayEdgesFloat,
weights: NpArrayEdgesFloat,
a: float,
) -> NpArrayEdgesFloat:
"""Compute the prob of the integer weight distribution using numba JIT and parallelization.
Args:
wdegree: np.array
edge weighted degrees
degree: np.array
vertex degrees
weights: np.array
edge weights
a: float
Returns:
np.array:
Probability values for each edge with integer weights
"""
n = wdegree.shape[0]
p = np.zeros(n, dtype=np.float64)
for i in range(n):
wi = int(weights[i])
if wi < 2:
p[i] = 0.0
continue
wd = wdegree[i]
d = degree[i]
p[i] = polya_cdf_lru_py_native(wi, wd, d, a)
return p
polya_cdf(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float, apt_lvl: float, eps: float = 1e-20, check_consistency: bool = False, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb', cache_obj = None) -> ndarray
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wdegree |
ndarray |
np.array edge weighted degrees |
required |
degree |
ndarray |
np.array vertex degrees |
required |
weights |
ndarray |
np.array edge weights |
required |
a |
float |
float |
required |
apt_lvl |
float |
int |
required |
eps |
float |
float |
1e-20 |
check_consistency |
bool |
bool (default: False) |
False |
Returns:
Type | Description |
---|---|
np.array |
Probability values for each edge |
Source code in edgeseraser/polya_tools/statistics.py
def polya_cdf(
wdegree: NpArrayEdgesFloat,
degree: NpArrayEdgesFloat,
weights: NpArrayEdgesFloat,
a: float,
apt_lvl: float,
eps: float = 1e-20,
check_consistency: bool = False,
optimization: Literal[
"lru-nb", "lru-nb-szuszik", "lru-nbf", "lru-py-nb", "nb"
] = "lru-nb",
cache_obj=None,
) -> NpArrayEdgesFloat:
"""
Args:
wdegree: np.array
edge weighted degrees
degree: np.array
vertex degrees
weights: np.array
edge weights
a: float
apt_lvl: int
eps: float
check_consistency: bool (default: False)
Returns:
np.array:
Probability values for each edge
"""
k_high: NpArrayEdgesBool = degree > 1
k = degree[k_high]
w = weights[k_high]
s = wdegree[k_high]
scores = np.zeros_like(weights)
if a == 0:
p = compute_polya_pdf_approx(w, s, k)
return p
a_inv = 1.0 / a
b = (k - 1) * a_inv
if check_consistency:
diff_strength = s - w
assert np.all(diff_strength >= 0)
idx = (
(s - w >= apt_lvl * (b + 1))
* (w >= apt_lvl * np.maximum(a_inv, 1))
* (s >= apt_lvl * np.maximum(k * a_inv, 1))
* (k >= apt_lvl * (a - 1 + eps))
)
p = np.zeros(w.shape[0])
idx_true = np.argwhere(idx).flatten()
if len(idx_true) > 0:
p[idx_true] = compute_polya_pdf_approx(
w[idx_true], s[idx_true], k[idx_true], a=a
)
non_zero_idx = np.argwhere(~idx).flatten()
non_zero_idx = non_zero_idx[(s[~idx] > 0) & (k[~idx] > 1)]
if len(non_zero_idx) > 0:
if optimization == "lru-nb" or optimization == "lru-nb-szuszik":
if cache_obj is None:
raise ValueError("Cache object is None")
vals = integer_cdf_lru_nb(
s[non_zero_idx],
k[non_zero_idx],
w[non_zero_idx],
a=a,
cache_obj=cache_obj,
)
elif optimization == "lru-nbf":
vals = integer_cdf_lru_nb_f(
s[non_zero_idx],
k[non_zero_idx],
w[non_zero_idx],
a=a,
cache_obj=cache_obj,
)
elif optimization == "lru-py-nb":
vals = integer_cdf_lru_py_native(
s[non_zero_idx], k[non_zero_idx], w[non_zero_idx], a
)
elif optimization == "nb":
vals = integer_cdf_nb(s[non_zero_idx], k[non_zero_idx], w[non_zero_idx], a)
p[non_zero_idx] = vals
scores[k_high] = p
return scores
polya_cdf_lru_py_native(size, w, k, a)
Compute the Pólya-Urn for integer weights
The results are cached using python's lru_cache.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
size |
int size of the distribution |
required | |
w |
float weighted degree |
required | |
k |
float degree |
required | |
a |
float |
required |
Returns:
Type | Description |
---|---|
float |
between 0 and 1 |
Source code in edgeseraser/polya_tools/statistics.py
@lru_cache(None)
def polya_cdf_lru_py_native(size, w, k, a):
"""Compute the Pólya-Urn for integer weights
The results are cached using python's lru_cache.
Args:
size: int
size of the distribution
w: float
weighted degree
k: float
degree
a: float
Returns:
float:
between 0 and 1
"""
x = np.arange(0, size, dtype=np.float64)
polya_pdf = compute_polya_pdf(x, w, k, a)
return 1 - np.sum(polya_pdf)