一、前言
核心步骤:
- 生成多序列比对的一致性序列(Consensus Sequence)
- 计算序列相似性:BLAST 局部比对 + K-mer 无比对算法
- Wilcoxon 符号秩检验验证差异显著性
- 可视化图表
二、配置路径与基础环境
#配置库和文件路径
import subprocess
import pandas as pd
import re
import uuid
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns
from pathlib import Path
from Bio import SeqIO
from scipy.stats import wilcoxon
BASE_DIR = Path("/work/malefast/result")
PROMOTER_DIR = Path("/work/malefast/result/TF_Result/promoter_fasta")
TF_BASE_DIR = BASE_DIR / "TF_Result"
TF_BASE_DIR.mkdir(exist_ok=True)
TF_PROMOTER_DIR = TF_BASE_DIR / "promoter_fasta"
TF_PROMOTER_DIR.mkdir(exist_ok=True)
TF_RESULT_DIR = TF_BASE_DIR / "PM_sequence_similarity"
TF_RESULT_DIR.mkdir(exist_ok=True)三、生成一致性序列(Consensus)
一致性序列:多序列中每个位置出现频率最高的碱基组成的代表序列,用于后续雌雄序列对比。
- 自动忽略缺失碱基
- - 长度与输入序列完全一致
def majority_consensus(records):
if len(records) == 0:
return None
length = len(records[0].seq)
consensus = []
for i in range(length):
bases = [rec.seq[i] for rec in records if rec.seq[i] != "-"]
if bases:
consensus.append(max(set(bases), key=bases.count))
else:
consensus.append("-")
return "".join(consensus)四、BLAST 序列相似性计算
调用blastn计算两条一致性序列的局部相似性,过滤短比对、低相似度序列,保证结果可靠性。
#一致性序列相似性identity(用blastn计算两条序列的local similarity,返回best HSP identity(0–1),不满足条件返回 None)
def blast_identity(seq1, seq2, tmp_prefix,
min_len=100, min_id=70):
fa1 = f"{tmp_prefix}_1.fa"
fa2 = f"{tmp_prefix}_2.fa"
with open(fa1, "w") as f:
f.write(f">seq1\n{seq1}\n")
with open(fa2, "w") as f:
f.write(f">seq2\n{seq2}\n")
cmd = (
f"blastn -query {fa1} -subject {fa2} "
f"-outfmt '6 length pident bitscore' "
f"-max_hsps 1 -max_target_seqs 1"
)
try:
res = subprocess.check_output(cmd, shell=True, text=True).strip()
except subprocess.CalledProcessError:
return None, "blast_error"
finally:
for f in (fa1, fa2):
if Path(f).exists():
Path(f).unlink()
if not res:
return None, "no_blast_hit"
aln_len, pid, bits = res.split()
aln_len = int(aln_len)
pid = float(pid)
if aln_len < min_len:
return None, f"alignment_too_short(<{min_len})"
if pid < min_id:
return None, f"identity_too_low(<{min_id})"
return pid / 100.0, "OK"五、k-mer 无比对相似性计算
不依赖序列比对,基于 k-mer 交集计算全局相似性,适合长序列、存在插入缺失的场景。
# alignment-free similarity(k-mer based)
def kmer_similarity(seq1, seq2, k=5):
def get_kmers(seq, k):
return {seq[i:i+k] for i in range(len(seq) - k + 1) if "N" not in seq[i:i+k] and "-" not in seq[i:i+k]}
kmers1 = get_kmers(seq1, k)
kmers2 = get_kmers(seq2, k)
if not kmers1 or not kmers2:
return None
intersection = len(kmers1 & kmers2)
union = len(kmers1 | kmers2)
return intersection / union六、主流程 - 批量处理基因对
遍历所有基因对文件夹,生成一致性序列→计算相似性→保存结果,支持断点续跑(已生成的一致性序列不重复计算)。
# 获取所有基因对文件夹
pair_dirs = sorted([
d for d in PROMOTER_DIR.iterdir()
if d.is_dir() and d.name.endswith("_2000bp")
])
print(f"Total gene pairs: {len(pair_dirs)}")
results = []
blast_failed = [] # 记录计算失败的基因对
# 批量处理每个基因对
for i, pair_dir in enumerate(pair_dirs, 1):
print(f"[{i}/{len(pair_dirs)}] Processing: {pair_dir.name}")
gene1, gene2, _ = pair_dir.name.split("_", 2)
# 定义序列文件路径
g1_male_fa = pair_dir / f"{gene1}_male.fa"
g1_female_fa = pair_dir / f"{gene1}_female.fa"
g2_male_fa = pair_dir / f"{gene2}_male.fa"
g2_female_fa = pair_dir / f"{gene2}_female.fa"
# 跳过文件缺失的基因对
if not all([g1_male_fa.exists(), g1_female_fa.exists(),
g2_male_fa.exists(), g2_female_fa.exists()]):
continue
# 读取FASTA序列
g1_male = list(SeqIO.parse(g1_male_fa, "fasta"))
g1_female = list(SeqIO.parse(g1_female_fa, "fasta"))
g2_male = list(SeqIO.parse(g2_male_fa, "fasta"))
g2_female = list(SeqIO.parse(g2_female_fa, "fasta"))
# 生成雌雄一致性序列
g1_m_cons = majority_consensus(g1_male)
g2_m_cons = majority_consensus(g2_male)
g1_f_cons = majority_consensus(g1_female)
g2_f_cons = majority_consensus(g2_female)
# 跳过生成失败的序列
if None in (g1_m_cons, g2_m_cons, g1_f_cons, g2_f_cons):
continue
# 保存一致性序列(断点续跑:已存在则跳过)
consensus_files = {
f"{gene1}_male_consensus.fa": g1_m_cons,
f"{gene2}_male_consensus.fa": g2_m_cons,
f"{gene1}_female_consensus.fa": g1_f_cons,
f"{gene2}_female_consensus.fa": g2_f_cons
}
for fname, seq in consensus_files.items():
fpath = pair_dir / fname
if not fpath.exists():
with open(fpath, "w") as f:
f.write(f">{fname.split('.')[0]}\n{seq}\n")
# 计算相似性
# gene1 雌雄对比
g1_id, g1_reason = blast_identity(g1_m_cons, g1_f_cons, tmp_prefix=pair_dir / f"g1_{uuid.uuid4()}")
g1_kmer = kmer_similarity(g1_m_cons, g1_f_cons)
# gene2 雌雄对比
g2_id, g2_reason = blast_identity(g2_m_cons, g2_f_cons, tmp_prefix=pair_dir / f"g2_{uuid.uuid4()}")
g2_kmer = kmer_similarity(g2_m_cons, g2_f_cons)
# 记录失败的基因对
if g1_id is None or g2_id is None:
blast_failed.append({
"pair": pair_dir.name,
"gene1_status": "OK" if g1_id else g1_reason,
"gene2_status": "OK" if g2_id else g2_reason
})
continue
# 保存成功结果
results.append({
"pair": pair_dir.name,
"gene1_identity": g1_id,
"gene2_identity": g2_id,
"gene1_kmer_similarity": g1_kmer,
"gene2_kmer_similarity": g2_kmer
})七、结果整理与统计检验
计算序列分化值(1 - 相似度),用 Wilcoxon 检验判断雌雄序列是否显著分化。
#输出记录similarity失败的基因对的文件
if blast_failed:
failed_df = pd.DataFrame(blast_failed)
failed_df.to_csv(
TF_RESULT_DIR / "blast_similarity_filtered_pairs.csv",
index=False
)
#保存成功计算similarity的结果
df = pd.DataFrame(results)
#deviation
df["gene1_deviation"] = 1 - df["gene1_identity"]
df["gene2_deviation"] = 1 - df["gene2_identity"]
# global deviation(k-mer based)
df["gene1_kmer_deviation"] = 1 - df["gene1_kmer_similarity"]
df["gene2_kmer_deviation"] = 1 - df["gene2_kmer_similarity"]
df.to_csv(TF_RESULT_DIR / "gene_pair_similarity_with_kmer.csv", index=False)
#合并identity
all_identity = pd.concat([
df["gene1_identity"],
df["gene2_identity"]
])
#deviation
all_deviation = 1 - all_identity
#Wilcoxon signed-rank test (检验identity是否显著偏离1.0)
stat, p = wilcoxon(all_deviation)
#alignment-free global analysis
all_kmer = pd.concat([
df["gene1_kmer_similarity"],
df["gene2_kmer_similarity"]
]).dropna()
all_kmer_dev = 1 - all_kmer
stat_kmer, p_kmer = wilcoxon(all_kmer_dev)
kmer_test_df = pd.DataFrame([{
"test": "Wilcoxon_signed_rank_kmer",
"n": len(all_kmer),
"statistic": stat_kmer,
"p_value": p_kmer,
"mean_similarity": all_kmer.mean(),
"median_similarity": all_kmer.median(),
"mean_deviation": all_kmer_dev.mean(),
"median_deviation": all_kmer_dev.median()
}])
kmer_test_df.to_csv(TF_RESULT_DIR / "kmer_similarity_test.csv", index=False)
#保存结果
test_df = pd.DataFrame([{
"test": "Wilcoxon_signed_rank",
"n": len(all_identity),
"statistic": stat,
"p_value": p,
"mean_identity": all_identity.mean(),
"median_identity": all_identity.median(),
"mean_deviation": all_deviation.mean(),
"median_deviation": all_deviation.median()
}])
test_df.to_csv(TF_RESULT_DIR / "identity_vs_1_test.csv", index=False)八、结果可视化
绘制序列分化分布直方图,直观展示雌雄启动子分化模式。
#数据重塑为长格式以便绘图
df_long = pd.melt(
df,
id_vars="pair",
value_vars=["gene1_identity", "gene2_identity"],
var_name="Sex",
value_name="Identity"
)
df_long["Sex"] = df_long["Sex"].map({
"gene1_identity": "Gene1",
"gene2_identity": "Gene2"
})
#seaborn 箱线图
mpl.rcParams['pdf.fonttype'] = 42
plt.rcParams['font.family'] = 'DejaVu Sans' #修改字体
plt.figure(figsize=(6,6))
sns.histplot(all_deviation, bins=50, kde=True)
plt.axvline(0.0, linestyle="--") #0表示无divergence
plt.xlabel("Promoter divergence (1 - identity)")
plt.ylabel("Frequency")
plt.title("Distribution of promoter divergence between sexes")
plt.tight_layout()
plt.savefig(TF_RESULT_DIR / "deviation_distribution.pdf", dpi=300)
plt.close()
plt.figure(figsize=(6,6))
sns.histplot(all_kmer_dev, bins=50, kde=True)
plt.axvline(0.0, linestyle="--")
plt.xlabel("Global promoter divergence (k-mer)")
plt.ylabel("Frequency")
plt.title("Alignment-free promoter divergence between sexes")
plt.tight_layout()
plt.savefig(TF_RESULT_DIR / "kmer_deviation_distribution.pdf", dpi=300)
plt.close()
print("DONE")九、输出结果说明(完整结果目录结构--清晰树状版)
/work/malefast/result/
└── TF_Result/ # 转录因子分析总目录
├── promoter_fasta/ # 原始+生成的序列目录(输入+中间文件)
│ ├── geneA_geneB_2000bp/ # 基因对文件夹(示例)
│ │ ├── geneA_male.fa # 原始雄性序列
│ │ ├── geneA_female.fa # 原始雌性序列
│ │ ├── geneB_male.fa
│ │ ├── geneB_female.fa
│ │ ├── geneA_male_consensus.fa # 脚本生成:雄性一致性序列
│ │ ├── geneA_female_consensus.fa # 脚本生成:雌性一致性序列
│ │ ├── geneB_male_consensus.fa
│ │ └── geneB_female_consensus.fa
│ │
│ └── geneX_geneY_2000bp/ # 其他基因对文件夹(结构同上)
│
└── PM_sequence_similarity/ # 最终结果目录(核心输出)
├── gene_pair_similarity_with_kmer.csv # 所有基因对相似性+分化值
├── blast_similarity_filtered_pairs.csv # BLAST失败基因对及原因
├── identity_vs_1_test.csv # BLAST相似度统计检验
├── kmer_similarity_test.csv # k-mer相似度统计检验
├── deviation_distribution.pdf # 雌雄分化分布图(BLAST)
└── kmer_deviation_distribution.pdf # 雌雄分化分布图(k-mer)金山文档链接:https://www.kdocs.cn/l/che14t5VOV6H