用代码和示例详解 Kahn 算法的拓扑排序

算法
用代码和示例详解 Kahn 算法的拓扑排序
Shot by Tuur Tisseghem

通过超人穿衣顺序的趣味类比、交互式可视化、逐步 C++ 实现,以及基于逐层 BFS 的并行执行扩展,学习 Kahn 算法的拓扑排序。

开场

欢迎来到裤子最后穿星球!在这个星球上,你必须把裤子放在所有其他衣物之后再穿。我们不在乎你是不是把袜子穿在鞋子外面,还是像憨豆先生那样换内裤。只要确保穿裤子是你准备出门过程中的最后一步就行。

我们的任务是开发一款 App,教那些不守规矩的人——比如超人 🦸‍♂️,他通常把内裤穿在裤子外面——正确的穿衣顺序,这样他们就能按规矩来了。

首先,让我们来盘点一下经典超人日常穿搭的装备清单:

  • 🔵 一件蓝色紧身上衣
  • 👖 一条蓝色紧身裤
  • 🧣 一件红色披风
  • 🦸‍♂️ 胸前的”S”盾牌标志
  • 🩲 红色三角裤穿在裤子里面
  • 👢 红色靴子
  • 🟨 一条黄色腰带

我们可以按照人类穿衣的一般顺序来定义这些装备之间的关系。

比如说,他只有在上半身的紧身衣已经穿好的前提下,才能披上红色披风。

我们用这种方式来表示这个依赖关系:

python
      ('紧身上衣', '红色披风')
    

你可以把这个简洁的符号表达式理解为以下这段完全直白、绝对不会过于复杂的指令:

“如果你想穿上你的红色披风,并让它正确且稳妥地搭在你的肩膀上——按照人们通常期望披风搭在肩上的方式——那么在你伸手去拿它之前,在你把它拎起来之前,在你把它甩到背后或尝试把它整齐地围在脖子上之前,你需要先把一件衣服从头套上,把一只胳膊伸进一个袖子、另一只胳膊伸进另一个袖子,把面料拉到胸前和背后,沿身体两侧抚平,理顺所有褶皱,检查它在两肩是否平整,并确认它没有扭曲、堆叠或只穿了一半——换句话说,你需要先穿好你的蓝色紧身上衣。”

非常简单。

我们可以不断重复这个过程,直到超人通过了在线媒体平台审核机器人的检测,同时 DC Comics 也拿到了授权费——简单来说,就是穿得足够多以免被审核,但又足够像超人以至于可能会惹上版权麻烦。

超级英雄穿衣算法

我们在很早之前就介绍了这种语法,表示你先穿上 紧身上衣,然后才能穿 红色披风

python
      ('紧身上衣', '红色披风')
    

这很明显——因为如果你不按这个顺序来,你就会卡住。所以一定存在某件衣服是我们可以毫无顾虑直接穿上的。我们把它当作我们的起点——穿衣顺序的出发点。

因此,第一步是找出所有一开始就不依赖任何东西的衣物:

python
      ('紧身上衣', '红色披风')
('红色三角裤', '紧身裤')
('紧身上衣', '盾牌标志')
('红色靴子', '紧身裤')
    

从上面的列表可以看到,紧身上衣红色三角裤红色靴子 可以不用多想就直接穿——它们不依赖任何其他东西。 让我们建一个待穿清单:

shell
       紧身上衣
 红色三角裤
 红色靴子
    

你想从哪个开始都行,但按照惯例,我们按自然顺序来。所以先从 紧身上衣 开始,看看接下来会怎样。

打个勾——我们已经穿上了 紧身上衣

shell
       紧身上衣
 红色三角裤
 红色靴子
    

现在我们有更多选择了。因为有些衣物依赖于 紧身上衣,比如 红色披风盾牌标志,它们现在可以穿了。 让我们把它们加到选择列表的末尾:

shell
       紧身上衣
 红色三角裤
 红色靴子
 红色披风
 盾牌标志
    

我们重复这个过程。接下来,我们要穿 红色三角裤,打个勾:

shell
       紧身上衣
 红色三角裤
 红色靴子
 红色披风
 盾牌标志
    

