Source code for spatialtis.spatial.ncd_markers

from __future__ import annotations

import numpy as np
import pandas as pd
import scipy
from anndata import AnnData
from ast import literal_eval
from scipy.stats import mannwhitneyu
from spatialtis_core import neighbor_components
from typing import Dict, List

from import AnalysisBase
from spatialtis.utils import doc, read_exp

[docs]@doc def NCD_marker(data: AnnData, selected_markers: List[str] | np.ndarray = None, importance_cutoff: float = 0.5, layer_key: str = None, tree_kwargs: Dict = None, test_method: str = "mannwhitneyu", pval: float = 0.01, export_key: str = "ncd_marker", **kwargs, ): """Identify neighbor cells dependent marker This method tells you the dependency between markers and its neighbor cell type. The dependency is calculated by building a gradiant boosting tree (in here lightgbm) to determine the feature importance. A statistic test and fold change will be calculated for importance markers and its neighbor cells, the fold change is between marker with cell type at / not at the neighborhood. Parameters ---------- data : {adata} importance_cutoff : float, default: 0.5 Threshold to determine the feature markers. selected_markers : {selected_markers} layer_key : {layer_key} tree_kwargs : dict The keyword arguments that pass to the boosting tree class, (Default: n_jobs=-1, random_state=0). test_method : str, default: 'mannwhitneyu' which test method to use, anything from `scipy.stats <>`_. pval : {pval} export_key : {export_key} **kwargs : {analysis_kwargs} """ try: from lightgbm import LGBMRegressor except ImportError: raise ImportError("lightgbm is not installed, please try `pip install lightgbm`.") ab = AnalysisBase(data, display_name="NCD Markers", export_key=export_key, **kwargs) ab.check_neighbors() ab.check_cell_type() tree_kwargs_ = {"n_jobs": -1, "random_state": 0, "importance_type": "gain"} if tree_kwargs is not None: for k, v in tree_kwargs.items(): tree_kwargs_[k] = v markers = ab.selected_markers(selected_markers) markers_mask = ab.markers_col.isin(markers) neighbors = [literal_eval(n) for n in data.obsm[ab.neighbors_key]] labels = data.obs[ab.cell_id_key] cell_types = data.obs[ab.cell_type_key] col, comps = neighbor_components( neighbors, labels.tolist(), cell_types.tolist() ) neigh_comp = pd.DataFrame( data=comps, columns=col, index=pd.MultiIndex.from_frame( data.obs[[ab.cell_type_key, ab.cell_id_key]], names=["type", "id"], ), ) results_data = [] # For markers in different cell types with np.errstate(divide="ignore"): for t, x in neigh_comp.groupby(level=["type"]): exp_ix = x.index.to_frame()["id"] exp = read_exp(data[exp_ix, markers_mask], layer_key) for i, y in enumerate(exp): # copy it to prevent memory peak according to lightgbm reg = LGBMRegressor(**tree_kwargs_).fit(x, y.copy()) weights = np.asarray(reg.feature_importances_) weights = weights / weights.sum() max_ix = np.argmax(weights) max_weight = weights[max_ix] max_type = col[max_ix] if max_weight > importance_cutoff: nx = x.copy() # add expression data to dataframe to allow cutting afterwards nx["exp"] = y # cells with max_type at neighbors at_neighbor = (nx.iloc[:, max_ix] != 0) at_neighbor_exp = nx[at_neighbor]["exp"].to_numpy() non_at_neighbor_exp = nx[~at_neighbor]["exp"].to_numpy() at_sum = at_neighbor_exp.sum() non_at_sum = non_at_neighbor_exp.sum() if (at_sum > 0) & (non_at_sum > 0): test_result = getattr(scipy.stats, test_method).__call__( at_neighbor_exp, non_at_neighbor_exp ) pvalue = test_result.pvalue if pvalue < pval: at_mean = at_neighbor_exp.mean() non_at_mean = non_at_neighbor_exp.mean() log2_fc = np.log2(at_mean / non_at_mean) results_data.append([t, markers[i], max_type, max_weight, log2_fc, pvalue, ]) ab.result = pd.DataFrame( data=results_data, columns=[ "cell_type", "marker", "neighbor_type", "dependency", "log2_FC", "pval", ], )