【金山文档 | WPS云文档】 翻转单倍型的染色体和注释
https://www.kdocs.cn/l/ccchWmXeeNF7
背景
为什么要做这件事?因为我发现单倍型的基因组,有一些染色体方向是反着的,即雄树单倍型染内部出现了同源染色体的方向相反的情况,雌树单倍型基因组内部也出现了方向相反的情况。因为nucmer-syri做基因组比对的时候,对于方向相反的染色体会出现报错,如果我不翻转染色体,我就不能获取准确的基因组间共线性信息,在计算等位基因表达时也会受影响,那么翻转了染色体,gff文件当然也要翻转。
传送门:https://www.kdocs.cn/l/cuJn5iNl2fM3
方法
先翻转fasta文件和gff文件
翻转代码:
#!/usr/bin/env python3
import sys, os, argparse
from Bio import SeqIO
"""
Flip whole-chromosome orientation for FASTA and GFF
and rename flipped chromosomes with '_flip' suffix.
"""
# -------------------------------------------------
# arguments
# -------------------------------------------------
p = argparse.ArgumentParser()
p.add_argument("genome_fa")
p.add_argument("gff")
p.add_argument("flip_list")
p.add_argument("--out-prefix", default=None)
p.add_argument("--out-dir", default=".")
p.add_argument("--flip-suffix", default="_flip",
help="Suffix added to flipped chromosome IDs (default: _flip)")
args = p.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
# infer prefix
if args.out_prefix:
prefix = args.out_prefix
else:
prefix = os.path.splitext(os.path.basename(args.genome_fa))[0] + ".fixed"
out_fa = os.path.join(args.out_dir, f"{prefix}.fa")
out_gff = os.path.join(args.out_dir, f"{prefix}.gff3")
# -------------------------------------------------
# 1. read flip chromosome list
# -------------------------------------------------
flip_chr = set()
with open(args.flip_list) as f:
for l in f:
if l.strip() and not l.startswith("#"):
flip_chr.add(l.strip())
# -------------------------------------------------
# 2. read genome fasta
# -------------------------------------------------
seqs = {}
chr_len = {}
for rec in SeqIO.parse(args.genome_fa, "fasta"):
seqs[rec.id] = rec
chr_len[rec.id] = len(rec.seq)
# -------------------------------------------------
# 3. write corrected FASTA
# -------------------------------------------------
with open(out_fa, "w") as fo_all:
for cid, rec in seqs.items():
if cid in flip_chr:
new_id = cid + args.flip_suffix
new_rec = rec[:]
new_rec.id = new_id
new_rec.name = new_id
new_rec.description = ""
new_rec.seq = rec.seq.reverse_complement()
# per-chromosome fasta
chr_fa = os.path.join(
args.out_dir, f"{prefix}.{new_id}.fa"
)
with open(chr_fa, "w") as fo_chr:
SeqIO.write(new_rec, fo_chr, "fasta")
else:
new_rec = rec
SeqIO.write(new_rec, fo_all, "fasta")
# -------------------------------------------------
# 4. process GFF
# -------------------------------------------------
def flip_strand(s):
if s == "+": return "-"
if s == "-": return "+"
return s
with open(out_gff, "w") as fo_gff, open(args.gff) as fgff:
for l in fgff:
if l.startswith("#"):
fo_gff.write(l)
continue
c = l.rstrip().split("\t")
if len(c) < 9:
fo_gff.write(l)
continue
chrom = c[0]
start = int(c[3])
end = int(c[4])
strand = c[6]
if chrom in flip_chr:
L = chr_len[chrom]
c[0] = chrom + args.flip_suffix
c[3] = str(L - end + 1)
c[4] = str(L - start + 1)
c[6] = flip_strand(strand)
fo_gff.write("\t".join(c) + "\n")
# -------------------------------------------------
# summary
# -------------------------------------------------
sys.stderr.write(
f"[Done]\n"
f"FASTA(all): {out_fa}\n"
f"GFF3 : {out_gff}\n"
f"Flipped chromosomes renamed with suffix: {args.flip_suffix}\n"
)校验代码:
先用gffread提取蛋白质序列和密码子序列,然后将翻转前的蛋白质文件与翻转后的蛋白质文件进行比较。
确保翻转后的gff文件是正确的。
(base) root@b27b26bddcc5:/project1/bin# cat diff_fasta.py
#!/usr/bin/env python3
from Bio import SeqIO
import sys
def load_fasta_dict(fasta_file):
"""加载 FASTA 文件到字典 {seq_id: sequence}"""
seq_dict = {}
for record in SeqIO.parse(fasta_file, "fasta"):
seq_id = record.id
seq = str(record.seq).upper() # 统一转大写,避免大小写差异
if seq_id in seq_dict:
print(f"⚠️ 警告: 重复的序列ID '{seq_id}',后一条将被忽略", file=sys.stderr)
continue
seq_dict[seq_id] = seq
return seq_dict
def compare_fasta(fileA, fileB):
"""比较两个FASTA文件的同名序列"""
dictA = load_fasta_dict(fileA)
dictB = load_fasta_dict(fileB)
# 检查共有ID
common_ids = set(dictA.keys()) & set(dictB.keys())
only_in_A = set(dictA.keys()) - set(dictB.keys())
only_in_B = set(dictB.keys()) - set(dictA.keys())
# 比对共有ID的序列
mismatches = []
for seq_id in common_ids:
if dictA[seq_id] != dictB[seq_id]:
mismatches.append(seq_id)
# 输出结果
print("\n===== 比对结果 =====")
print(f"文件A中的序列数量: {len(dictA)}")
print(f"文件B中的序列数量: {len(dictB)}")
print(f"共有的序列ID数量: {len(common_ids)}")
print(f"仅在文件A中的序列ID: {len(only_in_A)}")
print(f"仅在文件B中的序列ID: {len(only_in_B)}")
print(f"序列内容不一致的ID数量: {len(mismatches)}")
if mismatches:
print("\n❌ 以下序列ID的内容不一致 (前10个):")
for seq_id in mismatches[:10]:
print(f"{seq_id}: A长度={len(dictA[seq_id])}, B长度={len(dictB[seq_id])}")
else:
print("\n✅ 所有共有序列ID的内容完全一致")
if only_in_A:
print("\n⚠️ 仅在文件A中的序列ID (前10个):", ", ".join(list(only_in_A)[:10]))
if only_in_B:
print("\n⚠️ 仅在文件B中的序列ID (前10个):", ", ".join(list(only_in_B)[:10]))
if __name__ == "__main__":
if len(sys.argv) != 3:
print("用法: python compare_fasta.py fileA.fasta fileB.fasta", file=sys.stderr)
sys.exit(1)
compare_fasta(sys.argv[1], sys.argv[2])操作:
nohup flip_genome_and_check_pep_cds_diff.py TG_male_hap1.fasta male_hap1.gff3 filp_male_hap1.txt --out-dir ./ &
nohup flip_genome_and_check_pep_cds_diff.py TG_male_hap2.fasta male_hap2.gff3 filp_male_hap2.txt --out-dir ./ &
nohup flip_genome_and_check_pep_cds_diff.py female_hap1_chr.fasta female_hap1.gff3 filp_female_hap1.txt --out-dir ./ &
nohup flip_genome_and_check_pep_cds_diff.py female_hap2_chr.fasta female_hap2.gff3 filp_female_hap2.txt --out-dir ./ &gffread提取蛋白质
nohup gffread TG_male_hap2.fixed.gff3 -g TG_male_hap2.fixed.fa -y TG_male_hap2_pep_fixed.fa &
nohup gffread TG_male_hap1.fixed.gff3 -g TG_male_hap1.fixed.fa -y TG_male_hap1_pep_fixed.fa &
nohup gffread female_hap1.gff3 -g female_hap1_chr.fasta -y female_hap1_pep_fixed.fa &
nohup gffread female_hap2.gff3 -g female_hap2_chr.fasta -y female_hap2_pep_fixed.fa &前后的蛋白质序列比对
542 diff_fasta.py male_hap2.pep TG_male_hap2_pep_fixed.fa
543 diff_fasta.py male_hap1.pep TG_male_hap1_pep_fixed.fa
544 diff_fasta.py female_hap1.pep female_hap1_pep_fixed.fa
545 diff_fasta.py female_hap2.pep female_hap2_pep_fixed.fa完全一致。
以下是完整的集成代码
我发现自己手动提取蛋白质、比较太费劲了,非常容易出错,那么把这部分集成到代码里面吧。
(base) root@b27b26bddcc5:/project1/bin# cat flip_genome_and_check_pep_cds_diff.py
#!/usr/bin/env python3
import sys, os, argparse, subprocess
from Bio import SeqIO
"""
Flip whole-chromosome orientation for FASTA and GFF
Generate CDS/PEP before and after flipping using gffread
and automatically compare FASTA results (CDS + PEP)
"""
# =================================================
# arguments
# =================================================
p = argparse.ArgumentParser()
p.add_argument("genome_fa")
p.add_argument("gff")
p.add_argument("flip_list")
p.add_argument("--out-prefix", default=None)
p.add_argument("--out-dir", default=".")
p.add_argument("--flip-suffix", default="_flip")
p.add_argument("--gffread", default="gffread")
args = p.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
# =================================================
# prefix
# =================================================
if args.out_prefix:
prefix = args.out_prefix
else:
prefix = os.path.splitext(os.path.basename(args.genome_fa))[0] + ".fixed"
out_fa = os.path.join(args.out_dir, f"{prefix}.fa")
out_gff = os.path.join(args.out_dir, f"{prefix}.gff3")
# =================================================
# gffread
# =================================================
def run_gffread(genome, gff, out_cds, out_pep):
cmd = [args.gffread, gff, "-g", genome, "-x", out_cds, "-y", out_pep]
sys.stderr.write("[CMD] " + " ".join(cmd) + "\n")
subprocess.check_call(cmd)
# =================================================
# diff_fasta (集成版)
# =================================================
def load_fasta_dict(fasta_file):
seq_dict = {}
for record in SeqIO.parse(fasta_file, "fasta"):
seq_id = record.id
seq = str(record.seq).upper()
if seq_id in seq_dict:
sys.stderr.write(f"[WARN] duplicated ID ignored: {seq_id}\n")
continue
seq_dict[seq_id] = seq
return seq_dict
def compare_fasta(fileA, fileB, label, out_prefix):
dictA = load_fasta_dict(fileA)
dictB = load_fasta_dict(fileB)
common_ids = set(dictA) & set(dictB)
only_in_A = set(dictA) - set(dictB)
only_in_B = set(dictB) - set(dictA)
mismatches = []
for i in common_ids:
if dictA[i] != dictB[i]:
mismatches.append(i)
# 写 summary
summary_file = out_prefix + f".{label}.summary.txt"
with open(summary_file, "w") as fo:
fo.write(f"A: {fileA}\n")
fo.write(f"B: {fileB}\n\n")
fo.write(f"A seqs: {len(dictA)}\n")
fo.write(f"B seqs: {len(dictB)}\n")
fo.write(f"Common IDs: {len(common_ids)}\n")
fo.write(f"Only in A: {len(only_in_A)}\n")
fo.write(f"Only in B: {len(only_in_B)}\n")
fo.write(f"Mismatched sequences: {len(mismatches)}\n")
# 写 mismatch list
mismatch_file = out_prefix + f".{label}.mismatch.list"
if mismatches:
with open(mismatch_file, "w") as fo:
for i in mismatches:
fo.write(i + "\n")
# log
if mismatches or only_in_A or only_in_B:
sys.stderr.write(
f"[DIFF-{label}] ❌ NOT identical "
f"(mismatch={len(mismatches)}, onlyA={len(only_in_A)}, onlyB={len(only_in_B)})\n"
)
else:
sys.stderr.write(f"[DIFF-{label}] ✅ IDENTICAL\n")
# =================================================
# 0. gffread BEFORE
# =================================================
before_cds = os.path.join(args.out_dir, f"{prefix}.before.cds.fa")
before_pep = os.path.join(args.out_dir, f"{prefix}.before.pep.fa")
sys.stderr.write("[STEP] gffread BEFORE flipping\n")
run_gffread(args.genome_fa, args.gff, before_cds, before_pep)
# =================================================
# 1. read flip list
# =================================================
flip_chr = set()
with open(args.flip_list) as f:
for l in f:
if l.strip() and not l.startswith("#"):
flip_chr.add(l.strip())
# =================================================
# 2. read genome
# =================================================
seqs = {}
chr_len = {}
for rec in SeqIO.parse(args.genome_fa, "fasta"):
seqs[rec.id] = rec
chr_len[rec.id] = len(rec.seq)
# =================================================
# 3. write flipped FASTA
# =================================================
with open(out_fa, "w") as fo:
for cid, rec in seqs.items():
if cid in flip_chr:
new_rec = rec[:]
new_rec.id = cid + args.flip_suffix
new_rec.name = new_rec.id
new_rec.description = ""
new_rec.seq = rec.seq.reverse_complement()
else:
new_rec = rec
SeqIO.write(new_rec, fo, "fasta")
# =================================================
# 4. write flipped GFF
# =================================================
def flip_strand(s):
return "+" if s == "-" else "-" if s == "+" else s
with open(out_gff, "w") as fo, open(args.gff) as fg:
for l in fg:
if l.startswith("#"):
fo.write(l)
continue
c = l.rstrip().split("\t")
if len(c) < 9:
fo.write(l)
continue
chrom, start, end, strand = c[0], int(c[3]), int(c[4]), c[6]
if chrom in flip_chr:
L = chr_len[chrom]
c[0] = chrom + args.flip_suffix
c[3] = str(L - end + 1)
c[4] = str(L - start + 1)
c[6] = flip_strand(strand)
fo.write("\t".join(c) + "\n")
# =================================================
# 5. gffread AFTER
# =================================================
after_cds = os.path.join(args.out_dir, f"{prefix}.after.cds.fa")
after_pep = os.path.join(args.out_dir, f"{prefix}.after.pep.fa")
sys.stderr.write("[STEP] gffread AFTER flipping\n")
run_gffread(out_fa, out_gff, after_cds, after_pep)
# =================================================
# 6. DIFF
# =================================================
sys.stderr.write("[STEP] Comparing CDS\n")
compare_fasta(
before_cds,
after_cds,
"cds",
os.path.join(args.out_dir, prefix)
)
sys.stderr.write("[STEP] Comparing PEP\n")
compare_fasta(
before_pep,
after_pep,
"pep",
os.path.join(args.out_dir, prefix)
)
sys.stderr.write("\n[ALL DONE]\n")12_25更新,现在翻转后的结果中,male_hap1的chr02和chr03都是错误的方向。
所以male_hap1的chr02是不用翻转的,male_hap1的chr03是要翻转的。
所以male_hap2的chr02是不要翻转的,male_hap1的chr03是要翻转的。
妈呀,竟然真的有错误。好在另外的染色体是没问题滴。
再次翻转后确认校验无误。