现在检查是否有新的选项可以加入列表。因为 紧身裤 依赖于 红色三角裤,所以它们现在可用了。 把它们加到列表末尾:

shell
       紧身上衣
 红色三角裤
 红色靴子
 红色披风
 盾牌标志
 紧身裤
    

不断重复这个过程,直到列表无法再扩展且所有衣物都已穿上。然后我们就得到了一个线性顺序,告诉用户应该如何穿上每一件。

以下是逐步的伪代码:

  1. 找到用户可以立即穿上的衣物。
  2. 将它们加入待办列表。
  3. 穿上一件,然后检查是否有其他依赖于它的衣物现在可以直接穿上(即没有剩余依赖)。如果有,就加入待办列表。
  4. 不断取出和追加,直到列表为空。

现在,让我们看看如何将其转化为编程语言。

从想法到实现:Kahn 算法详解

让我们用以下有向边来演示一个示例:

python
      ('A', 'B')
('A', 'C')
('B', 'D')
('C', 'E')
('D', 'F')
('E', 'F')
('F', 'G')
('H', 'I')
('I', 'G')
    

在上述数据结构中,每个元素——括号内的一组——表示一条有向边。括号内的第一项是起始节点,第二项是结束节点。

计算每个节点的入度

我们要找出哪些节点可以在没有依赖的情况下被处理。为此,我们遍历列表并跟踪计数。

以第一个元素 A -> B 为例。我们在依赖追踪器中增加 B 的计数,意味着在 B 可以运行之前,还需要再解决一个节点。

拖动下面的滑块来跟进。图的上半部分显示了我们正在处理的有向边——从第一条黄色高亮的边开始。下半部分是依赖追踪器,随着计数增加而更新。

0
step

最终,任何计数为 0 的节点都可以自由执行。继续滑动查看完整的追踪过程。

顺便一提,在图论中,这个计数器叫做入度,它表示一个节点有多少条入边——也就是在处理该节点之前必须解决多少个依赖。

1
step

初始化队列

毫无疑问,我们找出所有依赖数为 0 的节点并将它们加入队列。

下面的布局做了些更新以展示更多过程。顶部现在显示每个节点的入度(从底部移上来了),而底部显示最终结果——节点的执行顺序,在我们的例子中就是穿衣顺序。

处理队列中的节点

现在我们把这个和超级英雄穿衣的例子联系起来。在每一步中,我们选取一个没有依赖的节点,将其加入结果列表,并更新所有依赖于它的节点的计数器。完成这些更改后,我们对下一个可用节点重复同样的过程。

听起来可能有点多,让我来带你过一遍:

在下面的第一步中,我们关注入度列表中第一个没有依赖的节点(黄色高亮)。向前滑动——你将节点 A 加入结果列表,因为它可以在没有更多依赖的情况下执行。

0
step

接下来,我们反映依赖关系的变化。我们找出哪些节点受到了影响。因为我们这里并没有严格按照机器的逐步操作来走,所以我们可以快速目测结果——你会在边列表中看到受影响的边以黄色高亮显示。

再多一步:我们将受影响节点的计数器减 1,以表示依赖关系已更新(如入度映射中所高亮的那样)。

说清楚一点,这是你脑海中应该有的画面:我们现在有了更多 0 依赖的节点。从这里开始,重复你刚才学到的步骤——这就是完整过程的展开。

在每次滑动之前,试着预测接下来会发生什么。想想哪个节点会被选中,依赖关系会如何变化,更新后的结果会是什么样。

4
step

Kahn 算法的 C++ 实现

“少一点魔法,多一点 C++。“终于到了实现部分。

这是我们之前用的测试输入:

python
      ('A', 'B')
('A', 'C')
('B', 'D')
('C', 'E')
('D', 'F')
('E', 'F')
('F', 'G')
('H', 'I')
('I', 'G')
    

现在让我们把每条有向边变成一”对”——在 C++ 里,这个”对”就是 std::pair,然后把它们全部放进一个 std::vector 中。

cpp
      #include <utility>
#include <vector>

int main() {
  // 定义有向无环图(DAG)的有向边
  std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
                                           {'C', 'E'}, {'D', 'F'}, {'E', 'F'},
                                           {'F', 'G'}, {'H', 'I'}, {'I', 'G'}};
}
    

