Skip to content

API documentation

This pkg aims to implement serveral filtering methods for (un)directed graphs.

Edge filtering methods allows to extract the backbone of a graph or sampling the most important edges. You can use edge filtering methods as a preprocessing step aiming to improve the performance/results of graph algorithms or to turn a graph visualtzation more asthetic.

See the example below for a simple usage of the package.

import networkx as nx
import edgeseraser as ee

g = nx.erdos_renyi_graph(100, 0.4)
ee.noise_score.filter_nx_graph(g)

g # filtered graph

Available methods and details

Method Description suitable for limitations/restrictions/details
Noise Score Filters edges with high noise score. Paper:1 Directed, Undirected, Weighted Very good and fast! 4
Disparity Dirichlet process filter (stick-breaking) Paper:2 Directed, Undirected, Weighted There are some criticism regarding the use in undirected graphs3
Pólya-Urn Filters edges with Pólya-Urn method. Paper:5 Directed, Undirected, Integer Weighted

disparity

cond_edges2erase(alphas: ndarray, thresh: float = 0.1) -> ndarray

Parameters:

Name Type Description Default
alphas ndarray

np.array edge scores

required
thresh float

float Between 0 and 1.

0.1

Returns:

Type Description
np.array

indices of edges to be erased

Source code in edgeseraser/disparity.py
def cond_edges2erase(alphas: NpArrayEdgesFloat, thresh: float = 0.1) -> NpArrayEdgesIds:
    """
    Args:
        alphas: np.array
            edge scores
        thresh: float
            Between 0 and 1.
    Returns:
        np.array:
            indices of edges to be erased

    """
    ids2erase: NpArrayEdgesIds = np.argwhere(alphas > thresh).flatten().astype(np.int64)
    return ids2erase

filter_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, thresh: float = 0.8, cond: Literal['or', 'both', 'out', 'in'] = 'or', is_directed: bool = False) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter edges from a graph using the disparity filter. (Dirichet proccess)

Parameters:

Name Type Description Default
g

networkx.Graph graph to be filtered

required
thresh float

float Between 0 and 1.

0.8
cond Literal['or', 'both', 'out', 'in']

str "out", "in", "both", "or"

'or'
is_directed bool

bool if True, the graph is considered as directed

False

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array) - indices of edges to be erased - alphas scores for each edge

Source code in edgeseraser/disparity.py
def filter_generic_graph(
    num_vertices: int,
    edges: NpArrayEdges,
    weights: NpArrayEdgesFloat,
    thresh: float = 0.8,
    cond: Literal["or", "both", "out", "in"] = "or",
    is_directed: bool = False,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
    """Filter edges from a graph using the disparity filter.
    (Dirichet proccess)

    Args:
        g: networkx.Graph
            graph to be filtered
        thresh: float
            Between 0 and 1.
        cond: str
            "out", "in", "both", "or"
        is_directed: bool
            if True, the graph is considered as directed
    Returns:
        (np.array, np.array)
        -  indices of edges to be erased
        -  alphas scores for each edge


    """
    alphas = scores_generic_graph(
        num_vertices, edges, weights, cond=cond, is_directed=is_directed
    )

    ids2erase = cond_edges2erase(alphas, thresh=thresh)
    return ids2erase, alphas

filter_ig_graph(g: Graph, thresh: float = 0.8, cond: Literal['or', 'both', 'out', 'in'] = 'or', field: Optional[str] = None) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter edges from a igraph instance using the disparity filter. (Dirichet proccess)

Parameters:

Name Type Description Default
g Graph

igraph.Graph graph to be filtered

required
thresh float

float Between 0 and 1.

0.8
cond Literal['or', 'both', 'out', 'in']

str "out", "in", "both", "or"

'or'
field Optional[str]

str or None field to use for edge weights

None

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array) - indices of edges erased - alphas scores for each edge

Source code in edgeseraser/disparity.py
def filter_ig_graph(
    g: ig.Graph,
    thresh: float = 0.8,
    cond: Literal["or", "both", "out", "in"] = "or",
    field: Optional[str] = None,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
    """Filter edges from a igraph instance using the disparity filter.
    (Dirichet proccess)

    Parameters:
        g: igraph.Graph
            graph to be filtered
        thresh: float
            Between 0 and 1.
        cond: str
            "out", "in", "both", "or"
        field: str or None
            field to use for edge weights
    Returns:
        (np.array, np.array)
        -  indices of edges erased
        -  alphas scores for each edge

    """
    assert thresh > 0.0 and thresh < 1.0, "thresh must be between 0 and 1"
    edges, weights, num_vertices, opts = ig_extract(g, field)
    ids2erase, alphas = filter_generic_graph(
        num_vertices,
        edges,
        weights,
        cond=cond,
        is_directed=opts["is_directed"],
        thresh=thresh,
    )
    ig_erase(g, ids2erase)
    return ids2erase, alphas

filter_nx_graph(g: Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph], thresh: float = 0.8, cond: Literal['or', 'both', 'out', 'in'] = 'or', field: Optional[str] = None, remap_labels: bool = False, save_scores: bool = False) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter edges from a networkx graph using the disparity filter. (Dirichet proccess)

Parameters:

Name Type Description Default
g Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph]

networkx.Graph graph to be filtered

required
thresh float

float Between 0 and 1.

0.8
cond Literal['or', 'both', 'out', 'in']

str "out", "in", "both", "or"

'or'
field Optional[str]

str edge weight field

None
remap_labels bool

bool if True, the labels of the graph will be remapped to consecutive integers

False
save_scores bool

bool (default: False) if True, the scores of the edges will be saved in the graph attribute

False

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array) - indices of edges erased - alphas scores for each edge

