【金山文档 | WPS云文档】 旁系同源基因对在 CDS、DNA 全长及 Promoter区域序列Divergence计算 https://www.kdocs.cn/l/cqploUR8tNpw
用于批量计算旁系同源基因对在 CDS、DNA 全长及 Promoter(2kb)区域的序列divergence(1-Identity),支持 GFF 解析与负链反向互补处理。
1. 输入文件准备
.fa: 基因组 FASTA 文件。.gff: 基因注释文件。.csv: 旁系同源基因对列表,包含gene1,gene2两列。
2.环境依赖
需要安装 pyfaidx, biopython, pandas, numpy
3. 核心代码
import re
import gc
import numpy as np
import pandas as pd
from pathlib import Path
from pyfaidx import Fasta
from Bio.Align import PairwiseAligner
#1. 配置路径与参数
BASE_DIR = Path("./result/HG_A2")
HG_divergence_DIR = BASE_DIR / "HG_divergence"
FASTA_FILE = Path("./data/Tgra.chr.fa")
GFF_FILE = Path("./data/Tgra.chr.no_fragment.gff")
GENE_PAIR_FILE = Path("./data/cs_1vs1.csv")
# 创建输出目录
HG_divergence_DIR.mkdir(parents=True, exist_ok=True)
#2. 核心函数定义
def revcomp(seq):
"""DNA序列反向互补"""
return seq.translate(str.maketrans("ACGTacgt", "TGCAtgca"))[::-1]
def load_gene_coords(gff_file):
"""解析GFF提取基因坐标"""
gene_coords = {}
with open(gff_file) as f:
for line in f:
if line.startswith("#") or not line.strip(): continue
fields = line.strip().split("\t")
if fields[2] != "gene": continue
chrom, start, end, strand, attributes = fields[0], int(fields[3])-1, int(fields[4]), fields[6], fields[8]
# 提取ID (适配PTG或evm格式)
m = re.search(r"ID=([^;]+)", attributes)
if m:
gene_id = m.group(1).replace("evm.TU.", "")
gene_coords[gene_id] = (chrom, start, end, strand)
return gene_coords
def load_cds_coords(gff_file):
"""解析GFF提取CDS外显子坐标"""
cds_coords = {}
with open(gff_file) as f:
for line in f:
if line.startswith("#") or not line.strip(): continue
fields = line.strip().split("\t")
if fields[2] != "CDS": continue
chrom, start, end, strand, attr = fields[0], int(fields[3])-1, int(fields[4]), fields[6], fields[8]
m = re.search(r"Parent=([^;]+)", attr)
if m:
gene_id = m.group(1).replace("evm.TU.", "")
cds_coords.setdefault(gene_id, []).append((chrom, start, end, strand))
return cds_coords
def get_cds_seq(gene_id, cds_dict, genome_fasta):
"""根据外显子坐标拼接完整的CDS序列"""
if gene_id not in cds_dict: return None
regions = cds_dict[gene_id]
strand = regions[0][3]
# 根据正负链调整外显子拼接顺序
regions_sorted = sorted(regions, key=lambda x: x[1], reverse=(strand == "-"))
seq_parts = [genome_fasta[r[0]][r[1]:r[2]].seq for r in regions_sorted]
full_seq = "".join(seq_parts)
return revcomp(full_seq) if strand == "-" else full_seq
def get_promoter_seq(gene_id, gene_dict, genome_fasta, length=2000):
"""提取转录起始位点上游的Promoter序列"""
if gene_id not in gene_dict: return None
chrom, s, e, strand = gene_dict[gene_id]
if strand == "+":
seq = genome_fasta[chrom][max(0, s-length):s].seq
else:
seq = revcomp(genome_fasta[chrom][e:e+length].seq)
return seq
def identity_calc(seq1, seq2, aligner):
"""全局比对并计算序列一致性"""
if not seq1 or not seq2: return np.nan
try:
aln = aligner.align(seq1, seq2)[0]
# 计算匹配数(排除掉两端及中间的所有Gap)
target, query = str(aln[0]), str(aln[1])
matches = sum(1 for a, b in zip(target, query) if a == b and a != "-")
valid_len = sum(1 for a, b in zip(target, query) if not (a == "-" and b == "-"))
return matches / valid_len if valid_len > 0 else np.nan
except:
return np.nan
#3. 执行主流程
# 加载参考基因组和坐标
print("[INFO] Loading Genome and GFF...")
genome = Fasta(str(FASTA_FILE), sequence_always_upper=True)
gene_coords = load_gene_coords(GFF_FILE)
cds_coords = load_cds_coords(GFF_FILE)
# 初始化比对器
aligner = PairwiseAligner(mode="global")
aligner.match_score, aligner.mismatch_score = 1, -1
aligner.open_gap_score, aligner.extend_gap_score = -2, -0.5
# 读取基因对列表
gene_pairs = pd.read_csv(GENE_PAIR_FILE).dropna().drop_duplicates()
results = []
total = len(gene_pairs)
print(f"[INFO] Processing {total} gene pairs...")
for i, row in gene_pairs.iterrows():
g1, g2 = row['gene1'], row['gene2']
# 提取序列
cds1, cds2 = get_cds_seq(g1, cds_coords, genome), get_cds_seq(g2, cds_coords, genome)
pr1, pr2 = get_promoter_seq(g1, gene_coords, genome), get_promoter_seq(g2, gene_coords, genome)
# 计算分歧度 (1 - Identity)
cds_div = 1 - identity_calc(cds1, cds2, aligner)
pr_div = 1 - identity_calc(pr1, pr2, aligner)
results.append({
"gene1": g1, "gene2": g2,
"cds_div": cds_div,
"promoter_div": pr_div
})
if (i+1) % 10 == 0:
print(f" Progress: {i+1}/{total}", end="\r")
gc.collect() # 及时释放内存
# 保存结果
df_div = pd.DataFrame(results).dropna()
output_path = HG_divergence_DIR / "divergence_results.csv"
df_div.to_csv(output_path, index=False)
print(f"\n[DONE] Divergence calculated. Results saved to: {output_path}")4.输出文件
divergence.csv: 包含 gene1, gene2, cds_div, dna_div, promoter_div 的详细divergence表。