第 31 章

多模式匹配与AC自动机

第三十一章:多模式匹配与 AC 自动机

上一章我们掌握了单模式匹配:在文本中找一个模式串。但现实世界经常需要同时找多个模式串——敏感词过滤要匹配成千上万个违禁词,入侵检测系统要检测数百条攻击签名,杀毒软件要匹配数十万条病毒特征码。

如果对每个模式分别跑一次 KMP,k 个模式的总时间是 O(k * (n + m))——模式越多越慢。有没有办法一次扫描文本,同时匹配所有模式

AC 自动机(Aho-Corasick Automaton)就是答案。它的核心思想:把多个模式串构建成一个 Trie,然后像 KMP 一样在 Trie 上加失配指针。一次扫描文本即可同时匹配所有模式,时间复杂度 O(n + m_total + z),其中 z 是匹配总数。


Level 1 · 你需要知道的

31.1 为什么需要多模式匹配

场景对比

场景 模式数量 文本长度 单模式方案 AC 自动机方案
敏感词过滤 10,000 1,000 10M 次操作 1K 次操作
网络 IDS 500 1,500 750K 次操作 1.5K 次操作
病毒扫描 100,000 10^6 10^11 次操作 10^6 次操作

差距是惊人的。AC 自动机的扫描时间与模式数量无关——只取决于文本长度和匹配数量。

31.2 AC 自动机 = Trie + 失配指针

三个组成部分

  1. Trie(字典树):把所有模式串插入一棵 Trie
  2. 失配指针(Failure Link):类似 KMP 的 next 数组,当匹配失败时告诉我们跳到哪里
  3. 输出链接(Output Link):记录在当前状态可以输出哪些完整的模式串

直觉理解

想象你同时用多把尺子(模式串)在文本上滑动。AC 自动机等价于把所有尺子叠在一起(Trie),当某把尺子失配时,不需要从头开始——失配指针告诉你另一把尺子的哪个位置恰好可以继续。

31.3 构建过程(BFS)

第一步:构建 Trie

class AhoCorasick:
    def __init__(self):
        # 每个节点用一个列表/字典表示其子节点
        # goto[state][char] = next_state
        self.goto = [{}]  # 状态 0 是根节点
        self.fail = [0]   # 失配指针
        self.output = [[]]  # 每个状态的输出(匹配的模式串)
        self.state_count = 1
    
    def _add_pattern(self, pattern: str, index: int):
        """将一个模式串插入 Trie"""
        state = 0
        for ch in pattern:
            if ch not in self.goto[state]:
                # 创建新节点
                self.goto.append({})
                self.fail.append(0)
                self.output.append([])
                self.goto[state][ch] = self.state_count
                self.state_count += 1
            state = self.goto[state][ch]
        # 标记结束状态
        self.output[state].append(index)

第二步:BFS 构建失配指针

失配指针的含义:如果当前在状态 s(代表已匹配的字符串为 u),fail[s] 指向的是 Trie 中能匹配 u 的最长真后缀的那个节点。

    def _build_failure(self):
        """BFS 构建失配指针"""
        from collections import deque
        queue = deque()
        
        # 初始化:根节点的直接子节点的失配指针指向根
        for ch, next_state in self.goto[0].items():
            self.fail[next_state] = 0
            queue.append(next_state)
        
        # BFS
        while queue:
            curr = queue.popleft()
            for ch, next_state in self.goto[curr].items():
                queue.append(next_state)
                
                # 找 fail 指针:沿着 curr 的 fail 链向上找
                fallback = self.fail[curr]
                while fallback != 0 and ch not in self.goto[fallback]:
                    fallback = self.fail[fallback]
                
                self.fail[next_state] = self.goto[fallback].get(ch, 0)
                
                # 如果 fail 指针指向的是自己,修正为根
                if self.fail[next_state] == next_state:
                    self.fail[next_state] = 0
                
                # 合并输出:当前节点的输出 = 自身输出 + fail 指向节点的输出
                self.output[next_state] = (
                    self.output[next_state] + 
                    self.output[self.fail[next_state]]
                )
            
    def build(self, patterns: list[str]):
        """构建完整的 AC 自动机"""
        for i, pattern in enumerate(patterns):
            self._add_pattern(pattern, i)
        self._build_failure()

