多模式匹配与AC自动机
第三十一章:多模式匹配与 AC 自动机
上一章我们掌握了单模式匹配:在文本中找一个模式串。但现实世界经常需要同时找多个模式串——敏感词过滤要匹配成千上万个违禁词,入侵检测系统要检测数百条攻击签名,杀毒软件要匹配数十万条病毒特征码。
如果对每个模式分别跑一次 KMP,k 个模式的总时间是 O(k * (n + m))——模式越多越慢。有没有办法一次扫描文本,同时匹配所有模式?
AC 自动机(Aho-Corasick Automaton)就是答案。它的核心思想:把多个模式串构建成一个 Trie,然后像 KMP 一样在 Trie 上加失配指针。一次扫描文本即可同时匹配所有模式,时间复杂度 O(n + m_total + z),其中 z 是匹配总数。
Level 1 · 你需要知道的
31.1 为什么需要多模式匹配
场景对比
| 场景 | 模式数量 | 文本长度 | 单模式方案 | AC 自动机方案 |
|---|---|---|---|---|
| 敏感词过滤 | 10,000 | 1,000 | 10M 次操作 | 1K 次操作 |
| 网络 IDS | 500 | 1,500 | 750K 次操作 | 1.5K 次操作 |
| 病毒扫描 | 100,000 | 10^6 | 10^11 次操作 | 10^6 次操作 |
差距是惊人的。AC 自动机的扫描时间与模式数量无关——只取决于文本长度和匹配数量。
31.2 AC 自动机 = Trie + 失配指针
三个组成部分
- Trie(字典树):把所有模式串插入一棵 Trie
- 失配指针(Failure Link):类似 KMP 的 next 数组,当匹配失败时告诉我们跳到哪里
- 输出链接(Output Link):记录在当前状态可以输出哪些完整的模式串
直觉理解
想象你同时用多把尺子(模式串)在文本上滑动。AC 自动机等价于把所有尺子叠在一起(Trie),当某把尺子失配时,不需要从头开始——失配指针告诉你另一把尺子的哪个位置恰好可以继续。
31.3 构建过程(BFS)
第一步:构建 Trie
class AhoCorasick:
def __init__(self):
# 每个节点用一个列表/字典表示其子节点
# goto[state][char] = next_state
self.goto = [{}] # 状态 0 是根节点
self.fail = [0] # 失配指针
self.output = [[]] # 每个状态的输出(匹配的模式串)
self.state_count = 1
def _add_pattern(self, pattern: str, index: int):
"""将一个模式串插入 Trie"""
state = 0
for ch in pattern:
if ch not in self.goto[state]:
# 创建新节点
self.goto.append({})
self.fail.append(0)
self.output.append([])
self.goto[state][ch] = self.state_count
self.state_count += 1
state = self.goto[state][ch]
# 标记结束状态
self.output[state].append(index)
第二步:BFS 构建失配指针
失配指针的含义:如果当前在状态 s(代表已匹配的字符串为 u),fail[s] 指向的是 Trie 中能匹配 u 的最长真后缀的那个节点。
def _build_failure(self):
"""BFS 构建失配指针"""
from collections import deque
queue = deque()
# 初始化:根节点的直接子节点的失配指针指向根
for ch, next_state in self.goto[0].items():
self.fail[next_state] = 0
queue.append(next_state)
# BFS
while queue:
curr = queue.popleft()
for ch, next_state in self.goto[curr].items():
queue.append(next_state)
# 找 fail 指针:沿着 curr 的 fail 链向上找
fallback = self.fail[curr]
while fallback != 0 and ch not in self.goto[fallback]:
fallback = self.fail[fallback]
self.fail[next_state] = self.goto[fallback].get(ch, 0)
# 如果 fail 指针指向的是自己,修正为根
if self.fail[next_state] == next_state:
self.fail[next_state] = 0
# 合并输出:当前节点的输出 = 自身输出 + fail 指向节点的输出
self.output[next_state] = (
self.output[next_state] +
self.output[self.fail[next_state]]
)
def build(self, patterns: list[str]):
"""构建完整的 AC 自动机"""
for i, pattern in enumerate(patterns):
self._add_pattern(pattern, i)
self._build_failure()
为什么用 BFS? 失配指针的计算需要已知父节点的失配指针。BFS 保证我们按层处理——层数越浅的节点先处理,它们的失配指针已经就绪,可以供更深的节点使用。
31.4 匹配过程
def search(self, text: str) -> list[tuple[int, int]]:
"""
在文本中搜索所有模式串
返回 [(结束位置, 模式串索引), ...]
"""
results = []
state = 0
for i, ch in enumerate(text):
# 当前状态没有 ch 的转移时,沿 fail 链回退
while state != 0 and ch not in self.goto[state]:
state = self.fail[state]
# 尝试转移
state = self.goto[state].get(ch, 0)
# 检查输出
if self.output[state]:
for pattern_idx in self.output[state]:
results.append((i, pattern_idx))
return results
完整使用示例
# 构建 AC 自动机
patterns = ["he", "she", "his", "hers"]
ac = AhoCorasick()
ac.build(patterns)
# 搜索文本
text = "ushers"
matches = ac.search(text)
# 输出匹配结果
for end_pos, pattern_idx in matches:
pattern = patterns[pattern_idx]
start_pos = end_pos - len(pattern) + 1
print(f"Found '{pattern}' at position {start_pos}-{end_pos}")
# 输出:
# Found 'she' at position 1-3
# Found 'he' at position 2-3
# Found 'hers' at position 2-5
时间复杂度分析
- 构建 Trie:O(m_total),m_total 是所有模式串长度之和
- 构建失配指针:O(m_total * |Σ|)(最坏情况),通常 O(m_total)
- 匹配:O(n + z),n 是文本长度,z 是匹配总数
空间复杂度:O(m_total * |Σ|)(如果用数组实现),或 O(m_total)(如果用字典实现)。
31.5 逐步演示
让我们用模式集 {"he", "she", "his", "hers"} 完整走一遍。
Trie 结构
根(0) → 'h' → (1) → 'e' → (2, 输出"he") → 'r' → (6) → 's' → (7, 输出"hers")
→ 'i' → (3) → 's' → (4, 输出"his")
→ 's' → (5) → 'h' → (8) → 'e' → (9, 输出"she")
失配指针
fail[1] = 0 (h 的最长真后缀在 Trie 中不存在)
fail[2] = 0 (he 的真后缀 "e" 不在 Trie 中)
fail[3] = 0 (hi 的真后缀 "i" 不在 Trie 中)
fail[4] = 5 (his 的真后缀 "s" 在 Trie 中, 对应状态 5)
fail[5] = 0 (s 的真后缀为空)
fail[6] = 0 (her 的真后缀 "r", "er" 都不在 Trie 中)
fail[7] = 0 (hers 的真后缀...)
fail[8] = 1 (sh 的真后缀 "h" 在 Trie 中, 对应状态 1)
fail[9] = 2 (she 的真后缀 "he" 在 Trie 中, 对应状态 2)
→ output[9] 合并了 output[2] = ["he"]
→ 所以 output[9] = ["she", "he"]
匹配 text = "ushers"
i=0, ch='u': state=0, 无 'u' 转移 → state 保持 0
i=1, ch='s': state=0 → goto[0]['s'] = 5 → state=5, 无输出
i=2, ch='h': state=5 → goto[5]['h'] = 8 → state=8, 无输出
i=3, ch='e': state=8 → goto[8]['e'] = 9 → state=9
output[9] = ["she"(idx=1), "he"(idx=0)]
→ 匹配 "she" 结束于位置 3
→ 匹配 "he" 结束于位置 3
i=4, ch='r': state=9, 无 'r' 转移
→ fail[9] = 2, goto[2] 有 'r' 吗?有!goto[2]['r'] = 6
→ 等等,我们需要先回退再检查
state = fail[9] = 2, goto[2] 有 'r' → state=6, 无输出
i=5, ch='s': state=6 → goto[6]['s'] = 7 → state=7
output[7] = ["hers"(idx=3)]
→ 匹配 "hers" 结束于位置 5
最终结果:在 "ushers" 中找到 "she"@1, "he"@2, "hers"@2。
Level 2 · 它是怎么运行的
31.6 AC 自动机完整优化实现
上面的基础实现在失配时需要沿 fail 链逐步回退。工程实践中,我们可以预计算所有转移(类似 DFA),消除运行时的回退:
class AhoCorasickOptimized:
"""优化版 AC 自动机:预计算所有转移,匹配时无需沿 fail 链回退"""
def __init__(self, patterns: list[str]):
self.patterns = patterns
self.num_states = 1
# 用固定大小数组(假设 ASCII)或字典
# 这里用字典以支持任意字符
self.goto_table = [{}] # goto_table[state][char] = next_state
self.failure = [0]
self.output_list = [[]]
self._build(patterns)
def _build(self, patterns: list[str]):
# 第一步:构建 Trie
for idx, pattern in enumerate(patterns):
state = 0
for ch in pattern:
if ch not in self.goto_table[state]:
self.goto_table.append({})
self.failure.append(0)
self.output_list.append([])
self.goto_table[state][ch] = self.num_states
self.num_states += 1
state = self.goto_table[state][ch]
self.output_list[state].append(idx)
# 第二步:BFS 构建 failure 指针 + 完整转移表
from collections import deque
queue = deque()
# 对根节点的所有可能字符,设定默认转移
# 根节点遇到 Trie 中没有的字符,转移到自身
for ch, s in self.goto_table[0].items():
self.failure[s] = 0
queue.append(s)
while queue:
u = queue.popleft()
for ch, v in self.goto_table[u].items():
queue.append(v)
# 计算 failure[v]
f = self.failure[u]
while f != 0 and ch not in self.goto_table[f]:
f = self.failure[f]
self.failure[v] = self.goto_table[f].get(ch, 0)
if self.failure[v] == v:
self.failure[v] = 0
# 合并输出
self.output_list[v] = (
self.output_list[v] + self.output_list[self.failure[v]]
)
def search(self, text: str) -> list[tuple[int, str]]:
"""
搜索文本,返回 (结束位置, 模式串) 列表
"""
results = []
state = 0
for i, ch in enumerate(text):
# 沿 failure 链找到能转移的状态
while state != 0 and ch not in self.goto_table[state]:
state = self.failure[state]
state = self.goto_table[state].get(ch, 0)
# 检查输出
for idx in self.output_list[state]:
pattern = self.patterns[idx]
start = i - len(pattern) + 1
results.append((start, pattern))
return results
31.7 敏感词过滤系统实战
需求:给定一组敏感词列表和用户输入文本,将所有敏感词替换为 ***。
class SensitiveWordFilter:
"""基于 AC 自动机的敏感词过滤器"""
def __init__(self, sensitive_words: list[str]):
self.ac = AhoCorasickOptimized(sensitive_words)
def filter_text(self, text: str) -> str:
"""将文本中的敏感词替换为 ***"""
matches = self.ac.search(text)
if not matches:
return text
# 按起始位置排序,合并重叠区间
matches.sort(key=lambda x: x[0])
intervals = []
for start, pattern in matches:
end = start + len(pattern)
if intervals and start <= intervals[-1][1]:
intervals[-1] = (intervals[-1][0], max(intervals[-1][1], end))
else:
intervals.append((start, end))
# 替换
result = []
prev_end = 0
for start, end in intervals:
result.append(text[prev_end:start])
result.append('*' * (end - start))
prev_end = end
result.append(text[prev_end:])
return ''.join(result)
def contains_sensitive(self, text: str) -> bool:
"""快速判断文本是否包含敏感词"""
state = 0
for ch in text:
while state != 0 and ch not in self.ac.goto_table[state]:
state = self.ac.failure[state]
state = self.ac.goto_table[state].get(ch, 0)
if self.ac.output_list[state]:
return True
return False
# 使用示例
filter_sys = SensitiveWordFilter(["暴力", "赌博", "色情", "毒品"])
print(filter_sys.filter_text("这篇文章涉及暴力和赌博内容"))
# 输出: "这篇文章涉及**和**内容"
print(filter_sys.contains_sensitive("正常的文章内容"))
# 输出: False
工程考量
- 大小写不敏感匹配:预处理时将模式和文本都转小写
- Unicode 支持:Python 的字符串天然支持 Unicode,但需要注意全角/半角转换
- 增量更新:如果敏感词列表动态变化,需要重建 AC 自动机。可以用定时重建策略
- 变体检测:用户会用各种方式规避过滤(插空格、谐音字等),需要预处理阶段标准化
31.8 AC 自动机与 KMP 的关系
KMP 是 AC 自动机的特例
当模式集只包含一个模式串时,AC 自动机退化为 KMP:
- Trie 退化为一条链(单个模式串的每个字符)
- 失配指针退化为 KMP 的 next 数组
- BFS 退化为线性扫描
形式化对应
| KMP | AC 自动机 |
|---|---|
| 模式串的字符序列 | Trie 中从根到叶的路径 |
| next[j](前缀函数) | fail[state](失配指针) |
| 匹配时 j 的增减 | 状态在 Trie 中的转移和回退 |
| 一个模式 → O(n+m) | k 个模式 → O(n + m_total + z) |
直觉:KMP 是"一维"的(沿一条链回退),AC 自动机是"多维"的(在整棵 Trie 上通过 fail 链跳转)。
31.9 DFA 化优化
在高性能场景中,可以把 AC 自动机的匹配过程转化为纯 DFA 查表——预计算每个 (state, char) 对的完整转移,消除运行时的 fail 链遍历:
def build_dfa(ac: AhoCorasickOptimized, charset: str) -> list[dict]:
"""
将 AC 自动机转化为完整 DFA
对每个 (state, char) 预计算下一个状态
"""
num_states = ac.num_states
dfa = [{} for _ in range(num_states)]
from collections import deque
queue = deque([0])
visited = {0}
while queue:
state = queue.popleft()
for ch in charset:
# 模拟 fail 链查找
s = state
while s != 0 and ch not in ac.goto_table[s]:
s = ac.failure[s]
next_state = ac.goto_table[s].get(ch, 0)
dfa[state][ch] = next_state
if next_state not in visited:
visited.add(next_state)
queue.append(next_state)
return dfa
DFA 化后的匹配变成简单的查表操作:
def dfa_search(dfa, output_list, text):
"""DFA 化后的匹配:每个字符恰好一次查表"""
state = 0
results = []
for i, ch in enumerate(text):
state = dfa[state].get(ch, 0)
if output_list[state]:
for idx in output_list[state]:
results.append((i, idx))
return results
DFA 化的代价:空间从 O(m_total) 增加到 O(m_total * |Σ|)。对于 ASCII 字符集(|Σ|=256),如果有 10 万个状态,需要 25.6M 个转移条目——可接受。对于 Unicode(|Σ|=65536+),通常用字典或压缩表。
Level 3 · 规范怎么定义的
31.10 Aho & Corasick 1975 年论文
历史背景
Alfred Aho 和 Margaret Corasick 在 1975 年发表 "Efficient String Matching: An Aid to Bibliographic Search"(Communications of the ACM, Vol. 18, No. 6)。这篇论文的动机来自贝尔实验室的文献检索系统——需要在大量文档中同时搜索多个关键词。
论文的核心贡献
-
定理 1:给定模式集 P = {p_1, ..., p_k},可以在 O(sum|p_i|) 时间内构建一个确定有限自动机 M,使得 M 处理长度为 n 的文本的时间为 O(n),加上输出匹配结果的时间 O(z)。
-
算法描述:论文精确描述了三个阶段:
- goto 构建:将所有模式插入 Trie,定义 goto 函数
- failure 构建:BFS 计算每个状态的失配转移
- output 合并:通过 failure 链合并输出集
-
复杂度证明:论文用摊还分析证明了线性时间——关键在于 fail 链的总长度是有界的。
与 Unix 工具的关系
Aho 是 Unix 工具 egrep 和 fgrep 的作者。fgrep(fixed string grep)的内部实现正是 AC 自动机。当你在命令行执行 fgrep -f patterns.txt text.txt 时,底层就在运行本章描述的算法。
awk 的名字中的 "A" 也指 Alfred Aho——他是 awk 的三位创建者之一。
31.11 Wu-Manber 算法简介
动机
AC 自动机在模式数量极大但单个模式较短时表现优异。但当模式串较长且字母表较大时,类似 Boyer-Moore 的"跳跃"策略可能更快。Wu-Manber 算法(1994)正是将 Boyer-Moore 的思想推广到多模式匹配。
核心思想
Sun Wu 和 Udi Manber 在 "A Fast Algorithm for Multi-Pattern Searching"(Technical Report TR-94-17, University of Arizona)中提出:
- 取所有模式的最短长度为 m_min
- 用一个哈希表(shift table)记录长度为 B 的文本块(通常 B=2 或 3)的跳跃距离
- 如果文本当前位置的 B 字符块不匹配任何模式的后缀,可以安全跳过 m_min - B + 1 位
与 AC 自动机的对比
| 特性 | AC 自动机 | Wu-Manber |
|---|---|---|
| 最佳场景 | 短模式、大量模式 | 长模式、大字母表 |
| 最坏复杂度 | O(n + z) | O(n * m) |
| 平均复杂度 | O(n + z) | O(n * B / m_min) |
| 空间 | O(m_total) | O( |
| 实际应用 | fgrep、IDS | agrep(近似匹配) |
Wu-Manber 的优势:在模式长度 >= 5 且字母表 >= 256 时,平均性能可以超过 AC 自动机——因为它能跳过大量文本字符。但它的最坏情况比 AC 自动机差。
31.12 AC 自动机的正确性证明
引理 1:Trie 的正确性
如果文本 text[i..j] 恰好等于某个模式 p,则从 Trie 根节点出发,依次输入 text[i], text[i+1], ..., text[j],最终到达的状态 s 满足 p 在 output[s] 中。
证明:Trie 的构建过程保证了这一点——每个模式串对应根到某状态的唯一路径。
引理 2:failure 链的正确性
对于任意状态 s(代表字符串 u),failure[s] 指向的状态代表 u 的最长真后缀 v,且 v 是 Trie 中某条路径的前缀。
证明:BFS 的层序遍历保证了计算 failure[v] 时,所有深度更小的状态的 failure 已经正确计算。归纳可证。
定理:AC 自动机找到所有匹配
在文本上运行 AC 自动机后,所有在文本中出现的模式串都会被报告。
证明概要:
- 如果模式 p 在 text[i..j] 出现,则处理完 text[j] 后,当前状态 s 代表的字符串以 p 为后缀
- 通过 failure 链的传递性,output[s] 中包含 p(直接或通过 failure 链合并)
- output 的合并在构建阶段完成,保证了完整性
31.13 复杂度的严格证明
构建阶段:O(m_total * |Σ|)
Trie 的构建是 O(m_total)(每个字符一次插入)。failure 的 BFS 中,每个状态出队一次、每条边访问一次:O(m_total)。但 failure 计算中的 while 回退需要额外分析——类似 KMP 的摊还分析,每个状态沿 failure 链回退的总次数不超过该状态的深度,总回退不超过 O(m_total)。
匹配阶段:O(n + z)
- 外层循环:n 次(每个文本字符一次)
- while 回退:类似 KMP 的摊还分析,state 的"深度"每次外层循环最多增加 1(成功转移),while 每次让深度减小。总回退次数不超过 n。
- 输出收集:O(z)
因此总时间 = O(n + z)。
Level 4 · 边界与陷阱
31.14 AC 自动机在实际工程中的应用
Web 应用防火墙(WAF)
WAF 需要在每个 HTTP 请求中检测数百条 SQL 注入、XSS 攻击签名。例如 ModSecurity 的核心规则集包含数百条攻击模式:
# WAF 攻击签名示例
attack_patterns = [
"union select",
"' or 1=1",
"<script>",
"javascript:",
"../../../etc/passwd",
"cmd.exe",
"; drop table",
"exec xp_",
]
class WAF:
def __init__(self, signatures: list[str]):
# 不区分大小写
self.ac = AhoCorasickOptimized([s.lower() for s in signatures])
self.signatures = signatures
def inspect(self, request_body: str) -> tuple[bool, list[str]]:
"""
检查请求体是否包含攻击签名
返回 (是否拦截, 匹配到的签名列表)
"""
matches = self.ac.search(request_body.lower())
if matches:
matched_sigs = list(set(sig for _, sig in matches))
return True, matched_sigs
return False, []
入侵检测系统(IDS)
Snort 和 Suricata 等网络 IDS 使用 AC 自动机的变体来检测网络流量中的恶意特征。它们需要处理:
- 每秒数 GB 的流量
- 数万条规则/签名
- 实时响应(微秒级延迟)
这些系统通常使用 DFA 化的 AC 自动机 + SIMD 加速来达到线速处理。
入侵检测中的多模式匹配流水线
网络数据包 → 协议解析 → 内容提取 → AC 自动机匹配 → 规则引擎 → 告警/拦截
↑
预编译的 DFA
(启动时加载)
31.15 性能优化技巧
1. 压缩存储
对于大字符集(如 UTF-8),用完整的 |Σ| 大小数组不现实。常用策略:
- 字典存储:Python 中自然的选择,但有哈希开销
- 双数组 Trie(Double-Array Trie):Aoe(1989)提出的紧凑 Trie 表示,查找 O(1) 且内存紧凑
- 分层映射:先用字符类映射(如 ASCII 0-127 直接索引,>127 用哈希)
class CompactAC:
"""使用字符类压缩的 AC 自动机"""
def __init__(self, patterns: list[str]):
# 分析字符集,建立字符到类别的映射
charset = set()
for p in patterns:
charset.update(p)
self.char_map = {ch: i for i, ch in enumerate(sorted(charset))}
self.num_classes = len(self.char_map)
# 用二维数组替代字典
# goto_array[state * num_classes + char_class] = next_state
# 这比字典查找快得多
self._build(patterns)
2. 缓存友好的内存布局
AC 自动机的性能瓶颈通常是内存访问模式。优化策略:
- 将经常一起访问的数据(goto、output、failure)紧凑排列
- 使用 BFS 序给状态编号——这保证相邻编号的状态在 Trie 中也相邻,有利于缓存预取
- 热点状态(根附近、高频访问)放在内存前端
3. SIMD 加速
现代 CPU 的 SIMD 指令(SSE4.2 的 PCMPISTRI/PCMPISTRM)可以一次比较 16 字节:
// 伪代码:利用 SIMD 一次处理 16 字节
for each 16-byte block of text:
if any byte matches interesting character set:
fall back to AC automaton for that region
else:
skip entire block (all bytes are "boring")
这种"预过滤 + 精确匹配"的混合策略在实际 IDS 中广泛使用,可以将吞吐量提高 5-10 倍。
4. 失配指针的优化——suffix link 压缩
对于深层节点,沿 failure 链回退可能经过很多中间节点。如果这些中间节点没有输出,可以直接跳到有输出的最近祖先——这就是"字典后缀链接"(dictionary suffix link)优化:
# 在构建时预计算 dict_suffix_link
# dict_suffix_link[s] = 沿 failure 链最近的有输出的状态
def build_dict_suffix_links(ac):
"""预计算字典后缀链接"""
dict_link = [0] * ac.num_states
from collections import deque
queue = deque()
for ch, s in ac.goto_table[0].items():
dict_link[s] = 0
queue.append(s)
while queue:
u = queue.popleft()
for ch, v in ac.goto_table[u].items():
queue.append(v)
f = ac.failure[v]
if ac.output_list[f]:
dict_link[v] = f
else:
dict_link[v] = dict_link[f]
return dict_link
31.16 面试中的 AC 自动机
面试出现频率
AC 自动机在一般面试中出现概率较低(属于高级话题),但在以下场景中会出现:
- 系统设计面试:设计敏感词过滤系统、URL 黑名单检测
- 算法面试(高级别):LeetCode Hard 的字符串匹配变体
- 后端/基础设施岗位:讨论 IDS、WAF 的实现原理
面试中如何展示 AC 自动机知识
- 先说清楚它解决什么问题:"在 O(n + z) 时间内完成多模式匹配"
- 说明核心结构:"Trie + failure 指针,failure 指针的含义是最长后缀匹配"
- 与 KMP 的关系:"KMP 是 AC 自动机在单模式下的特例"
- 复杂度:"构建 O(m_total),匹配 O(n + z)"
- 实际应用:"fgrep、IDS/WAF、敏感词过滤"
相关面试题
-
设计敏感词过滤系统:
- 核心:AC 自动机
- 扩展:如何处理变体(同音字、拆字、空格插入)
- 规模:10万敏感词、每秒10万次请求
-
单词搜索 II(LeetCode #212): 虽然这道题通常用 Trie + DFS 解,但 AC 自动机是更系统的解法思路。
-
实现一个简单的正则表达式引擎: 多个固定字符串的"或"匹配 (a|b|c) 本质上就是 AC 自动机。
31.17 AC 自动机的局限性
不适用的场景
-
通配符匹配:AC 自动机只能精确匹配,不支持
*或?。需要用 NFA 或其他方法。 -
近似匹配:搜索"与模式编辑距离 <= k 的子串"——AC 自动机无法直接处理。需要结合 DP(如 Myers 的位并行算法)。
-
非常长的单一模式:如果只有一个超长模式(如 100MB),AC 自动机的空间开销不划算,直接用 KMP 或 Boyer-Moore。
-
动态模式集:如果模式集频繁增删,每次都要重建 AC 自动机,开销大。可以用多个小 AC 自动机分批管理。
与正则表达式引擎的关系
固定字符串匹配: KMP / Boyer-Moore
多固定字符串匹配: AC 自动机
含通配符: NFA 模拟
完整正则: NFA → DFA 转换(Thompson 构造 + 子集构造)
AC 自动机是正则表达式引擎的一个特化——当所有模式都是固定字符串时,AC 自动机是最优解。
31.18 实际系统中的 AC 自动机变体
Commentz-Walter 算法(1979)
把 Boyer-Moore 的右到左比较思想与 AC 自动机结合。在失配时可以实现大幅跳跃。适合模式较长的多模式匹配。
Set-BM(Set Boyer-Moore)
针对多个长模式的优化。先用最短模式的长度确定文本扫描窗口,再在窗口内应用 BM 跳跃策略。
Aho-Corasick + 位并行
结合 AC 自动机和位并行(bit-parallel)技术,利用 CPU 的字长一次处理多个转移。在模式总数 <= 64 时特别高效。
实际产品中的选择
| 产品 | 算法 | 原因 |
|---|---|---|
| Snort 3 | AC + Intel Hyperscan | 需要线速,硬件加速 |
| ClamAV | AC + 前缀过滤 | 病毒签名数量大 |
| Cloudflare WAF | AC + SIMD | 低延迟要求 |
| grep (GNU) | BM + AC 混合 | 单/多模式自适应 |
| Python re | NFA → DFA(pyahocorasick 扩展) | 通用性 + 性能 |
本章小结
-
AC 自动机 = Trie + 失配指针:把多模式匹配的时间从 O(k * n) 降到 O(n + z),与模式数量无关。
-
构建过程:先插入所有模式到 Trie,再用 BFS 计算每个节点的 failure 指针(指向最长后缀匹配),最后合并 output 链。
-
与 KMP 的关系:AC 自动机是 KMP 从一维(单链)到多维(Trie)的推广。失配指针就是 KMP next 数组的多模式版本。
-
Aho & Corasick 1975 年论文:奠定了多模式精确匹配的理论基础,直接催生了 Unix fgrep 工具。
-
工程实践:WAF、IDS、敏感词过滤是核心应用场景。性能优化包括 DFA 化、双数组 Trie、SIMD 预过滤、字典后缀链接。
-
面试要点:理解"为什么需要多模式匹配"、能描述构建过程(Trie → BFS failure → output 合并)、知道时间复杂度 O(n + m_total + z) 的由来。