By jiangchenhao, 31 December, 2025
Forums

【金山文档 | WPS云文档】 翻转单倍型的染色体和注释
https://www.kdocs.cn/l/ccchWmXeeNF7

背景

为什么要做这件事?因为我发现单倍型的基因组,有一些染色体方向是反着的,即雄树单倍型染内部出现了同源染色体的方向相反的情况,雌树单倍型基因组内部也出现了方向相反的情况。因为nucmer-syri做基因组比对的时候,对于方向相反的染色体会出现报错,如果我不翻转染色体,我就不能获取准确的基因组间共线性信息,在计算等位基因表达时也会受影响,那么翻转了染色体,gff文件当然也要翻转。

传送门:https://www.kdocs.cn/l/cuJn5iNl2fM3

方法

先翻转fasta文件和gff文件

翻转代码:

#!/usr/bin/env python3
import sys, os, argparse
from Bio import SeqIO

"""
Flip whole-chromosome orientation for FASTA and GFF
and rename flipped chromosomes with '_flip' suffix.
"""

# -------------------------------------------------
# arguments
# -------------------------------------------------
p = argparse.ArgumentParser()
p.add_argument("genome_fa")
p.add_argument("gff")
p.add_argument("flip_list")
p.add_argument("--out-prefix", default=None)
p.add_argument("--out-dir", default=".")
p.add_argument("--flip-suffix", default="_flip",
               help="Suffix added to flipped chromosome IDs (default: _flip)")
args = p.parse_args()

os.makedirs(args.out_dir, exist_ok=True)

# infer prefix
if args.out_prefix:
    prefix = args.out_prefix
else:
    prefix = os.path.splitext(os.path.basename(args.genome_fa))[0] + ".fixed"

out_fa  = os.path.join(args.out_dir, f"{prefix}.fa")
out_gff = os.path.join(args.out_dir, f"{prefix}.gff3")

# -------------------------------------------------
# 1. read flip chromosome list
# -------------------------------------------------
flip_chr = set()
with open(args.flip_list) as f:
    for l in f:
        if l.strip() and not l.startswith("#"):
            flip_chr.add(l.strip())

# -------------------------------------------------
# 2. read genome fasta
# -------------------------------------------------
seqs = {}
chr_len = {}

for rec in SeqIO.parse(args.genome_fa, "fasta"):
    seqs[rec.id] = rec
    chr_len[rec.id] = len(rec.seq)

# -------------------------------------------------
# 3. write corrected FASTA
# -------------------------------------------------
with open(out_fa, "w") as fo_all:
    for cid, rec in seqs.items():

        if cid in flip_chr:
            new_id = cid + args.flip_suffix
            new_rec = rec[:]
            new_rec.id = new_id
            new_rec.name = new_id
            new_rec.description = ""
            new_rec.seq = rec.seq.reverse_complement()

            # per-chromosome fasta
            chr_fa = os.path.join(
                args.out_dir, f"{prefix}.{new_id}.fa"
            )
            with open(chr_fa, "w") as fo_chr:
                SeqIO.write(new_rec, fo_chr, "fasta")

        else:
            new_rec = rec

        SeqIO.write(new_rec, fo_all, "fasta")

# -------------------------------------------------
# 4. process GFF
# -------------------------------------------------
def flip_strand(s):
    if s == "+": return "-"
    if s == "-": return "+"
    return s

with open(out_gff, "w") as fo_gff, open(args.gff) as fgff:
    for l in fgff:
        if l.startswith("#"):
            fo_gff.write(l)
            continue

        c = l.rstrip().split("\t")
        if len(c) < 9:
            fo_gff.write(l)
            continue

        chrom  = c[0]
        start  = int(c[3])
        end    = int(c[4])
        strand = c[6]

        if chrom in flip_chr:
            L = chr_len[chrom]
            c[0] = chrom + args.flip_suffix
            c[3] = str(L - end + 1)
            c[4] = str(L - start + 1)
            c[6] = flip_strand(strand)

        fo_gff.write("\t".join(c) + "\n")

