Source code for spatialtis.spatial.ncd_markers

from typing import Dict, Optional

import numpy as np
import pandas as pd
from anndata import AnnData
from lightgbm import LGBMRegressor
from scipy.stats import mannwhitneyu

from spatialtis.abc import AnalysisBase
from spatialtis.spatial.utils import NeighborsNotFoundError, normalize
from spatialtis.typing import Array, Number
from spatialtis.utils import doc
from spatialtis.utils.log import pbar_iter

try:
    import neighborhood_analysis as na
except ImportError:
    raise ImportError("Try pip install neighborhood_analysis")


[docs]@doc class NCDMarkers(AnalysisBase): """Identify neighbor cells dependent marker This method tells you the dependency and correlation between markers and its neighbor cell type. The dependency is calculated by building a gradiant boosting tree (in here XGBoost) to determine the feature importance. And the the spearman correlation is calculated. A reasonable std cutoff should be set, the marker expression need to have certain degree of variance. Args: data: {adata} exp_std_cutoff: Standard deviation, threshold to filter out markers that are not variant enough pval: {pval} selected_markers: {selected_markers} layers_key: {layers_key} tree_kwargs: {tree_kwargs} **kwargs: {analysis_kwargs} """ def __init__( self, data: AnnData, use_cell_type: bool = False, importance_cutoff: Number = 0.5, exp_std_cutoff: Number = 1.0, pval: Number = 0.01, selected_markers: Optional[Array] = None, layers_key: Optional[str] = None, tree_kwargs: Optional[Dict] = None, **kwargs, ): super().__init__(data, task_name="NCDMarkers", **kwargs) if not self.neighbors_exists: raise NeighborsNotFoundError("Run `find_neighbors` first before continue.") if self.cell_type_key not in self.data.obs.keys(): use_cell_type = False tree_kwargs_ = {"n_jobs": -1, "random_state": 0, "importance_type": "gain"} if tree_kwargs is not None: for k, v in tree_kwargs.items(): tree_kwargs_[k] = v if use_cell_type: result_data = [] neighbors = self.get_neighbors_ix_map() cent_cells = list(neighbors.keys()) df = self.data.obs.set_index(self.neighbors_ix_key) cell_types = df[self.cell_type_key] cent_type = cell_types[cent_cells] ix, col, data = na.neighbor_components( neighbors, dict(zip(df.index, cell_types)) ) neigh_comp = pd.DataFrame(data=data, index=ix, columns=col) markers, exp_matrix, cut_data = self.get_exp_matrix_fraction( markers=selected_markers, layers_key=layers_key, ) if len(markers) > 0: for t, g in pbar_iter( pd.DataFrame( {"cent_cell": cent_cells, "cent_type": cent_type, } ).groupby("cent_type"), desc="NCD Markers", ): cents = g["cent_cell"].values meta = ( cut_data.obs.reset_index(drop=True) .reset_index() .set_index(self.neighbors_ix_key) ) exp_ix = meta.loc[cents]["index"].values exp = exp_matrix[exp_ix] neigh_types = neigh_comp.loc[cents] cent_exp = exp.T for ix, m in enumerate(markers): y = cent_exp[ix].copy() max_type, max_weight, lo2_fc, pvalue = max_contri_marker( neigh_types, y, exp_std_cutoff, tree_kwargs_, importance_cutoff, pval, ) if (max_type is not None) & (max_type != t): result_data.append( [t, m, max_type, max_weight, lo2_fc, pvalue] ) self.result = pd.DataFrame( data=result_data, columns=[ "cell_type", "marker", "neighbor_type", "dependency", "log2_FC", "pvalue", ], ) else: results_data = [] neighbors = self.get_neighbors_ix_map() cent_cells = list(neighbors.keys()) df = self.data.obs.set_index(self.neighbors_ix_key) markers, exp_matrix, data = self.get_exp_matrix_fraction( markers=selected_markers, layers_key=layers_key, neighbors_ix=cent_cells, ) if len(markers) > 0: ix, col, data = na.neighbor_components( neighbors, dict(zip(df.index, df[self.cell_type_key])) ) neigh_comp = pd.DataFrame(data=data, index=ix, columns=col) neigh_types = neigh_comp.loc[cent_cells] cent_exp = exp_matrix.T for ix, m in enumerate( pbar_iter(markers, desc="NCD Markers", ) ): y = cent_exp[ix].copy() max_type, max_weight, log2_fc, pvalue = max_contri_marker( neigh_types, y, exp_std_cutoff, tree_kwargs_, importance_cutoff, pval, ) if max_type is not None: results_data.append([m, max_type, max_weight, log2_fc, pvalue]) self.result = pd.DataFrame( data=results_data, columns=["marker", "neighbor_type", "dependency", "log2_FC", "pvalue"], )
def max_contri_marker(x, y, exp_std_cutoff, tree_kw, importance_cutoff, pval): if np.std(y) > exp_std_cutoff: reg = LGBMRegressor(**tree_kw).fit(x, y) weights = np.asarray(reg.feature_importances_) weights = weights / weights.sum() if weights.mean() < 1: max_ix = np.argmax(weights) max_weight = weights[max_ix] max_type = x.columns[max_ix] if max_weight > importance_cutoff: x = x.copy() x["y"] = y at_neighbor = x.iloc[:, max_ix] != 0 at_neighbor_exp = normalize(x[at_neighbor]["y"].to_numpy()) non_at_neighbor_exp = normalize(x[~at_neighbor]["y"].to_numpy()) u, pvalue = mannwhitneyu(at_neighbor_exp, non_at_neighbor_exp) log2_fc = np.log2(at_neighbor_exp.mean() / non_at_neighbor_exp.mean()) if pvalue < pval: return max_type, max_weight, log2_fc, pvalue return None, None, None, None