一、前言
核心步骤:
- 从 VCF 文件自动区分雌雄样本,生成分组列表
- 基于 GFF 注释精准提取基因上游 2000bp 启动子序列
- 调用 FIMO 工具预测转录因子结合位点(支持 PlantTFDB、JASPAR 数据库)
- 统计雌雄样本 TF 结合位点数目、TF 家族分布
- 计算 Jaccard/Bray-Curtis/JS 距离量化雌雄结合模式分化
- 采用 Mann-Whitney U 检验、Wilcoxon 检验验证差异显著性
- 自动生成小提琴图、火山图等可视化结果
- 全程质控:记录缺失基因、异常短启动子序列
二、环境配置与文件路径
#配置库和文件路径
import subprocess
import pandas as pd
import re
import gzip
import csv
import uuid
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns
import numpy as np
from pathlib import Path
from scipy.stats import mannwhitneyu
from scipy.stats import wilcoxon
from pyfaidx import Fasta
from collections import Counter
from scipy.spatial.distance import braycurtis, jensenshannon
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
# 结果目录
BASE_DIR = Path("/work/malefast/result")
TF_BASE_DIR = BASE_DIR / "TF_Result"
TF_PROMOTER_DIR = TF_BASE_DIR / "promoter_fasta"
TF_RESULT_DIR = TF_BASE_DIR / "tf_binding"
TF_BASE_DIR.mkdir(exist_ok=True)
TF_PROMOTER_DIR.mkdir(exist_ok=True)
TF_RESULT_DIR.mkdir(exist_ok=True)
# 输入文件
PLANTTFDB_MOTIF = Path("/work/malefast/data/PlantTFDB.meme")
JASPAR_PLANT_MOTIF = Path("/work/malefast/data/JASPAR_plants.meme")
VCF_FILE = Path("/work/malefast/data/2_female.clean.vcf.gz")
FASTA_FILE = Path("/work/malefast/data/Tgra.chr.fa")
GFF_FILE = Path("/work/malefast/data/Tgra.chr.no_fragment.gff")
GPINFO_FILE = Path("/work/malefast/data/cs_1vs1.csv")
# 异常记录
missing_genes = []
bad_promoters = []三、雌雄样本解析
从 VCF 文件头部自动识别样本名称,F 开头为雌性,M 开头为雄性,并生成样本列表文件。
def parse_sex_samples(vcf_file):
with gzip.open(vcf_file, "rt") as f:
for line in f:
if line.startswith("#CHROM"):
header = line.strip().split("\t")
samples = header[9:]
break
female = [s for s in samples if s.startswith("F")]
male = [s for s in samples if s.startswith("M") and not s.startswith("MF")]
return female, male
female_samples, male_samples = parse_sex_samples(VCF_FILE)
def write_sample_list(samples, out_file):
with open(out_file, "w") as f:
for s in samples:
f.write(s + "\n")
female_list = BASE_DIR / "female.samples"
male_list = BASE_DIR / "male.samples"
write_sample_list(female_samples, female_list)
write_sample_list(male_samples, male_list)四、基因注释信息解析
解析 GFF 文件,提取基因层级的染色体、位置、链信息,为启动子提取提供精准坐标。
def load_gene_info(gff_file):
gene_coords = {}
with open(gff_file) as f:
for line in f:
if line.startswith("#"):
continue
fields = line.strip().split("\t")
if len(fields) < 9:
continue
chrom, source, feature, start, end, _, strand, _, attrs = fields
if feature != "gene":
continue
start = int(start) - 1
end = int(end)
attr_dict = dict(
item.split("=", 1)
for item in attrs.split(";") if "=" in item
)
for key in ("ID", "Name", "gene_id"):
if key in attr_dict:
m = re.search(r"(PTG\d+L\.\d+)", attr_dict[key])
if m:
gene_coords[m.group(1)] = (chrom, start, end, strand)
break
return gene_coords五、启动子序列提取与质控
根据基因正负链自动生成反向互补序列,批量提取 2000bp 启动子,同时记录空序列、短序列等异常信息。
# 负链序列反向互补
def revcomp(seq):
comp = str.maketrans("ACGTacgt", "TGCAtgca")
return seq.translate(comp)[::-1]
# 序列缓存
fasta_cache = {}
for sample in male_samples:
path = BASE_DIR / f"{sample}_male.fa"
fasta_cache[f"male_{sample}"] = Fasta(str(path))
for sample in female_samples:
path = BASE_DIR / f"{sample}_female.fa"
fasta_cache[f"female_{sample}"] = Fasta(str(path))
# 提取启动子
def extract_promoter(sample_name, gene, gene_info_coords, upstream=2000):
chrom, start, end, strand = gene_info_coords[gene]
fa = fasta_cache[sample_name]
if chrom not in fa:
return ""
chrom_len = len(fa[chrom])
if strand == "+":
p_start = max(0, start - upstream)
p_end = start
seq = fa[chrom][p_start:p_end].seq
else:
p_start = max(0, end - 1)
p_end = min(chrom_len, end - 1 + upstream)
seq = fa[chrom][p_start:p_end].seq
seq = revcomp(seq)
return str(seq)
# 批量提取并质控
gene_info_coords = load_gene_info(GFF_FILE)
gene_pairs = pd.read_csv(GPINFO_FILE).dropna(subset=["gene1", "gene2"])
PROMOTER_SIZES = [2000]
for _, row in gene_pairs.iterrows():
gene1, gene2 = row["gene1"], row["gene2"]
pair_id = f"{gene1}_{gene2}"
if gene1 not in gene_info_coords or gene2 not in gene_info_coords:
missing_genes.append({"pair": pair_id})
continue
for upstream in PROMOTER_SIZES:
pair_dir = TF_PROMOTER_DIR / f"{pair_id}_{upstream}bp"
pair_dir.mkdir(exist_ok=True)
for gene in (gene1, gene2):
for sex, samples in (("male", male_samples), ("female", female_samples)):
out_fa = pair_dir / f"{gene}_{sex}.fa"
if out_fa.exists():
continue
with open(out_fa, "w") as out:
for s in samples:
sample_key = f"{sex}_{s}"
seq = extract_promoter(sample_key, gene, gene_info_coords, upstream=upstream)
seq_len = len(seq)
if seq_len == 0 or seq_len < 50:
bad_promoters.append({
"pair": pair_id, "gene": gene, "sex": sex, "sample": s,
"promoter_bp": upstream, "promoter_len": seq_len, "note": "empty" if seq_len == 0 else "short"
})
out.write(f">{gene}_{sex}_{s}\n{seq}\n")六、转录因子结合位点预测(FIMO)
使用 MEME 套件 FIMO 工具并行预测TF 结合位点,支持双数据库,可开关是否重新运行。
def run_fimo(motif_file, fasta_file, out_dir):
cmd = [
"fimo", "--verbosity", "1", "--thresh", "1e-4",
"--max-stored-scores", "100000", "--motif-pseudo", "0.1",
"--oc", str(out_dir), str(motif_file), str(fasta_file)
]
subprocess.run(cmd, check=True, timeout=3600)
def run_fimo_job(args):
motif, fasta, out_dir = args
if (out_dir / "fimo.txt").exists():
return
try:
run_fimo(motif, fasta, out_dir)
except subprocess.TimeoutExpired:
print(f"[TIMEOUT] {fasta}")
# 构建任务
jobs = []
for fasta in TF_PROMOTER_DIR.rglob("*2000bp/*.fa"):
if fasta.stat().st_size < 10:
continue
for motif in [PLANTTFDB_MOTIF, JASPAR_PLANT_MOTIF]:
out_dir = TF_RESULT_DIR / fasta.parent.name / fasta.stem / motif.stem
jobs.append((motif, fasta, out_dir))
if __name__ == "__main__":
RUN_FIMO = False
if RUN_FIMO and len(jobs) > 0:
with ThreadPoolExecutor(max_workers=8) as executor:
future_to_job = {executor.submit(run_fimo_job, job): job for job in jobs}
for future in as_completed(future_to_job):
try:
future.result()
except Exception as e:
print(f"[ERROR] FIMO failed: {e}")七、雌雄 TF 结合差异分析
解析 FIMO 结果,统计 TF 家族数目,计算分化指数、差异倍数、显著性 P 值,保存完整结果表。
# 解析TF家族
def parse_fimo_tf_family(fimo_txt):
try:
df = pd.read_csv(fimo_txt, sep="\t", comment=None)
except:
return Counter()
if df.empty:
return Counter()
df.columns = [c.lstrip("#").strip() for c in df.columns]
motif_col = "motif_id" if "motif_id" in df.columns else "pattern name"
df["TF_family"] = df[motif_col].astype(str).str.extract(r"(.+?)(?:_\d+)?$")
return Counter(df["TF_family"])
# 距离计算
def jaccard_distance(c1, c2):
s1, s2 = set(c1.keys()), set(c2.keys())
return 1 - len(s1 & s2) / len(s1 | s2) if s1 or s2 else 0.0
def freq_vector_distance(c1, c2):
keys = sorted(set(c1) | set(c2))
if not keys:
return {"braycurtis": 0.0, "jsd": 0.0}
v1 = np.array([c1.get(k, 0) for k in keys], dtype=float)
v2 = np.array([c2.get(k, 0) for k in keys], dtype=float)
v1 /= v1.sum()
v2 /= v2.sum()
return {"braycurtis": braycurtis(v1, v2), "jsd": jensenshannon(v1, v2, base=2)}
# 批量统计雌雄差异
results = []
pair_dirs = list(TF_RESULT_DIR.glob("*_2000bp"))
for pair_dir in pair_dirs:
gene1, gene2, _ = pair_dir.name.split("_")
for motif_db in ["PlantTFDB", "JASPAR_plants"]:
for gene in [gene1, gene2]:
f_male = pair_dir / f"{gene}_male" / motif_db / "fimo.txt"
f_female = pair_dir / f"{gene}_female" / motif_db / "fimo.txt"
if not f_male.exists() or not f_female.exists():
continue
c_male = parse_fimo_tf_family(f_male)
c_female = parse_fimo_tf_family(f_female)
dist = freq_vector_distance(c_male, c_female)
gain, loss = len(set(c_male)-set(c_female)), len(set(c_female)-set(c_male))
# 统计检验
male_counts = [v for v in c_male.values()]
female_counts = [v for v in c_female.values()]
p_gene = mannwhitneyu(male_counts, female_counts).pvalue if male_counts and female_counts else 1.0
results.append({
"pair": pair_dir.name, "gene": gene, "motif_db": motif_db,
"jaccard": jaccard_distance(c_male, c_female),
"braycurtis": dist["braycurtis"], "jsd": dist["jsd"],
"gain": gain, "loss": loss, "p_value_gene": p_gene
})
df_div = pd.DataFrame(results)八、显著性检验与可视化
使用 Wilcoxon 检验验证整体差异,自动生成小提琴图(分化分布)与火山图(显著差异基因)。
# 统计检验
stat_results = []
for motif_db in df_div["motif_db"].unique():
df_db = df_div[df_div.motif_db == motif_db]
for upstream in PROMOTER_SIZES:
df_group = df_db[df_db.promoter == upstream]
for metric in ["jaccard", "braycurtis", "jsd"]:
values = df_group[metric].dropna()
stat, p = wilcoxon(values)
stat_results.append({
"motif_db": motif_db, "metric": metric, "p_value": p
})
pd.DataFrame(stat_results).to_csv(TF_BASE_DIR / "male_vs_female_MWU_by_promoter.csv", index=False)
# 小提琴图
sns.violinplot(data=df_group, y="jaccard", color="skyblue")
plt.savefig(TF_BASE_DIR / f"TF_jaccard_TOTAL_violin_{motif_db}_{upstream}bp.pdf")
plt.close()
# 火山图
df_group["neglog10p"] = -np.log10(df_group["p_value_gene"].replace(0, 1e-300))
sns.scatterplot(data=df_group, x="log2FC", y="neglog10p", hue="significant")
plt.axhline(-np.log10(0.05), linestyle="--")
plt.axvline(1, linestyle="--")
plt.axvline(-1, linestyle="--")
plt.savefig(TF_BASE_DIR / f"volcano_{motif_db}_{upstream}bp.pdf")
plt.close()九、质控与最终输出
脚本自动输出缺失基因、异常启动子质控表,以及雌雄 TF 结合分化总表。
# 质控文件
if missing_genes:
pd.DataFrame(missing_genes).to_csv(BASE_DIR / "missing_genes.csv", index=False)
if bad_promoters:
pd.DataFrame(bad_promoters).to_csv(TF_BASE_DIR / "promoter_length_qc.csv", index=False)
# 核心结果
df_div.to_csv(TF_BASE_DIR / "TF_divergence_pair_level.csv", index=False)
print("DONE")十、结果文件目录结构
/work/malefast/result/
├── female.samples # 雌性样本列表
├── male.samples # 雄性样本列表
├── missing_genes.csv # 缺失基因记录
└── TF_Result/ # 转录因子分析主目录
├── promoter_fasta/ # 启动子序列文件
├── tf_binding/ # FIMO预测原始结果
├── promoter_length_qc.csv # 启动子序列质控
├── TF_divergence_pair_level.csv # 雌雄TF差异总表(核心)
├── male_vs_female_MWU_by_promoter.csv # 统计检验结果
├── TF_jaccard_violin.pdf # 分化分布小提琴图
└── volcano_plot.pdf # 雌雄差异火山图金山文档链接:https://www.kdocs.cn/l/cdwtVm5E0XLc