By jiangchenhao, 31 March, 2026
Forums

【金山文档 | WPS云文档】 香榧基因组内假基因的识别Blat版本

猫言猫语

这一部分工作是为了香榧中的SD事件鉴定所做的,参考文献是针叶树基因组文章。

已知我们香榧的基因组没有做假基因的预测,现在必须要我从预测。

阅读了几个文献后选择了这个预印本上的方法:

https://github.com/chhhhai/EasyPseudogene

理由是发文单位是清华大学的,我觉得他水平靠谱一些。这个工作在海洋哺乳类动物上进行了预测(经过测试后此工具不适配我的研究目的)。

针叶树那篇文章的做法是使用了基于blat比对的方法,谢剑波在2019年有一篇文章也使用了类似的方法,但是在有封装好的包的情况下,我倾向于使用这些封装好的软件。

EasyPseudogene

环境配置与准备

docker run -itd --name jiangchenhao_pseudogene -v /data3/jiangchenhao/Hap_genome_jch_rename:/project1/data nucmer_and_pav:1.0
mamba create -n easypseudo -c conda-forge -c bioconda \
mmseqs2 miniprot samtools wise2 python=3.10
git clone https://github.com/chhhhai/EasyPseudogene.git
cd EasyPseudogene

chmod +x bin/easypseudogene
chmod +x scripts/*.sh
export PATH=$PATH:$(pwd)/bin

测试后pass此工具,考虑自行写流程

经在92服务器上测试后,我发现此工具仅用于检测基因组中已经注释的基因中的假基因比例,而不是检测全基因组的假基因,因此他不符合我的目标,所以要重新来过。参考一千个针叶树基因组文章重新写一下流程,基本的思路是这样的:

protein coding gene
        ↓
      CDS
        ↓
BLAT against intergenic region
        ↓
filter (coverage ≥50%, identity ≥50%)
        ↓
check exon junction
        ↓
classify pseudogene

exon junction这一步骤的目的是为了检测exon是连续的还是不连续的,在那篇文章中,研究者认为如果exon是连续的,那么就意味着基因组

在做了这一步之后,还需要结合TE以确定是否是TE包藏的?我想不需要,先通过 exon junction确认假基因是不是LTR包藏产生的,那么,需要先对每个转录本内含子的数量进行计数,我想,有的转录本不包括外显子和内含子。

做exon比对的时候,当我得到了比对结果之后,需要屏蔽掉基因组上的真基因。

脚本的输入和输出

最原始的输入: gff文件,fasta文件

最终的输出:假基因的坐标和类型

对于每一个基因组: 对于每一个基因,我们用cds生成他对应的假基因输出,然后用blat按照阈值(coverage 50%,identity 50%),比对到对应的基因组上,对于输出,保留query_id,query, 比对次序(用来编号),坐标区间。在这一步基础上,将整个gff文件上的全部exon的位置作为白名单,只要有重叠,就视为和真基因重叠了。

现在,为了进行香榧中的SD事件的鉴定,我想先进行假基因的预测,这一工作参考了我们之前讨论过的针叶树基因组工作,请你根据我的这个设定,先考虑一个代码的框架:
最原始的输入: gff文件,fasta文件
最终的输出:假基因的坐标和类型
对于每一个基因组: 对于每一个基因,我们用cds生成他对应的假基因输出,然后用blat按照阈值(coverage 50%,identity 50%),比对到对应的基因组上,对于输出,保留query_id,query, 比对次序(用来编号),坐标区间。在这一步基础上,将整个gff文件上的全部exon的位置作为白名单,只要有重叠,就视为和真基因重叠了。

起容器

由于进行blat比对和解析 blat 比对的 psl 文件的代码我已经有了,存放在之前重复章老师工作所使用的容器里,所以将容器打包为镜像,并重新起一个新的容器。由于这部分操作可能会跑较长的时间,因此我在89服务器上进行了部署。

docker run -itd \
  --name jiangchenhao_pseudogene \
  -v /home/jiangchenhao/data/genome_data/genome_data:/project1/data \
  --restart unless-stopped \
  -it \
  jiangchenhao_blat:1.0

老版本的blat代码

 #!/usr/bin/env python3
import subprocess
from pathlib import Path

def run_blat(query_fa, target_fa, out_psl, min_identity=50):
    """
    对 target_fa 用 query_fa 进行 BLAT 比对,输出 PSL 文件
    """
    out_psl = Path(out_psl)
    out_psl.parent.mkdir(parents=True, exist_ok=True)

    cmd = [
        "blat",
        "-t=dnax",
        "-q=prot",
        str(target_fa),
        str(query_fa),
        str(out_psl),
        f"-minIdentity={min_identity}"
    ]
    print("[BLAT] " + " ".join(cmd))
    subprocess.run(cmd, check=True)
    print(f"[✔] BLAT 输出 → {out_psl}")
    return out_psl


# 如果直接运行此脚本可以测试
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="BLAT 比对")
    parser.add_argument("--query", required=True)
    parser.add_argument("--target", required=True)
    parser.add_argument("--out", required=True)
    args = parser.parse_args()
    run_blat(args.query, args.target, args.out)

输入是两个文件,pep文件和target到的基因组文件,

所以我只需要指定pep文件作为输入好了,至于target到什么,其实无所谓?

估计十几万的假基因应该是有的。

下一个是解析输入的代码,明天再搞吧。

所以第一个步骤是把一个物种的pep blat到这个物种的基因组上:

(base) root@d282c0c8544a:/project1/bin# cat run_blat.py 
#!/usr/bin/env python3
import subprocess
from pathlib import Path
import multiprocessing
import os

def run_cmd(cmd, step_name):
    print(f"[RUN] {step_name}")
    print(" ".join(cmd))
    subprocess.run(cmd, check=True)
    print(f"[✔] {step_name} 完成\n")


def check_file(path):
    return Path(path).exists() and Path(path).stat().st_size > 0


def split_fasta(fasta, out_dir, n_parts):
    """
    按序列数平均拆分 fasta
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    # 统计序列
    seqs = []
    with open(fasta) as f:
        current = []
        for line in f:
            if line.startswith(">"):
                if current:
                    seqs.append("".join(current))
                current = [line]
            else:
                current.append(line)
        if current:
            seqs.append("".join(current))

    total = len(seqs)
    per_part = (total // n_parts) + 1

    files = []
    for i in range(n_parts):
        chunk = seqs[i*per_part:(i+1)*per_part]
        if not chunk:
            continue
        out_fa = out_dir / f"chunk_{i}.fa"
        with open(out_fa, "w") as f:
            f.write("".join(chunk))
        files.append(out_fa)

    return files


def run_blat_chunk(args):
    genome, chunk_fa, out_psl = args
    if check_file(out_psl):
        print(f"[SKIP] {out_psl}")
        return

    cmd = [
        "blat",
        "-t=dnax",
        "-q=prot",
        str(genome),
        str(chunk_fa),
        str(out_psl),
        "-minIdentity=50"
    ]
    subprocess.run(cmd, check=True)


def run_pipeline(gff, genome, prefix, threads):
    gff = Path(gff)
    genome = Path(genome)

    cds_fa = Path(f"{prefix}.cds.fa")
    pep_fa = Path(f"{prefix}.pep.fa")
    blat_psl = Path(f"{prefix}.psl")

    split_dir = Path(f"{prefix}_chunks")
    psl_dir = Path(f"{prefix}_psl_parts")
    psl_dir.mkdir(exist_ok=True)

    # -------------------------------
    # 1️⃣ gffread 提取 CDS
    # -------------------------------
    if not check_file(cds_fa):
        run_cmd([
            "gffread", str(gff),
            "-g", str(genome),
            "-x", str(cds_fa)
        ], "提取 CDS")
    else:
        print(f"[SKIP] {cds_fa}")

    # -------------------------------
    # 2️⃣ 翻译蛋白
    # -------------------------------
    if not check_file(pep_fa):
        run_cmd([
            "gffread", str(gff),
            "-g", str(genome),
            "-y", str(pep_fa)
        ], "翻译蛋白")
    else:
        print(f"[SKIP] {pep_fa}")

    # -------------------------------
    # 3️⃣ 拆分 pep.fa
    # -------------------------------
    chunk_files = list(split_dir.glob("chunk_*.fa"))
    if not chunk_files:
        print("[RUN] 拆分 fasta")
        chunk_files = split_fasta(pep_fa, split_dir, threads)
        print(f"[✔] 拆分完成,共 {len(chunk_files)} 份\n")
    else:
        print(f"[SKIP] 已存在拆分文件 ({len(chunk_files)} 份)")

    # -------------------------------
    # 4️⃣ 并行 BLAT
    # -------------------------------
    tasks = []
    for chunk in chunk_files:
        out_psl = psl_dir / (chunk.stem + ".psl")
        tasks.append((genome, chunk, out_psl))

    print(f"[RUN] 并行 BLAT ({threads} 线程)")
    with multiprocessing.Pool(threads) as pool:
        pool.map(run_blat_chunk, tasks)
    print("[✔] BLAT 完成\n")

    # -------------------------------
    # 5️⃣ 合并 PSL
    # -------------------------------
    if not check_file(blat_psl):
        with open(blat_psl, "w") as out:
            for p in sorted(psl_dir.glob("*.psl")):
                with open(p) as f:
                    out.write(f.read())
        print(f"[✔] 合并 PSL → {blat_psl}")
    else:
        print(f"[SKIP] {blat_psl}")

    print("\n🎉 Pipeline 完成!")


# CLI
if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="并行 BLAT pipeline")
    parser.add_argument("--gff", required=True)
    parser.add_argument("--genome", required=True)
    parser.add_argument("--prefix", required=True)
    parser.add_argument("--threads", type=int, default=4)

    args = parser.parse_args()

    run_pipeline(args.gff, args.genome, args.prefix, args.threads)

这个版本的代码目前跑的是cds+基因组

但是内存占用太高了,

而且跑的非常慢,要想一个解决办法:

100多天,这得什么时候能跑的完?

所以这是不行的。还是得想个办法。