Source code in edgeseraser/disparity.py
def filter_nx_graph(
    g: Union[nx.Graph, nx.DiGraph],
    thresh: float = 0.8,
    cond: Literal["or", "both", "out", "in"] = "or",
    field: Optional[str] = None,
    remap_labels: bool = False,
    save_scores: bool = False,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
    """Filter edges from a networkx graph using the disparity filter.
    (Dirichet proccess)

    Parameters:
        g: networkx.Graph
            graph to be filtered
        thresh: float
            Between 0 and 1.
        cond: str
            "out", "in", "both", "or"
        field: str
            edge weight field
        remap_labels: bool
            if True, the labels of the graph will be remapped to consecutive integers
        save_scores: bool (default: False)
            if True, the scores of the edges will be saved in the graph attribute
    Returns:
        (np.array, np.array)
        -  indices of edges erased
        -  alphas scores for each edge

    """
    assert thresh > 0.0 and thresh < 1.0, "thresh must be between 0 and 1"
    edges, weights, num_vertices, opts = nx_extract(g, remap_labels, field)
    ids2erase, alphas = filter_generic_graph(
        num_vertices,
        edges,
        weights,
        cond=cond,
        is_directed=opts["is_directed"],
        thresh=thresh,
    )
    if save_scores:
        nx.set_edge_attributes(
            g,
            {(u, v): {"alpha": a} for u, v, a in zip(edges[:, 0], edges[:, 1], alphas)},
        )
    nx_erase(g, edges[ids2erase], opts)
    return ids2erase, alphas

scores_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, cond: Literal['or', 'both', 'out', 'in'] = 'or', is_directed: bool = False) -> ndarray

Parameters:

Name Type Description Default
num_vertices int

int number ofvertices

required
edges ndarray

np.array edges

required
weights ndarray

np.array edge weights

required
cond Literal['or', 'both', 'out', 'in']

str "out", "in", "both", "or"

'or'

Returns:

Type Description
np.array

alphas edge scores

Source code in edgeseraser/disparity.py
def scores_generic_graph(
    num_vertices: int,
    edges: NpArrayEdges,
    weights: NpArrayEdgesFloat,
    cond: Literal["or", "both", "out", "in"] = "or",
    is_directed: bool = False,
) -> NpArrayEdgesFloat:
    """
    Args:
        num_vertices: int
            number ofvertices
        edges: np.array
            edges
        weights: np.array
            edge weights
        cond: str
            "out", "in", "both", "or"
    Returns:
        np.array:
            **alphas** edge scores

    """
    w_adj, adj = construct_sp_matrices(
        weights, edges, num_vertices, is_directed=is_directed
    )

    def calc_degree(adj: Any, i: int) -> NpArrayEdgesFloat:
        return np.asarray(adj.sum(axis=i)).flatten().astype(np.float64)

    iin = edges[:, 1]
    iout = edges[:, 0]
    wdegree_out = calc_degree(w_adj, 0)[iout]
    degree_out = calc_degree(adj, 0)[iout]
    wdegree_in = calc_degree(w_adj, 1)[iin]
    degree_in = calc_degree(adj, 1)[iin]
    if cond == "out":
        alphas = stick_break_scores(wdegree_out, degree_out, edges, weights)
    elif cond == "in":
        alphas = stick_break_scores(wdegree_in, degree_in, edges, weights)
    else:
        alphas_out = stick_break_scores(wdegree_out, degree_out, edges, weights)
        alphas_in = stick_break_scores(wdegree_in, degree_in, edges, weights)
        if cond == "both":
            alphas = np.maximum(alphas_out, alphas_in)
        elif cond == "or":
            alphas = np.minimum(alphas_out, alphas_in)

    return alphas

stick_break_scores(wdegree: ndarray, degree: ndarray, edges: ndarray, weights: ndarray) -> ndarray

Calculate the stick-breaking scores for each edge.

Parameters:

Name Type Description Default
wdegree ndarray

np.array edge weighted degree

required
degree ndarray

np.array degree of each vertex

required
edges ndarray

np.array edges of the graph

required
weights ndarray

np.array edge weights

required

Returns:

Type Description
np.array

alphas stick-breaking scores for each edge

Source code in edgeseraser/disparity.py
def stick_break_scores(
    wdegree: NpArrayEdgesFloat,
    degree: NpArrayEdgesFloat,
    edges: NpArrayEdges,
    weights: NpArrayEdgesFloat,
) -> NpArrayEdgesFloat:
    """
    Calculate the stick-breaking scores for each edge.

    Args:
        wdegree: np.array
            edge weighted degree
        degree: np.array
            degree of each vertex
        edges: np.array
            edges of the graph
        weights: np.array
            edge weights

    Returns:
        np.array:
            **alphas** stick-breaking scores for each edge
    """
    alphas = np.ones(edges.shape[0])
    ids_d1: NpArrayEdgesBool = degree > 1
    st = weights[ids_d1] / wdegree[ids_d1]
    assert np.all(st <= 1)
    alphas[ids_d1] = (1 - st) ** (degree[ids_d1] - 1)
    return alphas

misc special

fast_math

Contains some math functions that are faster than the standard library.

nbbetaln_parallel(z, w)

Compute the LogBeta function.

Algorithm extracted from Numerical Recipes in C1 chapter 6.1

Parameters:

Name Type Description Default
z

np.array

required
w

np.array

required

Returns:

Type Description

np.array

$\beta = \Gamma(z) + \Gamma(w) - \Gamma(z + w)$ $\ln(\abs \beta)$

Source code in edgeseraser/misc/fast_math.py
@njit(fastmath=True, error_model="numpy")
def nbbetaln_parallel(z, w):
    """Compute the  LogBeta function.

    Algorithm extracted from **Numerical Recipes in C**[1] chapter 6.1


    [1]: http://www.it.uom.gr/teaching/linearalgebra/NumericalRecipiesInC/

    Args:
        z: np.array
        w: np.array
    Returns:
        np.array

    $\\beta = \\Gamma(z) + \\Gamma(w) - \\Gamma(z + w)$
    $\\ln(\\abs \\beta)$
    """

    arg = nbgammaln_parallel(z) + nbgammaln_parallel(w) - nbgammaln_parallel(z + w)
    return arg

nbgammaln(z: float) -> float

Compute the log of the gamma function.

Algorithm extracted from Numerical Recipes in C1 chapter 6.1

Parameters:

Name Type Description Default
z float

np.float64

required

Returns:

Type Description
float

np.float64

Source code in edgeseraser/misc/fast_math.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def nbgammaln(z: float) -> float:
    """Compute the log of the gamma function.

    Algorithm extracted from **Numerical Recipes in C**[1] chapter 6.1


    [1]: http://www.it.uom.gr/teaching/linearalgebra/NumericalRecipiesInC/

    Args:
        z: np.float64
    Returns:
        np.float64
    """
    coefs = np.array(
        [
            57.1562356658629235,
            -59.5979603554754912,
            14.1360979747417471,
            -0.491913816097620199,
            0.339946499848118887e-4,
            0.465236289270485756e-4,
            -0.983744753048795646e-4,
            0.158088703224912494e-3,
            -0.210264441724104883e-3,
            0.217439618115212643e-3,
            -0.164318106536763890e-3,
            0.844182239838527433e-4,
            -0.261908384015814087e-4,
            0.368991826595316234e-5,
        ]
    )
    y = z
    f = 2.5066282746310005 / z
    tmp = z + 5.24218750000000000
    tmp = (z + 0.5) * np.log(tmp) - tmp
    ser = 0.999999999999997092
    for j in range(14):
        y = y + 1.0
        ser = ser + coefs[j] / y

    out = tmp + np.log(f * ser)
    return out

