AC 自动机的原理及实现.

在最近水论坛的时候,我发现了一个网友友问的这样一个问题:

数千万篇文章,寻找其中包含成语的句子。成语有数万条。 目前没有对文章内容建立过全文索引,鉴于这个事情是一次性的,为此搞个索引可能也成本过高。暂时的解决方案是,把成语都放在一条 re.compile(‘乌合之众|鸡犬相闻|…’)里面去搜索文章,但效率总觉得不理想。 求教,是否可能有更高效的解决方案。

我写过爬虫,也做过相关的一些算法,对于数据匹配无非就是正则表达式了,毕竟爬的数据量都还不大,效率方面的感知不强。

但是正则的缺点就是效率低,所以这个有大规模数据匹配需求的网友就遇到了低效的问题。这时,底下的一条评论靠着简单的一个词解决了这个问题:

AC 自动机。”

AC 自动机是什么?

AC 自动机的原理简单来说,就是根据输入的多个模式串去建立一个树模型,然后再根据这个模型作字符串匹配。那它具体是什么呢?

在聊 AC 自动机之前,我们还需要了解两个基本算法:KMPtrie 树

KMP

KMP 算法是一种改进的字符串匹配算法,能够高效地进行串匹配。

传统的一种字符串匹配算法是类似于滑窗一样地进行逐个比对,如下:

T: b a a b a b c
   x
W: a b a
Wrong.

T: b a a b a b c
     √ x
W:   a b a
Wrong.

T: b a a b a b c
       √ √ √
W:     a b a
Right.

后来有人发现了,这样匹配多没效率啊,不如匹配错误的时候,跳过一些字符,可以减少匹配计算量。

怎么跳呢?于是就有人就想出了解决办法:在模式串中通过算法找出长度最大且相等前后缀,然后构建出一组失配指针,从而哪一处匹配错误了,就根据 数组中对应的值向右移几步。

这个数组又如何生成呢?方法如下,假设我们有一串 'ababa'

子串 前缀 后缀 取交集 最长相等前后缀长度
a $\Phi$ $\Phi$ $\Phi\cap\Phi=\Phi$ 0
ab {a} {b} $\lbrace a\rbrace\cap\lbrace b\rbrace=\Phi$ 0
aba {a, ab} {a, ba} $\lbrace a, ab\rbrace\cap\lbrace a, ba\rbrace={a}$ 1
abab {a, ab, aba} {b, ab, bab} $\lbrace a, ab, aba\rbrace\cap\lbrace b, ab, bab\rbrace=\lbrace a, ab\rbrace$ 2
ababa {a, ab, aba, abab} {a, ba, aba, baba} $\lbrace a, ab, aba, abab\rbrace\cap\lbrace a, ba, aba, baba\rbrace=\lbrace a, aba\rbrace$ 3

所以由上表的算法,最后可以计算出来一个数组:[0, 0, 1, 2, 3],这就是部分匹配值表(Partial Match Table, PMT)

为了后续编程的方便,我们还需要将该表整体右移,最左端填充 -1,得到最终的数组 [-1, 0, 0, 1, 2],这就是 next 数组。它的含义是字符串当前匹配错误的元素的下一个元素所对应的模式串的位置。我们再举一个例子:

T: a b c a b a b a
       x
W: a b a b a
 [-1 0 0 1 2]
Wrong. 此时模式串中出错位置为 a,对应 next 数组的 0(相同位置),所以匹配出错的 c 元素下一个 a 元素对应的模式串新的位置为 0。

T: a b c a b a b a
       x
W:     a b a b a
     [-1 0 0 1 2]
Wrong. 此时从 c 开始(上一次错误的地方)重新匹配模式串,出错位对应 -1,所以 c 对应的位置为 -1,即模式串往前移一位。

T: a b c a b a b a
         √ √ √ √ √
W:       a b a b a
       [-1 0 0 1 2]
Right.
至此匹配成功。

同上例,当匹配字符串出错时,我们就可以找到模式串相应位置对应的 next 数组值 $n$,然后向后滑动 $n$ 的地方继续匹配。

简单来说,就是“按块匹配”,所以往右移的时候也是“按块右移”,从而不用像以前一样一格一格右移匹配,减少了很多的工作量。

虽然 KMP 相较传统检索效率高,但是效率并不是最高的,如今各种编辑器的查找功能大多采用的是 Boyer-Moore 算法,见这里

trie 树

trie 树又称前缀树或字典树,是一种有序树,用于保存关联数组,其中的键通常是字符串。与二叉查找树不同,键不是直接保存在节点中,二是由节点在树中的位置决定。一个节点的所有子孙都有相同的前缀,也就是这个节点对应的字符串,而根节点对应空字符串。一般情况下,不是所有的节点都有对应的值,只有叶子节点和部分内部节点所对应的键才有相关的值。