为什么用 BFS? 失配指针的计算需要已知父节点的失配指针。BFS 保证我们按层处理——层数越浅的节点先处理,它们的失配指针已经就绪,可以供更深的节点使用。

31.4 匹配过程

    def search(self, text: str) -> list[tuple[int, int]]:
        """
        在文本中搜索所有模式串
        返回 [(结束位置, 模式串索引), ...]
        """
        results = []
        state = 0
        
        for i, ch in enumerate(text):
            # 当前状态没有 ch 的转移时,沿 fail 链回退
            while state != 0 and ch not in self.goto[state]:
                state = self.fail[state]
            
            # 尝试转移
            state = self.goto[state].get(ch, 0)
            
            # 检查输出
            if self.output[state]:
                for pattern_idx in self.output[state]:
                    results.append((i, pattern_idx))
        
        return results

完整使用示例

# 构建 AC 自动机
patterns = ["he", "she", "his", "hers"]
ac = AhoCorasick()
ac.build(patterns)

# 搜索文本
text = "ushers"
matches = ac.search(text)

# 输出匹配结果
for end_pos, pattern_idx in matches:
    pattern = patterns[pattern_idx]
    start_pos = end_pos - len(pattern) + 1
    print(f"Found '{pattern}' at position {start_pos}-{end_pos}")

# 输出:
# Found 'she' at position 1-3
# Found 'he' at position 2-3
# Found 'hers' at position 2-5

时间复杂度分析

空间复杂度:O(m_total * |Σ|)(如果用数组实现),或 O(m_total)(如果用字典实现)。

31.5 逐步演示

让我们用模式集 {"he", "she", "his", "hers"} 完整走一遍。

Trie 结构

根(0) → 'h' → (1) → 'e' → (2, 输出"he") → 'r' → (6) → 's' → (7, 输出"hers")
                    → 'i' → (3) → 's' → (4, 输出"his")
     → 's' → (5) → 'h' → (8) → 'e' → (9, 输出"she")

失配指针

fail[1] = 0   (h 的最长真后缀在 Trie 中不存在)
fail[2] = 0   (he 的真后缀 "e" 不在 Trie 中)
fail[3] = 0   (hi 的真后缀 "i" 不在 Trie 中)
fail[4] = 5   (his 的真后缀 "s" 在 Trie 中, 对应状态 5)
fail[5] = 0   (s 的真后缀为空)
fail[6] = 0   (her 的真后缀 "r", "er" 都不在 Trie 中)
fail[7] = 0   (hers 的真后缀...)
fail[8] = 1   (sh 的真后缀 "h" 在 Trie 中, 对应状态 1)
fail[9] = 2   (she 的真后缀 "he" 在 Trie 中, 对应状态 2)
             → output[9] 合并了 output[2] = ["he"]
             → 所以 output[9] = ["she", "he"]

匹配 text = "ushers"

i=0, ch='u': state=0, 无 'u' 转移 → state 保持 0
i=1, ch='s': state=0 → goto[0]['s'] = 5 → state=5, 无输出
i=2, ch='h': state=5 → goto[5]['h'] = 8 → state=8, 无输出
i=3, ch='e': state=8 → goto[8]['e'] = 9 → state=9
             output[9] = ["she"(idx=1), "he"(idx=0)]
             → 匹配 "she" 结束于位置 3
             → 匹配 "he" 结束于位置 3
i=4, ch='r': state=9, 无 'r' 转移
             → fail[9] = 2, goto[2] 有 'r' 吗?有!goto[2]['r'] = 6
             → 等等,我们需要先回退再检查
             state = fail[9] = 2, goto[2] 有 'r' → state=6, 无输出