nbgammaln_parallel(z: ndarray) -> ndarray

Compute the log of the gamma function.

Algorithm extracted from Numerical Recipes in C1 chapter 6.1

Parameters:

Name Type Description Default
z ndarray

np.array

required

Returns:

Type Description
ndarray

np.array

Source code in edgeseraser/misc/fast_math.py
@njit(fastmath=True, nogil=True, error_model="numpy", parallel=True)
def nbgammaln_parallel(z: np.ndarray) -> np.ndarray:
    """Compute the log of the gamma function.

    Algorithm extracted from **Numerical Recipes in C**[1] chapter 6.1


    [1]: http://www.it.uom.gr/teaching/linearalgebra/NumericalRecipiesInC/

    Args:
        z: np.array
    Returns:
        np.array

    """
    coefs = np.array(
        [
            57.1562356658629235,
            -59.5979603554754912,
            14.1360979747417471,
            -0.491913816097620199,
            0.339946499848118887e-4,
            0.465236289270485756e-4,
            -0.983744753048795646e-4,
            0.158088703224912494e-3,
            -0.210264441724104883e-3,
            0.217439618115212643e-3,
            -0.164318106536763890e-3,
            0.844182239838527433e-4,
            -0.261908384015814087e-4,
            0.368991826595316234e-5,
        ]
    )
    n = z.shape[0]
    out = np.zeros(n)
    for i in prange(n):
        y = z[i]
        tmp = z[i] + 5.24218750000000000
        tmp = (z[i] + 0.5) * np.log(tmp) - tmp
        ser = 0.999999999999997092
        for j in range(14):
            y = y + 1.0
            ser = ser + coefs[j] / y

        out[i] = tmp + np.log(2.5066282746310005 * ser / z[i])
    return out

noise_score

cond_edges2erase(scores_uv: ndarray, std_uv: ndarray, thresh: float = 1.28) -> ndarray

Filter edges with high noise score.

Parameters:

Name Type Description Default
scores_uv ndarray

np.array edge scores

required
std_uv ndarray

np.array edge standard deviations

required
thresh float

float

Since this is roughly equivalent to a one-tailed test of statistical significance, common values of δ are 1.28, 1.64, and 2.32, which approximate p-values of 0.1, 0.05, and 0.0

1.28

Returns:

Type Description
np.array

indices of edges to be erased

Source code in edgeseraser/noise_score.py
def cond_edges2erase(
    scores_uv: NpArrayEdgesFloat, std_uv: NpArrayEdgesFloat, thresh: float = 1.28
) -> NpArrayEdgesIds:
    """Filter edges with high noise score.

    Args:
        scores_uv: np.array
            edge scores
        std_uv: np.array
            edge standard deviations
        thresh: float
            >Since this is roughly equivalent to a one-tailed test of
            statistical significance, common values of δ are 1.28, 1.64, and
            2.32, which approximate p-values of 0.1, 0.05, and 0.0

    Returns:
        np.array:
        indices of edges to be erased
    """
    ids2erase = np.argwhere(scores_uv <= thresh * std_uv).flatten()
    return ids2erase

filter_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, param: float = 1.28) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Compute noise corrected edge weights for a sparse graph.

Parameters:

Name Type Description Default
num_vertices int

int

required
edges ndarray

np.array Edges of the graph.

required
weights ndarray

np.array Edge weights of the graph.

required
param float

float

Since this is roughly equivalent to a one-tailed test of statistical significance, common values of δ are 1.28, 1.64, and 2.32, which approximate p-values of 0.1, 0.05, and 0.0

1.28

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array, np.array) - indices of edges to be erased - noise score for each edge - standard deviation of noise score for each edge

Source code in edgeseraser/noise_score.py
def filter_generic_graph(
    num_vertices: int,
    edges: NpArrayEdges,
    weights: NpArrayEdgesFloat,
    param: float = 1.28,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat, NpArrayEdgesFloat]:
    """Compute noise corrected edge weights for a sparse graph.

    Args:
        num_vertices: int
        edges: np.array
            Edges of the graph.
        weights: np.array
            Edge weights of the graph.
        param: float
            >Since this is roughly equivalent to a one-tailed test of
            statistical significance, common values of δ are 1.28, 1.64, and
            2.32, which approximate p-values of 0.1, 0.05, and 0.0

    Returns:
        (np.array, np.array, np.array)
        -   indices of edges to be erased
        -   noise score for each edge
        -   standard deviation of noise score for each edge

    """
    # check if the graph is complete
    w_adj, adj = construct_sp_matrices(weights, edges, num_vertices)
    w_adj = sp.csr_matrix((weights, edges.T), shape=(num_vertices, num_vertices))
    wdegree: NpArrayEdgesFloat = (
        np.asarray(w_adj.sum(axis=1)).flatten().astype(np.float64)
    )

    scores_uv, std_uv = scores_generic_graph(wdegree, edges, weights)
    ids2erase = cond_edges2erase(scores_uv, std_uv, thresh=param)
    return ids2erase, scores_uv, std_uv

filter_ig_graph(g: Graph, param: float = 1.28, field: Optional[str] = None) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter edge with high noise score from a igraph graph.

Parameters:

Name Type Description Default
g Graph

igraph.Graph Graph to be filtered.

required
param float

float

Since this is roughly equivalent to a one-tailed test of statistical significance, common values of δ are 1.28, 1.64, and 2.32, which approximate p-values of 0.1, 0.05, and 0.0

1.28
field Optional[str]

str Edge field to be used for filtering.

None

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array, np.array) - indices of edges erased - noise score for each edge - standard deviation of noise score for each edge

Examples:

import networkx as nx
import edgeseraser as ee

g = nx.erdos_renyi_graph(100, 0.1)
ee.noise_score.filter_nx_graph(g, field=None)

