背景
起因是我重复章老师的流程,把他的流程定性为了弱的PAV流程。而我们现在新增的补充内容又需要等位基因对这个东西。后来看了赤松单倍型基因组的文章,我觉得它的方法值得参考。
好的,现在看看这篇赤松单倍型基因组文章,它是如何鉴定PAV基因的: Allelic variations between two haplotypes We identified a total of 6,915,720 and 6,937,699 variations in 31,277 and 31,243 gene regions of P. densiflora HA and HB, respectively (Extended Data Fig. 8a). Similar to heterozygous plant genomes27, 29,492 (52%) and 29,411 (52%) genes in P. densiflora HA and HB had nonsynonymous, indel and/or structural variations (SVs) in the coding sequences (Extended Data Fig. 8a). This suggests that the amino acid alteration resulting from these variations led to the generation of gene imbalances between the two haplotypes. Next, we identified 2,634 and 2,616 PAV genes in HA and HB, respectively, and 1,324 allele-specific expressed (ASE) genes in both haplotypes in addition to 40,275 common allelic genes (Extended Data Fig. 8b and Supplementary Table 6). Gene ontology (GO) descriptions and domain repertoires revealed that the majority of PAV and ASE genes have potential involvement in biological processes relevant to biotic and abiotic stresses, including TFs and disease resistance genes containing MYB, AP2 and leucine-rich repeat domains (Extended Data Fig. 8c,d). We then verified the sequences of several PAV TFs and their corresponding regions in the P. densiflora genome (Extended Data Fig. 9a). We observed frameshift mutations leading to the generation of premature stop codons in MADS box, NAC, MYB, NF-YA and AP2 TFs in certain haplotypes, therefore indicating the haplotype-specific presence of these genes in the opposite haplotype (Extended Data Fig. 9a). Validation experiments of representative ASE genes revealed significant haplotype-preferred expression, consistent with the expression pattern observed in the RNA-seq data (Extended Data Fig. 9b). Utilization of orthologous genes from model plants accelerates the identification of important trait-related genes in relatively less-characterized plant species such as gymnosperms28. We detected 2,344 and 2,367 functional orthologous genes (FOGs) of Arabidopsis in HA and HB, respectively, as potential candidates for functional genes in P. densiflora (Extended Data Fig. 9c and Supplementary Table 7). Among them, FOGs consisted of 173 PAV genes, including 75 in HA and 98 in HB, 69 ASE genes and 2,200 common alleles, respectively (Extended Data Fig. 9c). This indicates that there are 144 distinct FOGs in HA and 167 in HB. GO analysis detected diverse cellular functions and multicopy gene families among these FOGs, including protein kinases, leucine-rich repeats and TFs (Extended Data Fig. 9d). Our findings emphasize that haplotype-resolved genome assembly enables the identification and characterization of allelic variation and haplotype-specific patterns of gene expression in the large diploid genome of P. densiflora
Identification of allelic variation and PAV and ASE genes
between P. densiflora haplotypes
To identify genomic variation between haplotypes, we used the
nucmer program in MUMmer version 4.0 (ref. 79) to align the HA and
HB genomes using default parameters. SVs were reported using the
‘show-diff’ function in both HA and HB genomes. SNPs and indels were
also identified using the ‘show-snps’ function with default parameters.
Moreover, SNPs and indels within gene regions were annotated using
SnpEff version 5.1 (ref. 80), assuming an upstream and downstream
length of 2 kb (-ud 2000).
To classify PAV and ASE genes, we first defined allelic genes in
P. densiflora HA and HB based on reciprocal BLAST best matches and
the genomic location of genes.
When the gene was annotated in only one haplotype, it was classified as a PAV if counterpart genomic regions did not contain its allelic gene after verification using Exonerate version 2.2.0 (ref. 81) (–model protein2genome–percent 50).
For ASE genes, we conducted transcriptome analysis across various tissues of P. densiflora and categorized them by organs including (1) stem for immature
stem-derived cambium, mature stem-derived cambium, immature
stem-derived developing xylem, mature stem-derived developing
xylem and immature whole stem, (2) leaf for young needle, (3) root for
main root and (4) shoot apical meristem82. RNA-seq expression profiles
were obtained following the New Tuxedo83 protocol. Differentially
expressed genes in each tissue were identified using DESeq2 (ref. 84) in
the R module with |log2 (fold change)| > 1. Finally, ASE genes are defined
as genes with alleles differentially expressed in at least one tissue.让AI翻译一下:
✅ 这篇文章是如何鉴定 PAV 基因的?
PAV 基因 = 在一个单倍型(HA)中存在,而在另一个单倍型(HB)中完全缺失的基因。
要做到这一点,他们做了以下 三步:
① 两个单倍型进行基因比对(allelic gene identification)
他们有 HA 基因集 和 HB 基因集。
要判断基因是“等位基因”还是“单倍型特有”,第一步就是把两边的基因互相比对:
- 用蛋白质比对(一般是 BLASTP / Diamond)
- 用基因组序列比对确认位置一致性
- 根据比对覆盖度、identity、互为 best hit 判断是否属于 同一个等位基因对(allelic pair)
如果确认两个基因是 1:1 共线并且序列相似 → 等位基因(common allelic genes)
如果 HA 的一个基因在 HB 中找不到合理的 allele → 候选 PAV
这就是他们得到:
- 40,275 个 common allelic gene pairs
- 2634 / 2616 个 PAV 基因(HA/HB)
② 对候选 PAV 区域在另一单倍型中做“基因区段验证”
为了防止:
- 基因预测错误
- 假缺失(实际基因在但 annotation 没写出来)
- 共线性断裂导致误判缺失
他们对候选 PAV 基因做了 genome-level validation:
We then verified the sequences of several PAV TFs and their corresponding regions in the P. densiflora genome (Extended Data Fig. 9a). We observed frameshift mutations leading to the generation of premature stop codons… therefore indicating haplotype-specific presence of these genes in the opposite haplotype.
也就是说:
- 用 nucmer/minimap2 比对 HA 基因所在的基因组区域到 HB
- 确认 HB 的同源区域是否真的 没有完整的 CDS
- 有些区域存在,但是因为 frameshift / stop codon 导致基因失活 → 他们也算作 haplotype-specific variation
所以他们的 PAV 不是纯粹的“完全缺失”,而是“功能上缺失”。
③ 将 PAV 基因与表达 + 功能做进一步分析
例如:
- PAV TF 验证
- ASE(allele-specific expression)与 PAV 结合
- FOG(functional orthologue)中哪些也属于 PAV
说明这些 PAV 点不仅是结构差异,还会影响表达与功能。
香榧中重复
输入:
- 两个 haplotype 的蛋白质序列 FASTA(query & target)
- 两 haplotype 的 GFF 注释文件
- 共线性 block 文件(SyRI/MCScanX 输出)
- 已有 RBH 等位基因对列表
- 可选:基因组 DNA 序列(用于 rescue)
流程:
┌───────────────────────────────┐
│ 1. 全局 RBH 判定(Strict RBH) │
└───────────────────────────────┘
|
v
- 对 query haplotype 的每个基因执行 BLASTp 对 target haplotype 进行搜索
- 解析 BLAST 输出,保留满足阈值的 hit(evalue / PID / coverage)
- 互惠检验(Reciprocal Best Hit, RBH):
├─ 双向最佳匹配成立 → 严格 RBH 等位基因对
└─ 否 → 进入 PAV 检验
- 输出:初步 RBH 等位基因对列表
┌───────────────────────────────┐
│ 2. PAV 检验与 block-rescue │
└───────────────────────────────┘
|
v
遍历每个 query 基因:
├─ 已在 RBH 等位列表中 → 跳过
└─ 否:
|
v
定位 query 基因所在共线性 block:
├─ 在 target block 中查找注释基因:
│ ├─ 找到基因:
│ │ ├─ 同源且未在 RBH 对 → 新等位基因对,记录
│ │ └─ 已在 RBH 或无法判定 → 进入 protein→genome rescue
│ └─ 未找到基因 → 进入 protein→genome rescue
|
v
Protein→Genome Rescue(BLAT / Exonerate):
├─ query 蛋白质序列比对 target DNA:
│ ├─ 无匹配 → 真实 PAV
│ ├─ 匹配序列突变(frameshift / premature stop) → 真实 PAV
│ └─ 完整蛋白质序列 → 假阳性(非 PAV)
└─ 输出 rescue 结果
┌───────────────────────────────┐
│ 3. 输出 │
└───────────────────────────────┘
- 严格 RBH 等位基因对列表
- 新发现的等位基因对(block rescue)
- 真实 PAV 基因列表
- 假阳性基因列表(可选标记)现在我已经允许用这个代码获得一个区间在另外一个区间上的投影。
我现在剩下要做的就是,输入PAV基因列表+两个基因组的gff文件+pep文件?+blocks_tsv文件+储存着每个染色体fasta序列的文件。
然后用下面这个逻辑来筛选PAV基因。
最好把我blat比对的代码给它,让他用来做参考。
https://www.kdocs.cn/l/cuJn5iNl2fM3
不知道这样喂给AI它能不能听懂嘞?但是不要紧,我们先试试。
一共是有6个代码,这六个代码也要用于待会的操作。
extract_hap_intervals.py
extract_region.py
filter_hited_result.py
main_homology_pipeline.py
parse_psl_to_protein.py
run_blat.py┌──────────────────────────────────────────────────────────┐
│ Allelic gene identification & PAV validation │
│ based on RBH, synteny blocks and rescue alignment │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 0. Input datasets │
│ -------------------------------------------------------- │
│ - Genome A protein sequences (pep.fa) │
│ - Genome B protein sequences (pep.fa) │
│ - Genome A annotation (GFF3) │
│ - Genome B annotation (GFF3) │
│ - Genome A chromosome FASTA │
│ - Genome B chromosome FASTA │
│ - SyRI-derived syntenic blocks (blocks.tsv) │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 1. Strict RBH construction (protein-based) │
│ -------------------------------------------------------- │
│ - BLASTP: Genome A → Genome B │
│ - BLASTP: Genome B → Genome A │
│ - Parameters (example): │
│ e-value ≤ 1e-5 │
│ coverage ≥ 0.6 │
│ keep best hit only │
│ - Reciprocal best hit filtering │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 2. Initial gene classification │
│ -------------------------------------------------------- │
│ - Genes in RBH pairs → strict allelic genes │
│ - Remaining genes → candidate PAV genes │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3. PAV gene inspection (gene-by-gene) │
│ -------------------------------------------------------- │
│ For each candidate PAV gene (query): │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3a. Check RBH table │
│ -------------------------------------------------------- │
│ - If gene already in RBH → skip │
│ - Else → continue │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3b. Syntenic block projection │
│ -------------------------------------------------------- │
│ - Locate query gene genomic coordinates │
│ - Identify query block in blocks.tsv │
│ - Project to corresponding target block │
│ (including non-collinear / nested blocks if present) │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3c. Target block gene search (annotation-based) │
│ -------------------------------------------------------- │
│ - Scan target GFF within block coordinates │
│ - If annotated gene(s) found: │
│ ├─ If gene already in RBH list → in_RBH │
│ └─ Else → NEW ALLELIC GENE (block-supported) │
│ - If no annotated gene found → rescue step │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3d. Protein → Genome rescue alignment │
│ -------------------------------------------------------- │
│ Triggered when: │
│ - No annotated gene in target block │
│ - Or gene model unreliable │
│ │
│ Steps: │
│ - Extract query protein sequence │
│ - Extract target block genomic DNA │
│ - BLAT (protein vs genome) │
│ - Parse PSL output │
│ - Reconstruct CDS coordinates │
│ - Translate to protein │
│ - Filter low-quality hits (identity, coverage, frameshift)│
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 3e. Rescue result interpretation │
│ -------------------------------------------------------- │
│ - No valid protein recovered → TRUE PAV │
│ - Protein truncated / frameshifted → TRUE PAV │
│ - Intact protein recovered → FALSE PAV │
│ (hidden or unannotated allelic gene) │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 4. Final output │
│ -------------------------------------------------------- │
│ - Strict RBH allelic gene pairs │
│ - Newly identified allelic pairs (synteny rescue) │
│ - True PAV gene list │
│ - False-positive PAV list (optional) │
│ - Block ID and coordinates for each gene │
└──────────────────────────────────────────────────────────┘Part_1_双向Blast模块
这一步骤就是为了做一个简单的RBH比对,先把PAV基因和都存在的blast簇找出来。
那么是不是任意的两个基因比对结果,我都有必要进行共线性检验呢?应该是的,确实也很有必要。这里是RBH模块
(base) root@b27b26bddcc5:/project1/bin# cat pav_00_rbh.py
#!/usr/bin/env python3
import argparse
import subprocess
from pathlib import Path
from typing import Dict, Tuple, List, Set
class StrictRBHAnalyzer:
def __init__(self, work_dir: str):
self.work_dir = Path(work_dir)
self.work_dir.mkdir(parents=True, exist_ok=True)
def _path(self, name: str) -> Path:
return self.work_dir / name
# ---------------- system ----------------
def _run(self, cmd: str):
print(f"[RUN] {cmd}")
subprocess.run(cmd, shell=True, check=True)
# ---------------- BLAST ----------------
def make_blast_db(self, fasta: str, prefix: str) -> Path:
db_prefix = self._path(prefix)
if (db_prefix.with_suffix(".pin")).exists():
print(f"[SKIP] BLAST DB exists: {db_prefix}")
return db_prefix
self._run(
f"makeblastdb -in {fasta} -dbtype prot -out {db_prefix}"
)
return db_prefix
def run_blastp(
self,
query: str,
db: Path,
outname: str,
evalue: float,
threads: int,
) -> Path:
out = self._path(outname)
if out.exists() and out.stat().st_size > 0:
print(f"[SKIP] BLAST output exists: {out}")
return out
outfmt = (
"6 qseqid sseqid pident length "
"qlen slen evalue bitscore"
)
self._run(
f"blastp "
f"-query {query} "
f"-db {db} "
f"-out {out} "
f"-evalue {evalue} "
f"-num_threads {threads} "
f"-max_target_seqs 1000 "
f"-outfmt '{outfmt}'"
)
if not out.exists() or out.stat().st_size == 0:
raise RuntimeError(f"BLAST failed or empty output: {out}")
return out
# ---------------- parse ----------------
def parse_best_hits(
self, blast_file: Path, coverage: float
) -> Dict[str, Tuple[str, float]]:
"""
返回:query -> (best_subject, bitscore)
"""
best: Dict[str, Tuple[str, float]] = {}
with open(blast_file) as f:
for line in f:
qid, sid, pid, aln, qlen, slen, e, score = line.split()
aln = float(aln)
qlen = float(qlen)
slen = float(slen)
score = float(score)
cov = min(aln / qlen, aln / slen)
if cov < coverage:
continue
if qid not in best or score > best[qid][1]:
best[qid] = (sid, score)
if not best:
raise RuntimeError(f"No valid BLAST hits after filtering: {blast_file}")
return best
# ---------------- RBH ----------------
def find_strict_rbh(
self,
b12: Dict[str, Tuple[str, float]],
b21: Dict[str, Tuple[str, float]],
) -> Set[Tuple[str, str]]:
rbh = set()
for g1, (g2, _) in b12.items():
if g2 in b21 and b21[g2][0] == g1:
rbh.add((g1, g2))
return rbh
# ---------------- FASTA ----------------
def get_ids(self, fasta: str) -> Set[str]:
ids = set()
with open(fasta) as f:
for line in f:
if line.startswith(">"):
ids.add(line[1:].split()[0])
return ids
# ---------------- main ----------------
def analyze(
self,
fasta1: str,
fasta2: str,
label1: str,
label2: str,
evalue: float,
coverage: float,
prefix: str,
threads: int,
):
print("=== Build BLAST DB ===")
db1 = self.make_blast_db(fasta1, f"{prefix}_{label1}_db")
db2 = self.make_blast_db(fasta2, f"{prefix}_{label2}_db")
print("=== Run BLAST ===")
b12 = self.run_blastp(
fasta1, db2, f"{prefix}_{label1}_to_{label2}.blast",
evalue, threads
)
b21 = self.run_blastp(
fasta2, db1, f"{prefix}_{label2}_to_{label1}.blast",
evalue, threads
)
print("=== Parse BLAST ===")
best12 = self.parse_best_hits(b12, coverage)
best21 = self.parse_best_hits(b21, coverage)
print("=== Find strict RBH ===")
rbh = self.find_strict_rbh(best12, best21)
rbh_file = self._path(f"{prefix}_strict_RBH.txt")
with open(rbh_file, "w") as f:
f.write(f"{label1}\t{label2}\n")
for a, b in sorted(rbh):
f.write(f"{a}\t{b}\n")
print(f" Strict RBH pairs: {len(rbh)}")
print("=== PAV ===")
g1 = self.get_ids(fasta1)
g2 = self.get_ids(fasta2)
paired1 = {x for x, _ in rbh}
paired2 = {y for _, y in rbh}
pav1 = sorted(g1 - paired1)
pav2 = sorted(g2 - paired2)
pav1_file = self._path(f"{prefix}_{label1}_PAV.txt")
pav2_file = self._path(f"{prefix}_{label2}_PAV.txt")
pav1_file.write_text("\n".join(pav1))
pav2_file.write_text("\n".join(pav2))
print(f" {label1} PAV genes: {len(pav1)}")
print(f" {label2} PAV genes: {len(pav2)}")
# ---------------- CLI ----------------
def main():
ap = argparse.ArgumentParser("Strict RBH-based PAV analysis")
ap.add_argument("fasta1")
ap.add_argument("fasta2")
ap.add_argument("--label1", default="HA")
ap.add_argument("--label2", default="HB")
ap.add_argument("--evalue", type=float, default=1e-5)
ap.add_argument("--coverage", type=float, default=0.5)
ap.add_argument("--threads", type=int, default=4)
ap.add_argument("--work_dir", default="pav_workdir")
ap.add_argument("--out_prefix", default="pav")
args = ap.parse_args()
StrictRBHAnalyzer(args.work_dir).analyze(
args.fasta1,
args.fasta2,
args.label1,
args.label2,
args.evalue,
args.coverage,
args.out_prefix,
args.threads,
)
if __name__ == "__main__":
main()这个思路还是不太对,我们还是一对一对计算比较合适
To classify PAV and ASE genes, we first defined allelic genes in
P. densiflora HA and HB based on reciprocal BLAST best matches and
the genomic location of genes.因为我们有四个单倍型基因组,我们可以逐个单倍型计算PAV基因,这样应该也行的。
目前来看,这个代码计算出来的等位基因比较少,PAV基因比较多,后者达到了每个单倍型12000左右的数量,和赤松单倍型基因组那篇文章相差太多了,最好的办法就是把这些PAV基因mapping到共线性位置上,查看是否真正存在基因。
这里的输出目前是这样:
# head step1_strict_RBH.txt
A B
TgFa01G00010-mRNA1 TgFb01G19070-mRNA1
TgFa01G00030-mRNA1 TgFb01G11200-mRNA1
TgFa01G00040-mRNA1 TgFb01G02590-mRNA1
TgFa01G00050-mRNA1 TgFb01G00910-mRNA1
TgFa01G00060-mRNA1 TgFb01G31220-mRNA1
TgFa01G00070-mRNA1 TgFb01G10850-mRNA1
TgFa01G00080-mRNA1 TgFb01G06320-mRNA1
TgFa01G00090-mRNA1 TgFb01G12160-mRNA1
TgFa01G00100-mRNA1 TgFb01G20280-mRNA1(blastp) root@b27b26bddcc5:/project1/data/pav_results/Fe_hap1_vs_Fe_hap2/step1_rbh# head step1_A_PAV.txt
TgFa01G00020-mRNA1
TgFa01G00120-mRNA1
TgFa01G00130-mRNA1
TgFa01G00150-mRNA1
TgFa01G00200-mRNA1
TgFa01G00220-mRNA1
TgFa01G00250-mRNA1
TgFa01G00280-mRNA1
TgFa01G00290-mRNA1
TgFa01G00320-mRNA1
Part_2_syri_vcf 解析模块
这个代码的目标是从syri输出的vcf文件解析生成block文件,目前这个代码既能够被命令行调用,也能被导入到其他的代码里面,所以它是可以被封装的。
(base) root@b27b26bddcc5:/project1/bin# cat pav_02_scan_syri_vcf_list.py
#!/usr/bin/env python3
"""
Scan SyRI VCF, filter structural / block records,
and extract block table including ALL non-collinear regions (HDR, INS, DEL, etc.)
修复HDR被错误标记为ShV的问题,精确识别变异类型并过滤短变异
"""
import os
import sys
import argparse
from typing import Dict, Tuple, List
def parse_info(info_str: str) -> Dict[str, str]:
"""解析 VCF INFO 字段为字典"""
info = {}
if not info_str or info_str == ".":
return info
for item in info_str.split(";"):
if "=" in item:
k, v = item.split("=", 1)
info[k] = v
return info
def parse_syri_chromosomes(vcf_path: str) -> Tuple[str, str]:
"""从 VCF 中解析染色体对信息,如果解析到'.'则跳过"""
with open(vcf_path) as f:
for line in f:
if line.startswith("#"):
continue
fields = line.rstrip().split("\t")
info = parse_info(fields[7])
ref_chr = fields[0]
qry_chr = info.get("ChrB", ".")
# 如果解析到 '.',跳过
if ref_chr != "." and qry_chr != ".":
return ref_chr, qry_chr
# fallback: 根据文件名猜测
basename = os.path.basename(vcf_path)
if "_vs_" in basename:
a, b = basename.split("_vs_", 1)
return a, b.split(".")[0]
# 如果没找到有效信息,返回默认
return "RefChr", "QueryChr"
# 增强的结构变异识别函数
def is_struct_variant(fields: List[str]) -> bool:
"""判定是否为结构变异记录"""
alt = fields[4]
ref = fields[3]
vid = fields[2].lower()
info = parse_info(fields[7])
# 1. 所有包含在<>中的ALT都是结构变异
if alt.startswith("<") and alt.endswith(">"):
return True
# 2. VarType标记为SR
if info.get("VarType") == "SR":
return True
# 3. ID前缀是结构变异类型
struct_prefixes = ("hdr", "del", "ins", "inv", "cpg", "cpl", "notal", "dup", "tra", "synd", "synal")
if any(vid.startswith(prefix) for prefix in struct_prefixes):
return True
# 4. 长度变化超过50bp (作为后备条件)
ref_len = len(ref)
alt_len = len(alt)
if abs(alt_len - ref_len) >= 50:
return True
return False
def calculate_sv_length(block_type: str, ref_start: str, ref_end: str, qry_start: str, qry_end: str) -> int:
"""计算结构变异的长度"""
try:
# 对于SYN和SYNAL区块,使用参考序列长度
if block_type in ["SYN", "SYNAL"]:
return int(ref_end) - int(ref_start) + 1
# 对于其他类型,根据变异类型计算长度
ref_len = int(ref_end) - int(ref_start) + 1
qry_len = int(qry_end) - int(qry_start) + 1
if block_type in ["INS", "DUP"]:
return max(qry_len - ref_len, qry_len) # 插入/重复长度
elif block_type == "DEL":
return max(ref_len - qry_len, ref_len) # 缺失长度
elif block_type == "INV":
return max(ref_len, qry_len) # 倒位长度
elif block_type == "HDR":
return ref_len # HDR长度等于参考序列长度
else:
return max(ref_len, qry_len) # 默认取最大值
except (ValueError, TypeError):
return 0 # 如果坐标无效,返回0长度
def extract_blocks(input_vcf: str) -> List[str]:
"""抽取所有结构变异区块,过滤短变异"""
blocks = []
ref_chr, qry_chr = parse_syri_chromosomes(input_vcf)
with open(input_vcf) as f:
for line in f:
if line.startswith("#"):
continue
fields = line.rstrip().split("\t")
if not is_struct_variant(fields):
continue
chrom = fields[0]
pos = fields[1]
vid = fields[2]
alt = fields[4]
info = parse_info(fields[7])
# 1. 优先从ID中推断变异类型
vid_lower = vid.lower()
block_type = "UNK"
if vid_lower.startswith("hdr"):
block_type = "HDR"
elif vid_lower.startswith("del"):
block_type = "DEL"
elif vid_lower.startswith("ins"):
block_type = "INS"
elif vid_lower.startswith("inv"):
block_type = "INV"
elif vid_lower.startswith("dup"):
block_type = "DUP"
elif vid_lower.startswith("cpg"):
block_type = "CPG"
elif vid_lower.startswith("cpl"):
block_type = "CPL"
elif vid_lower.startswith("notal"):
block_type = "NOTAL"
elif vid_lower.startswith("synal"):
block_type = "SYNAL"
elif vid_lower.startswith("synd"):
block_type = "SYND"
elif vid_lower.startswith("syn"):
block_type = "SYN"
# 2. 次优先从ALT字段获取类型
if block_type == "UNK" and alt.startswith("<") and alt.endswith(">"):
block_type = alt[1:-1].upper() # 去掉<>,如HDR
# 3. 最后使用VarType字段
if block_type == "UNK" and "VarType" in info:
block_type = info["VarType"]
# 确保变异类型总是大写
block_type = block_type.upper()
# 处理坐标
ref_start_val = pos
ref_end_val = info.get("END", ref_start_val)
qry_chr_local = info.get("ChrB", qry_chr)
qry_start_val = info.get("StartB", "Na")
qry_end_val = info.get("EndB", "Na")
# 特殊处理:当StartB存在但EndB不存在时
if qry_start_val != "Na" and qry_end_val == "Na":
qry_end_val = qry_start_val
# 计算变异长度并过滤短变异
sv_length = calculate_sv_length(
block_type,
ref_start_val,
ref_end_val,
qry_start_val,
qry_end_val
)
# 过滤长度<50的变异(SYN和SYNAL除外)
if sv_length < 50 and block_type not in ["SYN", "SYNAL", "SYND"]:
continue
# 添加区块信息
blocks.append(
"\t".join([
vid,
block_type,
info.get("Parent", "."),
chrom,
ref_start_val,
ref_end_val,
qry_chr_local,
qry_start_val,
qry_end_val
])
)
return blocks
def filter_vcf(input_vcf: str) -> str:
"""过滤VCF,保留所有结构变异"""
out_lines = []
with open(input_vcf) as f:
for line in f:
if line.startswith("#"):
out_lines.append(line)
continue
fields = line.rstrip().split("\t")
if is_struct_variant(fields):
out_lines.append(line)
return "".join(out_lines)
def main():
parser = argparse.ArgumentParser(
description="Filter SyRI VCF and extract block table (correctly handles HDR marked as ShV)"
)
parser.add_argument("input_vcf")
parser.add_argument("--out-vcf", help="Filtered VCF output")
parser.add_argument("--out-block", help="Block table TSV output")
args = parser.parse_args()
# 1. 过滤 VCF
filtered_vcf = filter_vcf(args.input_vcf)
if args.out_vcf:
with open(args.out_vcf, "w") as f:
f.write(filtered_vcf)
print(f"[INFO] Filtered VCF saved to {args.out_vcf}", file=sys.stderr)
# 2. 抽取 block 表(包含长度过滤)
blocks = extract_blocks(args.input_vcf)
if args.out_block:
with open(args.out_block, "w") as f:
f.write(
"\t".join([
"block_id", "block_type", "block_parent",
"ref_chr", "ref_start", "ref_end",
"qry_chr", "qry_start", "qry_end"
]) + "\n"
)
for line in blocks:
f.write(line + "\n")
print(f"[INFO] Block table saved to {args.out_block} ({len(blocks)} blocks)", file=sys.stderr)
ref_chr, qry_chr = parse_syri_chromosomes(args.input_vcf)
print(f"[INFO] REF={ref_chr} QUERY={qry_chr}", file=sys.stderr)
if __name__ == "__main__":
main()现在展示输出的block文件
(base) root@b27b26bddcc5:/project1/data/Chr_split/Chr_ALN4/Ma_hap1_Chr03_flip_vs_Ma_hap2_Chr03_flip# head Ma_hap1_Chr03_flip_vs_Ma_hap2_Chr03_flipsyri.vcf -n 50
##fileformat=VCFv4.3
##fileDate=20251226
##source=syri
##contig=<ID=Ma_hap1_Chr03_flip,length=1266512730>
##ALT=<ID=SYN,Description="Syntenic region">
##ALT=<ID=INV,Description="Inversion">
##ALT=<ID=TRANS,Description="Translocation">
##ALT=<ID=INVTR,Description="Inverted Translocation">
##ALT=<ID=DUP,Description="Duplication">
##ALT=<ID=INVDP,Description="Inverted Duplication">
##ALT=<ID=SYNAL,Description="Syntenic alignment">
##ALT=<ID=INVAL,Description="Inversion alignment">
##ALT=<ID=TRANSAL,Description="Translocation alignment">
##ALT=<ID=INVTRAL,Description="Inverted Translocation alignment">
##ALT=<ID=DUPAL,Description="Duplication alignment">
##ALT=<ID=INVDPAL,Description="Inverted Duplication alignment">
##ALT=<ID=HDR,Description="Highly diverged regions">
##ALT=<ID=INS,Description="Insertion in non-reference genome">
##ALT=<ID=DEL,Description="Deletion in non-reference genome">
##ALT=<ID=CPG,Description="Copy gain in non-reference genome">
##ALT=<ID=CPL,Description="Copy loss in non-reference genome">
##ALT=<ID=SNP,Description="Single nucleotide polymorphism">
##ALT=<ID=TDM,Description="Tandem repeat">
##ALT=<ID=NOTAL,Description="Not Aligned region">
##INFO=<ID=END,Number=1,Type=Integer,Description="End position on reference genome">
##INFO=<ID=ChrB,Number=1,Type=String,Description="Chromosome ID on the non-reference genome">
##INFO=<ID=StartB,Number=1,Type=Integer,Description="Start position on non-reference genome">
##INFO=<ID=EndB,Number=1,Type=Integer,Description="End position on non-reference genome">
##INFO=<ID=Parent,Number=1,Type=String,Description="ID of the parent SR">
##INFO=<ID=VarType,Number=1,Type=String,Description="SR for structural arrangements, ShV for short variants, missing otherwise">
##INFO=<ID=DupType,Number=1,Type=String,Description="Copy gain or loss in the non-reference genome">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT sample
Ma_hap1_Chr03_flip 1 SYNAL1 N <SYNAL> . PASS END=2999732;ChrB=Ma_hap2_Chr03_flip;StartB=1;EndB=2999771;Parent=SYN1;VarType=.;DupType=. GT 1
Ma_hap1_Chr03_flip 1 SYN1 N <SYN> . PASS END=4901654;ChrB=Ma_hap2_Chr03_flip;StartB=1;EndB=4862229;Parent=.;VarType=SR;DupType=- GT 1
Ma_hap1_Chr03_flip 52819 SNP24287 C G . PASS END=52819;ChrB=Ma_hap2_Chr03_flip;StartB=52819;EndB=52819;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 55593 SNP24288 T C . PASS END=55593;ChrB=Ma_hap2_Chr03_flip;StartB=55593;EndB=55593;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 163411 SNP24289 T C . PASS END=163411;ChrB=Ma_hap2_Chr03_flip;StartB=163411;EndB=163411;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 234802 SNP24290 G A . PASS END=234802;ChrB=Ma_hap2_Chr03_flip;StartB=234802;EndB=234802;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 241548 SNP24291 G A . PASS END=241548;ChrB=Ma_hap2_Chr03_flip;StartB=241548;EndB=241548;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 370805 SNP24292 A G . PASS END=370805;ChrB=Ma_hap2_Chr03_flip;StartB=370805;EndB=370805;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 442033 SNP24293 C T . PASS END=442033;ChrB=Ma_hap2_Chr03_flip;StartB=442033;EndB=442033;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 442476 SNP24294 T C . PASS END=442476;ChrB=Ma_hap2_Chr03_flip;StartB=442476;EndB=442476;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 447992 SNP24295 T C . PASS END=447992;ChrB=Ma_hap2_Chr03_flip;StartB=447992;EndB=447992;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 826023 SNP24296 C T . PASS END=826023;ChrB=Ma_hap2_Chr03_flip;StartB=826023;EndB=826023;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 1261857 SNP24297 T G . PASS END=1261857;ChrB=Ma_hap2_Chr03_flip;StartB=1261857;EndB=1261857;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 1476540 SNP24298 G A . PASS END=1476540;ChrB=Ma_hap2_Chr03_flip;StartB=1476540;EndB=1476540;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 1491997 SNP24299 T C . PASS END=1491997;ChrB=Ma_hap2_Chr03_flip;StartB=1491997;EndB=1491997;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 1599278 SNP24300 C T . PASS END=1599278;ChrB=Ma_hap2_Chr03_flip;StartB=1599278;EndB=1599278;Parent=SYN1;VarType=ShV;DupType=. GT 1
Ma_hap1_Chr03_flip 1660761 SNP24301 C G . PASS END=1660761;ChrB=Ma_hap2_Chr03_flip;StartB=1660761;EndB=1660761;Parent=SYN1;VarType=ShV;DupType=. GT 1第一步,我通过nucmer注释四个单倍型之间的共线性block,包括结构变异类型的block。
这个流程,根据我目前的观察,一周左右其实就可以跑完了,没有想象中那么久。
第二步,我将PART1得到的基因簇比回到四个单倍型的物理位置上,查看是否处于一个相同的block块,如果处于同源的block块,我先认定它们属于等位基因对,然后我再根据结构变异具体处于什么结构类型的block块上。
好的,现在我想请你给我写一个脚本,输入为syri比对得到vcf文件,输出为过滤了短变异的两个文件,一个是有1v1共线性的block文件(只要有1比1关系,就算是1v1共线性的block文件,倒位,易位,甚至这个严格参数下实际上是1比1的dup也在内),一个是没有共线性的block文件,均以vcf作为格式。
然后,我希望你能写第二个脚本:即把这两个文件作为库,然后把基因的坐标作为query,以bed文件的格式输入,对于输入的区间,找到它在另外的染色体上的对应block,能做到吗?这个过程中要注意区分正向的block和反向的block,比如倒位,可以吗?
第三步,跑完转录组数据之后看表达量。
Part_3_基因block搜索
对于一个给定的基因区间,在输出的block文件里找到对应的同源染色体block。
对于这个block,先查询gff上是否有注释基因,如果注释了是不是同源基因,如果有同源基因,这个同源基因是不是在已有的等位基因对里?如果不在等位基因对里,判定为一个新的等位基因对。
如果以上逻辑走不通,则把蛋白质序列作为query,把dna序列作为target,进行blat比对,提取序列后拼接翻译为蛋白质,如果没有蛋白质序列,直接认定为真实的PAV基因,如果有蛋白质序列,但是突变了,则认定为真实的PAV基因,如果这个蛋白质序列是完好的,则认定为假阳性,它不是一个真实的PAV基因。
Project一个区间在同源染色体上的投影区间pav_03_project_gene_blocks.py
(base) root@b27b26bddcc5:/project1/bin# cat pav_03_project_gene_blocks.py
#!/usr/bin/env python3
import pandas as pd
ONE_TO_ONE_TYPES = {
"SYN", "INV", "TRANS", "INVTR",
"DUP", "INVDP",
"SYNAL", "INVAL", "TRANSAL", "INVTRAL",
"DUPAL", "INVDPAL",
"HDR"
}
NON_COLLINEAR_TYPES = {
"DEL", "INS", "NOTAL", "CPG", "CPL", "shortVariant"
}
def parse_vcf_block_ids(vcf_file, min_len=50):
"""
解析 VCF 文件,返回 block 列表
block = (start, end, is_collinear, svtype, ID, Parent)
"""
blocks = []
with open(vcf_file) as fin:
for l in fin:
if l.startswith("#"):
continue
f = l.rstrip().split("\t")
pos = int(f[1])
alt = f[4]
info_str = f[7]
block_id = f[2] if f[2] != "." else None
# 获取 SVTYPE
svtype = alt[1:-1] if alt.startswith("<") and alt.endswith(">") else "shortVariant"
# 解析 INFO 获取 Parent
info = {}
for x in info_str.split(";"):
if "=" in x:
k, v = x.split("=", 1)
info[k] = v
parent_id = info.get("Parent")
# 获取 END
end = int(info.get("END", pos))
svlen = end - pos + 1
if svlen < min_len:
continue
is_collinear = svtype in ONE_TO_ONE_TYPES
blocks.append((pos, end, is_collinear, svtype, block_id, parent_id))
return blocks
def find_gene_block_ids(gene_coord, blocks):
"""
找出 gene 落入的所有 block 的 ID 和 Parent
返回:
target_starts, target_ends, is_collinear_list, svtype_list, block_id_list, parent_id_list
"""
g_start, g_end = gene_coord
target_starts, target_ends, is_collinear_list, svtype_list, block_id_list, parent_id_list = [], [], [], [], [], []
for b_start, b_end, is_collinear, svtype, block_id, parent_id in blocks:
# 判断基因是否落入 block
if g_start <= b_end and g_end >= b_start:
target_starts.append(b_start)
target_ends.append(b_end)
is_collinear_list.append("YES" if is_collinear else "NO")
svtype_list.append(svtype)
block_id_list.append(block_id)
parent_id_list.append(parent_id)
if not target_starts:
return [None], [None], ["NO"], [None], [None], [None]
return target_starts, target_ends, is_collinear_list, svtype_list, block_id_list, parent_id_list
def project_gene_blocks(pav_df, gff_dict, chr_pair_index, min_len=50):
"""
四单倍型互比投影基因到 block,输出 block_id 和 Parent
"""
hap_labels = pav_df.columns[1:]
results = []
for idx, row in pav_df.iterrows():
cluster = row[0]
for q_hap in hap_labels:
q_gene = row[q_hap]
if q_gene == "NA":
continue
q_chr = gff_dict[q_gene]["chr"]
q_start, q_end = gff_dict[q_gene]["start"], gff_dict[q_gene]["end"]
for t_hap in hap_labels:
if t_hap == q_hap:
continue
t_gene = row[t_hap]
t_chr = gff_dict[t_gene]["chr"] if t_gene != "NA" else t_hap
# 找染色体对
chr_pair = f"{q_chr}vs{t_chr}"
if chr_pair not in chr_pair_index:
chr_pair = f"{t_chr}vs{q_chr}"
if chr_pair not in chr_pair_index:
results.append([
q_hap, q_gene, t_hap, t_chr,
q_start, q_end,
[None], [None], ["NO"],
[None], [None], [None]
])
continue
vcf_1v1, vcf_nc = chr_pair_index[chr_pair]
if vcf_1v1 != "NA":
blocks = parse_vcf_block_ids(vcf_1v1, min_len=min_len)
t_starts, t_ends, is_collinear_list, svtype_list, block_ids, parent_ids = \
find_gene_block_ids((q_start, q_end), blocks)
query_block_type = ["1v1"] * len(t_starts)
else:
t_starts = t_ends = block_ids = parent_ids = svtype_list = [None]
is_collinear_list = ["NO"]
query_block_type = [None]
# 每个基因可能对应多个 block,展开多行
for i in range(len(t_starts)):
results.append([
q_hap, q_gene, t_hap, t_chr,
q_start, q_end,
t_starts[i], t_ends[i],
is_collinear_list[i], query_block_type[i], svtype_list[i], block_ids[i], parent_ids[i]
])
df_res = pd.DataFrame(results, columns=[
"query_hap", "query_gene", "target_hap", "target_chr",
"query_start", "query_end", "target_start", "target_end",
"is_collinear", "query_block_type", "target_block_type", "target_block_id", "target_block_parent"
])
return df_res(base) root@b27b26bddcc5:/project1/data/Chr_split/Chr_ALN4/Ma_hap1_Chr03_flip_vs_Ma_hap2_Chr03_flip# cat del.txt
input_interval used_block_level used_block_ids used_block_types source_coords target_chrom target_coords orientation
Ma_hap1_Chr03_flip:1-8000 child SYNAL1 SYNAL Ma_hap1_Chr03_flip:1-8000 Ma_hap2_Chr03_flip 1-8000 +很好,输出是映射出来了的
映射区间到多个block文件上
现在有一个小问题,我有一个染色体区间,比如Fe_hap1_chr1:1-8000,当我只有一个block文件Fe_hap1_chr1_vs_Fe_hap2_chr1.txt的时候,我当然知道要在这个block文件上找映射,但是当我涉及到的block文件越来越多的时候,怎么办?这个时候我们把一系列block文件生成一个txt表,不直接全局合并 block,而是保留按染色体对拆分的 block 文件,单个区间需要时再去对应文件查找。
目前表格的思路是这样:
block_file ref_chr qry_chr
chr1_ref_vs_chrA1.blocks.tsv chr1_ref chrA1
chr2_ref_vs_chrA2.blocks.tsv chr2_ref chrA2
...pav_04_project_interval.py
(base) root@b27b26bddcc5:/project1/bin# cat pav_04_project_interval.py
#!/usr/bin/env python3
"""
Project a genomic interval to target chromosome(s) using precomputed block files.
Supports:
1. Command-line usage
2. External import and function call
3. Multiple block files per reference chromosome
"""
import pandas as pd
import sys
import argparse
from pathlib import Path
# 将 pav_03_project_gene_blocks.py 所在路径加入 sys.path
sys.path.insert(0, "/project1/bin")
from pav_03_project_gene_blocks import get_projected_block_info
def load_block_index(index_file: str) -> pd.DataFrame:
"""
Load block index TSV.
Must contain columns: block_file, ref_chr, qry_chr
"""
df = pd.read_csv(index_file, sep="\t")
if not set(['block_file','ref_chr','qry_chr']).issubset(df.columns):
raise ValueError("Index TSV must contain columns: block_file, ref_chr, qry_chr")
return df
def project_interval(interval: str, index_df: pd.DataFrame):
"""
Project a single interval.
interval: string "chr:start-end"
index_df: block index dataframe
Returns a list of projection dicts (one per matching block file)
"""
chrom, _ = interval.split(":")
# 找到所有对应 block 文件
matched_files = index_df[index_df['ref_chr'] == chrom]['block_file'].tolist()
results = []
for bf in matched_files:
if not Path(bf).exists():
continue
res = get_projected_block_info(bf, interval)
if res:
res['block_file'] = bf # 记录对应 block 文件
results.append(res)
return results
def project_intervals_from_list(intervals: list, index_df: pd.DataFrame):
"""
Project multiple intervals.
Returns a list of dicts.
"""
all_results = []
for interval in intervals:
proj_res = project_interval(interval, index_df)
all_results.extend(proj_res)
return all_results
def main():
parser = argparse.ArgumentParser(description="Project genomic interval(s) using precomputed block files")
parser.add_argument("index_file", help="TSV file listing block_file, ref_chr, qry_chr")
parser.add_argument("--interval", help="Single interval to project, e.g., chr1:1000-5000")
parser.add_argument("--interval_list", help="File with one interval per line")
parser.add_argument("--out", help="Output TSV file")
args = parser.parse_args()
index_df = load_block_index(args.index_file)
intervals = []
if args.interval:
intervals.append(args.interval)
if args.interval_list:
with open(args.interval_list) as f:
intervals.extend([line.strip() for line in f if line.strip()])
if not intervals:
print("No intervals provided.", file=sys.stderr)
sys.exit(1)
results = project_intervals_from_list(intervals, index_df)
if args.out:
if results:
df = pd.DataFrame(results)
df.to_csv(args.out, sep="\t", index=False)
print(f"[INFO] Projection results saved to {args.out}", file=sys.stderr)
else:
print("[INFO] No projection results found.", file=sys.stderr)
else:
# 打印到 stdout
for r in results:
print("\t".join(str(r[k]) for k in r.keys()))
# 支持外部调用
def project_interval_external(interval: str, index_file: str):
"""
External callable function.
Returns a list of dicts, each dict corresponds to one matching block file projection.
"""
index_df = load_block_index(index_file)
return project_interval(interval, index_df)
if __name__ == "__main__":
main()基于染色体映射的gff筛选 pav_05_block_allele_gff_check.py
这个代码的用处是,对于一个给定的一对潜在的PAV文件,对于上面的每一个基因,找到它们在同源染色体上的对应Blocks,先进行gff核对。
如果这个区间上注释了基因,并且blastp比对通过,√新的潜在的等位基因。
(还需要加一层,这个区间上的这个基因不在已有的RBH上,如果在对应的RBH上,则归类为--rbh_file )
如果这个区间上没有注释基因,进入下一轮PAV比对。
pav_05_block_allele_gff_check.py
(base) root@b27b26bddcc5:/project1/bin# cat pav_05_block_allele_gff_check.py
#!/usr/bin/env python3
import argparse
from pathlib import Path
import subprocess
import tempfile
import pandas as pd
import sys
import logging
import re
sys.path.insert(0, "/project1/bin")
from pav_04_project_interval import project_interval_external
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# --------------------------------------------------
# 新功能:基于RBH的target_gene过滤
# --------------------------------------------------
def load_existing_targets(rbh_file):
"""加载RBH文件中已有的target_gene集合"""
existing_targets = set()
try:
if rbh_file:
df = pd.read_csv(rbh_file, sep='\t')
# 处理可能的列名变体
if 'HB' in df.columns:
existing_targets.update(df['HB'].tolist())
elif 'target_gene' in df.columns:
existing_targets.update(df['target_gene'].tolist())
logging.info(f"Loaded {len(existing_targets)} existing target genes from RBH file")
except Exception as e:
logging.warning(f"Error loading RBH file: {e}")
return existing_targets
####我觉得这串代码有问题,他没必要看基因总数啊
# --------------------------------------------------
# 基础工具
# --------------------------------------------------
def read_list(path):
with open(path) as f:
return [l.strip() for l in f if l.strip()]
def overlap(a1, a2, b1, b2):
return not (a2 < b1 or b2 < a1)
def extract_pep(pid, pep, out):
with open(pep) as f, open(out, "w") as o:
w = False
for line in f:
if line.startswith(">"):
w = pid in line
if w:
o.write(line)
def run_blastp(q, t, out, threads):
subprocess.run([
"blastp",
"-query", str(q),
"-subject", str(t),
"-outfmt", "6 qseqid sseqid pident length qlen",
"-num_threads", str(threads)
], stdout=open(out, "w"), check=True)
def blast_ok(tsv, pid=30, cov=0.3):
with open(tsv) as f:
for l in f:
_, _, p, lq, ql = l.strip().split("\t")
if float(p) >= pid and int(lq)/int(ql) >= cov:
return True, float(p), int(lq)/int(ql)
return False, ".", "."
# --------------------------------------------------
# GFF
# --------------------------------------------------
def load_gff(gff):
genes = []
g2p = {}
with open(gff) as f:
for l in f:
if l.startswith("#"):
continue
c = l.rstrip().split("\t")
chrom, typ, s, e, attr = c[0], c[2], int(c[3]), int(c[4]), c[8]
if typ == "gene":
gid = next(x[3:] for x in attr.split(";") if x.startswith("ID="))
genes.append((chrom, s, e, gid))
elif typ == "mRNA":
mid = next(x[3:] for x in attr.split(";") if x.startswith("ID="))
pid = next(x[7:] for x in attr.split(";") if x.startswith("Parent="))
g2p.setdefault(pid, []).append(mid)
return genes, g2p
# --------------------------------------------------
# 主逻辑 - 加入RBH过滤
# --------------------------------------------------
def process(pavs, gff_s, pep_s, gff_t, pep_t, block_index,
extend, tmp, allelic, potential, threads, existing_targets):
src_genes, g2p = load_gff(gff_s)
tgt_genes, tg2p = load_gff(gff_t)
for prot in pavs:
hit_gene = None
for c, s, e, g in src_genes:
if prot in g2p.get(g, []):
hit_gene = (c, s, e)
break
if not hit_gene:
potential.append({
"pav_protein": prot,
"block_hit": "no",
"block_file": ".",
"block_level": ".",
"block_ids": ".",
"block_types": ".",
"reason": "protein_not_in_gff",
"src_chr": ".",
"src_start": ".",
"src_end": ".",
"tgt_chr": ".",
"tgt_start": ".",
"tgt_end": "."
})
continue
src_chr, src_s, src_e = hit_gene
interval = f"{src_chr}:{src_s}-{src_e}"
projs = project_interval_external(interval, block_index)
if not projs:
potential.append({
"pav_protein": prot,
"block_hit": "no",
"block_file": ".",
"block_level": ".",
"block_ids": ".",
"block_types": ".",
"reason": "no_block_projection",
"src_chr": src_chr,
"src_start": src_s,
"src_end": src_e,
"tgt_chr": ".",
"tgt_start": ".",
"tgt_end": "."
})
continue
for p in projs:
try:
ps, pe = map(int, p["projected_coords"].split("-"))
except Exception:
continue
ps -= extend
pe += extend
tgt_chr = p["target_chrom"]
block_level = p.get("used_block_level", ".")
block_info_str = p.get("child_blocks_info" if block_level == "child" else "parent_block_info", "")
ids, types = [], []
if block_info_str and block_info_str != ".":
pattern = r'([A-Za-z0-9_]+)\((\d+)-(\d+),([A-Z]+)\)'
matches = re.findall(pattern, block_info_str)
for match in matches:
bid = match[0]
typ = match[3]
ids.append(bid)
types.append(typ)
block_ids = ";".join(ids) if ids else "."
block_types = ";".join(types) if types else "."
block_file = p.get("block_file", ".")
logging.info(f"Block info parsed: level={block_level}, ids={ids}, types={types}, str={block_info_str}")
hits = [
g for g in tgt_genes
if g[0] == tgt_chr and overlap(ps, pe, g[1], g[2])
]
if not hits:
potential.append({
"pav_protein": prot,
"block_hit": "yes",
"block_level": block_level,
"block_ids": block_ids,
"block_types": block_types,
"block_file": block_file,
"reason": "no_gene_in_target_block",
"src_chr": src_chr,
"src_start": src_s,
"src_end": src_e,
"tgt_chr": tgt_chr,
"tgt_start": ps,
"tgt_end": pe
})
continue
qfa = tmp / f"{prot}.fa"
extract_pep(prot, pep_s, qfa)
found = False
for _, _, _, tg in hits:
for tp in tg2p.get(tg, [tg]):
# ============================= 新过滤逻辑 =============================
# 如果target_gene已在RBH文件中存在,直接跳过作为PAV
if tp in existing_targets:
potential.append({
"pav_protein": prot,
"target_gene": tp,
"block_hit": "yes",
"block_level": block_level,
"block_ids": block_ids,
"block_types": block_types,
"block_file": block_file,
"reason": "already_in_rbh",
"src_chr": src_chr,
"src_start": src_s,
"src_end": src_e,
"tgt_chr": tgt_chr,
"tgt_start": ps,
"tgt_end": pe
})
found = True
continue
# ===================================================================
tfa = tmp / f"{tp}.fa"
extract_pep(tp, pep_t, tfa)
out = tmp / f"{prot}_vs_{tp}.tsv"
run_blastp(qfa, tfa, out, threads)
ok, pid, cov = blast_ok(out)
if ok:
allelic.append({
"pav_protein": prot,
"target_gene": tp,
"block_hit": "yes",
"block_level": block_level,
"block_ids": block_ids,
"block_types": block_types,
"block_file": block_file,
"src_chr": src_chr,
"src_start": src_s,
"src_end": src_e,
"tgt_chr": tgt_chr,
"tgt_start": ps,
"tgt_end": pe,
"identity": pid,
"coverage": cov
})
found = True
if not found:
potential.append({
"pav_protein": prot,
"block_hit": "yes",
"block_level": block_level,
"block_ids": block_ids,
"block_types": block_types,
"block_file": block_file,
"reason": "non_homolog",
"src_chr": src_chr,
"src_start": src_s,
"src_end": src_e,
"tgt_chr": tgt_chr,
"tgt_start": ps,
"tgt_end": pe
})
# --------------------------------------------------
# main - 加入RBH文件参数
# --------------------------------------------------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--pav_list", required=True)
ap.add_argument("--gff_list", required=True)
ap.add_argument("--pep_list", required=True)
ap.add_argument("--block_index", required=True)
ap.add_argument("--outdir", required=True)
ap.add_argument("--extend", type=int, default=1000)
ap.add_argument("--threads", type=int, default=4)
# 新参数:RBH文件路径
ap.add_argument("--rbh_file", help="Reciprocal Best Hit file for existing target genes")
args = ap.parse_args()
ref_pav, qry_pav = args.pav_list.split(",")
ref_gff, qry_gff = args.gff_list.split(",")
ref_pep, qry_pep = args.pep_list.split(",")
# 加载已有的target_gene集合
existing_targets = load_existing_targets(args.rbh_file)
allelic, potential = [], []
with tempfile.TemporaryDirectory() as d:
tmp = Path(d)
process(read_list(ref_pav), ref_gff, ref_pep,
qry_gff, qry_pep, args.block_index,
args.extend, tmp, allelic, potential, args.threads, existing_targets)
process(read_list(qry_pav), qry_gff, qry_pep,
ref_gff, ref_pep, args.block_index,
args.extend, tmp, allelic, potential, args.threads, existing_targets)
out = Path(args.outdir)
out.mkdir(exist_ok=True)
pd.DataFrame(allelic).to_csv(out/"allelic_pairs.tsv", sep="\t", index=False)
pd.DataFrame(potential).to_csv(out/"potential_PAV.tsv", sep="\t", index=False)
if __name__ == "__main__":
main()
输出表格格式如下:
pav_protein block_hit block_file block_level block_ids block_types reason src_chr src_start src_end tgt_chr tgt_start tgt_end target_gene
TgFa01G00120-mRNA1 no . . . . no_block_projection Fe_hap1_Chr01 1603491117 1603492235 . . .
TgFa01G00130-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR47337 HDR already_in_rbh Fe_hap1_Chr01 1500540090 1500540467 Fe_hap2_Chr01 1499636564 1499638941 TgFb01G22240-mRNA1
TgFa01G00150-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL35474 SYNAL no_gene_in_target_block Fe_hap1_Chr01 1672845211 1672847952 Fe_hap2_Chr01 1668497645 1668502386
TgFa01G00200-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR12927 HDR no_gene_in_target_block Fe_hap1_Chr01 474576912 474577511 Fe_hap2_Chr01 475452873 475455472
TgFa01G00220-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL1722 SYNAL non_homolog Fe_hap1_Chr01 58920443 58922592 Fe_hap2_Chr01 60905604 60909753
TgFa01G00250-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL4985 SYNAL already_in_rbh Fe_hap1_Chr01 200457691 200492201 Fe_hap2_Chr01 203790682 203827192 TgFb01G20720-mRNA1
TgFa01G00280-mRNA1 no . . . . no_block_projection Fe_hap1_Chr01 1761010781 1761011160 . . .
TgFa01G00290-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR29313 HDR no_gene_in_target_block Fe_hap1_Chr01 912416620 912416964 Fe_hap2_Chr01 897987334 897989678
TgFa01G00320-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL31427 SYNAL already_in_rbh Fe_hap1_Chr01 1520421040 1520421381 Fe_hap2_Chr01 1517358786 1517361127 TgFb01G21750-mRNA1
TgFa01G00340-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child TRANSAL45454TRANSAL already_in_rbh Fe_hap1_Chr01 502596068 502596865 Fe_hap2_Chr01 135235066 135237861 TgFb01G08910-mRNA1
TgFa01G00390-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL166 SYNAL no_gene_in_target_block Fe_hap1_Chr01 4411932 4412267 Fe_hap2_Chr01 4846350 4848685
TgFa01G00480-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child CPL40806 CPL no_gene_in_target_block Fe_hap1_Chr01 1295651184 1295651492 Fe_hap2_Chr01 1299538485 1299540793
TgFa01G00490-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR3954 HDRalready_in_rbh Fe_hap1_Chr01 154397080 154397418 Fe_hap2_Chr01 157490498 157492836 TgFb01G13670-mRNA1
TgFa01G00530-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR51925 HDR no_gene_in_target_block Fe_hap1_Chr01 1669263079 1669263502 Fe_hap2_Chr01 1666898444 1666900867
TgFa01G00670-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR38750 HDR no_gene_in_target_block Fe_hap1_Chr01 1241379680 1241380663 Fe_hap2_Chr01 1247227722 1247230705
TgFa01G00700-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL15223 SYNAL no_gene_in_target_block Fe_hap1_Chr01 553673488 553674186 Fe_hap2_Chr01 556161580 556164278
TgFa01G00710-mRNA1 no . . . . no_block_projection Fe_hap1_Chr01 186105508 186106385 . . .
TgFb01G00010-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv parent INV1224 INVnon_homolog Fe_hap2_Chr01 1099732705 1099733820 Fe_hap1_Chr01 1116217087 1116220202
TgFb01G00090-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL30602 SYNAL non_homolog Fe_hap2_Chr01 1485932026 1485937272 Fe_hap1_Chr01 1487651003 1487658249
TgFb01G00100-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL5658 SYNAL no_gene_in_target_block Fe_hap2_Chr01 228111956 228117444 Fe_hap1_Chr01 224832319 224839807
TgFb01G00180-mRNA1 no . . . . no_block_projection Fe_hap2_Chr01 1600639357 1600640240 . . .
TgFb01G00220-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR48740 HDR no_gene_in_target_block Fe_hap2_Chr01 1550334445 1550335368 Fe_hap1_Chr01 1553732282 1553735205
TgFb01G00250-mRNA1 no . . . . no_block_projection Fe_hap2_Chr01 278704854 278705493 . . .
TgFb01G00270-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child SYNAL32253 SYNAL no_gene_in_target_block Fe_hap2_Chr01 1547464482 1547464700 Fe_hap1_Chr01 1551201455 1551203673
TgFb01G00310-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR50976 HDR no_gene_in_target_block Fe_hap2_Chr01 1628302761 1628303078 Fe_hap1_Chr01 1632873891 1632876208
TgFb01G00420-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child CPG9884 CPGno_gene_in_target_block Fe_hap2_Chr01 382165853 382166683 Fe_hap1_Chr01 380599079 380601909
TgFb01G00430-mRNA1 yes /project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv child HDR12487 HDR no_gene_in_target_block Fe_hap2_Chr01 462744863 462745587 Fe_hap1_Chr01 461702163 461704887
TgFb01G00520-mRNA1 no . . . . no_block_projection Fe_hap2_Chr01 246348213 246348830 . . .
TgFb01G00570-mRNA1 no . . . . no_block_projection Fe_hap2_Chr01 110247794 110248165 . . .基于基因组blat比对的蛋白质
第一步,在05输出的PAV表格中, 有5种基因,分别是no_gene_in_target_block;non_homolog;no_block_projection;already_in_rbh;protein_not_in_gff
第五种事实上是不存在的。我们要考虑的是前面四种。这四种中,already_in_rbh直接鉴定为Copy-PAV,no_block_projection
直接鉴定为非共线性region的PAV non_homolog和no_gene_in_target_block是要进行下一步验证的。所以第一步是把这个输出文件给提取出来。
第二步,对于需要验证的基因,我们需要从输入的表格中提取tgt_chr,tgt_start,tgt_end,用samtools切片,作为tagret,然后根据蛋白质文件从pep文件中提取蛋白质序列。
pav_06_blat_check.py \
--candidates /project1/data/Chr_split/hap1_test/potential_PAV.tsv \
--pep_file_A /project1/data/Chr_fixed/female_hap1_longest_pep.fa \
--pep_file_B /project1/data/Chr_fixed/female_hap2_longest_pep.fa \
--genome_fa_A /project1/data/Chr_fixed/female_hap1_chr_fixed.fa \
--genome_fa_B /project1/data/Chr_fixed/female_hap2_chr_fixed.fa \
--output_dir ./validation_results \
--min_identity 80 \
--min_coverage 0.6(base) root@b27b26bddcc5:/project1/bin# cat pav_06_blat_check4.py
#!/usr/bin/env python3
import argparse
import subprocess
import os
import pandas as pd
from pathlib import Path
from Bio import SeqIO
from Bio.Seq import Seq
import tempfile
import sys
import re
import shutil
import logging
import threading
import queue
import time
from collections import defaultdict
import glob
import concurrent.futures
import traceback
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
# -------------------------------
# 辅助函数
# -------------------------------
def determine_haplotype(chrom_name, gene_id=None):
"""从染色体名称或基因ID推断单倍型"""
if isinstance(chrom_name, str):
chrom_name = chrom_name.lower()
if 'hap1' in chrom_name:
return 'A'
elif 'hap2' in chrom_name:
return 'B'
elif 'a' in chrom_name:
return 'A'
elif 'b' in chrom_name:
return 'B'
if gene_id:
if gene_id.startswith('TgFa') or 'GAP' in gene_id:
return 'A'
elif gene_id.startswith('TgFb') or 'GBP' in gene_id:
return 'B'
raise ValueError(f"无法推断单倍型: chrom={chrom_name}, gene={gene_id}")
def extract_genome_region(genome_fa, chrom, start, end, output_fa):
"""使用samtools提取基因组区域"""
# 检查染色体是否存在于基因组索引
chrom_found = False
chrom_to_use = chrom
try:
with open(genome_fa + ".fai") as f:
for line in f:
ref_chrom = line.split("\t")[0]
if ref_chrom == chrom:
chrom_found = True
break
if ref_chrom.endswith(chrom) or chrom.endswith(ref_chrom):
chrom_to_use = ref_chrom
chrom_found = True
break
except FileNotFoundError:
raise ValueError(f"基因组索引文件不存在: {genome_fa}.fai")
if not chrom_found:
raise ValueError(f"在基因组文件中找不到染色体: {chrom}")
# 执行提取
cmd = f"samtools faidx {genome_fa} {chrom_to_use}:{start}-{end} > {output_fa}"
result = subprocess.run(cmd, shell=True, stderr=subprocess.PIPE)
if result.returncode != 0 or not os.path.exists(output_fa) or os.path.getsize(output_fa) == 0:
raise RuntimeError(f"提取失败: {chrom}:{start}-{end} | 错误: {result.stderr.decode()}")
return output_fa
def extract_protein(pep_file, prot_id, output_fa):
"""从蛋白文件中提取特定蛋白序列"""
# 处理可能的ID变体
id_variants = [
prot_id,
prot_id.split('-')[0],
prot_id.replace('-mRNA', '_mRNA'),
prot_id.replace('_mRNA', '-mRNA'),
prot_id.split('_')[0],
prot_id.split('.')[0],
prot_id.split(':')[0]
]
# 精确匹配
for record in SeqIO.parse(pep_file, "fasta"):
if any(variant == record.id for variant in id_variants):
SeqIO.write(record, output_fa, "fasta")
return True
# 宽松匹配
for record in SeqIO.parse(pep_file, "fasta"):
if any(variant in record.id for variant in id_variants):
SeqIO.write(record, output_fa, "fasta")
return True
# 尝试匹配描述
for record in SeqIO.parse(pep_file, "fasta"):
if any(variant in record.description for variant in id_variants):
SeqIO.write(record, output_fa, "fasta")
return True
return False
def run_blat(query_fa, target_fa, out_psl, min_identity=80):
"""运行BLAT比对"""
cmd = [
"blat",
"-t=dnax",
"-q=prot",
target_fa,
query_fa,
out_psl,
f"-minIdentity={min_identity}"
]
result = subprocess.run(cmd, stderr=subprocess.PIPE)
if result.returncode != 0 or not os.path.exists(out_psl) or os.path.getsize(out_psl) < 100:
error_msg = result.stderr.decode() if result.stderr else "未知错误"
raise RuntimeError(f"BLAT失败: {error_msg}")
return out_psl
# ====================================================
# PSL解析和CDS提取逻辑(优化内存使用)
# ====================================================
def parse_psl_line(line):
"""解析单行PSL记录"""
cols = line.strip().split("\t")
if len(cols) < 21:
return None
block_count = int(cols[17])
block_sizes = list(map(int, cols[18].strip(",").split(",")))
t_starts = list(map(int, cols[20].strip(",").split(",")))
return {
"matches": int(cols[0]),
"misMatches": int(cols[1]),
"qName": cols[9],
"qSize": int(cols[10]),
"strand": cols[8],
"tName": cols[13],
"tSize": int(cols[14]),
"tStart": int(cols[15]),
"tEnd": int(cols[16]),
"blockCount": block_count,
"blockSizes": block_sizes,
"tStarts": t_starts
}
def parse_psl_to_list(psl_file):
"""解析PSL文件为记录列表"""
records = []
try:
with open(psl_file) as fh:
header_count = 0
for line in fh:
# 跳过表头行
if line.startswith("psLayout") or line.startswith("match") or line.startswith("-"):
header_count += 1
continue
if not line.strip():
continue
rec = parse_psl_line(line)
if rec:
records.append(rec)
# 如果只有表头没有实际记录
if header_count > 0 and len(records) == 0:
logger.info(f"检测到空PSL文件: {psl_file} (只有表头)")
except Exception as e:
logger.error(f"解析PSL文件错误: {e}")
return records
def extract_cds_from_psl(psl, target_fa):
"""从PSL记录中提取CDS序列(使用目标区域文件)"""
t_name = psl["tName"]
strand = psl["strand"]
t_size = psl["tSize"]
# 加载目标区域序列
try:
# 由于目标区域文件很小,直接加载到内存
target_seq = next(SeqIO.parse(target_fa, "fasta")).seq
except Exception as e:
logger.error(f"加载目标序列失败: {e}")
return None
# 处理负链比对:转换每个block的坐标
if strand.endswith('-') or strand == '-': # 负链比对
# 转换每个block的起始位置
converted_starts = [t_size - (ts + aa_size*3) for ts, aa_size in zip(psl["tStarts"], psl["blockSizes"])]
else: # 正链比对
converted_starts = psl["tStarts"]
# 提取序列
cds_parts = []
try:
for aa_size, ts in zip(psl["blockSizes"], converted_starts):
dna_size = aa_size * 3 # 氨基酸 → 核苷酸
exon_seq = target_seq[ts : ts + dna_size]
# 如果是负链,立即反向互补
if strand.endswith('-') or strand == '-':
exon_seq = exon_seq.reverse_complement()
cds_parts.append(exon_seq)
return Seq("".join(str(seq) for seq in cds_parts))
except Exception as e:
logger.error(f"提取CDS失败: {e}")
return None
def write_psl_hits_to_fasta(psl_hits, target_fa, out_file, prefix="hit"):
"""将PSL匹配结果写入FASTA文件(CDS+蛋白质)"""
with open(out_file, "w") as fh:
for hit_idx, hit in enumerate(psl_hits, 1):
try:
cds_seq = extract_cds_from_psl(hit, target_fa)
# 跳过无效序列
if not cds_seq or len(cds_seq) == 0:
logger.warning(f"跳过无效CDS序列: {hit['qName']}-hit{hit_idx}")
continue
# 翻译蛋白质
try:
protein_seq = cds_seq.translate(to_stop=False)
except Exception as e:
logger.warning(f"翻译失败: {hit['qName']}-hit{hit_idx} | {e}")
continue
# 写入CDS
header = f"{prefix}|{hit['qName']}|{hit['tName']}|hit{hit_idx}"
fh.write(f">{header}_CDS\n{str(cds_seq)}\n")
# 写入蛋白质
fh.write(f">{header}_PROT\n{str(protein_seq)}\n")
logger.info(f"成功写入匹配: {header}")
except Exception as e:
logger.error(f"处理匹配时出错: {e}")
# ====================================================
# 蛋白质过滤函数
# ====================================================
def filter_hits(in_fa, out_prot_fa, out_cds_fa):
"""
精确匹配 hit 编号进行过滤:
1. 去除蛋白中包含 '*' 的序列
2. 去除蛋白起始氨基酸不是 'M' 的序列
3. 去除的 PROT,同时必须过滤掉对应的 CDS
"""
# 先按 hit 编号分组: { (prefix, qname, tname, hitX): {"CDS": seq, "PROT": seq} }
grouped = {}
query_records = [] # 保存 query
for rec in SeqIO.parse(in_fa, "fasta"):
if rec.id.startswith("query|"):
query_records.append(rec)
continue
parts = rec.id.split("|")
if len(parts) < 4:
continue
prefix = parts[0]
qname = parts[1]
tname = parts[2]
hit_full = parts[3]
if "_" not in hit_full:
continue
hit_id, seq_type = hit_full.split("_")
key = (prefix, qname, tname, hit_id)
grouped.setdefault(key, {})
grouped[key][seq_type] = rec
# ===== 过滤逻辑 =====
passed_prot = []
passed_cds = []
for key, seqs in grouped.items():
prot = seqs.get("PROT", None)
cds = seqs.get("CDS", None)
if prot is None or cds is None:
continue
seq_str = str(prot.seq)
if "*" in seq_str:
continue
if not seq_str.startswith("M"):
continue
passed_prot.append(prot)
passed_cds.append(cds)
# ===== 写入过滤后的文件 =====
# query 放在首位
SeqIO.write(query_records + passed_prot, out_prot_fa, "fasta")
SeqIO.write(passed_cds, out_cds_fa, "fasta")
# ====================================================
# BLAT结果处理函数(优化内存使用)
# ====================================================
def process_blat_results(psl_file, target_fa, query_prot_seq, min_identity=0.95, output_dir=None, temp_dir=None):
"""
BLAT后处理(优化内存使用)
1. 快速检测空PSL文件
2. 解析PSL获取匹配位置
3. 直接从目标区域文件提取CDS
4. 蛋白质级验证
"""
# 1. 快速检测空PSL文件
if os.path.exists(psl_file):
file_size = os.path.getsize(psl_file)
# 如果文件小于200字节(只有表头),直接返回空结果
if file_size < 200:
logger.info(f"检测到空PSL文件(文件大小{file_size}字节): {psl_file}")
return [], 0
# 2. 解析PSL文件
psl_hits = parse_psl_to_list(psl_file)
num_psl_hits = len(psl_hits)
if num_psl_hits == 0:
logger.info(f"无有效PSL匹配: {psl_file}")
return [], num_psl_hits
# 3. 创建临时文件(放在专门的临时文件目录)
try:
# 确保临时目录存在
os.makedirs(temp_dir, exist_ok=True)
# 主临时文件路径
temp_out = tempfile.NamedTemporaryFile(
mode="w",
delete=False,
suffix=".fa",
dir=temp_dir
).name
# 写入查询蛋白序列
with open(temp_out, "w") as tmp_file:
if query_prot_seq:
tmp_file.write(f">query|{query_prot_seq.id}\n{str(query_prot_seq.seq).rstrip('*')}\n")
# 写入匹配序列 - 直接使用目标区域文件
write_psl_hits_to_fasta(psl_hits, target_fa, temp_out)
# 4. 过滤有效基因
filtered_prot = tempfile.NamedTemporaryFile(
mode="w",
delete=False,
suffix="_prot.fa",
dir=temp_dir
).name
filtered_cds = tempfile.NamedTemporaryFile(
mode="w",
delete=False,
suffix="_cds.fa",
dir=temp_dir
).name
# 执行过滤
filter_hits(temp_out, filtered_prot, filtered_cds)
# 读取有效基因
valid_genes = []
for rec in SeqIO.parse(filtered_prot, "fasta"):
# 跳过查询序列
if rec.id.startswith("query|"):
continue
valid_genes.append(rec)
# 保存所有蛋白质序列到永久文件
if output_dir and valid_genes:
# 创建蛋白质输出目录
protein_dir = Path(output_dir) / "valid_proteins"
protein_dir.mkdir(parents=True, exist_ok=True)
# 获取基因ID
gene_id = "unknown"
if query_prot_seq:
gene_id = query_prot_seq.id.split("|")[0] if "|" in query_prot_seq.id else query_prot_seq.id
# 为每个基因单独保存蛋白质文件
prot_file = protein_dir / f"{gene_id}_valid_prot.fa"
with open(prot_file, "w") as f:
SeqIO.write(valid_genes, f, "fasta")
logger.info(f"保存 {len(valid_genes)} 条蛋白质序列到: {prot_file}")
# 不再删除临时文件(根据用户要求)
logger.info(f"临时文件已保留在: {temp_dir}")
logger.info(f"过滤后保留 {len(valid_genes)} 个有效基因模型")
return valid_genes, num_psl_hits
except Exception as e:
logger.error(f"处理BLAT结果时出错: {e}")
return [], num_psl_hits
# ====================================================
# 并行处理函数(使用线程池)
# ====================================================
def process_gene_row(row, args, output_dir, temp_dir):
"""处理单个基因行(线程安全)"""
stats = defaultdict(int)
result = {}
prot_id = row['pav_protein']
reason = row['reason']
logger.info(f"处理基因 {prot_id} ({reason})...")
try:
# 1. 直接分类的基因类型
if reason == "already_in_rbh":
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "Copy-PAV",
"src_hap": "N/A",
"tgt_hap": "N/A",
"blat_file": "",
"valid_genes": 0,
"validation_result": "direct_classification"
}
stats["copy_pav"] = 1
logger.info("→ 直接分类: Copy-PAV")
elif reason == "no_block_projection":
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "PAV (non-syntenic)",
"src_hap": "N/A",
"tgt_hap": "N/A",
"blat_file": "",
"valid_genes": 0,
"validation_result": "direct_classification"
}
stats["non_syntenic_pav"] = 1
logger.info("→ 直接分类: PAV (non-syntenic)")
# 2. 需要验证的基因类型
else:
# 推断源单倍型
try:
src_hap = determine_haplotype(row['src_chr'], prot_id)
except ValueError:
src_hap = determine_haplotype(None, prot_id)
# 目标区域处理
try:
tgt_hap = determine_haplotype(row['tgt_chr'])
tgt_chr = row['tgt_chr']
except ValueError:
# 回退策略: 假设相反单倍型
tgt_hap = "B" if src_hap == "A" else "A"
tgt_chr = row['src_chr'].replace("hap1", "hap2") if "hap1" in row['src_chr'] else row['src_chr'].replace("hap2", "hap1")
logger.info(f"单倍型: 源={src_hap}, 目标={tgt_hap}")
# 获取对应的蛋白质和基因组文件
pep_file = args.pep_file_A if src_hap == "A" else args.pep_file_B
genome_fa = args.genome_fa_A if tgt_hap == "A" else args.genome_fa_B
# 确保临时目录存在
os.makedirs(temp_dir, exist_ok=True)
# 提取查询蛋白序列(放在临时文件目录)
query_fa = os.path.join(temp_dir, f"{prot_id}_query.fa")
if not extract_protein(pep_file, prot_id, query_fa):
# 尝试匹配基本ID
base_id = prot_id.split('_')[0].split('-')[0].split('.')[0]
if not extract_protein(pep_file, base_id, query_fa):
logger.error(f"在单倍型 {src_hap} 中未找到蛋白 {prot_id} 或 {base_id}")
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "unknown",
"src_hap": src_hap,
"tgt_hap": tgt_hap,
"blat_file": "",
"valid_genes": 0,
"validation_result": "query_not_found"
}
stats["query_not_found"] = 1
return result, stats
# 读取查询蛋白序列
try:
with open(query_fa, "r") as f:
query_prot = next(SeqIO.parse(f, "fasta"))
except Exception:
query_prot = None
# 提取目标基因组区域(放在临时文件目录)
target_fa = os.path.join(temp_dir, f"{prot_id}_target.fa")
try:
extract_genome_region(
genome_fa,
tgt_chr,
row['tgt_start'],
row['tgt_end'],
target_fa
)
logger.info(f"目标区域提取成功: {tgt_chr}:{row['tgt_start']}-{row['tgt_end']} -> {target_fa}")
except Exception as e:
logger.error(f"提取目标区域失败: {e}")
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "unknown",
"src_hap": src_hap,
"tgt_hap": tgt_hap,
"blat_file": "",
"valid_genes": 0,
"validation_result": "extract_failed"
}
stats["extract_failed"] = 1
return result, stats
# 运行BLAT(结果放在临时文件目录)
psl_file = os.path.join(temp_dir, f"{prot_id}.psl")
try:
logger.info("运行BLAT比对...")
run_blat(query_fa, target_fa, psl_file, args.min_identity)
logger.info(f"BLAT完成,结果保存至: {psl_file}")
except Exception as e:
logger.error(f"BLAT运行失败: {e}")
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "unknown",
"src_hap": src_hap,
"tgt_hap": tgt_hap,
"blat_file": str(psl_file),
"valid_genes": 0,
"validation_result": "blat_error"
}
stats["blat_failed"] = 1
return result, stats
# BLAT后处理:使用提取的目标区域文件
try:
logger.info("处理BLAT结果,验证基因模型...")
valid_genes, num_psl_hits = process_blat_results(
psl_file,
target_fa, # 使用提取的目标区域文件
query_prot,
min_identity=args.min_identity,
output_dir=output_dir,
temp_dir=temp_dir # 传递临时目录
)
valid_count = len(valid_genes)
# 根据验证结果分类
if valid_count > 0:
classification = "invalid (present in target)"
validation_result = "validation_failed"
stats["invalid_pav"] = 1
logger.info(f"→ 验证失败: 找到 {valid_count} 个有效基因模型")
else:
if num_psl_hits > 0:
classification = "invalid (no valid gene)"
validation_result = "no_valid_genes"
stats["no_valid_genes"] = 1
logger.info("→ 验证失败: 有匹配但无有效基因模型")
else:
classification = "PAV (validated)"
validation_result = "validated"
stats["validated_pav"] = 1
logger.info("→ 验证成功: PAV (无任何BLAT匹配)")
# 记录结果
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": classification,
"src_hap": src_hap,
"tgt_hap": tgt_hap,
"blat_file": str(psl_file),
"valid_genes": valid_count,
"validation_result": validation_result
}
except Exception as e:
logger.error(f"处理失败: {e}")
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "unknown",
"src_hap": src_hap,
"tgt_hap": tgt_hap,
"blat_file": str(psl_file),
"valid_genes": 0,
"validation_result": "cds_extract_failed"
}
stats["cds_extract_failed"] = 1
except ValueError as e:
logger.error(f"单倍型推断失败: {e}")
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "unknown",
"src_hap": "unknown",
"tgt_hap": "unknown",
"blat_file": "",
"valid_genes": 0,
"validation_result": "hap_infer_failed"
}
stats["hap_infer_failed"] = 1
except Exception as e:
logger.error(f"处理基因时发生意外错误: {e}")
result = {
"pav_protein": prot_id,
"reason": reason,
"classification": "unknown",
"src_hap": "unknown",
"tgt_hap": "unknown",
"blat_file": "",
"valid_genes": 0,
"validation_result": "unexpected_error"
}
stats["unexpected_error"] = 1
# 不再删除临时文件(根据用户要求)
logger.info(f"所有临时文件保留在: {temp_dir}")
return result, stats
# ====================================================
# 主函数(使用线程池)
# ====================================================
def main():
parser = argparse.ArgumentParser(description="PAV基因分类与验证流程")
parser.add_argument("--candidates", required=True, help="候选基因TSV文件")
parser.add_argument("--pep_file_A", required=True, help="单倍型A的蛋白FASTA文件")
parser.add_argument("--pep_file_B", required=True, help="单倍型B的蛋白FASTA文件")
parser.add_argument("--genome_fa_A", required=True, help="单倍型A的基因组FASTA文件")
parser.add_argument("--genome_fa_B", required=True, help="单倍型B的基因组FASTA文件")
parser.add_argument("--output_dir", required=True, help="输出目录")
parser.add_argument("--min_identity", type=int, default=80, help="BLAT最小序列同一性阈值")
parser.add_argument("--threads", type=int, default=4, help="并行处理线程数")
args = parser.parse_args()
# 准备输出目录
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 创建临时文件目录
temp_dir = output_dir / "temp_files"
temp_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"所有临时文件将保存在: {temp_dir}")
# 创建蛋白质输出目录
protein_dir = output_dir / "valid_proteins"
protein_dir.mkdir(parents=True, exist_ok=True)
# 读取候选基因
try:
candidates = pd.read_csv(args.candidates, sep="\t")
except Exception as e:
logger.error(f"读取候选基因文件失败: {e}")
sys.exit(1)
# 检查必要的列
required_cols = ["pav_protein", "reason", "src_chr", "src_start", "src_end", "tgt_chr", "tgt_start", "tgt_end"]
missing_cols = [col for col in required_cols if col not in candidates.columns]
if missing_cols:
raise ValueError(f"输入文件缺少必要的列: {', '.join(missing_cols)}")
# 初始化结果容器
results_container = []
stats_container = defaultdict(int)
stats_container["total"] = len(candidates)
# 创建进度计数器
processed_counter = 0
total_counter = len(candidates)
lock = threading.Lock()
def process_and_track(row):
"""处理单个基因并更新进度"""
nonlocal processed_counter
try:
result, stats = process_gene_row(
row,
args,
str(output_dir),
str(temp_dir) # 传递临时目录
)
except Exception as e:
logger.error(f"处理基因行时发生严重错误: {e}")
result = {
"pav_protein": row.get('pav_protein', 'unknown'),
"reason": row.get('reason', 'unknown'),
"classification": "crash",
"src_hap": "unknown",
"tgt_hap": "unknown",
"blat_file": "",
"valid_genes": 0,
"validation_result": "fatal_error"
}
stats = {"fatal_errors": 1}
with lock:
results_container.append(result)
for key, value in stats.items():
stats_container[key] += value
processed_counter += 1
progress = processed_counter / total_counter * 100
logger.info(f"进度: {processed_counter}/{total_counter} ({progress:.1f}%) - 完成处理 {row['pav_protein']}")
logger.info(f"开始处理 {len(candidates)} 个候选基因,使用 {args.threads} 线程...")
# 使用线程池处理
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
# 提交所有任务
futures = [executor.submit(process_and_track, row) for _, row in candidates.iterrows()]
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
try:
future.result() # 获取结果(如果有异常会抛出)
except Exception as e:
logger.error(f"任务执行异常: {e}")
logger.info("所有基因处理完成")
# 保存结果
result_df = pd.DataFrame(results_container)
result_file = output_dir / "pav_classification_results.tsv"
result_df.to_csv(result_file, sep="\t", index=False)
# 将defaultdict转为普通字典
stats_dict = dict(stats_container)
# 保存详细统计信息
stats_file = output_dir / "processing_stats.txt"
with open(stats_file, "w") as sf:
sf.write("===== PAV基因分类统计 =====\n")
sf.write(f"总候选基因: {stats_dict.get('total', 0)}\n")
sf.write(f"Copy-PAV (already_in_rbh): {stats_dict.get('copy_pav', 0)}\n")
sf.write(f"非共线性PAV (no_block_projection): {stats_dict.get('non_syntenic_pav', 0)}\n")
sf.write(f"验证通过的PAV: {stats_dict.get('validated_pav', 0)}\n")
sf.write(f"验证未通过的PAV: {stats_dict.get('invalid_pav', 0) + stats_dict.get('no_valid_genes', 0)}\n")
sf.write("---- 验证过程统计 ----\n")
sf.write(f"蛋白序列未找到: {stats_dict.get('query_not_found', 0)}\n")
sf.write(f"区域提取失败: {stats_dict.get('extract_failed', 0)}\n")
sf.write(f"单倍型推断失败: {stats_dict.get('hap_infer_failed', 0)}\n")
sf.write(f"BLAT运行失败: {stats_dict.get('blat_failed', 0)}\n")
sf.write(f"CDS提取失败: {stats_dict.get('cds_extract_failed', 0)}\n")
sf.write(f"有匹配但无有效基因: {stats_dict.get('no_valid_genes', 0)}\n")
sf.write(f"致命错误: {stats_dict.get('fatal_errors', 0)}\n")
# 打印统计信息
logger.info("\n" + "="*40)
logger.info("PAV基因分类统计")
logger.info("="*40)
logger.info(f"总候选基因: {stats_dict.get('total', 0)}")
logger.info(f"Copy-PAV (already_in_rbh): {stats_dict.get('copy_pav', 0)}")
logger.info(f"非共线性PAV (no_block_projection): {stats_dict.get('non_syntenic_pav', 0)}")
logger.info(f"验证通过的PAV: {stats_dict.get('validated_pav', 0)}")
logger.info(f"验证未通过的PAV: {stats_dict.get('invalid_pav', 0) + stats_dict.get('no_valid_genes', 0)}")
logger.info("-"*30)
logger.info(f"蛋白序列未找到: {stats_dict.get('query_not_found', 0)}")
logger.info(f"区域提取失败: {stats_dict.get('extract_failed', 0)}")
logger.info(f"单倍型推断失败: {stats_dict.get('hap_infer_failed', 0)}")
logger.info(f"BLAT运行失败: {stats_dict.get('blat_failed', 0)}")
logger.info(f"CDS提取失败: {stats_dict.get('cds_extract_failed', 0)}")
logger.info(f"有匹配但无有效基因: {stats_dict.get('no_valid_genes', 0)}")
logger.info(f"致命错误: {stats_dict.get('fatal_errors', 0)}")
logger.info("="*40)
logger.info(f"结果文件: {result_file}")
logger.info(f"统计文件: {stats_file}")
logger.info(f"所有蛋白质序列保存至: {protein_dir}")
logger.info(f"所有临时文件保存至: {temp_dir}")
# 不再清理残留临时文件
logger.info("程序执行完成")
if __name__ == "__main__":
main()结束了,接下来就是找一个main把这些代码给导入进去了
汇总部分,不过目前还缺少一个等位基因的结构变异区注释,现在所有的PAV基因已经都注释上了,这很好嘛。
main_pav_srcipts.py
(base) root@b27b26bddcc5:/project1/bin# cat main_pav.py
#!/usr/bin/env python3
"""
PAV Gene Analysis Pipeline - Main Integration Script
Usage: pav_pipeline.py [options]
"""
import argparse
import subprocess
import sys
from pathlib import Path
def run_command(cmd, step_name):
"""Execute a shell command with error handling"""
print(f"Running {step_name}: {cmd}")
try:
subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
print(f"Error in {step_name}: {e}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='PAV Gene Analysis Pipeline')
parser.add_argument('--pepA', required=True, help='Protein FASTA for haplotype A')
parser.add_argument('--pepB', required=True, help='Protein FASTA for haplotype B')
parser.add_argument('--gffA', required=True, help='GFF file for haplotype A')
parser.add_argument('--gffB', required=True, help='GFF file for haplotype B')
parser.add_argument('--genomeA', required=True, help='Genome FASTA for haplotype A')
parser.add_argument('--genomeB', required=True, help='Genome FASTA for haplotype B')
parser.add_argument('--block_index', required=True, help='Block index file for projection')
parser.add_argument('--outdir', required=True, help='Output directory')
parser.add_argument('--threads', type=int, default=4, help='Number of threads')
parser.add_argument('--evalue', type=float, default=1e-5, help='BLAST evalue cutoff (step1)')
parser.add_argument('--coverage', type=float, default=0.5, help='BLAST coverage cutoff (step1)')
parser.add_argument('--min_identity', type=int, default=80, help='Minimum identity for BLAT (step3)')
parser.add_argument('--extend', type=int, default=1000, help='Extend length for block projection (step2)')
args = parser.parse_args()
# Create output directories for each step
outdir = Path(args.outdir)
step1_dir = outdir / "step1_rbh"
step2_dir = outdir / "step2_block_check"
step3_dir = outdir / "step3_blat_check"
step1_dir.mkdir(parents=True, exist_ok=True)
step2_dir.mkdir(parents=True, exist_ok=True)
step3_dir.mkdir(parents=True, exist_ok=True)
# Step 1: Reciprocal BLAST Best Hits (RBH) analysis
step1_cmd = (
f"pav_00_rbh.py {args.pepA} {args.pepB} "
f"--label1 A --label2 B "
f"--work_dir {step1_dir} "
f"--out_prefix step1 "
f"--threads {args.threads} "
f"--evalue {args.evalue} "
f"--coverage {args.coverage}"
)
run_command(step1_cmd, "Step 1 (RBH analysis)")
# Step 2: Block-based allele verification
step2_cmd = (
f"pav_05_block_allele_gff_check.py "
f"--pav_list {step1_dir}/step1_A_PAV.txt,{step1_dir}/step1_B_PAV.txt "
f"--gff_list {args.gffA},{args.gffB} "
f"--pep_list {args.pepA},{args.pepB} "
f"--block_index {args.block_index} "
f"--outdir {step2_dir} "
f"--extend {args.extend} "
f"--threads {args.threads} "
f"--rbh_file {step1_dir}/step1_strict_RBH.txt"
)
run_command(step2_cmd, "Step 2 (Block and GFF check)")
# Step 3: BLAT-based validation and classification
potential_pav_file = step2_dir / "potential_PAV.tsv"
if not potential_pav_file.exists():
print(f"ERROR: Step 2 output file not found: {potential_pav_file}")
sys.exit(1)
step3_cmd = (
f"pav_06_blat_check.py "
f"--candidates {potential_pav_file} "
f"--pep_file_A {args.pepA} "
f"--pep_file_B {args.pepB} "
f"--genome_fa_A {args.genomeA} "
f"--genome_fa_B {args.genomeB} "
f"--output_dir {step3_dir} "
f"--min_identity {args.min_identity} "
f"--threads {args.threads}"
)
run_command(step3_cmd, "Step 3 (BLAT validation)")
print("\n" + "="*80)
print("PAV ANALYSIS PIPELINE COMPLETED SUCCESSFULLY!")
print("="*80)
print(f"Final results directory: {step3_dir}")
print(f"- Classification results: {step3_dir}/pav_classification_results.tsv")
print(f"- Processing stats: {step3_dir}/processing_stats.txt")
print(f"- Valid protein sequences: {step3_dir}/valid_proteins/")
print("\nNext steps: Review the classification results and statistics")
if __name__ == "__main__":
main()准备blocks index文件,这个文件要切割一下。不过问题不大。
block_file ref_chr qry_chr
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Fe_hap2_Chr01syri.blocks.tsv Fe_hap1_Chr01 Fe_hap2_Chr01
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Ma_hap1_Chr01_flipsyri.blocks.tsv Fe_hap1_Chr01 Ma_hap1_Chr01_flip
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap2_Chr01_vs_Ma_hap1_Chr01_flipsyri.blocks.tsv Fe_hap2_Chr01 Ma_hap1_Chr01_flip
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr01_vs_Ma_hap2_Chr01_flipsyri.blocks.tsv Fe_hap1_Chr01 Ma_hap2_Chr01_flip
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap2_Chr01_vs_Ma_hap2_Chr01_flipsyri.blocks.tsv Fe_hap2_Chr01 Ma_hap2_Chr01_flip
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Ma_hap1_Chr01_flip_vs_Ma_hap2_Chr01_flipsyri.blocks.tsv Ma_hap1_Chr01_flip Ma_hap2_Chr01_flip
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr04_flip_vs_Fe_hap2_Chr04_flipsyri.blocks.tsv Fe_hap1_Chr04_flip Fe_hap2_Chr04_flip
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap1_Chr04_flip_vs_Ma_hap1_Chr04_flipsyri.blocks.tsv Fe_hap1_Chr04_flip Ma_hap1_Chr04_flip
/project1/data/Chr_split/Chr_ALN4_all_filted_vcf_dir/Fe_hap2_Chr04_flip_vs_Ma_hap1_Chr04_flipsyri.blocks.tsv Fe_hap2_Chr04_flip Ma_hap1_Chr04_flip、四个单倍型只有6种排列组合方式,问题不大。
还是有问题的,我找了个代码专门来做拆分,要不然光这一步也挺烦人的。
awk -F'\t' '
# -----------------------------
# 读入 FAI 文件,建立染色体集合
# -----------------------------
BEGIN {
# female hap1
while((getline line < "female_hap1_chr_fixed.fa.fai") > 0){
split(line, a, "\t"); hap1_f[a[1]] = 1
}
# female hap2
while((getline line < "female_hap2_chr_fixed.fa.fai") > 0){
split(line, a, "\t"); hap2_f[a[1]] = 1
}
# male hap1
while((getline line < "TG_male_hap1_fixed_Chr.fa.fai") > 0){
split(line, a, "\t"); hap1_m[a[1]] = 1
}
# male hap2
while((getline line < "TG_male_hap2_fixed_Chr.fa.fai") > 0){
split(line, a, "\t"); hap2_m[a[1]] = 1
}
}
# -----------------------------
# 处理 blocks.tsv
# -----------------------------
NR==1 {header=$0; next}
{
ref = $2
qry = $3
# 去掉 _flip 以便去重
ref_clean = ref; gsub(/_flip/, "", ref_clean)
qry_clean = qry; gsub(/_flip/, "", qry_clean)
combos = ""
# 精确匹配 FAI 文件
if((ref in hap1_f && qry in hap2_f) || (ref in hap2_f && qry in hap1_f)) combos="Fe_hap1_vs_Fe_hap2.tsv"
else if((ref in hap1_f && qry in hap1_m) || (ref in hap1_m && qry in hap1_f)) combos="Fe_hap1_vs_Ma_hap1.tsv"
else if((ref in hap1_f && qry in hap2_m) || (ref in hap2_m && qry in hap1_f)) combos="Fe_hap1_vs_Ma_hap2.tsv"
else if((ref in hap2_f && qry in hap1_m) || (ref in hap1_m && qry in hap2_f)) combos="Fe_hap2_vs_Ma_hap1.tsv"
else if((ref in hap2_f && qry in hap2_m) || (ref in hap2_m && qry in hap2_f)) combos="Fe_hap2_vs_Ma_hap2.tsv"
else if((ref in hap1_m && qry in hap2_m) || (ref in hap2_m && qry in hap1_m)) combos="Ma_hap1_vs_Ma_hap2.tsv"
if(combos){
# 写表头(只写一次)
if(!seen_header[combos]++){print header > combos}
# 无序组合去重
if(ref_clean < qry_clean) key = ref_clean "|" qry_clean
else key = qry_clean "|" ref_clean
if(!seen_pair[combos,key]++){print $0 >> combos}
}
}
' blocks.tsv