【金山文档 | WPS云文档】 香榧基因组内假基因的识别Blat版本
猫言猫语
这一部分工作是为了香榧中的SD事件鉴定所做的,参考文献是针叶树基因组文章。
已知我们香榧的基因组没有做假基因的预测,现在必须要我从预测。
阅读了几个文献后选择了这个预印本上的方法:
https://github.com/chhhhai/EasyPseudogene
理由是发文单位是清华大学的,我觉得他水平靠谱一些。这个工作在海洋哺乳类动物上进行了预测(经过测试后此工具不适配我的研究目的)。
针叶树那篇文章的做法是使用了基于blat比对的方法,谢剑波在2019年有一篇文章也使用了类似的方法,但是在有封装好的包的情况下,我倾向于使用这些封装好的软件。
EasyPseudogene
环境配置与准备
docker run -itd --name jiangchenhao_pseudogene -v /data3/jiangchenhao/Hap_genome_jch_rename:/project1/data nucmer_and_pav:1.0mamba create -n easypseudo -c conda-forge -c bioconda \
mmseqs2 miniprot samtools wise2 python=3.10git clone https://github.com/chhhhai/EasyPseudogene.git
cd EasyPseudogene
chmod +x bin/easypseudogene
chmod +x scripts/*.sh
export PATH=$PATH:$(pwd)/bin测试后pass此工具,考虑自行写流程
经在92服务器上测试后,我发现此工具仅用于检测基因组中已经注释的基因中的假基因比例,而不是检测全基因组的假基因,因此他不符合我的目标,所以要重新来过。参考一千个针叶树基因组文章重新写一下流程,基本的思路是这样的:
protein coding gene
↓
CDS
↓
BLAT against intergenic region
↓
filter (coverage ≥50%, identity ≥50%)
↓
check exon junction
↓
classify pseudogeneexon junction这一步骤的目的是为了检测exon是连续的还是不连续的,在那篇文章中,研究者认为如果exon是连续的,那么就意味着基因组
在做了这一步之后,还需要结合TE以确定是否是TE包藏的?我想不需要,先通过 exon junction确认假基因是不是LTR包藏产生的,那么,需要先对每个转录本内含子的数量进行计数,我想,有的转录本不包括外显子和内含子。
做exon比对的时候,当我得到了比对结果之后,需要屏蔽掉基因组上的真基因。
脚本的输入和输出
最原始的输入: gff文件,fasta文件
最终的输出:假基因的坐标和类型
对于每一个基因组: 对于每一个基因,我们用cds生成他对应的假基因输出,然后用blat按照阈值(coverage 50%,identity 50%),比对到对应的基因组上,对于输出,保留query_id,query, 比对次序(用来编号),坐标区间。在这一步基础上,将整个gff文件上的全部exon的位置作为白名单,只要有重叠,就视为和真基因重叠了。
现在,为了进行香榧中的SD事件的鉴定,我想先进行假基因的预测,这一工作参考了我们之前讨论过的针叶树基因组工作,请你根据我的这个设定,先考虑一个代码的框架:
最原始的输入: gff文件,fasta文件
最终的输出:假基因的坐标和类型
对于每一个基因组: 对于每一个基因,我们用cds生成他对应的假基因输出,然后用blat按照阈值(coverage 50%,identity 50%),比对到对应的基因组上,对于输出,保留query_id,query, 比对次序(用来编号),坐标区间。在这一步基础上,将整个gff文件上的全部exon的位置作为白名单,只要有重叠,就视为和真基因重叠了。
起容器
由于进行blat比对和解析 blat 比对的 psl 文件的代码我已经有了,存放在之前重复章老师工作所使用的容器里,所以将容器打包为镜像,并重新起一个新的容器。由于这部分操作可能会跑较长的时间,因此我在89服务器上进行了部署。
docker run -itd \
--name jiangchenhao_pseudogene \
-v /home/jiangchenhao/data/genome_data/genome_data:/project1/data \
--restart unless-stopped \
-it \
jiangchenhao_blat:1.0老版本的blat代码
#!/usr/bin/env python3
import subprocess
from pathlib import Path
def run_blat(query_fa, target_fa, out_psl, min_identity=50):
"""
对 target_fa 用 query_fa 进行 BLAT 比对,输出 PSL 文件
"""
out_psl = Path(out_psl)
out_psl.parent.mkdir(parents=True, exist_ok=True)
cmd = [
"blat",
"-t=dnax",
"-q=prot",
str(target_fa),
str(query_fa),
str(out_psl),
f"-minIdentity={min_identity}"
]
print("[BLAT] " + " ".join(cmd))
subprocess.run(cmd, check=True)
print(f"[✔] BLAT 输出 → {out_psl}")
return out_psl
# 如果直接运行此脚本可以测试
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="BLAT 比对")
parser.add_argument("--query", required=True)
parser.add_argument("--target", required=True)
parser.add_argument("--out", required=True)
args = parser.parse_args()
run_blat(args.query, args.target, args.out)输入是两个文件,pep文件和target到的基因组文件,
所以我只需要指定pep文件作为输入好了,至于target到什么,其实无所谓?
估计十几万的假基因应该是有的。
下一个是解析输入的代码,明天再搞吧。
所以第一个步骤是把一个物种的pep blat到这个物种的基因组上:
(base) root@d282c0c8544a:/project1/bin# cat run_blat.py
#!/usr/bin/env python3
import subprocess
from pathlib import Path
import multiprocessing
import os
def run_cmd(cmd, step_name):
print(f"[RUN] {step_name}")
print(" ".join(cmd))
subprocess.run(cmd, check=True)
print(f"[✔] {step_name} 完成\n")
def check_file(path):
return Path(path).exists() and Path(path).stat().st_size > 0
def split_fasta(fasta, out_dir, n_parts):
"""
按序列数平均拆分 fasta
"""
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
# 统计序列
seqs = []
with open(fasta) as f:
current = []
for line in f:
if line.startswith(">"):
if current:
seqs.append("".join(current))
current = [line]
else:
current.append(line)
if current:
seqs.append("".join(current))
total = len(seqs)
per_part = (total // n_parts) + 1
files = []
for i in range(n_parts):
chunk = seqs[i*per_part:(i+1)*per_part]
if not chunk:
continue
out_fa = out_dir / f"chunk_{i}.fa"
with open(out_fa, "w") as f:
f.write("".join(chunk))
files.append(out_fa)
return files
def run_blat_chunk(args):
genome, chunk_fa, out_psl = args
if check_file(out_psl):
print(f"[SKIP] {out_psl}")
return
cmd = [
"blat",
"-t=dnax",
"-q=prot",
str(genome),
str(chunk_fa),
str(out_psl),
"-minIdentity=50"
]
subprocess.run(cmd, check=True)
def run_pipeline(gff, genome, prefix, threads):
gff = Path(gff)
genome = Path(genome)
cds_fa = Path(f"{prefix}.cds.fa")
pep_fa = Path(f"{prefix}.pep.fa")
blat_psl = Path(f"{prefix}.psl")
split_dir = Path(f"{prefix}_chunks")
psl_dir = Path(f"{prefix}_psl_parts")
psl_dir.mkdir(exist_ok=True)
# -------------------------------
# 1️⃣ gffread 提取 CDS
# -------------------------------
if not check_file(cds_fa):
run_cmd([
"gffread", str(gff),
"-g", str(genome),
"-x", str(cds_fa)
], "提取 CDS")
else:
print(f"[SKIP] {cds_fa}")
# -------------------------------
# 2️⃣ 翻译蛋白
# -------------------------------
if not check_file(pep_fa):
run_cmd([
"gffread", str(gff),
"-g", str(genome),
"-y", str(pep_fa)
], "翻译蛋白")
else:
print(f"[SKIP] {pep_fa}")
# -------------------------------
# 3️⃣ 拆分 pep.fa
# -------------------------------
chunk_files = list(split_dir.glob("chunk_*.fa"))
if not chunk_files:
print("[RUN] 拆分 fasta")
chunk_files = split_fasta(pep_fa, split_dir, threads)
print(f"[✔] 拆分完成,共 {len(chunk_files)} 份\n")
else:
print(f"[SKIP] 已存在拆分文件 ({len(chunk_files)} 份)")
# -------------------------------
# 4️⃣ 并行 BLAT
# -------------------------------
tasks = []
for chunk in chunk_files:
out_psl = psl_dir / (chunk.stem + ".psl")
tasks.append((genome, chunk, out_psl))
print(f"[RUN] 并行 BLAT ({threads} 线程)")
with multiprocessing.Pool(threads) as pool:
pool.map(run_blat_chunk, tasks)
print("[✔] BLAT 完成\n")
# -------------------------------
# 5️⃣ 合并 PSL
# -------------------------------
if not check_file(blat_psl):
with open(blat_psl, "w") as out:
for p in sorted(psl_dir.glob("*.psl")):
with open(p) as f:
out.write(f.read())
print(f"[✔] 合并 PSL → {blat_psl}")
else:
print(f"[SKIP] {blat_psl}")
print("\n🎉 Pipeline 完成!")
# CLI
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="并行 BLAT pipeline")
parser.add_argument("--gff", required=True)
parser.add_argument("--genome", required=True)
parser.add_argument("--prefix", required=True)
parser.add_argument("--threads", type=int, default=4)
args = parser.parse_args()
run_pipeline(args.gff, args.genome, args.prefix, args.threads)
这个版本的代码目前跑的是cds+基因组
但是内存占用太高了,
而且跑的非常慢,要想一个解决办法:
100多天,这得什么时候能跑的完?
所以这是不行的。还是得想个办法。