g # filtered graph
Source code in edgeseraser/noise_score.py
def filter_ig_graph(
    g: ig.Graph, param: float = 1.28, field: Optional[str] = None
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat, NpArrayEdgesFloat]:
    """Filter edge with high noise score from a igraph graph.

    Args:
        g: igraph.Graph
            Graph to be filtered.
        param: float
            >Since this is roughly equivalent to a one-tailed test of
            statistical significance, common values of δ are 1.28, 1.64, and
            2.32, which approximate p-values of 0.1, 0.05, and 0.0
        field: str
            Edge field to be used for filtering.
    Returns:
        (np.array, np.array, np.array)
        -   indices of edges erased
        -   noise score for each edge
        -   standard deviation of noise score for each edge

    Example:
        ```python
        import networkx as nx
        import edgeseraser as ee

        g = nx.erdos_renyi_graph(100, 0.1)
        ee.noise_score.filter_nx_graph(g, field=None)

        g # filtered graph
        ```

    """
    edges, weights, num_vertices, opts = ig_extract(g, field)
    ids2erase, scores_uv, std_uv = filter_generic_graph(
        num_vertices, edges, weights, param=param
    )
    ig_erase(g, ids2erase)

    return ids2erase, scores_uv, std_uv

filter_nx_graph(g: Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph], param: float = 1.28, field: Optional[str] = None, remap_labels: bool = False, save_scores: bool = False) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter edge with high noise score from a networkx graph.

Parameters:

Name Type Description Default
g Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph]

networkx.Graph Graph to be filtered.

required
param float

float

Since this is roughly equivalent to a one-tailed test of statistical significance, common values of δ are 1.28, 1.64, and 2.32, which approximate p-values of 0.1, 0.05, and 0.0

1.28
field Optional[str]

str Edge field to be used for filtering.

None
remap_labels bool

bool (default: False) If True, the labels of the graph will be remapped to consecutive

False
save_scores bool

bool (default: False) If True, the noise scores will be saved in the graph.

False

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array, np.array) - indices of edges erased - noise score for each edge - standard deviation of noise score for each edge

Examples:

import networkx as nx
import edgeseraser as ee

g = nx.erdos_renyi_graph(100, 0.1)
ee.noise_score.filter_nx_graph(g)

g # filtered graph
Source code in edgeseraser/noise_score.py
def filter_nx_graph(
    g: Union[nx.Graph, nx.DiGraph],
    param: float = 1.28,
    field: Optional[str] = None,
    remap_labels: bool = False,
    save_scores: bool = False,
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat, NpArrayEdgesFloat]:
    """Filter edge with high noise score from a networkx graph.

    Args:
        g: networkx.Graph
            Graph to be filtered.
        param: float
            >Since this is roughly equivalent to a one-tailed test of
            statistical significance, common values of δ are 1.28, 1.64, and
            2.32, which approximate p-values of 0.1, 0.05, and 0.0
        field: str
            Edge field to be used for filtering.
        remap_labels: bool (default: False)
            If True, the labels of the graph will be remapped to consecutive
        save_scores: bool (default: False)
            If True, the noise scores will be saved in the graph.
    Returns:
        (np.array, np.array, np.array)
        -   indices of edges erased
        -   noise score for each edge
        -   standard deviation of noise score for each edge

    Example:
        ```python
        import networkx as nx
        import edgeseraser as ee

        g = nx.erdos_renyi_graph(100, 0.1)
        ee.noise_score.filter_nx_graph(g)

        g # filtered graph
        ```

    """
    edges, weights, num_vertices, opts = nx_extract(g, remap_labels, field)
    ids2erase, scores_uv, std_uv = filter_generic_graph(
        num_vertices, edges, weights, param=param
    )
    if save_scores:
        params: NpArrayEdgesFloat = scores_uv - param * std_uv
        nx.set_edge_attributes(
            g,
            {
                (u, v): {"score": score, "std": std, "score-std": r}
                for u, v, score, std, r in zip(
                    edges[:, 0],
                    edges[:, 1],
                    scores_uv,
                    std_uv,
                    params,
                )
            },
        )
    nx_erase(g, edges[ids2erase], opts)

    return ids2erase, scores_uv, std_uv

noisy_scores(st_u: ndarray, st_v: ndarray, vol: float, w: ndarray) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Get noise score and the std for each edge

Parameters:

Name Type Description Default
st_u ndarray

np.array strength for each vertex

required
st_v ndarray

np.array strength for each vertex

required
vol float

float volume of the graph

required
w ndarray

np.array edge weights

required

Returns:

Type Description
(np.array, np.array)
  • scores_uv Noise score for each edge.
  • std_uv standard deviation of noise score for each edge
Source code in edgeseraser/noise_score.py
def noisy_scores(
    st_u: NpArrayEdgesFloat, st_v: NpArrayEdgesFloat, vol: float, w: NpArrayEdgesFloat
) -> Tuple[NpArrayEdgesFloat, NpArrayEdgesFloat]:
    """
    Get noise score and the std for each edge

    Args:
        st_u: np.array
            strength for each vertex
        st_v: np.array
            strength for each vertex
        vol: float
            volume of the graph
        w: np.array
            edge weights

    Returns:
        (np.array, np.array):
        -   **scores_uv**
            Noise score for each edge.
        -   **std_uv**
            standard deviation of noise score for each edge

    """
    st_prod_uv = np.multiply(st_u, st_v)

    mean_prior_prob = st_prod_uv / (vol) ** 2.0
    kappa = vol / st_prod_uv
    score = (kappa * w - 1) / (kappa * w + 1)
    var_prior_prob = (
        (1 / vol**2.0)
        * (st_prod_uv * (vol - st_u) * (vol - st_v))
        / (vol**2 * (vol - 1))
    )

    alpha_prior = (mean_prior_prob**2 / var_prior_prob) * (
        1 - mean_prior_prob
    ) - mean_prior_prob

    beta_prior = (
        (mean_prior_prob / var_prior_prob) * (1 - mean_prior_prob**2.0)
        - 1
        + mean_prior_prob
    )

    alpha_post = alpha_prior + w
    beta_post = vol - w + beta_prior
    expected_puv = alpha_post / (alpha_post + beta_post)
    variance_uv = expected_puv * (1 - expected_puv) * vol
    d = 1 / (st_prod_uv) - vol * ((st_u + st_v) / st_prod_uv**2)
    variance_cuv = variance_uv * (2 * (kappa + w * d) / (kappa * w + 1) ** 2.0) ** 2.0
    sdev_cuv = variance_cuv**0.5

    return score, sdev_cuv