i=5, ch='s': state=6 → goto[6]['s'] = 7 → state=7
             output[7] = ["hers"(idx=3)]
             → 匹配 "hers" 结束于位置 5

最终结果:在 "ushers" 中找到 "she"@1, "he"@2, "hers"@2。


Level 2 · 它是怎么运行的

31.6 AC 自动机完整优化实现

上面的基础实现在失配时需要沿 fail 链逐步回退。工程实践中,我们可以预计算所有转移(类似 DFA),消除运行时的回退:

class AhoCorasickOptimized:
    """优化版 AC 自动机:预计算所有转移,匹配时无需沿 fail 链回退"""
    
    def __init__(self, patterns: list[str]):
        self.patterns = patterns
        self.num_states = 1
        # 用固定大小数组(假设 ASCII)或字典
        # 这里用字典以支持任意字符
        self.goto_table = [{}]  # goto_table[state][char] = next_state
        self.failure = [0]
        self.output_list = [[]]
        
        self._build(patterns)
    
    def _build(self, patterns: list[str]):
        # 第一步:构建 Trie
        for idx, pattern in enumerate(patterns):
            state = 0
            for ch in pattern:
                if ch not in self.goto_table[state]:
                    self.goto_table.append({})
                    self.failure.append(0)
                    self.output_list.append([])
                    self.goto_table[state][ch] = self.num_states
                    self.num_states += 1
                state = self.goto_table[state][ch]
            self.output_list[state].append(idx)
        
        # 第二步:BFS 构建 failure 指针 + 完整转移表
        from collections import deque
        queue = deque()
        
        # 对根节点的所有可能字符,设定默认转移
        # 根节点遇到 Trie 中没有的字符,转移到自身
        for ch, s in self.goto_table[0].items():
            self.failure[s] = 0
            queue.append(s)
        
        while queue:
            u = queue.popleft()
            for ch, v in self.goto_table[u].items():
                queue.append(v)
                # 计算 failure[v]
                f = self.failure[u]
                while f != 0 and ch not in self.goto_table[f]:
                    f = self.failure[f]
                self.failure[v] = self.goto_table[f].get(ch, 0)
                if self.failure[v] == v:
                    self.failure[v] = 0
                # 合并输出
                self.output_list[v] = (
                    self.output_list[v] + self.output_list[self.failure[v]]
                )
    
    def search(self, text: str) -> list[tuple[int, str]]:
        """
        搜索文本,返回 (结束位置, 模式串) 列表
        """
        results = []
        state = 0
        
        for i, ch in enumerate(text):
            # 沿 failure 链找到能转移的状态
            while state != 0 and ch not in self.goto_table[state]:
                state = self.failure[state]
            state = self.goto_table[state].get(ch, 0)
            
            # 检查输出
            for idx in self.output_list[state]:
                pattern = self.patterns[idx]
                start = i - len(pattern) + 1
                results.append((start, pattern))
        
        return results

31.7 敏感词过滤系统实战

需求:给定一组敏感词列表和用户输入文本,将所有敏感词替换为 ***

class SensitiveWordFilter:
    """基于 AC 自动机的敏感词过滤器"""
    
    def __init__(self, sensitive_words: list[str]):
        self.ac = AhoCorasickOptimized(sensitive_words)
    
    def filter_text(self, text: str) -> str:
        """将文本中的敏感词替换为 ***"""
        matches = self.ac.search(text)
        if not matches:
            return text
        
        # 按起始位置排序,合并重叠区间
        matches.sort(key=lambda x: x[0])
        intervals = []
        for start, pattern in matches:
            end = start + len(pattern)
            if intervals and start <= intervals[-1][1]:
                intervals[-1] = (intervals[-1][0], max(intervals[-1][1], end))
            else:
                intervals.append((start, end))
        
        # 替换
        result = []
        prev_end = 0
        for start, end in intervals:
            result.append(text[prev_end:start])
            result.append('*' * (end - start))
            prev_end = end
        result.append(text[prev_end:])
        
        return ''.join(result)
    
    def contains_sensitive(self, text: str) -> bool:
        """快速判断文本是否包含敏感词"""
        state = 0
        for ch in text:
            while state != 0 and ch not in self.ac.goto_table[state]:
                state = self.ac.failure[state]
            state = self.ac.goto_table[state].get(ch, 0)
            if self.ac.output_list[state]:
                return True
        return False


