By shenyijun, 30 May, 2026

【金山文档 | WPS云文档】 基因家族鉴定与质控批量处理脚本 https://www.kdocs.cn/l/csXgw8tIPHso

GFI3.py功能:

1. 指定输入目录

2. 递归搜索输入目录下所有 .pep 文件(包括子文件夹)

3. 分别运行 PF01397 和 PF03936 的 hmmsearch

4. 提取两个结构域命中的 ID

5. 取交集

6. 提取 TPS 蛋白序列

7. 指定输出目录

8. 保留输入目录原有子文件夹结构

 

目录示例:

输入目录:

input/

├── Ang/

│ ├── Hbrasiliensis.pep

│ └── Arabidopsis.pep

├── Gym/

│ └── Ginkgo.pep

 

输出目录:

output/

├── Ang/

│ ├── Hbrasiliensis_tps.fasta

│ └── Arabidopsis_tps.fasta

├── Gym/

│ └── Ginkgo_tps.fasta

需要软件:

hmmsearch

seqkit

运行方法:python GFI3.py -i 输入目录 -o 输出目录

例如:python GFI3.py -i /data/final_anno -o /data/TPS

import os
import glob
import argparse
import subprocess
=========================
参数设置
=========================
PF01397 = "PF01397.hmm"
PF03936 = "PF03936.hmm"
E_VALUE = "1e-5"
THREADS = "8"
=========================
命令行参数
=========================
parser = argparse.ArgumentParser(
description="TPS family identification pipeline"
)
parser.add_argument(
"-i",
"--input",
required=True,
help="输入目录"
)
parser.add_argument(
"-o",
"--output",
required=True,
help="输出目录"
)
args = parser.parse_args()
input_dir = os.path.abspath(args.input)
output_dir = os.path.abspath(args.output)
=========================
检查输入目录
=========================
if not os.path.exists(input_dir):
raise FileNotFoundError(f"输入目录不存在: {input_dir}")
=========================
检查 HMM 文件
=========================
if not os.path.exists(PF01397):
raise FileNotFoundError(f"{PF01397} 不存在")
if not os.path.exists(PF03936):
raise FileNotFoundError(f"{PF03936} 不存在")
=========================
创建输出目录
=========================
os.makedirs(output_dir, exist_ok=True)
=========================
搜索所有 pep 文件
=========================
pep_pattern = os.path.join(input_dir, "**", "*.pep")
pep_files = glob.glob(pep_pattern, recursive=True)
if len(pep_files) == 0:
raise FileNotFoundError("没有找到 .pep 文件")
print(f"\n共找到 {len(pep_files)} 个 .pep 文件待处理\n")
=========================
开始处理
=========================
for pep in pep_files:
try:
# 文件名(不含后缀)
species = os.path.splitext(os.path.basename(pep))[0]
# 相对路径
rel_path = os.path.relpath(os.path.dirname(pep), input_dir)
# 输出子目录
out_subdir = os.path.join(output_dir, rel_path)
os.makedirs(out_subdir, exist_ok=True)
print(f"========== 开始处理 {species} ==========")
print(f"输入文件: {pep}")
print(f"输出目录: {out_subdir}")
# =========================
# 输出文件
# =========================
pf03936_out = os.path.join(
out_subdir,
f"{species}PF03936.out"
)
pf01397_out = os.path.join(
out_subdir,
f"{species}PF01397.out"
)
pf03936_id = os.path.join(
out_subdir,
f"{species}PF03936.id.txt"
)
pf01397_id = os.path.join(
out_subdir,
f"{species}PF01397.id.txt"
)
tps_id = os.path.join(
out_subdir,
f"{species}_tps.id.txt"
)
tps_fasta = os.path.join(
out_subdir,
f"{species}_tps.fasta"
)
# =========================
# 1. hmmsearch PF03936
# =========================
cmd1 = (
f"hmmsearch "
f"--cpu {THREADS} "
f"--domE {E_VALUE} "
f"--domtblout {pf03936_out} "
f"{PF03936} {pep}"
)
print("运行命令:", cmd1)
subprocess.run(
cmd1,
shell=True,
check=True
)
# =========================
# 2. hmmsearch PF01397
# =========================
cmd2 = (
f"hmmsearch "
f"--cpu {THREADS} "
f"--domE {E_VALUE} "
f"--domtblout {pf01397_out} "
f"{PF01397} {pep}"
)
print("运行命令:", cmd2)
subprocess.run(
cmd2,
shell=True,
check=True
)
# =========================
# 3. 提取 PF03936 ID
# =========================
cmd3 = (
f"grep -v '^#' {pf03936_out} "
f"| awk '{{print $1}}' "
f"| sort -u > {pf03936_id}"
)
print("运行命令:", cmd3)
subprocess.run(
cmd3,
shell=True,
check=True
)
# =========================
# 4. 提取 PF01397 ID
# =========================
cmd4 = (
f"grep -v '^#' {pf01397_out} "
f"| awk '{{print $1}}' "
f"| sort -u > {pf01397_id}"
)
print("运行命令:", cmd4)
subprocess.run(
cmd4,
shell=True,
check=True
)
# =========================
# 5. 取交集
# =========================
cmd5 = (
f"grep -Fxf "
f"{pf01397_id} "
f"{pf03936_id} "
f"> {tps_id}"
)
print("运行命令:", cmd5)
subprocess.run(
cmd5,
shell=True,
check=True
)
# =========================
# 6. 提取 TPS 蛋白序列
# =========================
cmd6 = (
f"seqkit grep "
f"-f {tps_id} "
f"{pep} "
f"-o {tps_fasta}"
)
print("运行命令:", cmd6)
subprocess.run(
cmd6,
shell=True,
check=True
)
print(f"{species} 处理完成")
print(f"TPS文件: {tps_fasta}\n")
except subprocess.CalledProcessError as e:
print(f"\n处理 {species} 时出错: {e}")
if e.stderr:
print("错误输出:", e.stderr)
continue
print("\n========== 全部处理完成 ==========")