scores_generic_graph(wdegree: ndarray, edges: ndarray, weights: ndarray) -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Compute noise corrected edge weights for a sparse graph.

Parameters:

Name Type Description Default
wdegree ndarray

np.ndarray Weight degree of each vertex.

required
edges ndarray

np.array Edges of the graph.

required
weights ndarray

np.array Edge weights of the graph.

required

Returns:

Type Description
(np.array, np.array)
  • scores_uv Noise corrected edge weights.
  • std_uv Standard deviation of noise corrected edge weights.
Source code in edgeseraser/noise_score.py
def scores_generic_graph(
    wdegree: NpArrayEdgesFloat, edges: NpArrayEdges, weights: NpArrayEdgesFloat
) -> Tuple[NpArrayEdgesFloat, NpArrayEdgesFloat]:
    """Compute noise corrected edge weights for a sparse graph.

    Args:
        wdegree: np.ndarray
            Weight degree of each vertex.
        edges: np.array
            Edges of the graph.
        weights: np.array
            Edge weights of the graph.

    Returns:
        (np.array, np.array):
        -   **scores_uv**
            Noise corrected edge weights.
        -   **std_uv**
            Standard deviation of noise corrected edge weights.

    """

    vol = wdegree.sum()

    st_u = wdegree[edges[:, 0]]
    st_v = wdegree[edges[:, 1]]
    scores = st_u * st_v
    ids_0 = np.argwhere(scores == 0)
    st_u[ids_0] = 1
    st_v[ids_0] = 1
    scores_uv, std_uv = noisy_scores(st_u, st_v, vol, weights)
    scores_uv[ids_0] = 0
    std_uv[ids_0] = 0.0
    return scores_uv, std_uv

polya

cond_edges2erase(alphas: ndarray, thresh: float = 0.1) -> ndarray

Parameters:

Name Type Description Default
alphas ndarray

np.array edge scores

required
thresh float

float Between 0 and 1.

0.1

Returns:

Type Description
np.array

indices of edges to be erased

Source code in edgeseraser/polya.py
def cond_edges2erase(alphas: NpArrayEdgesFloat, thresh: float = 0.1) -> NpArrayEdgesIds:
    """
    Args:
        alphas: np.array
            edge scores
        thresh: float
            Between 0 and 1.
    Returns:
        np.array:
            indices of edges to be erased

    """
    ids2erase: NpArrayEdgesIds = np.argwhere(alphas > thresh).flatten().astype("int64")
    return ids2erase

filter_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, thresh: float = 0.4, a: float = 1, apt_lvl: int = 10, is_directed: bool = False, eps: float = 1e-20, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter the graph using the Pólya-based method.

Parameters:

Name Type Description Default
num_vertices int

int number of vertices

required
edges ndarray

np.array edges

required
weights ndarray

np.array edge weights

required
thresh float

float

0.4
a float

float

1
apt_lvl int

int

10
is_directed bool

bool

False
eps float

float

1e-20

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array) - indices of edges to be erased - probability for each edge