你还记得,我们需要一个依赖计数器。让我们为此创建一个 std::unordered_map

cpp
      #include <cstddef>
#include <unordered_map>
#include <utility>
#include <vector>

int main() {
  // 定义有向无环图(DAG)的有向边
  std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
                                           {'C', 'E'}, {'D', 'F'}, {'E', 'F'},
                                           {'F', 'G'}, {'H', 'I'}, {'I', 'G'}};

  // 入度映射:节点 -> 入边数量
  std::unordered_map<char, std::size_t> in_degree;
}
    

接下来,我们需要找出当某个节点的依赖发生变化时,应该通知哪些节点。与其每次依赖更新时都扫描所有节点,不如把这个信息存在一个邻居映射中。这样我们就能精确知道该通知谁。

cpp
      #include <cstddef>
#include <set>
#include <unordered_map>
#include <utility>
#include <vector>

int main() {
  // 定义有向无环图(DAG)的有向边
  std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
                                           {'C', 'E'}, {'D', 'F'}, {'E', 'F'},
                                           {'F', 'G'}, {'H', 'I'}, {'I', 'G'}};

  // 入度映射:节点 -> 入边数量
  std::unordered_map<char, std::size_t> in_degree;
  // 邻接表:节点 -> 邻居集合
  std::unordered_map<char, std::set<char>> neighbors;
}
    

值得注意的是,我们使用 std::set 来存储邻居节点,以避免重复项——以防有人不小心多次加入相同的边。

现在该填入我们之前讨论的所有信息了。

对于每条边 u -> v,我们增加 v 的入度,因为 v 依赖于 u。同时我们记录 vu 的邻居,这样一旦 u 被推入结果列表,我们就可以遍历 u 的邻居并减少它们对应的入度计数器。

我们只添加了几行代码,它们做的正是这些——仅此而已。所以别害怕。慢慢看一遍。;)

cpp
      #include ...

int main() {
  ...
  // 入度映射:节点 -> 入边数量
  std::unordered_map<char, std::size_t> in_degree;
  // 邻接表:节点 -> 邻居集合
  std::unordered_map<char, std::set<char>> neighbors;

  // 第 1 步:构建图
  for (const auto &edge : edges) {
    char u = edge.first;
    char v = edge.second;
    // 将 v 加入 u 的邻接表
    neighbors[u].insert(v);
    // 增加 v 的入度
    in_degree[v]++;
  }
}
    

有个小细节需要确认:每个节点都需要出现在 in_degree 映射中——即使那些没有入边的节点——这样在排序过程中我们就不会遗漏任何节点。

例如,假设我们有以下输入:

python
      # u -> v
('A', 'B')
('A', 'C')
    

如果我们只将 v 加入 in_degree 映射,最终只会有两个条目:BC。这意味着 A 丢失了。这看起来微不足道,但很重要:A 实际上是我们算法的起点——一个 0 依赖的节点。

所以我们在循环中多加一个分支:

cpp
      #include ...

int main() {
  ...
  // 入度映射:节点 -> 入边数量
  std::unordered_map<char, std::size_t> in_degree;
  // 邻接表:节点 -> 邻居集合
  std::unordered_map<char, std::set<char>> neighbors;

  // 第 1 步:构建图
  for (const auto &edge : edges) {
    char u = edge.first;
    char v = edge.second;
    // 将 v 加入 u 的邻接表
    neighbors[u].insert(v);
    // 增加 v 的入度
    in_degree[v]++;
    // 确保 u 也在入度映射中
    if (in_degree.find(u) == in_degree.end()) {
      in_degree[u] = 0;
    }
  }
}
    

第 1 步完成——接下来是第 2 步。找出所有入度为 0 的节点,意味着它们已准备好被执行。我们将它们存入一个待办列表——在这里用队列实现:

cpp
      #include ...
#include <queue>

int main(){
  ...
  // 入度映射:节点 -> 入边数量
  std::unordered_map<char, std::size_t> in_degree;

  // 第 1 步:构建图
  ...

  // 第 2 步:用所有入度为 0 的节点初始化队列
  std::queue<char> queue;
  for (const auto &[node, degree] : in_degree) {
    if (degree == 0) {
      queue.emplace(node);
    }
  }
}
    

