Source code for spatialtis.spatial.network

import pandas as pd
from anndata import AnnData
from scipy.spatial.distance import euclidean
from typing import Any, Dict

from spatialtis.abc import AnalysisBase
from spatialtis.utils import col2adata, doc
from spatialtis.utils import try_import


[docs]@doc def cell_community(data: AnnData, resolution: float = 0.05, partition_type: Any = None, partition_kwargs: Dict = None, export_key: str = "community_id", **kwargs, ): """Spatial communities detection Here we use Leiden graph cluster algorithm Parameters ---------- data : {adata} resolution : float, default: 0.05 Control the process of partition. partition_type : The leidenalg partition type. partition_kwargs : Pass to leidenalg.find_partition. export_key : {export_key} **kwargs : {analysis_kwargs} """ ab = AnalysisBase(data, display_name="Cell community", export_key=export_key, **kwargs) # import leidenalg # import igraph as ig leidenalg = try_import("leidenalg") ig = try_import("igraph", install_name="python-igraph") ab.check_neighbors() if partition_type is None: partition_type = leidenalg.CPMVertexPartition if partition_kwargs is None: partition_kwargs = {"resolution_parameter": resolution} else: partition_kwargs = {"resolution_parameter": 0.05, **partition_kwargs} graphs = [] track_ix = [] sub_comm = [] for roi_name, labels, neighbors, points in ab.iter_roi(fields=['neighbors', 'centroid']): vertices = [] edge_mapper = {} for i, (x, y) in zip(labels, points): vertices.append({"name": i, "x": x, "y": y}) edge_mapper[i] = (x, y) graph_edges = [] for k, vs in zip(labels, neighbors): if len(vs) > 0: for v in vs: if k < v: distance = euclidean(edge_mapper[k], edge_mapper[v]) graph_edges.append( {"source": k, "target": v, "weight": distance} ) graph = ig.Graph.DictList(vertices, graph_edges) part = leidenalg.find_partition(graph, partition_type, **partition_kwargs) sub_comm += part.membership graphs.append(graph) track_ix.append(roi_name) sub_comm = pd.Series(sub_comm, index=data.obs.index) col2adata(sub_comm, data, ab.export_key) ab.stop_timer()