qc3.py功能:

1. 指定输入目录

2. 指定输出目录

3. 递归检索输入目录下所有子文件夹中的 *_tps.fasta

4. 保留原目录结构

5. 仅保留长度 >=350 aa 的蛋白序列

6. 使用 seqkit rmdup -s 去除完全重复序列

7. 输出文件命名:

原名_qc2.fasta

 

例如:

输入目录:

input/

├── Ang/

│ └── Hbrasiliensis_tps.fasta

├── Gym/

│ └── Ginkgo_tps.fasta

 

输出目录:

output/

├── Ang/

│ └── Hbrasiliensis_tps_qc2.fasta

├── Gym/

│ └── Ginkgo_tps_qc2.fasta

 

需要软件:seqkit

运行方式:python qc2.py -i 输入目录 -o 输出目录

例如:python qc2.py -i /data/TPS -o /data/TPS_QC2

import os
import glob
import argparse
import subprocess
=========================
长度过滤参数
=========================
MIN_LEN = 350
=========================
命令行参数
=========================
parser = argparse.ArgumentParser(
description="Batch filter and deduplicate TPS fasta sequences"
)
parser.add_argument(
"-i",
"--input",
required=True,
help="输入目录"
)
parser.add_argument(
"-o",
"--output",
required=True,
help="输出目录"
)
args = parser.parse_args()
input_dir = os.path.abspath(args.input)
output_dir = os.path.abspath(args.output)
=========================
检查输入目录
=========================
if not os.path.exists(input_dir):
raise FileNotFoundError(f"输入目录不存在: {input_dir}")
=========================
创建输出目录
=========================
os.makedirs(output_dir, exist_ok=True)
=========================
搜索 fasta 文件
=========================
pattern = os.path.join(input_dir, "**", "*_tps.fasta")
fasta_files = glob.glob(pattern, recursive=True)
if len(fasta_files) == 0:
raise FileNotFoundError("没有找到 *_tps.fasta 文件")
print(f"\n共找到 {len(fasta_files)} 个文件待处理\n")
=========================
开始处理
=========================
for fasta in fasta_files:
try:
# 文件名
basename = os.path.basename(fasta)
# 不带后缀文件名
name = os.path.splitext(basename)[0]
# 获取相对路径
rel_path = os.path.relpath(
os.path.dirname(fasta),
input_dir
)
# 输出子目录
out_subdir = os.path.join(
output_dir,
rel_path
)
os.makedirs(out_subdir, exist_ok=True)
# 临时过滤文件
temp_filtered = os.path.join(
out_subdir,
f"{name}_temp.fasta"
)
# 最终输出文件
output_file = os.path.join(
out_subdir,
f"{name}_qc2.fasta"
)
# 重复序列详情文件
dup_detail = os.path.join(
out_subdir,
f"{name}_duplicated.detail.txt"
)
print(f"正在处理: {fasta}")
print(f"输出目录: {out_subdir}")
# =========================
# 1. 长度过滤
# =========================
cmd1 = (
f"seqkit seq "
f"-m {MIN_LEN} "
f"{fasta} "
f"-o {temp_filtered}"
)
print("运行命令:", cmd1)
subprocess.run(
cmd1,
shell=True,
check=True
)
# =========================
# 2. 去除重复序列
# =========================
cmd2 = (
f"seqkit rmdup "
f"-s "
f"-D {dup_detail} "
f"{temp_filtered} "
f"-o {output_file}"
)
print("运行命令:", cmd2)
subprocess.run(
cmd2,
shell=True,
check=True
)
# =========================
# 3. 删除临时文件
# =========================
if os.path.exists(temp_filtered):
os.remove(temp_filtered)
print(f"处理完成: {output_file}\n")
except subprocess.CalledProcessError as e:
print(f"\n处理失败: {fasta}")
print(e)
continue
print("========== 全部处理完成 ==========")