# -------------------------------------------------
# summary
# -------------------------------------------------
sys.stderr.write(
    f"[Done]\n"
    f"FASTA(all): {out_fa}\n"
    f"GFF3     : {out_gff}\n"
    f"Flipped chromosomes renamed with suffix: {args.flip_suffix}\n"
)

校验代码:

先用gffread提取蛋白质序列和密码子序列,然后将翻转前的蛋白质文件与翻转后的蛋白质文件进行比较。
确保翻转后的gff文件是正确的。

(base) root@b27b26bddcc5:/project1/bin# cat diff_fasta.py
#!/usr/bin/env python3
from Bio import SeqIO
import sys

def load_fasta_dict(fasta_file):
    """加载 FASTA 文件到字典 {seq_id: sequence}"""
    seq_dict = {}
    for record in SeqIO.parse(fasta_file, "fasta"):
        seq_id = record.id
        seq = str(record.seq).upper()  # 统一转大写,避免大小写差异
        if seq_id in seq_dict:
            print(f"⚠️ 警告: 重复的序列ID '{seq_id}',后一条将被忽略", file=sys.stderr)
            continue
        seq_dict[seq_id] = seq
    return seq_dict

def compare_fasta(fileA, fileB):
    """比较两个FASTA文件的同名序列"""
    dictA = load_fasta_dict(fileA)
    dictB = load_fasta_dict(fileB)

    # 检查共有ID
    common_ids = set(dictA.keys()) & set(dictB.keys())
    only_in_A = set(dictA.keys()) - set(dictB.keys())
    only_in_B = set(dictB.keys()) - set(dictA.keys())

    # 比对共有ID的序列
    mismatches = []
    for seq_id in common_ids:
        if dictA[seq_id] != dictB[seq_id]:
            mismatches.append(seq_id)

    # 输出结果
    print("\n===== 比对结果 =====")
    print(f"文件A中的序列数量: {len(dictA)}")
    print(f"文件B中的序列数量: {len(dictB)}")
    print(f"共有的序列ID数量: {len(common_ids)}")
    print(f"仅在文件A中的序列ID: {len(only_in_A)}")
    print(f"仅在文件B中的序列ID: {len(only_in_B)}")
    print(f"序列内容不一致的ID数量: {len(mismatches)}")

    if mismatches:
        print("\n❌ 以下序列ID的内容不一致 (前10个):")
        for seq_id in mismatches[:10]:
            print(f"{seq_id}: A长度={len(dictA[seq_id])}, B长度={len(dictB[seq_id])}")
    else:
        print("\n✅ 所有共有序列ID的内容完全一致")

    if only_in_A:
        print("\n⚠️ 仅在文件A中的序列ID (前10个):", ", ".join(list(only_in_A)[:10]))
    if only_in_B:
        print("\n⚠️ 仅在文件B中的序列ID (前10个):", ", ".join(list(only_in_B)[:10]))

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("用法: python compare_fasta.py fileA.fasta fileB.fasta", file=sys.stderr)
        sys.exit(1)

    compare_fasta(sys.argv[1], sys.argv[2])

操作:

nohup flip_genome_and_check_pep_cds_diff.py TG_male_hap1.fasta male_hap1.gff3 filp_male_hap1.txt --out-dir ./ &
nohup flip_genome_and_check_pep_cds_diff.py TG_male_hap2.fasta male_hap2.gff3 filp_male_hap2.txt --out-dir ./ &
nohup flip_genome_and_check_pep_cds_diff.py female_hap1_chr.fasta female_hap1.gff3 filp_female_hap1.txt --out-dir ./ &
nohup flip_genome_and_check_pep_cds_diff.py female_hap2_chr.fasta female_hap2.gff3 filp_female_hap2.txt --out-dir ./ &

gffread提取蛋白质