只要队列不为空,我们就取出一个节点——就像从待办清单中选一项——然后把它加入结果:

cpp
      #include ...

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...

  // 第 3 步:执行 Kahn 算法进行拓扑排序
  std::vector<char> result;
  while (!queue.empty()) {
    char node = queue.front();
    queue.pop();
    result.push_back(node);
  }
}
    

然后我们通过减少对应节点的计数器来通知它们:

cpp
      #include ...

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...

  // 第 3 步:执行 Kahn 算法进行拓扑排序
  std::vector<char> result;
  while (!queue.empty()) {
    char node = queue.front();
    queue.pop();
    result.push_back(node);
    // 对每个邻居,减少其入度
    for (char neighbor : neighbors[node]) {
      --in_degree[neighbor];
    }
  }
}
    

一旦某个计数器达到 0,我们就把该节点重新加入待办列表以进行后续处理。从这里开始,算法就一直运行下去了:

cpp
      #include ...

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...

  // 第 3 步:执行 Kahn 算法进行拓扑排序
  std::vector<char> result;
  while (!queue.empty()) {
    char node = queue.front();
    queue.pop();
    result.push_back(node);
    // 对每个邻居,减少其入度
    for (char neighbor : neighbors[node]) {
      --in_degree[neighbor];
      if (--in_degree[neighbor] == 0) {
        // 如果入度变为 0,加入队列
        queue.push(neighbor);
      }
    }
  }
}
    

最后,我们打印结果。

cpp
      #include ...
#include <iostream>

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...
  // 第 3 步:执行 Kahn 算法进行拓扑排序
  ...

  // 第 4 步:打印拓扑排序结果
  std::cout << "Topological Sort Order:" << std::endl;
  for (std::size_t i = 0; i < result.size(); ++i) {
    std::cout << result[i];
    if (i < result.size() - 1) {
      std::cout << " -> ";
    }
  }
  std::cout << std::endl;
  return 0;
}
    

别忘了在最后 return 0 哦。:)

python
      Topological Sort Order:
H -> A -> I -> B -> C -> D -> E -> F -> G
    

以下是完整代码,把所有内容整合在一起:

cpp
      #include <cstddef>
#include <iostream>
#include <queue>
#include <set>
#include <unordered_map>
#include <utility>
#include <vector>

int main() {
  // 定义有向无环图(DAG)的有向边
  std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
                                           {'C', 'E'}, {'D', 'F'}, {'E', 'F'},
                                           {'F', 'G'}, {'H', 'I'}, {'I', 'G'}};

  // 入度映射:节点 -> 入边数量
  std::unordered_map<char, std::size_t> in_degree;

  // 邻接表:节点 -> 邻居集合
  std::unordered_map<char, std::set<char>> neighbors;

  // 第 1 步:构建图
  for (const auto &edge : edges) {
    char u = edge.first;
    char v = edge.second;

    // 将 v 加入 u 的邻接表
    neighbors[u].insert(v);

    // 增加 v 的入度
    in_degree[v]++;

    // 确保 u 也在入度映射中
    if (in_degree.find(u) == in_degree.end()) {
      in_degree[u] = 0;
    }
  }

  // 第 2 步:用所有入度为 0 的节点初始化队列
  std::queue<char> queue;
  for (const auto &[node, degree] : in_degree) {
    if (degree == 0) {
      queue.emplace(node);
    }
  }

  // 第 3 步:执行 Kahn 算法进行拓扑排序
  std::vector<char> result;
  while (!queue.empty()) {
    char node = queue.front();
    queue.pop();
    result.push_back(node);

    // 对每个邻居,减少其入度
    for (char neighbor : neighbors[node]) {
      if (--in_degree[neighbor] == 0) {
        // 如果入度变为 0,加入队列
        queue.push(neighbor);
      }
    }
  }

  // 第 4 步:打印拓扑排序结果
  std::cout << "Topological Sort Order:" << std::endl;
  for (std::size_t i = 0; i < result.size(); ++i) {
    std::cout << result[i];
    if (i < result.size() - 1) {
      std::cout << " -> ";
    }
  }
  std::cout << std::endl;

  return 0;
}
    