(内容来自维基百科 - Trie

建立模型的算法不做过多的介绍,详情见这里

trie 树的算法也很直观,简单来说,就是根据输入的模式串在一个区域内建立了路标;然后每次从特定的起点出发,对上了就根据路标去下一个地点,对不上就结束,以此实现匹配来找到目的地。

OK,让我们继续

在了解了上面的两个算法之后,AC 自动机就十分容易理解了。

这里我们需要介绍一个概念: fail 指针,即一个地方的字符若如果匹配失败,就会跳到 fail 指针所指向的地方。比如 humanmad 这两个模式串,如果匹配了 huma 下一个不是 n,那么就跳到 mad 中的 a 处继续匹配。由此可见,这样的机制可以大大减小匹配的时间,但是在构建模型方面相比也会更加复杂

而 AC 自动机就是在 trie 树的基础上添加了许多的 fail 指针,实现了一个更加快速高效的模型。

问题的后续

当然,这个问题后来也聚集了多方大佬来讨论,于是也看到了一些有用的评论:

直接套 AC 自动机, 专门搞敏感词过滤和发现的, 现成的库就挺快了, 貌似是 C 写的给 py 调;

套正则, 四五万字的 “或” 条件, 还不如直接用 Python 字符串的 for 和 in。

qsnow6

之前在过滤 black list 时找过相关的资料,长文本的搜索适合上 AC 自动机。短文本建议直接 x in s 就完事了,Python 编译器做了优化,效率更快,写起来也方便。

lithbitren

说起来,当时找 ac 自动机解决方案还是有坑的,py 的对于复杂算法的模板不多,csdn 上搜到的 py 的 ac 自动机是有问题,建树竟然用 remove 首元素来处理队列,让队列处理在 c 层面的时间复杂度变成了 $O(N^2)$,小规模的屏蔽词过滤看不出啥,大规模词语处理时间会陡增,40 万词要处理将近四分钟,不过改数据解构用 dequepopleft 大概就好了。另外搜到过 2 个 pipy 上的开源库,一个是 py2 时代的,一个是重叠的关键字词输出会有问题,最后才决定手撸的。。

下文的代码也是用的 remove,所以速度相对会慢一些。

最后题主也出面反映解决了这个问题:

v2zero

@fishCatcher 感动,一个词解决一个大难题。业余写代码,自己也实现不了这种东西,网上找了个 Python 实现。原来预计一周多的运行时间,瞬间就缩减到一天了。

Python 实现 AC 自动机

from collections import defaultdict


class TrieNode(object):
    def __init__(self, value=None):
        self.value = value  # 值
        self.fail = None  # fail指针
        self.tail = 0  # 尾标志:标志为i表示第i个模式串串尾,默认为0
        self.children = {}  # 子节点,{value:TrieNode}


class Trie(object):
    def __init__(self, words):
    """
    初始化部分
    """
        self.root = TrieNode()  # 根节点
        self.count = 0  # 模式串个数
        self.words = words
        for word in words:
            self._add(word)
        self.ac_automation()

    def _add(self, sequence):
        """
        模型中加入字符串
        :param sequence: 字符串
        """
        self.count += 1
        cur_node = self.root
        for item in sequence:
            if item not in cur_node.children:
                child = TrieNode(value=item)  # 插入结点
                cur_node.children[item] = child
                cur_node = child
            else:
                cur_node = cur_node.children[item]
        cur_node.tail = self.count

    def ac_automation(self):
        """
        用 fail 指针搭建失败的路径
        """
        queue = [self.root]
        while len(queue):  # BFS遍历字典树
            temp_node = queue[0]
            queue.remove(temp_node)  # 取出队首元素
            for value in temp_node.children.values():
                if temp_node == self.root:
                    value.fail = self.root  # 根的子结点fail指向根自己
                else:
                    p = temp_node.fail  # 转到 fail 指针
                    while p:
                        if value.value in p.children:
                            value.fail = p.children[value.value]  # 若结点值在该结点的子结点中,则将fail指向该结点的对应子结点
                            break
                        # 转到fail指针继续回溯
                        p = p.fail
                    # 若为None,表示当前结点值在之前都没出现过,则其fail指向根结点
                    if not p:
                        value.fail = self.root
                # 将当前结点的所有子结点加到队列中
                queue.append(value)

    def search(self, text):
        """
        模式匹配
        :param text: 长文本
        """
        p = self.root
        # 记录匹配起始位置下标
        start_index = 0
        # 成功匹配结果集
        rst = defaultdict(list)
        for i in range(len(text)):
            single_char = text[i]
            while single_char not in p.children and p is not self.root:
                p = p.fail
            # 有一点瑕疵,原因在于匹配子串的时候,若字符串中部分字符由两个匹配词组成,此时后一个词的前缀下标不会更新
            # 这是由于KMP算法本身导致的,目前与下文循环寻找所有匹配词存在冲突
            # 但是问题不大,因为其标记的位置均为匹配成功的字符
            if single_char in p.children and p is self.root:
                start_index = i
            # 若找到匹配成功的字符结点,则指向那个结点,否则指向根结点
            if single_char in p.children:
                p = p.children[single_char]
            else:
                start_index = i
                p = self.root
            temp = p
            while temp is not self.root:
                # 尾标志为0不处理,但是tail需要-1从而与敏感词字典下标一致
                # 循环原因在于,有些词本身只是另一个词的后缀,也需要辨识出来
                if temp.tail:
                    rst[self.words[temp.tail - 1]].append((start_index, i))
                temp = temp.fail
        return rst


if __name__ == "__main__":
    test_words = ["你好", "我"]
    test_text = "你好我好大家好,我是 Zee。"
    model = Trie(test_words)
    print(str(model.search(test_text)))