Source code for xchrom.tl._utils

import os
from pathlib import Path
from typing import Union, Literal
import pandas as pd
import h5py
import anndata
import scanpy as sc
from Bio import SeqIO
from sklearn.metrics import roc_auc_score, average_precision_score
import numpy as np
from tqdm import tqdm
from .._utils import make_bed_seqs_from_df, dna_1hot_2vec


[docs]def calc_auc_pr( true_matrix:np.array, pred_matrix:np.array, mtype: Literal['overall', 'percell', 'perpeak'] = 'overall' ) -> dict: """ Calculate the AUROC and AUPRC metrics for given true and predicted matrix. Shape of true_matrix and pred_matrix must be consistent, cells x peaks. Parameters ---------- true_matrix: np.array The true label matrix (cells x peaks), 0/1 matrix, 0 for no signal, 1 for peak signal. pred_matrix: np.array, The predicted probability matrix (cells x peaks), 0-1 matrix for peak signal. mtype: Literal['overall', 'percell', 'perpeak'] = 'overall' The type of metric to calculate, optional 'overall', 'percell', 'perpeak' Examples -------- >>> auc, prc = calc_auc_pr(true_matrix, pred_matrix, mtype='overall') >>> print(f"The AUROC is {auc:.4f}, The AUPRC is {prc:.4f}") """ # check the shape of the matrix assert true_matrix.shape == pred_matrix.shape, "The shape of the true matrix and predicted matrix are not consistent" if mtype == 'overall': # calculate the overall metrics y_true = true_matrix.ravel() y_pred = pred_matrix.ravel() return { 'auroc': roc_auc_score(y_true, y_pred), 'auprc': average_precision_score(y_true, y_pred) } elif mtype == 'percell': # calculate the metrics per cell aurocs, auprcs = [], [] for i in range(pred_matrix.shape[0]): y_true = true_matrix[i, :] # skip the cell with only one class if len(np.unique(y_true)) > 1: aurocs.append(roc_auc_score(y_true, pred_matrix[i, :])) auprcs.append(average_precision_score(y_true, pred_matrix[i, :])) return { 'auroc': np.mean(aurocs) if aurocs else 0, 'auprc': np.mean(auprcs) if auprcs else 0, 'n_cells': len(aurocs) } elif mtype == 'perpeak': # calculate the metrics per peak aurocs, auprcs = [], [] for j in range(pred_matrix.shape[1]): y_true = true_matrix[:, j] # skip the peak with only one class if len(np.unique(y_true)) > 1: aurocs.append(roc_auc_score(y_true, pred_matrix[:, j])) auprcs.append(average_precision_score(y_true, pred_matrix[:, j])) return { 'auroc': np.mean(aurocs) if aurocs else 0, 'auprc': np.mean(auprcs) if auprcs else 0, 'n_peaks': len(aurocs) } else: raise ValueError("metrics type must be 'overall', 'percell' or 'perpeak'")
[docs]def calc_nsls_score( ad_rna:anndata.AnnData, ad_atac:anndata.AnnData, n:int = 100, label:str = 'celltype', test_cells:Union[list, np.ndarray] = None, use_rep_rna:str = 'X_pca', use_rep_atac:str = 'X_pca' ): """ Calculate the cluster metrics of scATAC data, including the number of shared neighbors and labels. Parameters ---------- ad_rna: anndata.AnnData scRNA-seq data, used to calculate scRNA cell neighborhoods, which should have been processed with scanpy or others. Need to have raw cell represenation from scRNA-seq data, such as 'X_pca'. ad_atac: anndata.AnnData scATAC-seq raw or predicted data, used to calculate scATAC cell neighborhoods Must contain cell types, or clustering results from paired scRNA-seq data to calculate label scores n: int The number of neighbors in different scales, such as 100, 50, 10. label: str The key name of the cell type labels, default is 'celltype', or 'leiden' from scRNA-seq data. Should be assighed to ad_atac.obs[label] test_cells: list The cells to be computed, if None, all cells will be computed. use_rep_rna: str The key name of the scRNA cells dimension reduction, default is 'X_pca', to compute and generate scRNA neighbors list, which can be regarded as a genuine neighbor relationship. use_rep_atac: str The key name of the scATAC cells dimension reduction, default is 'X_pca' from scanpy(scanpy.tl.pca), to compute and generate scATAC neighbors list. Returns ------- the ratio of shared neighbors: float The number of shared neighbors divided by the number of neighbors. the ratio of shared labels: float The number of shared labels divided by the number of neighbors. Data Requirements ----------------- ad_rna.obsm must contain: - Cell dimension reduction (default: 'X_pca') for computing neighborhoods from scRNA-seq data ad_atac.obsm must contain: - Cell dimension reduction (default: 'X_pca') for computing neighborhoods from scRNA-seq data ad_atac.obs must contain: - Cell type labels (default: 'celltype') for label consistency evaluation,which can be from true cell type labels or paired scRNA-seq clustering results Examples -------- >>> # Calculate the cluster metrics of scATAC data, including the number of shared neighbors and labels. >>> ns, ls = calc_nsls_score(ad_rna, ad_atac, n=100, label='celltype', test_cells=None, use_rep_rna='X_pca', use_rep_atac='X_pca') >>> print(f"The number of shared neighbors: {ns:.4f}, The number of shared labels: {ls:.4f}") """ sc.pp.neighbors(ad_rna, n_neighbors=n+1, use_rep=use_rep_rna) m_RNA_neighbors = [i.indices for i in ad_rna.obsp['distances']] # scRNA neighbors index sc.pp.neighbors(ad_atac, n_neighbors=n+1, use_rep=use_rep_atac) m_ATAC_neighbors = [i.indices for i in ad_atac.obsp['distances']] # scATAC neighbors index # if test_cells is not None, only compute the cells in test_cells if test_cells is not None: # only keep the neighbors of test_cells test_RNA_neighbors = [m_RNA_neighbors[i] for i in test_cells] test_ATAC_neighbors = [m_ATAC_neighbors[i] for i in test_cells] # Calculate the number of shared neighbors n_shared_neighbors = np.mean([len(np.intersect1d(i, j)) for i, j in zip(test_RNA_neighbors, test_ATAC_neighbors)]) # calculate the number of shared labels neighbor_label = ad_atac.obs[label].values[np.concatenate(test_ATAC_neighbors, axis=0)] cell_label = ad_atac.obs[label].values[np.repeat(test_cells, [len(m_ATAC_neighbors[i]) for i in test_cells])] n_shared_labels = (neighbor_label == cell_label).sum() / len(test_cells) else: # if test_cells is not None, use the global data n_shared_neighbors = np.mean([len(np.intersect1d(i, j)) for i, j in zip(m_RNA_neighbors, m_ATAC_neighbors)]) neighbor_label = ad_atac.obs[label].values[np.concatenate(m_ATAC_neighbors, axis=0)] cell_label = ad_atac.obs[label].values[np.repeat(np.arange(len(m_ATAC_neighbors)), [len(j) for j in m_ATAC_neighbors])] n_shared_labels = (neighbor_label == cell_label).sum() / len(m_ATAC_neighbors) # calculate UMAP sc.tl.umap(ad_atac) return n_shared_neighbors/n, n_shared_labels/n
[docs]def calc_pca( ad:anndata.AnnData, max_value:float = None, n_comps:int = None ) -> anndata.AnnData: """ Calculate the PCA of the given data, and save the PCA components in .obsm['X_pca']. Parameters ---------- ad: anndata.AnnData The data to calculate the PCA. max_value: float = None The maximum value of the data. n_comps: int = None The number of components to calculate. Returns ------- anndata.AnnData, With the PCA components in .obsm['X_pca']. Examples -------- >>> ad = calc_pca(ad, max_value=10, n_comps=32) >>> print(f"The PCA components are saved in ad.obsm['X_pca']") """ sc.pp.normalize_total(ad, target_sum=1e4) sc.pp.log1p(ad) sc.pp.highly_variable_genes(ad) ad = ad[:, ad.var.highly_variable] sc.pp.scale(ad,max_value=max_value) sc.tl.pca(ad,n_comps = n_comps) return ad
[docs]def bed_to_fasta( bed_input:Union[str, Path, pd.DataFrame], fasta_file:Union[str, Path], output_file:Union[str, Path], seq_len:int = 1344, stranded:bool = False ): """ Extract sequences from BED file and write to FASTA file. Parameters ---------- bed_input: str, Path, or DataFrame The path to the BED file, or a pandas DataFrame with 'chr', 'start', 'end' columns. fasta_file: Union[str, Path] The path to the reference genome FASTA file. output_file: Union[str, Path] The path to the output FASTA file. seq_len: int, default 1344 The length of the sequences to extract. stranded: bool, default False Whether to consider strand information. Returns ------- tuple (seqs, coords) - The list of sequences and coordinates. Examples -------- # Convert BED file to FASTA file seqs, coords = write_fasta("peaks.bed", "genome.fasta", "output.fasta", seq_len=1344) # Convert BED DataFrame to FASTA file seqs, coords = write_fasta(bed_df, "genome.fasta", "output.fasta", seq_len=1344) # Consider strand information seqs, coords = write_fasta("peaks.bed", "genome.fasta", "output.fasta", seq_len=1344, stranded=True) """ print(f"Extracting sequences from BED file...") # print(f"BED input: {bed_input}") print(f"Reference genome: {fasta_file}") print(f"Sequence length: {seq_len}") print(f"Consider strand information: {stranded}") # Call make_bed_seqs_from_df function to extract sequences seqs, coords = make_bed_seqs_from_df( input_bed=bed_input, fasta_file=fasta_file, seq_len=seq_len, stranded=stranded ) print(f"Successfully extracted {len(seqs)} sequences") # Write to FASTA file print(f"Writing to FASTA file: {output_file}") with open(output_file, 'w') as out: for coord, seq in zip(coords, seqs): if len(coord) == 3: chrom, start, end = coord header = f">{chrom}:{start}-{end}" elif len(coord) == 4: chrom, start, end, strand = coord header = f">{chrom}:{start}-{end}({strand})" else: # Fallback handling, if the coordinate format is not standard header = f">seq_{coords.index(coord)}" out.write(header + "\n") out.write(seq + "\n") print(f"FASTA file saved to: {output_file}") print(f"Number of sequences written: {len(seqs)}") return seqs, coords
def fasta_to_h5( input_fasta:Union[str, Path], seq_len:int = 1344 ): """ Encode the sequences in the FASTA file into HDF5 format. Parameters ---------- input_fasta: Union[str, Path] The path to the FASTA file. seq_len: int The length of the sequences. Returns ------- None """ # Convert "motif.fasta" → "motif.h5" output_h5 = os.path.splitext(input_fasta)[0] + ".h5" records = list(SeqIO.parse(input_fasta, "fasta")) num_seqs = len(records) dna_array = np.zeros((num_seqs, seq_len), dtype="int8") for i, record in enumerate(records): seq = str(record.seq).upper() dna_array[i] = dna_1hot_2vec(seq, seq_len) with h5py.File(output_h5, "w") as f: f.create_dataset("X", data=dna_array, dtype="int8") dt = h5py.string_dtype(encoding='utf-8') ids = [record.id for record in records] f.create_dataset("ids", data=np.array(ids, dtype=dt)) def generate_h5_files( motif_dir:Union[str, Path] ): """ Generate HDF5 files from FASTA files. Parameters ---------- motif_dir: Union[str, Path] The path to the directory containing the FASTA files. Returns ------- tuple (h5_files, motif_names) Examples -------- >>> h5_files, motif_names = generate_h5_files("motifs") >>> print(h5_files) >>> print(motif_names) """ fasta_files = sorted([f for f in os.listdir(motif_dir) if f.endswith('.fasta')]) h5_files = [] motif_names = [] for fasta_file in tqdm(fasta_files, desc="Generating H5 files"): fasta_path = os.path.join(motif_dir, fasta_file) h5_path = os.path.splitext(fasta_path)[0] + ".h5" try: fasta_to_h5(fasta_path) h5_files.append(h5_path) motif_names.append(os.path.splitext(fasta_file)[0]) except Exception as e: print(f"Error processing {fasta_file}: {str(e)}") continue return h5_files, motif_names def shuffle_sequences(sequences, k=2, seed=10): """ Shuffle the sequences while keeping the k-mer frequency. Need to install fasta_ushuffle first. Parameters ---------- sequences: list Input sequence list k: int Length of k-mer to keep (default 2=dinucleotide frequency) seed: int Random seed Returns ------- list: Shuffled sequences list """ import subprocess import tempfile import os import shutil import sys # Try multiple ways to find fasta_ushuffle fasta_ushuffle_path = None # Method 1: Use shutil.which with current PATH fasta_ushuffle_path = shutil.which('fasta_ushuffle') # Method 2: Check current Python environment's bin directory if fasta_ushuffle_path is None: # Get the directory where current Python executable is located python_dir = os.path.dirname(sys.executable) potential_path = os.path.join(python_dir, 'fasta_ushuffle') if os.path.exists(potential_path) and os.access(potential_path, os.X_OK): fasta_ushuffle_path = potential_path # Method 3: Check CONDA_PREFIX environment variable if fasta_ushuffle_path is None: conda_env = os.environ.get('CONDA_PREFIX') if conda_env: potential_path = os.path.join(conda_env, 'bin', 'fasta_ushuffle') if os.path.exists(potential_path) and os.access(potential_path, os.X_OK): fasta_ushuffle_path = potential_path # Method 4: Try to infer conda environment from Python path if fasta_ushuffle_path is None: python_path = sys.executable # Check if we're in a conda environment (path contains /envs/ or /miniconda3/bin) if '/envs/' in python_path or '/miniconda3/bin' in python_path or '/anaconda3/bin' in python_path: # Extract environment root path if '/envs/' in python_path: env_root = python_path.split('/bin/python')[0] else: env_root = os.path.dirname(python_path) potential_path = os.path.join(env_root, 'fasta_ushuffle') if os.path.exists(potential_path) and os.access(potential_path, os.X_OK): fasta_ushuffle_path = potential_path # Method 5: Try common system paths as last resort if fasta_ushuffle_path is None: common_paths = [ '/usr/local/bin/fasta_ushuffle', '/usr/bin/fasta_ushuffle', '/opt/conda/bin/fasta_ushuffle' ] for path in common_paths: if os.path.exists(path) and os.access(path, os.X_OK): fasta_ushuffle_path = path break if fasta_ushuffle_path is None: raise Exception( f"fasta_ushuffle not found in current Python environment.\n" f"Current Python: {sys.executable}\n" f"Please install fasta_ushuffle in your current conda environment:\n" f"conda install bioconda::fasta_ushuffle" ) print(f"Using fasta_ushuffle from: {fasta_ushuffle_path}") # create temporary input file with tempfile.NamedTemporaryFile(mode='w', suffix='.fasta', delete=False) as f: for i, seq in enumerate(sequences): f.write(f">seq_{i}\n{seq.upper()}\n") input_file = f.name try: # Use full path and inherit current environment cmd = f"{fasta_ushuffle_path} -k {k} -seed {seed} < {input_file}" # Get current environment and ensure PATH is properly set env = os.environ.copy() result = subprocess.run( cmd, shell=True, capture_output=True, text=True, check=True, env=env, executable='/bin/bash' # Explicitly use bash ) # parse FASTA output shuffled_sequences = [] current_seq = "" for line in result.stdout.strip().split('\n'): if line.startswith('>'): if current_seq: shuffled_sequences.append(current_seq) current_seq = "" else: current_seq += line.strip() if current_seq: shuffled_sequences.append(current_seq) return shuffled_sequences except subprocess.CalledProcessError as e: raise Exception(f"fasta_ushuffle failed: {e.stderr}. Install with: conda install bioconda::fasta_ushuffle") except FileNotFoundError: raise Exception("fasta_ushuffle not found. Please install it first: conda install bioconda::fasta_ushuffle") finally: try: os.unlink(input_file) except: pass def read_meme_motifs(meme_file): """ Read MEME format motif file Parameters ---------- meme_file: str MEME format motif file path Returns ------- motifs: list motif information list, each element is a dict containing motif name and pwm """ motifs = [] current_motif = None pwm_lines = [] reading_pwm = False with open(meme_file, 'r') as f: for line in f: line = line.strip() # remove leading and trailing whitespace if line.startswith('MOTIF'): # save previous motif if current_motif is not None and pwm_lines: pwm_matrix = np.array([list(map(float, line.split())) for line in pwm_lines]) current_motif['pwm'] = pwm_matrix motifs.append(current_motif) # start new motif parts = line.split() # default split by whitespace (including space, tab, newline, etc.) if len(parts) >= 2: motif_name = parts[2] # clean motif name, remove parentheses motif_name = motif_name.replace('(', '').replace(')', '') if '_' in motif_name: motif_name = motif_name.split('_')[0] current_motif = {'name': motif_name} pwm_lines = [] reading_pwm = False elif line.startswith('letter-probability matrix'): reading_pwm = True pwm_lines = [] elif reading_pwm and line and not line.startswith('URL'): # check if it is a number line try: values = list(map(float, line.split())) if len(values) == 4: # A, C, G, T pwm_lines.append(line) except: reading_pwm = False # process the last motif if current_motif is not None and pwm_lines: pwm_matrix = np.array([list(map(float, line.split())) for line in pwm_lines]) current_motif['pwm'] = pwm_matrix motifs.append(current_motif) return motifs def generate_motif_sequences(pwm, n_sequences=1000, seed=10): """ Generate motif sequences based on PWM. Parameters ---------- pwm: np.array Position weight matrix (length x 4), 4 columns correspond to A,C,G,T n_sequences: int Number of sequences to generate seed: int Random seed Returns ------- sequences: list Generated motif sequences list """ np.random.seed(seed) bases = ['A', 'C', 'G', 'T'] motif_seqs = [] # Normalize PWM to ensure each row sums to 1 (fix floating point precision issues) pwm_normalized = pwm / pwm.sum(axis=1, keepdims=True) for _ in range(n_sequences): seq = '' for pos in range(pwm_normalized.shape[0]): # select base based on PWM probability base_idx = np.random.choice(4, p=pwm_normalized[pos]) seq += bases[base_idx] motif_seqs.append(seq) return motif_seqs def insert_motif_to_background(background_seqs, motif_seqs): """ Insert motif sequences into the center of background sequences Parameters ---------- background_seqs: list background sequences list motif_seqs: list motif sequences list Returns ------- inserted_seqs: list Inserted sequences list """ inserted_seqs = [] motif_len = len(motif_seqs[0]) for i, bg_seq in enumerate(background_seqs): if i < len(motif_seqs): # calculate insertion position seq_len = len(bg_seq) left_coord = seq_len // 2 - motif_len // 2 # 插入motif left_part = bg_seq[:left_coord] right_part = bg_seq[left_coord + motif_len:] inserted_seq = left_part + motif_seqs[i] + right_part inserted_seqs.append(inserted_seq) else: # if motif sequence is not enough, repeat use motif_idx = i % len(motif_seqs) seq_len = len(bg_seq) left_coord = seq_len // 2 - motif_len // 2 left_part = bg_seq[:left_coord] right_part = bg_seq[left_coord + motif_len:] inserted_seq = left_part + motif_seqs[motif_idx] + right_part inserted_seqs.append(inserted_seq) return inserted_seqs
[docs]def generate_tf_activity_data( bed_file: Union[str, Path], input_fasta: Union[str, Path], motif_file: Union[str, Path], output_dir: Union[str, Path], n_samples: int = 1000, seq_len: int = 1344, n_motif_instances: int = 1000, seed: int = 10 ): """ Prepare motif data and background sequences for TF activity calculation Parameters ---------- bed_file: Union[str, Path] BED file path, containing peak regions input_fasta: Union[str, Path] Reference genome FASTA file path motif_file: Union[str, Path] MEME format motif file path output_dir: Union[str, Path] Output directory path for the generated data n_samples: int, default 1000 Number of sampled peaks seq_len: int, default 1344 Sequence length n_motif_instances: int, default 1000 Number of instances to generate for each motif seed: int, default 10 Random seed Returns ------- tuple (background_fasta_path, motif_dir_path) - background sequence file path and motif directory path Examples -------- >>> bg_fasta, motif_dir = prepare_motif_data( ... bed_file="peaks.bed", ... input_fasta="hg38.fa", ... motif_file="motifs.meme", ... output_dir="./motif_fasta", ... n_samples=1000, ... seed=10 ... ) """ print("=== Start preparing motif data ===") # create output directory output_dir = Path(output_dir) output_dir.mkdir(exist_ok=True, parents=True) # 1. Read BED file and sample print(f"1. Read BED file: {bed_file}") bed_df = pd.read_csv(bed_file, sep='\t', header=None, names=['chr', 'start', 'end'] + [f'col{i}' for i in range(3, 10)]) # adjust region size to 1344bp, center bed_df['center'] = (bed_df['start'] + bed_df['end']) // 2 bed_df['start'] = bed_df['center'] - seq_len // 2 bed_df['end'] = bed_df['center'] + seq_len // 2 bed_df = bed_df[['chr', 'start', 'end']] # random sampling np.random.seed(seed) if len(bed_df) > n_samples: sampled_indices = np.random.choice(len(bed_df), n_samples, replace=False) bed_df = bed_df.iloc[sampled_indices].reset_index(drop=True) print(f"Sampled {len(bed_df)} regions from {bed_file}") # 2. Extract background sequences print("2. Extract background sequences...") ref_fasta_path = output_dir / "ref_peaks1000.fasta" seqs, _ = bed_to_fasta(bed_df, input_fasta, ref_fasta_path, seq_len) print(f"Extracted {len(seqs)} sequences, saved to {ref_fasta_path}") # 3. Generate shuffled background sequences print("3. Generate shuffled background sequences...") shuffled_seqs = shuffle_sequences(seqs, seed=seed) background_fasta_path = output_dir / "shuffled_peaks.fasta" with open(background_fasta_path, 'w') as f: for i, seq in enumerate(shuffled_seqs): f.write(f">shuffled_seq_{i}\n") f.write(seq + "\n") print(f"Dinucleotide shuffled {len(shuffled_seqs)} sequences, saved to {background_fasta_path}") # 4. Read motif file (meme format) print("4. Read motif file(meme format)...") motifs = read_meme_motifs(motif_file) # list of dicts, each dict contains motif name and pwm print(f"Read {len(motifs)} motifs from {motif_file}") # 5. Generate motif inserted sequences print("5. Generate motif inserted sequences...") motif_output_dir = output_dir / "shuffled_peaks_motifs" motif_output_dir.mkdir(exist_ok=True) for motif in tqdm(motifs, desc="Processing motifs"): try: # generate motif sequence instances motif_instances = generate_motif_sequences( motif['pwm'], n_motif_instances, seed=seed ) # insert into background sequences inserted_seqs = insert_motif_to_background(shuffled_seqs, motif_instances) # write to fasta file motif_fasta_path = motif_output_dir / f"{motif['name']}.fasta" with open(motif_fasta_path, 'w') as f: for i, seq in enumerate(inserted_seqs): f.write(f">motif_{motif['name']}_{i}\n") f.write(seq + "\n") except Exception as e: print(f"Error processing motif {motif['name']}: {str(e)}") continue print("=== motif data preparation completed ===") print(f"Background sequence file: {background_fasta_path}") print(f"Motif inserted sequence directory: {motif_output_dir}") return str(background_fasta_path), str(motif_output_dir)