使用 Kahn 算法的并行拓扑排序

现在,如果你的节点可以安全地并行执行(例如,不同于某些像 OpenGL 这样的有状态 API,其中某些调用并非线程安全),我们可以稍微调整第 3 步。我们不再生成单一的扁平结果,而是将节点分组成”层”。

如你所见,我们不再使用一个 vector 来存结果,而是使用一个 vectorvector,叫做 layers

cpp
      #include ...

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...

  // 第 3 步:逐层拓扑排序(Kahn 算法)
  std::vector<char> result;
  std::vector<std::vector<char>> layers;
  while (!queue.empty()) {
  }

  // 第 4 步:打印拓扑排序结果
  ...

  return 0;
}
    

我们不再每次从待办列表中取一项,而是批量处理——因为它们都有 0 个依赖,可以同时执行。下一批将只包含在本次迭代中被加入待办列表的节点。

听起来有点复杂?让我们分解一下。

我们先创建一个叫 layer 的结果列表,然后遍历当前队列中的所有节点:

cpp
      #include ...

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...

  // 第 3 步:逐层拓扑排序(Kahn 算法)
  std::vector<std::vector<char>> layers;
  while (!queue.empty()) {
    std::vector<char> layer;
    std::size_t size = queue.size();
    // 处理当前层的所有节点
    for (std::size_t i = 0; i < size; ++i) {
    }
  }

  // 第 4 步:打印拓扑排序结果
  ...

  return 0;
}
    

size 变量告诉我们当前队列中有多少节点。别忘了——在这个循环中可能会有新节点被加入队列。这正是我们先保存这个数字的原因:我们只想处理在本次迭代开始时已经在队列中的节点。

在循环内部,我们做的事情和之前一样——只是这次我们把当前批次收集到 layer 中。

例如,第一层可能看起来像这样:

python
      Layer 0: H A
    

这意味着这些节点完全没有依赖——它们是合法的起点。因为它们共享相同的属性,我们可以在同一批次中并行执行它们。

接下来,我们找出哪些节点在下一批中变得可用,并将它们加入待办列表:

cpp
      #include ...

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...

  // 第 3 步:逐层拓扑排序(Kahn 算法)
  std::vector<char> result;
  std::vector<std::vector<char>> layers;
  while (!queue.empty()) {
    std::size_t size = queue.size();
    std::vector<char> layer;
    // 处理当前层的所有节点
    for (std::size_t i = 0; i < size; ++i) {
      char node = queue.front();
      queue.pop();
      layer.push_back(node);
      for (char neighbor : neighbors[node]) {
        if (--in_degree[neighbor] == 0) {
          queue.push(neighbor);
        }
      }
    }
    // 存储当前层
    layers.push_back(layer);
  }

  // 第 4 步:打印拓扑排序结果
  ...

  return 0;
}
    

然后在第 4 步中,我们更新输出以逐层打印:

cpp
      #include ...

int main(){
  ...
  // 第 1 步:构建图
  ...
  // 第 2 步:用所有入度为 0 的节点初始化队列
  ...

  // 第 3 步:逐层拓扑排序(Kahn 算法)
  std::vector<std::vector<char>> layers;
  ...

  // 第 4 步:打印拓扑排序结果
  for (std::size_t i = 0; i < layers.size(); ++i) {
    std::cout << "Layer " << i << ": ";
    for (char node : layers[i]) {
      std::cout << node << " ";
    }
    std::cout << std::endl;
  }

  return 0;
}
    

输出可能看起来像这样:

python
      Layer 0: H A
Layer 1: I B C
Layer 2: D E
Layer 3: F
Layer 4: G
    

你可以理解为:我们将一层中的所有节点并行执行,然后再进入下一层。

与之前的结果相比:

python
      Topological Sort Order:
H -> A -> I -> B -> C -> D -> E -> F -> G
    

我们有效地将一个 9 步的执行压缩为 5 个批次,在可以并行的情况下,这可以显著提升性能。

希望这篇详解能让你清楚地了解 Kahn 算法的工作原理——以及如何将其扩展用于并行执行。试着修改边或者玩玩这个例子,看看层级会如何变化。