nohup gffread TG_male_hap2.fixed.gff3 -g TG_male_hap2.fixed.fa -y TG_male_hap2_pep_fixed.fa &
nohup gffread TG_male_hap1.fixed.gff3 -g TG_male_hap1.fixed.fa -y TG_male_hap1_pep_fixed.fa &
nohup gffread female_hap1.gff3 -g female_hap1_chr.fasta -y female_hap1_pep_fixed.fa &
nohup gffread female_hap2.gff3 -g female_hap2_chr.fasta -y female_hap2_pep_fixed.fa &

前后的蛋白质序列比对

  542  diff_fasta.py male_hap2.pep TG_male_hap2_pep_fixed.fa
  543  diff_fasta.py male_hap1.pep TG_male_hap1_pep_fixed.fa
  544  diff_fasta.py female_hap1.pep female_hap1_pep_fixed.fa
  545  diff_fasta.py female_hap2.pep female_hap2_pep_fixed.fa

完全一致。

以下是完整的集成代码

我发现自己手动提取蛋白质、比较太费劲了,非常容易出错,那么把这部分集成到代码里面吧。

(base) root@b27b26bddcc5:/project1/bin# cat flip_genome_and_check_pep_cds_diff.py
#!/usr/bin/env python3
import sys, os, argparse, subprocess
from Bio import SeqIO

"""
Flip whole-chromosome orientation for FASTA and GFF
Generate CDS/PEP before and after flipping using gffread
and automatically compare FASTA results (CDS + PEP)
"""

# =================================================
# arguments
# =================================================
p = argparse.ArgumentParser()
p.add_argument("genome_fa")
p.add_argument("gff")
p.add_argument("flip_list")
p.add_argument("--out-prefix", default=None)
p.add_argument("--out-dir", default=".")
p.add_argument("--flip-suffix", default="_flip")
p.add_argument("--gffread", default="gffread")
args = p.parse_args()

os.makedirs(args.out_dir, exist_ok=True)

# =================================================
# prefix
# =================================================
if args.out_prefix:
    prefix = args.out_prefix
else:
    prefix = os.path.splitext(os.path.basename(args.genome_fa))[0] + ".fixed"

out_fa  = os.path.join(args.out_dir, f"{prefix}.fa")
out_gff = os.path.join(args.out_dir, f"{prefix}.gff3")

# =================================================
# gffread
# =================================================
def run_gffread(genome, gff, out_cds, out_pep):
    cmd = [args.gffread, gff, "-g", genome, "-x", out_cds, "-y", out_pep]
    sys.stderr.write("[CMD] " + " ".join(cmd) + "\n")
    subprocess.check_call(cmd)

# =================================================
# diff_fasta (集成版)
# =================================================
def load_fasta_dict(fasta_file):
    seq_dict = {}
    for record in SeqIO.parse(fasta_file, "fasta"):
        seq_id = record.id
        seq = str(record.seq).upper()
        if seq_id in seq_dict:
            sys.stderr.write(f"[WARN] duplicated ID ignored: {seq_id}\n")
            continue
        seq_dict[seq_id] = seq
    return seq_dict

def compare_fasta(fileA, fileB, label, out_prefix):
    dictA = load_fasta_dict(fileA)
    dictB = load_fasta_dict(fileB)

    common_ids = set(dictA) & set(dictB)
    only_in_A  = set(dictA) - set(dictB)
    only_in_B  = set(dictB) - set(dictA)

    mismatches = []
    for i in common_ids:
        if dictA[i] != dictB[i]:
            mismatches.append(i)

    # 写 summary
    summary_file = out_prefix + f".{label}.summary.txt"
    with open(summary_file, "w") as fo:
        fo.write(f"A: {fileA}\n")
        fo.write(f"B: {fileB}\n\n")
        fo.write(f"A seqs: {len(dictA)}\n")
        fo.write(f"B seqs: {len(dictB)}\n")
        fo.write(f"Common IDs: {len(common_ids)}\n")
        fo.write(f"Only in A: {len(only_in_A)}\n")
        fo.write(f"Only in B: {len(only_in_B)}\n")
        fo.write(f"Mismatched sequences: {len(mismatches)}\n")

    # 写 mismatch list
    mismatch_file = out_prefix + f".{label}.mismatch.list"
    if mismatches:
        with open(mismatch_file, "w") as fo:
            for i in mismatches:
                fo.write(i + "\n")

    # log
    if mismatches or only_in_A or only_in_B:
        sys.stderr.write(
            f"[DIFF-{label}] ❌ NOT identical "
            f"(mismatch={len(mismatches)}, onlyA={len(only_in_A)}, onlyB={len(only_in_B)})\n"
        )
    else:
        sys.stderr.write(f"[DIFF-{label}] ✅ IDENTICAL\n")