# 使用示例
filter_sys = SensitiveWordFilter(["暴力", "赌博", "色情", "毒品"])
print(filter_sys.filter_text("这篇文章涉及暴力和赌博内容"))
# 输出: "这篇文章涉及**和**内容"

print(filter_sys.contains_sensitive("正常的文章内容"))
# 输出: False

工程考量

  1. 大小写不敏感匹配:预处理时将模式和文本都转小写
  2. Unicode 支持:Python 的字符串天然支持 Unicode,但需要注意全角/半角转换
  3. 增量更新:如果敏感词列表动态变化,需要重建 AC 自动机。可以用定时重建策略
  4. 变体检测:用户会用各种方式规避过滤(插空格、谐音字等),需要预处理阶段标准化

31.8 AC 自动机与 KMP 的关系

KMP 是 AC 自动机的特例

当模式集只包含一个模式串时,AC 自动机退化为 KMP:

形式化对应

KMP AC 自动机
模式串的字符序列 Trie 中从根到叶的路径
next[j](前缀函数) fail[state](失配指针)
匹配时 j 的增减 状态在 Trie 中的转移和回退
一个模式 → O(n+m) k 个模式 → O(n + m_total + z)

直觉:KMP 是"一维"的(沿一条链回退),AC 自动机是"多维"的(在整棵 Trie 上通过 fail 链跳转)。

31.9 DFA 化优化

在高性能场景中,可以把 AC 自动机的匹配过程转化为纯 DFA 查表——预计算每个 (state, char) 对的完整转移,消除运行时的 fail 链遍历:

def build_dfa(ac: AhoCorasickOptimized, charset: str) -> list[dict]:
    """
    将 AC 自动机转化为完整 DFA
    对每个 (state, char) 预计算下一个状态
    """
    num_states = ac.num_states
    dfa = [{} for _ in range(num_states)]
    
    from collections import deque
    queue = deque([0])
    visited = {0}
    
    while queue:
        state = queue.popleft()
        for ch in charset:
            # 模拟 fail 链查找
            s = state
            while s != 0 and ch not in ac.goto_table[s]:
                s = ac.failure[s]
            next_state = ac.goto_table[s].get(ch, 0)
            dfa[state][ch] = next_state
            if next_state not in visited:
                visited.add(next_state)
                queue.append(next_state)
    
    return dfa

DFA 化后的匹配变成简单的查表操作:

def dfa_search(dfa, output_list, text):
    """DFA 化后的匹配:每个字符恰好一次查表"""
    state = 0
    results = []
    for i, ch in enumerate(text):
        state = dfa[state].get(ch, 0)
        if output_list[state]:
            for idx in output_list[state]:
                results.append((i, idx))
    return results

DFA 化的代价:空间从 O(m_total) 增加到 O(m_total * |Σ|)。对于 ASCII 字符集(|Σ|=256),如果有 10 万个状态,需要 25.6M 个转移条目——可接受。对于 Unicode(|Σ|=65536+),通常用字典或压缩表。


Level 3 · 规范怎么定义的

31.10 Aho & Corasick 1975 年论文

历史背景

Alfred Aho 和 Margaret Corasick 在 1975 年发表 "Efficient String Matching: An Aid to Bibliographic Search"(Communications of the ACM, Vol. 18, No. 6)。这篇论文的动机来自贝尔实验室的文献检索系统——需要在大量文档中同时搜索多个关键词。

