Source code for xchrom.pp._utils

import numpy as np
import anndata
import h5py
import time
from typing import Tuple
from pathlib import Path
from typing import Union, Literal
from .._utils import make_bed_seqs_from_df, dna_1hot_2vec

def split_test(
    cellids: np.ndarray,
    peakids: np.ndarray,
    seed: int = 20,
    ratio: float = 0.9
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    split cellids and peakids into train/test sets
    
    Parameters
    ----------
    cellids: np.ndarray
        A 1D array of cell ids, to be split into train/test sets
    peakids: np.ndarray
        A 1D array of peak ids, to be split into train/test sets
    seed: int, default is 20
        random seed for reproducibility
    ratio: float, default is 0.9
        ratio of train set, the rest will be used for testing
        
    Returns
    -------
    trainval_cell: np.ndarray
        A 1D array of cell ids, to be used for training and validation
    test_cell: np.ndarray
        A 1D array of cell ids, to be used for testing
    trainval_peak: np.ndarray
        A 1D array of peak ids, to be used for training and validation
    test_peak: np.ndarray
        A 1D array of peak ids, to be used for testing
    """
    np.random.seed(seed=seed)
    test_cell = np.random.choice(cellids,int(len(cellids) * (1 - ratio)),replace=False)
    trainval_cell = np.setdiff1d(cellids, test_cell)
    test_peak = np.random.choice(peakids,int(len(peakids) * (1 - ratio)),replace=False)
    trainval_peak = np.setdiff1d(peakids, test_peak)
    return trainval_cell,test_cell,trainval_peak,test_peak

def make_h5_sparse(
    tmp_ad: anndata.AnnData, 
    h5_name: Union[str, Path], 
    input_fasta: Union[str, Path], 
    seq_len: int = 1344, 
    batch_size: int = 1000
    ):
    """
    save the sequence of peaks to h5 file,which is used to generate train/test data for XChrom model training
    
    Parameters
    ----------
    tmp_ad: anndata.AnnData
        anndata object, must have .var['chr','start','end']
    h5_name: Union[str, Path]
        name of the h5 file, which will be saved in the current directory
    input_fasta: Union[str, Path]
        path to the genome fasta file
    seq_len: int, default is 1344
        length of the sequence
    batch_size: int, default is 1000
        how many peaks to process at a time
    """
    h5_name = Path(h5_name)
    input_fasta = Path(input_fasta)
    t0 = time.time()
    n_peaks = tmp_ad.shape[1]
    bed_df = tmp_ad.var.loc[:,['chr','start','end']] # bed file
    bed_df.index = np.arange(bed_df.shape[0])
    # n_batch = int(np.floor(n_peaks/batch_size))
    n_batch = max(1, int(np.ceil(n_peaks / batch_size)))
    # batches = np.array_split(np.arange(n_peaks), n_batch) # split all peaks to process in batches
    batches = np.array_split(np.arange(n_peaks), n_batch) if n_peaks > 0 else []
    
    # create h5 file 
    # X is a matrix of n_peaks * 1344, which is the sequence of peaks
    f = h5py.File(h5_name, "w")
    ds_X = f.create_dataset("X",(n_peaks, seq_len),dtype="int8")

    # save to h5 file
    for i in range(len(batches)):
        idx = batches[i]
        # write X to h5 file
        seqs_dna,_ = make_bed_seqs_from_df(
            bed_df.iloc[idx,:],
            fasta_file=input_fasta,
            seq_len=seq_len,
        )
        dna_array_dense = [dna_1hot_2vec(x) for x in seqs_dna]
        dna_array_dense = np.array(dna_array_dense)
        ds_X[idx] = dna_array_dense
        t1 = time.time()
        total = t1-t0
        # print('process %d peaks takes %.1f s' %(i*batch_size, total))
    f.close()

[docs]def filter_multiome_data( ad_rna: anndata.AnnData, ad_atac: anndata.AnnData, filter_ratio: float = 0.05, species: Literal['mouse', 'human'] = 'human', min_genes: int = 0, min_cells: int = 0 ) -> Tuple[anndata.AnnData, anndata.AnnData]: """ Filter multiome RNA and ATAC data based on expression/accessibility thresholds and chromosomes Parameters ---------- ad_rna: anndata.AnnData RNA expression data (cells x genes) ad_atac: anndata.AnnData ATAC accessibility data (cells x peaks) filter_ratio: float, default is 0.05 Minimum ratio of cells in which a gene/peak should be expressed/accessible species: str, default is 'mouse' Species name to determine chromosome filtering. Supports 'mouse' and 'human' min_genes: int, default is 0 Minimum number of genes expressed per cell min_cells: int, default is 0 Minimum number of cells expressing each gene Returns ------- ad_rna_filtered: anndata.AnnData Filtered RNA data ad_atac_filtered: anndata.AnnData Filtered ATAC data Examples -------- >>> import xchrom as xc >>> ad_rna, ad_atac = xc.pp.filter_multiome_data( ... ad_rna=ad_rna, ... ad_atac=ad_atac, ... species='mouse', ... filter_ratio=0.05, ... min_genes=0, ... min_cells=0 ... ) """ import scanpy as sc # Make copies to avoid modifying original data ad_rna_filtered = ad_rna.copy() ad_atac_filtered = ad_atac.copy() # Basic filtering sc.pp.filter_cells(ad_rna_filtered, min_genes=min_genes) sc.pp.filter_genes(ad_rna_filtered, min_cells=min_cells) sc.pp.filter_cells(ad_atac_filtered, min_genes=min_genes) sc.pp.filter_genes(ad_atac_filtered, min_cells=min_cells) # Filter based on expression/accessibility ratio thres = int(ad_atac_filtered.shape[0] * filter_ratio) ad_rna_filtered = ad_rna_filtered[:, ad_rna_filtered.var['n_cells'] > thres] ad_atac_filtered = ad_atac_filtered[:, ad_atac_filtered.var['n_cells'] > thres] # Filter peaks by chromosome if species.lower() == 'mouse': # Mouse: chr1-chr19, chrX, chrY chrs = ['chr' + str(i) for i in range(1, 20)] + ['chrX', 'chrY'] elif species.lower() == 'human': # Human: chr1-chr22, chrX, chrY chrs = ['chr' + str(i) for i in range(1, 23)] + ['chrX', 'chrY'] else: raise ValueError(f"Unsupported species: {species}. Supported species are 'mouse' and 'human'") # Check if chromosome column exists in ATAC data if 'chr' in ad_atac_filtered.var.columns: ad_atac_filtered = ad_atac_filtered[:, ad_atac_filtered.var['chr'].isin(chrs)] else: print("Warning: 'chr' column not found in ATAC data. Skipping chromosome filtering.") print(f"RNA data after filtering: {ad_rna_filtered}") print(f"ATAC data after filtering: {ad_atac_filtered}") return ad_rna_filtered, ad_atac_filtered