# =================================================
# 0. gffread BEFORE
# =================================================
before_cds = os.path.join(args.out_dir, f"{prefix}.before.cds.fa")
before_pep = os.path.join(args.out_dir, f"{prefix}.before.pep.fa")

sys.stderr.write("[STEP] gffread BEFORE flipping\n")
run_gffread(args.genome_fa, args.gff, before_cds, before_pep)

# =================================================
# 1. read flip list
# =================================================
flip_chr = set()
with open(args.flip_list) as f:
    for l in f:
        if l.strip() and not l.startswith("#"):
            flip_chr.add(l.strip())

# =================================================
# 2. read genome
# =================================================
seqs = {}
chr_len = {}

for rec in SeqIO.parse(args.genome_fa, "fasta"):
    seqs[rec.id] = rec
    chr_len[rec.id] = len(rec.seq)

# =================================================
# 3. write flipped FASTA
# =================================================
with open(out_fa, "w") as fo:
    for cid, rec in seqs.items():
        if cid in flip_chr:
            new_rec = rec[:]
            new_rec.id = cid + args.flip_suffix
            new_rec.name = new_rec.id
            new_rec.description = ""
            new_rec.seq = rec.seq.reverse_complement()
        else:
            new_rec = rec
        SeqIO.write(new_rec, fo, "fasta")

# =================================================
# 4. write flipped GFF
# =================================================
def flip_strand(s):
    return "+" if s == "-" else "-" if s == "+" else s

with open(out_gff, "w") as fo, open(args.gff) as fg:
    for l in fg:
        if l.startswith("#"):
            fo.write(l)
            continue
        c = l.rstrip().split("\t")
        if len(c) < 9:
            fo.write(l)
            continue

        chrom, start, end, strand = c[0], int(c[3]), int(c[4]), c[6]
        if chrom in flip_chr:
            L = chr_len[chrom]
            c[0] = chrom + args.flip_suffix
            c[3] = str(L - end + 1)
            c[4] = str(L - start + 1)
            c[6] = flip_strand(strand)

        fo.write("\t".join(c) + "\n")

# =================================================
# 5. gffread AFTER
# =================================================
after_cds = os.path.join(args.out_dir, f"{prefix}.after.cds.fa")
after_pep = os.path.join(args.out_dir, f"{prefix}.after.pep.fa")

sys.stderr.write("[STEP] gffread AFTER flipping\n")
run_gffread(out_fa, out_gff, after_cds, after_pep)

# =================================================
# 6. DIFF
# =================================================
sys.stderr.write("[STEP] Comparing CDS\n")
compare_fasta(
    before_cds,
    after_cds,
    "cds",
    os.path.join(args.out_dir, prefix)
)

sys.stderr.write("[STEP] Comparing PEP\n")
compare_fasta(
    before_pep,
    after_pep,
    "pep",
    os.path.join(args.out_dir, prefix)
)

sys.stderr.write("\n[ALL DONE]\n")

12_25更新,现在翻转后的结果中,male_hap1的chr02和chr03都是错误的方向。

所以male_hap1的chr02是不用翻转的,male_hap1的chr03是要翻转的。

所以male_hap2的chr02是不要翻转的,male_hap1的chr03是要翻转的。

妈呀,竟然真的有错误。好在另外的染色体是没问题滴。

再次翻转后确认校验无误。