论文的核心贡献

  1. 定理 1:给定模式集 P = {p_1, ..., p_k},可以在 O(sum|p_i|) 时间内构建一个确定有限自动机 M,使得 M 处理长度为 n 的文本的时间为 O(n),加上输出匹配结果的时间 O(z)。

  2. 算法描述:论文精确描述了三个阶段:

    • goto 构建:将所有模式插入 Trie,定义 goto 函数
    • failure 构建:BFS 计算每个状态的失配转移
    • output 合并:通过 failure 链合并输出集
  3. 复杂度证明:论文用摊还分析证明了线性时间——关键在于 fail 链的总长度是有界的。

与 Unix 工具的关系

Aho 是 Unix 工具 egrepfgrep 的作者。fgrep(fixed string grep)的内部实现正是 AC 自动机。当你在命令行执行 fgrep -f patterns.txt text.txt 时,底层就在运行本章描述的算法。

awk 的名字中的 "A" 也指 Alfred Aho——他是 awk 的三位创建者之一。

31.11 Wu-Manber 算法简介

动机

AC 自动机在模式数量极大但单个模式较短时表现优异。但当模式串较长且字母表较大时,类似 Boyer-Moore 的"跳跃"策略可能更快。Wu-Manber 算法(1994)正是将 Boyer-Moore 的思想推广到多模式匹配。

核心思想

Sun Wu 和 Udi Manber 在 "A Fast Algorithm for Multi-Pattern Searching"(Technical Report TR-94-17, University of Arizona)中提出:

  1. 取所有模式的最短长度为 m_min
  2. 用一个哈希表(shift table)记录长度为 B 的文本块(通常 B=2 或 3)的跳跃距离
  3. 如果文本当前位置的 B 字符块不匹配任何模式的后缀,可以安全跳过 m_min - B + 1 位

与 AC 自动机的对比

