Source code for QHyper.problems.community_detection

# This work was supported by the EuroHPC PL infrastructure funded at the
# Smart Growth Operational Programme (2014-2020), Measure 4.2
# under the grant agreement no. POIR.04.02.00-00-D014/20-00


from dataclasses import dataclass, field
from typing import Any, Iterable, Tuple, cast

import networkx as nx
import numpy as np
import sympy
from QHyper.problems.base import Problem
from QHyper.constraint import Constraint
from QHyper.polynomial import Polynomial
from sympy.core.expr import Expr
from QHyper.parser import from_sympy


@dataclass
class Network:
    graph: nx.Graph
    resolution: float = 1.0
    weight: str | None = "weight"
    community: list | None = None
    full_modularity_matrix: np.ndarray = field(init=False)
    generalized_modularity_matrix: np.ndarray = field(init=False)

    def __post_init__(self) -> None:
        if not self.community:
            self.community = [*range(self.graph.number_of_nodes())]
        (
            self.full_modularity_matrix,
            self.generalized_modularity_matrix,
        ) = self.calculate_modularity_matrix()

    def calculate_modularity_matrix(self) -> np.ndarray:
        adj_matrix: np.ndarray = nx.to_numpy_array(self.graph, weight=self.weight)
        in_degree_matrix: np.ndarray = adj_matrix.sum(axis=1)
        out_degree_matrix: np.ndarray = adj_matrix.sum(axis=0)
        m: int = np.sum(adj_matrix)
        full_modularity_matrix = (
            adj_matrix
            - self.resolution * np.outer(in_degree_matrix, out_degree_matrix) / m
        )

        B_bis = full_modularity_matrix[self.community, :]
        B_community = B_bis[:, self.community]
        B_i = np.sum(B_community, axis=1)
        B_j = np.sum(B_community.T, axis=1)
        delta = np.eye(len(self.community), dtype=np.int32)
        B_g = 0.5*( B_community + B_community.T ) - 0.5 * delta * (B_i + B_j)
        return full_modularity_matrix, B_g


class KarateClubNetwork(Network):
    def __init__(self, resolution: float = 1):
        super().__init__(nx.karate_club_graph(), resolution=resolution)


class BrainNetwork(Network):
    def __init__(
        self,
        input_data_dir: str,
        input_data_name: str,
        delimiter: str = "	",
        resolution: int = 1,
    ):
        adj_matrix = np.genfromtxt(
            f"{input_data_dir}/{input_data_name}.csv", delimiter=delimiter
        )
        super().__init__(nx.from_numpy_matrix(adj_matrix), resolution=resolution)


[docs] class CommunityDetectionProblem(Problem): """ Problem class instance - objective function for network community detection Attributes ---------- cases: int number of communities into which the graph shall be divided (default 2) G: networkx graph networkx implementation of graph B: networkx modularity matrix networkx implementation of modularity matrix objective_function : Expression objective function in SymPy syntax constraints : list[Expression] list of problem constraints in SymPy syntax, (default []) variables : int number of qubits in the circuit, equal to number of nodes in the graph """ def __init__( self, network_data: Network, communities: int = 2, one_hot_encoding: bool = True, ) -> None: """ Parameters ---------- network_data: Network representation of a network with graph and modularity matrix communities: int number of communities into which the graph shall be divided (default 2) one_hot_encoding: bool decides if objective function should be encoded to one-hot values """ self.G: nx.Graph = network_data.graph self.one_hot_encoding: bool = one_hot_encoding if one_hot_encoding: self.B: np.ndarray = network_data.full_modularity_matrix else: self.B: np.ndarray = network_data.generalized_modularity_matrix if communities < 1: raise Exception("Number of communities must be greater than or equal to 1") self.community = network_data.community self.cases: int = communities self.resolution: float = network_data.resolution if self.one_hot_encoding: self.variables: tuple[sympy.Symbol] = self._encode_discretes_to_one_hots() self._set_objective_function() self._set_one_hot_constraints(communities) else: self.variables: tuple[ sympy.Symbol ] = self._get_discrete_variable_representation() self._set_objective_function() def _get_discrete_variable_representation( self, ) -> tuple[sympy.Symbol] | Any: return sympy.symbols(" ".join([f"x{i}" for i in range(len(self.community))])) def _set_objective_function(self) -> None: equation: dict[tuple[str, ...], float] = {} for i in range(len(self.B)): for j in range(len(self.B)): x_i, x_j = sympy.symbols(f"x{self.community[i]}"), sympy.symbols( f"x{self.community[j]}" ) if self.one_hot_encoding: if i >= j: continue for case_val in range(self.cases): s_i = str(self._encode_discrete_to_one_hot(x_i, case_val)) s_j = str(self._encode_discrete_to_one_hot(x_j, case_val)) equation[(s_i, s_j)], equation[(s_j, s_i)] = ( self.B[i, j], self.B[j, i], ) else: x_i, x_j = str(x_i), str(x_j) equation[(x_i, x_j)] = self.B[i, j] equation = {key: -1 * val for key, val in equation.items()} self.objective_function = Polynomial(equation) def _encode_discrete_to_one_hot( self, discrete_variable: sympy.Symbol, case_value: int ) -> sympy.Symbol: discrete_id = int(str(discrete_variable)[1:]) id = discrete_id * self.cases + case_value return sympy.symbols(f"s{id}") def _encode_discretes_to_one_hots(self) -> tuple[sympy.Symbol]: one_hots: tuple[sympy.Symbol] = sympy.symbols( " ".join( [ str(self._encode_discrete_to_one_hot(var, case_val)) for var in self._get_discrete_variable_representation() for case_val in range(self.cases) ] ) ) return one_hots
[docs] def iter_variables_cases(self) -> Iterable[Tuple[sympy.Symbol, ...]]: """s -> (s0,s1,s2,...sn-1), (sn,sn+1,sn+2,...s2n-1), ...""" return zip(*[iter(self.variables)] * self.cases)
def _set_one_hot_constraints(self, communities: int) -> None: ONE_HOT_CONST = -1 self.constraints: list[Constraint] = [] # In the case of 1-to-1 mapping between discrete # and binary variable values no one-hot constraints if communities == ONE_HOT_CONST * -1: return dummies: Iterable[Tuple[sympy.Symbol, ...]] for dummies in self.iter_variables_cases(): expression: Expr = cast(Expr, 0) dummy: sympy.Symbol for dummy in dummies: expression += dummy expression += ONE_HOT_CONST self.constraints.append(Constraint(from_sympy(expression))) def decode_solution(self, solution: dict) -> dict: ONE_HOT_VALUE = 1.0 decoded_solution: dict = {} for variable, value in solution.items(): id = int(variable[1:]) if value == ONE_HOT_VALUE: case_value = id % self.cases variable_id = id // self.cases decoded_solution[variable_id] = case_value return self.sort_decoded_solution(decoded_solution) def sort_encoded_solution(self, encoded_solution: dict) -> dict: return { str(k): encoded_solution[str(k)] for k in self.variables if str(k) in encoded_solution } def sort_decoded_solution(self, decoded_solution: dict) -> dict: keyorder = [int(str(v)[1:]) for v in self.variables] return {k: decoded_solution[k] for k in keyorder if k in decoded_solution}