Source code in edgeseraser/polya.py
def filter_generic_graph(
    num_vertices: int,
    edges: NpArrayEdges,
    weights: NpArrayEdgesFloat,
    thresh: float = 0.4,
    a: float = 1,
    apt_lvl: int = 10,
    is_directed: bool = False,
    eps: float = 1e-20,
    optimization: OptionsForCache = "lru-nb-szuszik",
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
    """Filter the graph using the Pólya-based method.

    Args:
        num_vertices: int
            number of vertices
        edges: np.array
            edges
        weights: np.array
            edge weights
        thresh: float
        a: float
        apt_lvl: int
        is_directed: bool
        eps: float

    Returns:
        (np.array, np.array)
        -  indices of edges to be erased
        -  probability for each edge

    """
    p = scores_generic_graph(
        num_vertices,
        edges,
        weights,
        a=a,
        apt_lvl=apt_lvl,
        is_directed=is_directed,
        eps=eps,
        optimization=optimization,
    )

    ids2erase = cond_edges2erase(p, thresh=thresh)
    return ids2erase, p

filter_ig_graph(g: Graph, thresh: float = 0.5, field: Optional[str] = None, a: float = 2, apt_lvl: int = 10, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter edges from a igraph using the Pólya-Urn filter.

Parameters:

Name Type Description Default
g Graph

ig.Graph graph to be filtered

required
thresh float

float

0.5
field Optional[str]

str

None
a float

float 0 is the Binomial distribution, 1 the filter will behave like the Disparity filter.

2
apt_lvl int

int

10

Return: (np.array, np.array) - indices of edges erased - probability for each edge

Source code in edgeseraser/polya.py
def filter_ig_graph(
    g: ig.Graph,
    thresh: float = 0.5,
    field: Optional[str] = None,
    a: float = 2,
    apt_lvl: int = 10,
    optimization: OptionsForCache = "lru-nb-szuszik",
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
    """Filter edges from a igraph using the Pólya-Urn filter.

    Parameters:
        g: ig.Graph
            graph to be filtered
        thresh: float
        field: str
        a: float
            0 is the Binomial distribution,
            1 the filter will behave like the Disparity filter.
        apt_lvl: int

     Return:
        (np.array, np.array)
        -  indices of edges erased
        -  probability for each edge
    """

    edges, weights, num_vertices, opts = ig_extract(g, field)
    is_directed: bool = opts["is_directed"]
    ids2erase, probs = filter_generic_graph(
        num_vertices,
        edges,
        weights,
        is_directed=is_directed,
        a=a,
        apt_lvl=apt_lvl,
        thresh=thresh,
        optimization=optimization,
    )
    ig_erase(g, ids2erase)
    return ids2erase, probs

filter_nx_graph(g: Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph], thresh: float = 0.5, field: Optional[str] = None, a: float = 2, apt_lvl: int = 10, remap_labels: bool = False, save_scores: bool = False, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Filter edges from a networkx graph using the Pólya-Urn filter.

Parameters:

Name Type Description Default
g Union[networkx.classes.graph.Graph, networkx.classes.digraph.DiGraph]

networkx.Graph graph to be filtered

required
thresh float

float

0.5
field Optional[str]

str

None
a float

float 0 is the Binomial distribution, 1 the filter will behave like the Disparity filter.

2
apt_lvl int

int

10
remap_labels bool

bool If True, the labels of the nodes are remapped to consecutive integers.

False
save_scores bool

bool (default: False) If True, the scores of the edges are saved in the graph.

False

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

(np.array, np.array) - indices of edges erased - probability for each edge

Source code in edgeseraser/polya.py
def filter_nx_graph(
    g: Union[nx.Graph, nx.DiGraph],
    thresh: float = 0.5,
    field: Optional[str] = None,
    a: float = 2,
    apt_lvl: int = 10,
    remap_labels: bool = False,
    save_scores: bool = False,
    optimization: OptionsForCache = "lru-nb-szuszik",
) -> Tuple[NpArrayEdgesIds, NpArrayEdgesFloat]:
    """Filter edges from a networkx graph using the Pólya-Urn filter.

    Parameters:
        g: networkx.Graph
            graph to be filtered
        thresh: float
        field: str
        a: float
            0 is the Binomial distribution,
            1 the filter will behave like the Disparity filter.
        apt_lvl: int
        remap_labels: bool
            If True, the labels of the nodes are remapped to consecutive integers.
        save_scores: bool (default: False)
            If True, the scores of the edges are saved in the graph.

    Returns:
        (np.array, np.array)
        -  indices of edges erased
        -  probability for each edge

    """
    edges, weights, num_vertices, opts = nx_extract(g, remap_labels, field)
    is_directed: bool = opts["is_directed"]
    ids2erase, probs = filter_generic_graph(
        num_vertices,
        edges,
        weights,
        is_directed=is_directed,
        a=a,
        apt_lvl=apt_lvl,
        thresh=thresh,
        optimization=optimization,
    )
    if save_scores:
        nx.set_edge_attributes(
            g,
            {
                (u, v): {"prob": prob}
                for u, v, prob in zip(edges[:, 0], edges[:, 1], probs)
            },
        )

    nx_erase(g, edges[ids2erase], opts)
    return ids2erase, probs

scores_generic_graph(num_vertices: int, edges: ndarray, weights: ndarray, a: float = 1, apt_lvl: int = 10, is_directed: bool = False, eps: float = 1e-20, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb-szuszik') -> ndarray

Compute the probability for each edge using the Pólya-based method for a generic weighted graph.

Parameters:

Name Type Description Default
num_vertices int

int number of vertices

required
edges ndarray

np.array edges

required
weights ndarray

np.array edge weights

required
a float

float

1
apt_lvl int

int

10
is_directed bool

bool

False
eps float

float

1e-20
optimization Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb']

OptionsForCache

'lru-nb-szuszik'

Returns:

Type Description
np.array

edge scores. Probability values

Source code in edgeseraser/polya.py
def scores_generic_graph(
    num_vertices: int,
    edges: NpArrayEdges,
    weights: NpArrayEdgesFloat,
    a: float = 1,
    apt_lvl: int = 10,
    is_directed: bool = False,
    eps: float = 1e-20,
    optimization: OptionsForCache = "lru-nb-szuszik",
) -> NpArrayEdgesFloat:
    """Compute the probability for each edge using the Pólya-based method for
    a generic weighted graph.

    Args:
        num_vertices: int
            number of vertices
        edges: np.array
            edges
        weights: np.array
            edge weights
        a: float
        apt_lvl: int
        is_directed: bool
        eps: float
        optimization: OptionsForCache

    Returns:
        np.array:
            edge scores. Probability values

    """
    w_adj, adj = construct_sp_matrices(
        weights, edges, num_vertices, is_directed=is_directed
    )

    def calc_degree(x: csr_matrix, i: int) -> NpArrayEdgesFloat:
        return np.asarray(x.sum(axis=i)).flatten()

    ids_out = edges[:, 0]
    ids_in = edges[:, 1]
    wdegree_out = calc_degree(w_adj, 1)[ids_out]
    wdegree_in = calc_degree(w_adj, 0)[ids_in]
    degree_out = calc_degree(adj, 1)[ids_out]
    degree_in = calc_degree(adj, 0)[ids_in]

    if np.mod(weights, 1).sum() > eps:
        # non integer weights
        apt_lvl = 0
    if optimization == "lru-nb":
        cache_obj = NbComputePolyaCacheDict()
    elif optimization == "lru-nb-szuszik":
        cache_obj = NbComputePolyaCacheSzuszik()
    elif optimization == "lru-nbf":
        cache_obj = NbGammaLnCache()
    else:
        cache_obj = None
    p_in = polya_cdf(
        wdegree_in,
        degree_in,
        weights,
        a,
        apt_lvl,
        optimization=optimization,
        cache_obj=cache_obj,
    )
    p_out = polya_cdf(
        wdegree_out,
        degree_out,
        weights,
        a,
        apt_lvl,
        optimization=optimization,
        cache_obj=cache_obj,
    )

    p: NpArrayEdgesFloat = np.minimum(p_in, p_out)
    return p

polya_tools special

numba_tools

compute_polya_pdf(x: ndarray, w: ndarray, k: ndarray, a: float) -> ndarray

Parameters:

Name Type Description Default
x ndarray

np.array array of integer weights

required
w ndarray

float weighted degree

required
k ndarray

float degree

required
a float

float

required

Returns:

Type Description
ndarray

np.array

Source code in edgeseraser/polya_tools/numba_tools.py
@njit(
    nb_float64[:](nb_float64[:], nb_float64, nb_float64, nb_float64),
    nogil=True,
)
def compute_polya_pdf(
    x: NpArrayEdgesFloat, w: NpArrayEdgesFloat, k: NpArrayEdgesFloat, a: float
) -> NpArrayEdgesFloat:
    """

    Args:
        x: np.array
            array of integer weights
        w: float
            weighted degree
        k: float
            degree
        a: float
    Returns:
        np.array

    """
    a_inv = 1.0 / a
    b = (k - 1.0) * a_inv
    ones = np.ones(x.shape[0], dtype=np.float64)
    p: NpArrayEdgesFloat = np.exp(
        nbgammaln_parallel(w + ones)
        + nbbetaln_parallel(x + a_inv, w - x + b)
        - nbgammaln_parallel(x + ones)
        - nbgammaln_parallel(w - x + ones)
        - nbbetaln_parallel(a_inv * ones, b * ones)
    )

    return p

integer_cdf_lru_nb(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float, cache_obj) -> ndarray

Compute the prob of the integer weight distribution using numba JIT and parallelization.

Parameters:

Name Type Description Default
wdegree ndarray

np.array edge weighted degrees

required
degree ndarray

np.array vertex degrees

required
weights ndarray

np.array edge weights

required
a float

float

required

Returns:

Type Description
np.array

Probability values for each edge with integer weights

Source code in edgeseraser/polya_tools/numba_tools.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def integer_cdf_lru_nb(
    wdegree: NpArrayEdgesFloat,
    degree: NpArrayEdgesFloat,
    weights: NpArrayEdgesFloat,
    a: float,
    cache_obj,
) -> NpArrayEdgesFloat:
    """Compute the prob of the integer weight distribution using numba JIT and parallelization.

    Args:
        wdegree: np.array
            edge weighted degrees
        degree: np.array
            vertex degrees
        weights: np.array
            edge weights
        a: float
    Returns:
        np.array:
            Probability values for each edge with integer weights

    """
    n = wdegree.shape[0]
    p = np.zeros(n, dtype=np.float64)
    for i in range(n):
        wi = int(weights[i])
        if wi < 2:
            p[i] = 0.0
            continue
        wd = wdegree[i]
        d = degree[i]
        p[i] = cache_obj.call(wi, wd, d, a)
    return p

integer_cdf_lru_nb_f(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float, cache_obj) -> ndarray

Compute the prob of the integer weight distribution using numba JIT and parallelization.

Parameters:

Name Type Description Default
wdegree ndarray

np.array edge weighted degrees

required
degree ndarray

np.array vertex degrees

required
weights ndarray

np.array

required
a float

float

required
cache_obj

object

required

Returns:

Type Description
np.array

Probability values for each edge with integer weights

Source code in edgeseraser/polya_tools/numba_tools.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def integer_cdf_lru_nb_f(
    wdegree: NpArrayEdgesFloat,
    degree: NpArrayEdgesFloat,
    weights: NpArrayEdgesFloat,
    a: float,
    cache_obj,
) -> NpArrayEdgesFloat:
    """Compute the prob of the integer weight distribution using numba JIT and parallelization.

    Args:
        wdegree: np.array
            edge weighted degrees
        degree: np.array
            vertex degrees
        weights: np.array
        edge weights
        a: float
        cache_obj: object
    Returns:
        np.array:
            Probability values for each edge with integer weights

    """
    a_inv = 1 / a
    n = wdegree.shape[0]
    p = np.zeros(n, dtype=np.float64)

    for i in range(n):
        wi = int(weights[i])
        if wi < 2:
            p[i] = 0.0
            continue
        wd = wdegree[i]
        d = degree[i]
        b = (d - 1.0) * a_inv
        betaln2 = (
            cache_obj.callp(a_inv) + cache_obj.callp(b) - cache_obj.callp(a_inv + b)
        )
        gammaln1 = cache_obj.callp(wd + 1.0)
        gammalnbetaln1 = cache_obj.callp(wd + a_inv + b)
        r = 0.0
        x = 0.0
        for _ in range(0, wi):
            z1 = x + a_inv
            z2 = wd - x + b
            z3 = x + 1.0
            z4 = wd - x + 1.0
            v1 = cache_obj.callp(z1)
            v2 = cache_obj.callp(z2)
            v3 = cache_obj.callp(z3)
            v4 = cache_obj.callp(z4)

            betaln1 = v1 + v2 - gammalnbetaln1
            r += np.exp(gammaln1 - v3 - v4 + betaln1 - betaln2)
            x += 1
        p[i] = 1 - r
    return p

integer_cdf_nb(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float) -> ndarray

Compute the prob of the integer weight distribution using numba JIT and parallelization.

Parameters:

Name Type Description Default
wdegree ndarray

np.array edge weighted degrees

required
degree ndarray

np.array vertex degrees

required
weights ndarray

np.array edge weights

required
a float

float

required

Returns:

Type Description
np.array

Probability values for each edge with integer weights

Source code in edgeseraser/polya_tools/numba_tools.py
@njit(fastmath=True, nogil=True, error_model="numpy")
def integer_cdf_nb(
    wdegree: NpArrayEdgesFloat,
    degree: NpArrayEdgesFloat,
    weights: NpArrayEdgesFloat,
    a: float,
) -> NpArrayEdgesFloat:
    """Compute the prob of the integer weight distribution using numba JIT and parallelization.

    Args:
        wdegree: np.array
            edge weighted degrees
        degree: np.array
            vertex degrees
        weights: np.array
            edge weights
        a: float
    Returns:
        np.array:
            Probability values for each edge with integer weights

    """
    n = wdegree.shape[0]
    p = np.zeros(n, dtype=np.float64)
    for i in range(n):
        wi = int(weights[i])
        if wi < 2:
            p[i] = 0.0
            continue
        wd = wdegree[i]
        d = degree[i]
        x = np.arange(0, wi, dtype=np.float64)
        polya_pdf = compute_polya_pdf(x, wd, d, a)
        p[i] = 1 - np.sum(polya_pdf)
    return p

szuszik(a, b)

Szuszik's pairing algorithm is a map from a pair of natural numbers to a unique natural number. $\mathcal{N} imes\mathcal{N}\mapsto N$.

Parameters:

Name Type Description Default
a

int first integer

required
b

int second integer

required

Returns:

Type Description

int Szuszik's algorithm result

Source code in edgeseraser/polya_tools/numba_tools.py
@njit(nb_int64(nb_int64, nb_int64))
def szuszik(a, b):
    """
    Szuszik's pairing algorithm is a map from a pair of natural numbers to a unique
    natural number.
        $\\mathcal{N}\times\\mathcal{N}\\mapsto N$.

    Args:
        a: int
            first integer
        b: int
            second integer
    Returns:
        int
            Szuszik's algorithm result

    """
    if a != max(a, b):
        map_key = pow(b, 2) + a
    else:
        map_key = pow(a, 2) + a + b

    return map_key

statistics

compute_polya_pdf_approx(w: ndarray, n: ndarray, k: ndarray, a: float = 0.0) -> ndarray

Parameters:

Name Type Description Default
w ndarray

np.array edge weights

required
n ndarray

np.array

required
k ndarray

np.array degree

required
a float

float (default: 0)

0.0

Returns:

Type Description
ndarray

np.array

Source code in edgeseraser/polya_tools/statistics.py
def compute_polya_pdf_approx(
    w: NpArrayEdgesFloat, n: NpArrayEdgesFloat, k: NpArrayEdgesFloat, a: float = 0.0
) -> NpArrayEdgesFloat:
    """

    Args:
        w: np.array
            edge weights
        n: np.array
        k: np.array
            degree
        a: float (default: 0)
    Returns:
        np.array

    """
    if a == 0.0:
        prob_success: NpArrayEdgesFloat = 1.0 / k
        p: NpArrayEdgesFloat = stats.binom.cdf(w, n, prob_success)
        return p
    a_inv = 1 / a
    b = (k - 1) * a_inv
    if a == 1.0:
        p = (1 - w / n) ** (k - 1)
    else:
        p = 1 / gamma(a_inv) * ((1 - w / n) ** (b)) * ((w * k / n * a) ** (a_inv - 1))

    return p

integer_cdf_lru_py_native(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float) -> ndarray

Compute the prob of the integer weight distribution using numba JIT and parallelization.

Parameters:

Name Type Description Default
wdegree ndarray

np.array edge weighted degrees

required
degree ndarray

np.array vertex degrees

required
weights ndarray

np.array edge weights

required
a float

float

required

Returns:

Type Description
np.array

Probability values for each edge with integer weights

Source code in edgeseraser/polya_tools/statistics.py
def integer_cdf_lru_py_native(
    wdegree: NpArrayEdgesFloat,
    degree: NpArrayEdgesFloat,
    weights: NpArrayEdgesFloat,
    a: float,
) -> NpArrayEdgesFloat:
    """Compute the prob of the integer weight distribution using numba JIT and parallelization.

    Args:
        wdegree: np.array
            edge weighted degrees
        degree: np.array
            vertex degrees
        weights: np.array
            edge weights
        a: float
    Returns:
        np.array:
            Probability values for each edge with integer weights

    """
    n = wdegree.shape[0]
    p = np.zeros(n, dtype=np.float64)
    for i in range(n):
        wi = int(weights[i])
        if wi < 2:
            p[i] = 0.0
            continue
        wd = wdegree[i]
        d = degree[i]

        p[i] = polya_cdf_lru_py_native(wi, wd, d, a)
    return p

polya_cdf(wdegree: ndarray, degree: ndarray, weights: ndarray, a: float, apt_lvl: float, eps: float = 1e-20, check_consistency: bool = False, optimization: Literal['lru-nb', 'lru-nb-szuszik', 'lru-nbf', 'lru-py-nb', 'nb'] = 'lru-nb', cache_obj = None) -> ndarray

Parameters:

Name Type Description Default
wdegree ndarray

np.array edge weighted degrees

required
degree ndarray

np.array vertex degrees

required
weights ndarray

np.array edge weights

required
a float

float

required
apt_lvl float

int

required
eps float

float

1e-20
check_consistency bool

bool (default: False)

False

Returns:

Type Description
np.array

Probability values for each edge

Source code in edgeseraser/polya_tools/statistics.py
def polya_cdf(
    wdegree: NpArrayEdgesFloat,
    degree: NpArrayEdgesFloat,
    weights: NpArrayEdgesFloat,
    a: float,
    apt_lvl: float,
    eps: float = 1e-20,
    check_consistency: bool = False,
    optimization: Literal[
        "lru-nb", "lru-nb-szuszik", "lru-nbf", "lru-py-nb", "nb"
    ] = "lru-nb",
    cache_obj=None,
) -> NpArrayEdgesFloat:
    """
    Args:
        wdegree: np.array
            edge weighted degrees
        degree: np.array
            vertex degrees
        weights: np.array
            edge weights
        a: float
        apt_lvl: int
        eps: float
        check_consistency: bool (default: False)
    Returns:
        np.array:
            Probability values for each edge

    """
    k_high: NpArrayEdgesBool = degree > 1
    k = degree[k_high]
    w = weights[k_high]
    s = wdegree[k_high]
    scores = np.zeros_like(weights)
    if a == 0:
        p = compute_polya_pdf_approx(w, s, k)
        return p

    a_inv = 1.0 / a
    b = (k - 1) * a_inv
    if check_consistency:
        diff_strength = s - w
        assert np.all(diff_strength >= 0)
    idx = (
        (s - w >= apt_lvl * (b + 1))
        * (w >= apt_lvl * np.maximum(a_inv, 1))
        * (s >= apt_lvl * np.maximum(k * a_inv, 1))
        * (k >= apt_lvl * (a - 1 + eps))
    )
    p = np.zeros(w.shape[0])
    idx_true = np.argwhere(idx).flatten()
    if len(idx_true) > 0:
        p[idx_true] = compute_polya_pdf_approx(
            w[idx_true], s[idx_true], k[idx_true], a=a
        )
    non_zero_idx = np.argwhere(~idx).flatten()
    non_zero_idx = non_zero_idx[(s[~idx] > 0) & (k[~idx] > 1)]
    if len(non_zero_idx) > 0:
        if optimization == "lru-nb" or optimization == "lru-nb-szuszik":
            if cache_obj is None:
                raise ValueError("Cache object is None")
            vals = integer_cdf_lru_nb(
                s[non_zero_idx],
                k[non_zero_idx],
                w[non_zero_idx],
                a=a,
                cache_obj=cache_obj,
            )
        elif optimization == "lru-nbf":
            vals = integer_cdf_lru_nb_f(
                s[non_zero_idx],
                k[non_zero_idx],
                w[non_zero_idx],
                a=a,
                cache_obj=cache_obj,
            )
        elif optimization == "lru-py-nb":
            vals = integer_cdf_lru_py_native(
                s[non_zero_idx], k[non_zero_idx], w[non_zero_idx], a
            )
        elif optimization == "nb":
            vals = integer_cdf_nb(s[non_zero_idx], k[non_zero_idx], w[non_zero_idx], a)
        p[non_zero_idx] = vals

    scores[k_high] = p
    return scores

polya_cdf_lru_py_native(size, w, k, a)

Compute the Pólya-Urn for integer weights

The results are cached using python's lru_cache.

Parameters:

Name Type Description Default
size

int size of the distribution

required
w

float weighted degree

required
k

float degree

required
a

float

required

Returns:

Type Description
float

between 0 and 1

Source code in edgeseraser/polya_tools/statistics.py
@lru_cache(None)
def polya_cdf_lru_py_native(size, w, k, a):
    """Compute the Pólya-Urn for integer weights

    The results are cached using python's lru_cache.

    Args:
        size: int
            size of the distribution
        w: float
            weighted degree
        k: float
            degree
        a: float
    Returns:
        float:
            between 0 and 1

    """
    x = np.arange(0, size, dtype=np.float64)
    polya_pdf = compute_polya_pdf(x, w, k, a)
    return 1 - np.sum(polya_pdf)