特性 AC 自动机 Wu-Manber
最佳场景 短模式、大量模式 长模式、大字母表
最坏复杂度 O(n + z) O(n * m)
平均复杂度 O(n + z) O(n * B / m_min)
空间 O(m_total) O(
实际应用 fgrep、IDS agrep(近似匹配)

Wu-Manber 的优势:在模式长度 >= 5 且字母表 >= 256 时,平均性能可以超过 AC 自动机——因为它能跳过大量文本字符。但它的最坏情况比 AC 自动机差。

31.12 AC 自动机的正确性证明

引理 1:Trie 的正确性

如果文本 text[i..j] 恰好等于某个模式 p,则从 Trie 根节点出发,依次输入 text[i], text[i+1], ..., text[j],最终到达的状态 s 满足 p 在 output[s] 中。

证明:Trie 的构建过程保证了这一点——每个模式串对应根到某状态的唯一路径。

引理 2:failure 链的正确性

对于任意状态 s(代表字符串 u),failure[s] 指向的状态代表 u 的最长真后缀 v,且 v 是 Trie 中某条路径的前缀。

证明:BFS 的层序遍历保证了计算 failure[v] 时,所有深度更小的状态的 failure 已经正确计算。归纳可证。

定理:AC 自动机找到所有匹配

在文本上运行 AC 自动机后,所有在文本中出现的模式串都会被报告。

证明概要:

31.13 复杂度的严格证明

构建阶段:O(m_total * |Σ|)

Trie 的构建是 O(m_total)(每个字符一次插入)。failure 的 BFS 中,每个状态出队一次、每条边访问一次:O(m_total)。但 failure 计算中的 while 回退需要额外分析——类似 KMP 的摊还分析,每个状态沿 failure 链回退的总次数不超过该状态的深度,总回退不超过 O(m_total)。

匹配阶段:O(n + z)

因此总时间 = O(n + z)。


Level 4 · 边界与陷阱

31.14 AC 自动机在实际工程中的应用

Web 应用防火墙(WAF)

WAF 需要在每个 HTTP 请求中检测数百条 SQL 注入、XSS 攻击签名。例如 ModSecurity 的核心规则集包含数百条攻击模式:

# WAF 攻击签名示例
attack_patterns = [
    "union select",
    "' or 1=1",
    "<script>",
    "javascript:",
    "../../../etc/passwd",
    "cmd.exe",
    "; drop table",
    "exec xp_",
]

class WAF:
    def __init__(self, signatures: list[str]):
        # 不区分大小写
        self.ac = AhoCorasickOptimized([s.lower() for s in signatures])
        self.signatures = signatures
    
    def inspect(self, request_body: str) -> tuple[bool, list[str]]:
        """
        检查请求体是否包含攻击签名
        返回 (是否拦截, 匹配到的签名列表)
        """
        matches = self.ac.search(request_body.lower())
        if matches:
            matched_sigs = list(set(sig for _, sig in matches))
            return True, matched_sigs
        return False, []

入侵检测系统(IDS)

Snort 和 Suricata 等网络 IDS 使用 AC 自动机的变体来检测网络流量中的恶意特征。它们需要处理:

这些系统通常使用 DFA 化的 AC 自动机 + SIMD 加速来达到线速处理。

入侵检测中的多模式匹配流水线

网络数据包 → 协议解析 → 内容提取 → AC 自动机匹配 → 规则引擎 → 告警/拦截
                                         ↑
                                    预编译的 DFA
                                    (启动时加载)

31.15 性能优化技巧

1. 压缩存储

对于大字符集(如 UTF-8),用完整的 |Σ| 大小数组不现实。常用策略:

class CompactAC:
    """使用字符类压缩的 AC 自动机"""
    
    def __init__(self, patterns: list[str]):
        # 分析字符集,建立字符到类别的映射
        charset = set()
        for p in patterns:
            charset.update(p)
        
        self.char_map = {ch: i for i, ch in enumerate(sorted(charset))}
        self.num_classes = len(self.char_map)
        
        # 用二维数组替代字典
        # goto_array[state * num_classes + char_class] = next_state
        # 这比字典查找快得多
        self._build(patterns)

2. 缓存友好的内存布局

AC 自动机的性能瓶颈通常是内存访问模式。优化策略:

3. SIMD 加速

现代 CPU 的 SIMD 指令(SSE4.2 的 PCMPISTRI/PCMPISTRM)可以一次比较 16 字节:

// 伪代码:利用 SIMD 一次处理 16 字节
for each 16-byte block of text:
    if any byte matches interesting character set:
        fall back to AC automaton for that region
    else:
        skip entire block (all bytes are "boring")

这种"预过滤 + 精确匹配"的混合策略在实际 IDS 中广泛使用,可以将吞吐量提高 5-10 倍。

4. 失配指针的优化——suffix link 压缩

对于深层节点,沿 failure 链回退可能经过很多中间节点。如果这些中间节点没有输出,可以直接跳到有输出的最近祖先——这就是"字典后缀链接"(dictionary suffix link)优化:

# 在构建时预计算 dict_suffix_link
# dict_suffix_link[s] = 沿 failure 链最近的有输出的状态
def build_dict_suffix_links(ac):
    """预计算字典后缀链接"""
    dict_link = [0] * ac.num_states
    from collections import deque
    queue = deque()
    
    for ch, s in ac.goto_table[0].items():
        dict_link[s] = 0
        queue.append(s)
    
    while queue:
        u = queue.popleft()
        for ch, v in ac.goto_table[u].items():
            queue.append(v)
            f = ac.failure[v]
            if ac.output_list[f]:
                dict_link[v] = f
            else:
                dict_link[v] = dict_link[f]
    
    return dict_link

31.16 面试中的 AC 自动机

面试出现频率

AC 自动机在一般面试中出现概率较低(属于高级话题),但在以下场景中会出现:

面试中如何展示 AC 自动机知识

  1. 先说清楚它解决什么问题:"在 O(n + z) 时间内完成多模式匹配"
  2. 说明核心结构:"Trie + failure 指针,failure 指针的含义是最长后缀匹配"
  3. 与 KMP 的关系:"KMP 是 AC 自动机在单模式下的特例"
  4. 复杂度:"构建 O(m_total),匹配 O(n + z)"
  5. 实际应用:"fgrep、IDS/WAF、敏感词过滤"

相关面试题

  1. 设计敏感词过滤系统

    • 核心:AC 自动机
    • 扩展:如何处理变体(同音字、拆字、空格插入)
    • 规模:10万敏感词、每秒10万次请求
  2. 单词搜索 II(LeetCode #212): 虽然这道题通常用 Trie + DFS 解,但 AC 自动机是更系统的解法思路。

  3. 实现一个简单的正则表达式引擎: 多个固定字符串的"或"匹配 (a|b|c) 本质上就是 AC 自动机。

31.17 AC 自动机的局限性

不适用的场景

  1. 通配符匹配:AC 自动机只能精确匹配,不支持 *?。需要用 NFA 或其他方法。

  2. 近似匹配:搜索"与模式编辑距离 <= k 的子串"——AC 自动机无法直接处理。需要结合 DP(如 Myers 的位并行算法)。

  3. 非常长的单一模式:如果只有一个超长模式(如 100MB),AC 自动机的空间开销不划算,直接用 KMP 或 Boyer-Moore。

  4. 动态模式集:如果模式集频繁增删,每次都要重建 AC 自动机,开销大。可以用多个小 AC 自动机分批管理。

与正则表达式引擎的关系

固定字符串匹配: KMP / Boyer-Moore
多固定字符串匹配: AC 自动机
含通配符: NFA 模拟
完整正则: NFA → DFA 转换(Thompson 构造 + 子集构造)

AC 自动机是正则表达式引擎的一个特化——当所有模式都是固定字符串时,AC 自动机是最优解。

31.18 实际系统中的 AC 自动机变体

Commentz-Walter 算法(1979)

把 Boyer-Moore 的右到左比较思想与 AC 自动机结合。在失配时可以实现大幅跳跃。适合模式较长的多模式匹配。

Set-BM(Set Boyer-Moore)

针对多个长模式的优化。先用最短模式的长度确定文本扫描窗口,再在窗口内应用 BM 跳跃策略。

Aho-Corasick + 位并行

结合 AC 自动机和位并行(bit-parallel)技术,利用 CPU 的字长一次处理多个转移。在模式总数 <= 64 时特别高效。

实际产品中的选择

产品 算法 原因
Snort 3 AC + Intel Hyperscan 需要线速,硬件加速
ClamAV AC + 前缀过滤 病毒签名数量大
Cloudflare WAF AC + SIMD 低延迟要求
grep (GNU) BM + AC 混合 单/多模式自适应
Python re NFA → DFA(pyahocorasick 扩展) 通用性 + 性能

本章小结

  1. AC 自动机 = Trie + 失配指针:把多模式匹配的时间从 O(k * n) 降到 O(n + z),与模式数量无关。

  2. 构建过程:先插入所有模式到 Trie,再用 BFS 计算每个节点的 failure 指针(指向最长后缀匹配),最后合并 output 链。

  3. 与 KMP 的关系:AC 自动机是 KMP 从一维(单链)到多维(Trie)的推广。失配指针就是 KMP next 数组的多模式版本。

  4. Aho & Corasick 1975 年论文:奠定了多模式精确匹配的理论基础,直接催生了 Unix fgrep 工具。

  5. 工程实践:WAF、IDS、敏感词过滤是核心应用场景。性能优化包括 DFA 化、双数组 Trie、SIMD 预过滤、字典后缀链接。

  6. 面试要点:理解"为什么需要多模式匹配"、能描述构建过程(Trie → BFS failure → output 合并)、知道时间复杂度 O(n + m_total + z) 的由来。

本章评分
4.6  / 5  (3 评分)

💬 留言讨论