通过超人穿衣顺序的趣味类比、交互式可视化、逐步 C++ 实现,以及基于逐层 BFS 的并行执行扩展,学习 Kahn 算法的拓扑排序。
开场
欢迎来到裤子最后穿星球!在这个星球上,你必须把裤子放在所有其他衣物之后再穿。我们不在乎你是不是把袜子穿在鞋子外面,还是像憨豆先生那样换内裤。只要确保穿裤子是你准备出门过程中的最后一步就行。
我们的任务是开发一款 App,教那些不守规矩的人——比如超人 🦸♂️,他通常把内裤穿在裤子外面——正确的穿衣顺序,这样他们就能按规矩来了。
首先,让我们来盘点一下经典超人日常穿搭的装备清单:
- 🔵 一件蓝色紧身上衣
- 👖 一条蓝色紧身裤
- 🧣 一件红色披风
- 🦸♂️ 胸前的”S”盾牌标志
- 🩲 红色三角裤穿在裤子里面
- 👢 红色靴子
- 🟨 一条黄色腰带
我们可以按照人类穿衣的一般顺序来定义这些装备之间的关系。
比如说,他只有在上半身的紧身衣已经穿好的前提下,才能披上红色披风。
我们用这种方式来表示这个依赖关系:
('紧身上衣', '红色披风')
你可以把这个简洁的符号表达式理解为以下这段完全直白、绝对不会过于复杂的指令:
“如果你想穿上你的红色披风,并让它正确且稳妥地搭在你的肩膀上——按照人们通常期望披风搭在肩上的方式——那么在你伸手去拿它之前,在你把它拎起来之前,在你把它甩到背后或尝试把它整齐地围在脖子上之前,你需要先把一件衣服从头套上,把一只胳膊伸进一个袖子、另一只胳膊伸进另一个袖子,把面料拉到胸前和背后,沿身体两侧抚平,理顺所有褶皱,检查它在两肩是否平整,并确认它没有扭曲、堆叠或只穿了一半——换句话说,你需要先穿好你的蓝色紧身上衣。”
非常简单。
我们可以不断重复这个过程,直到超人通过了在线媒体平台审核机器人的检测,同时 DC Comics 也拿到了授权费——简单来说,就是穿得足够多以免被审核,但又足够像超人以至于可能会惹上版权麻烦。
超级英雄穿衣算法
我们在很早之前就介绍了这种语法,表示你先穿上 紧身上衣,然后才能穿 红色披风。
('紧身上衣', '红色披风')
这很明显——因为如果你不按这个顺序来,你就会卡住。所以一定存在某件衣服是我们可以毫无顾虑直接穿上的。我们把它当作我们的起点——穿衣顺序的出发点。
因此,第一步是找出所有一开始就不依赖任何东西的衣物:
('紧身上衣', '红色披风')
('红色三角裤', '紧身裤')
('紧身上衣', '盾牌标志')
('红色靴子', '紧身裤')
从上面的列表可以看到,紧身上衣、红色三角裤 和 红色靴子 可以不用多想就直接穿——它们不依赖任何其他东西。
让我们建一个待穿清单:
○ 紧身上衣
○ 红色三角裤
○ 红色靴子
你想从哪个开始都行,但按照惯例,我们按自然顺序来。所以先从 紧身上衣 开始,看看接下来会怎样。
打个勾——我们已经穿上了 紧身上衣:
✔ 紧身上衣
○ 红色三角裤
○ 红色靴子
现在我们有更多选择了。因为有些衣物依赖于 紧身上衣,比如 红色披风 和 盾牌标志,它们现在可以穿了。
让我们把它们加到选择列表的末尾:
✔ 紧身上衣
○ 红色三角裤
○ 红色靴子
○ 红色披风
○ 盾牌标志
我们重复这个过程。接下来,我们要穿 红色三角裤,打个勾:
✔ 紧身上衣
✔ 红色三角裤
○ 红色靴子
○ 红色披风
○ 盾牌标志
现在检查是否有新的选项可以加入列表。因为 紧身裤 依赖于 红色三角裤,所以它们现在可用了。
把它们加到列表末尾:
✔ 紧身上衣
✔ 红色三角裤
○ 红色靴子
○ 红色披风
○ 盾牌标志
○ 紧身裤
不断重复这个过程,直到列表无法再扩展且所有衣物都已穿上。然后我们就得到了一个线性顺序,告诉用户应该如何穿上每一件。
以下是逐步的伪代码:
- 找到用户可以立即穿上的衣物。
- 将它们加入待办列表。
- 穿上一件,然后检查是否有其他依赖于它的衣物现在可以直接穿上(即没有剩余依赖)。如果有,就加入待办列表。
- 不断取出和追加,直到列表为空。
现在,让我们看看如何将其转化为编程语言。
从想法到实现:Kahn 算法详解
让我们用以下有向边来演示一个示例:
('A', 'B')
('A', 'C')
('B', 'D')
('C', 'E')
('D', 'F')
('E', 'F')
('F', 'G')
('H', 'I')
('I', 'G')
在上述数据结构中,每个元素——括号内的一组——表示一条有向边。括号内的第一项是起始节点,第二项是结束节点。
计算每个节点的入度
我们要找出哪些节点可以在没有依赖的情况下被处理。为此,我们遍历列表并跟踪计数。
以第一个元素 A -> B 为例。我们在依赖追踪器中增加 B 的计数,意味着在 B 可以运行之前,还需要再解决一个节点。
拖动下面的滑块来跟进。图的上半部分显示了我们正在处理的有向边——从第一条黄色高亮的边开始。下半部分是依赖追踪器,随着计数增加而更新。
最终,任何计数为 0 的节点都可以自由执行。继续滑动查看完整的追踪过程。
顺便一提,在图论中,这个计数器叫做入度,它表示一个节点有多少条入边——也就是在处理该节点之前必须解决多少个依赖。
初始化队列
毫无疑问,我们找出所有依赖数为 0 的节点并将它们加入队列。
下面的布局做了些更新以展示更多过程。顶部现在显示每个节点的入度(从底部移上来了),而底部显示最终结果——节点的执行顺序,在我们的例子中就是穿衣顺序。
处理队列中的节点
现在我们把这个和超级英雄穿衣的例子联系起来。在每一步中,我们选取一个没有依赖的节点,将其加入结果列表,并更新所有依赖于它的节点的计数器。完成这些更改后,我们对下一个可用节点重复同样的过程。
听起来可能有点多,让我来带你过一遍:
在下面的第一步中,我们关注入度列表中第一个没有依赖的节点(黄色高亮)。向前滑动——你将节点 A 加入结果列表,因为它可以在没有更多依赖的情况下执行。
接下来,我们反映依赖关系的变化。我们找出哪些节点受到了影响。因为我们这里并没有严格按照机器的逐步操作来走,所以我们可以快速目测结果——你会在边列表中看到受影响的边以黄色高亮显示。
再多一步:我们将受影响节点的计数器减 1,以表示依赖关系已更新(如入度映射中所高亮的那样)。
说清楚一点,这是你脑海中应该有的画面:我们现在有了更多 0 依赖的节点。从这里开始,重复你刚才学到的步骤——这就是完整过程的展开。
在每次滑动之前,试着预测接下来会发生什么。想想哪个节点会被选中,依赖关系会如何变化,更新后的结果会是什么样。
Kahn 算法的 C++ 实现
“少一点魔法,多一点 C++。“终于到了实现部分。
这是我们之前用的测试输入:
('A', 'B')
('A', 'C')
('B', 'D')
('C', 'E')
('D', 'F')
('E', 'F')
('F', 'G')
('H', 'I')
('I', 'G')
现在让我们把每条有向边变成一”对”——在 C++ 里,这个”对”就是 std::pair,然后把它们全部放进一个 std::vector 中。
#include <utility>
#include <vector>
int main() {
// 定义有向无环图(DAG)的有向边
std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
{'C', 'E'}, {'D', 'F'}, {'E', 'F'},
{'F', 'G'}, {'H', 'I'}, {'I', 'G'}};
}
你还记得,我们需要一个依赖计数器。让我们为此创建一个 std::unordered_map:
#include <cstddef>
#include <unordered_map>
#include <utility>
#include <vector>
int main() {
// 定义有向无环图(DAG)的有向边
std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
{'C', 'E'}, {'D', 'F'}, {'E', 'F'},
{'F', 'G'}, {'H', 'I'}, {'I', 'G'}};
// 入度映射:节点 -> 入边数量
std::unordered_map<char, std::size_t> in_degree;
}
接下来,我们需要找出当某个节点的依赖发生变化时,应该通知哪些节点。与其每次依赖更新时都扫描所有节点,不如把这个信息存在一个邻居映射中。这样我们就能精确知道该通知谁。
#include <cstddef>
#include <set>
#include <unordered_map>
#include <utility>
#include <vector>
int main() {
// 定义有向无环图(DAG)的有向边
std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
{'C', 'E'}, {'D', 'F'}, {'E', 'F'},
{'F', 'G'}, {'H', 'I'}, {'I', 'G'}};
// 入度映射:节点 -> 入边数量
std::unordered_map<char, std::size_t> in_degree;
// 邻接表:节点 -> 邻居集合
std::unordered_map<char, std::set<char>> neighbors;
}
值得注意的是,我们使用 std::set 来存储邻居节点,以避免重复项——以防有人不小心多次加入相同的边。
现在该填入我们之前讨论的所有信息了。
对于每条边 u -> v,我们增加 v 的入度,因为 v 依赖于 u。同时我们记录 v 是 u 的邻居,这样一旦 u 被推入结果列表,我们就可以遍历 u 的邻居并减少它们对应的入度计数器。
我们只添加了几行代码,它们做的正是这些——仅此而已。所以别害怕。慢慢看一遍。;)
#include ...
int main() {
...
// 入度映射:节点 -> 入边数量
std::unordered_map<char, std::size_t> in_degree;
// 邻接表:节点 -> 邻居集合
std::unordered_map<char, std::set<char>> neighbors;
// 第 1 步:构建图
for (const auto &edge : edges) {
char u = edge.first;
char v = edge.second;
// 将 v 加入 u 的邻接表
neighbors[u].insert(v);
// 增加 v 的入度
in_degree[v]++;
}
}
有个小细节需要确认:每个节点都需要出现在 in_degree 映射中——即使那些没有入边的节点——这样在排序过程中我们就不会遗漏任何节点。
例如,假设我们有以下输入:
# u -> v
('A', 'B')
('A', 'C')
如果我们只将 v 加入 in_degree 映射,最终只会有两个条目:B 和 C。这意味着 A 丢失了。这看起来微不足道,但很重要:A 实际上是我们算法的起点——一个 0 依赖的节点。
所以我们在循环中多加一个分支:
#include ...
int main() {
...
// 入度映射:节点 -> 入边数量
std::unordered_map<char, std::size_t> in_degree;
// 邻接表:节点 -> 邻居集合
std::unordered_map<char, std::set<char>> neighbors;
// 第 1 步:构建图
for (const auto &edge : edges) {
char u = edge.first;
char v = edge.second;
// 将 v 加入 u 的邻接表
neighbors[u].insert(v);
// 增加 v 的入度
in_degree[v]++;
// 确保 u 也在入度映射中
if (in_degree.find(u) == in_degree.end()) {
in_degree[u] = 0;
}
}
}
第 1 步完成——接下来是第 2 步。找出所有入度为 0 的节点,意味着它们已准备好被执行。我们将它们存入一个待办列表——在这里用队列实现:
#include ...
#include <queue>
int main(){
...
// 入度映射:节点 -> 入边数量
std::unordered_map<char, std::size_t> in_degree;
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
std::queue<char> queue;
for (const auto &[node, degree] : in_degree) {
if (degree == 0) {
queue.emplace(node);
}
}
}
只要队列不为空,我们就取出一个节点——就像从待办清单中选一项——然后把它加入结果:
#include ...
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:执行 Kahn 算法进行拓扑排序
std::vector<char> result;
while (!queue.empty()) {
char node = queue.front();
queue.pop();
result.push_back(node);
}
}
然后我们通过减少对应节点的计数器来通知它们:
#include ...
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:执行 Kahn 算法进行拓扑排序
std::vector<char> result;
while (!queue.empty()) {
char node = queue.front();
queue.pop();
result.push_back(node);
// 对每个邻居,减少其入度
for (char neighbor : neighbors[node]) {
--in_degree[neighbor];
}
}
}
一旦某个计数器达到 0,我们就把该节点重新加入待办列表以进行后续处理。从这里开始,算法就一直运行下去了:
#include ...
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:执行 Kahn 算法进行拓扑排序
std::vector<char> result;
while (!queue.empty()) {
char node = queue.front();
queue.pop();
result.push_back(node);
// 对每个邻居,减少其入度
for (char neighbor : neighbors[node]) {
--in_degree[neighbor];
if (--in_degree[neighbor] == 0) {
// 如果入度变为 0,加入队列
queue.push(neighbor);
}
}
}
}
最后,我们打印结果。
#include ...
#include <iostream>
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:执行 Kahn 算法进行拓扑排序
...
// 第 4 步:打印拓扑排序结果
std::cout << "Topological Sort Order:" << std::endl;
for (std::size_t i = 0; i < result.size(); ++i) {
std::cout << result[i];
if (i < result.size() - 1) {
std::cout << " -> ";
}
}
std::cout << std::endl;
return 0;
}
别忘了在最后 return 0 哦。:)
Topological Sort Order:
H -> A -> I -> B -> C -> D -> E -> F -> G
以下是完整代码,把所有内容整合在一起:
#include <cstddef>
#include <iostream>
#include <queue>
#include <set>
#include <unordered_map>
#include <utility>
#include <vector>
int main() {
// 定义有向无环图(DAG)的有向边
std::vector<std::pair<char, char>> edges{{'A', 'B'}, {'A', 'C'}, {'B', 'D'},
{'C', 'E'}, {'D', 'F'}, {'E', 'F'},
{'F', 'G'}, {'H', 'I'}, {'I', 'G'}};
// 入度映射:节点 -> 入边数量
std::unordered_map<char, std::size_t> in_degree;
// 邻接表:节点 -> 邻居集合
std::unordered_map<char, std::set<char>> neighbors;
// 第 1 步:构建图
for (const auto &edge : edges) {
char u = edge.first;
char v = edge.second;
// 将 v 加入 u 的邻接表
neighbors[u].insert(v);
// 增加 v 的入度
in_degree[v]++;
// 确保 u 也在入度映射中
if (in_degree.find(u) == in_degree.end()) {
in_degree[u] = 0;
}
}
// 第 2 步:用所有入度为 0 的节点初始化队列
std::queue<char> queue;
for (const auto &[node, degree] : in_degree) {
if (degree == 0) {
queue.emplace(node);
}
}
// 第 3 步:执行 Kahn 算法进行拓扑排序
std::vector<char> result;
while (!queue.empty()) {
char node = queue.front();
queue.pop();
result.push_back(node);
// 对每个邻居,减少其入度
for (char neighbor : neighbors[node]) {
if (--in_degree[neighbor] == 0) {
// 如果入度变为 0,加入队列
queue.push(neighbor);
}
}
}
// 第 4 步:打印拓扑排序结果
std::cout << "Topological Sort Order:" << std::endl;
for (std::size_t i = 0; i < result.size(); ++i) {
std::cout << result[i];
if (i < result.size() - 1) {
std::cout << " -> ";
}
}
std::cout << std::endl;
return 0;
}
使用 Kahn 算法的并行拓扑排序
现在,如果你的节点可以安全地并行执行(例如,不同于某些像 OpenGL 这样的有状态 API,其中某些调用并非线程安全),我们可以稍微调整第 3 步。我们不再生成单一的扁平结果,而是将节点分组成”层”。
如你所见,我们不再使用一个 vector 来存结果,而是使用一个 vector 的 vector,叫做 layers。
#include ...
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:逐层拓扑排序(Kahn 算法)
std::vector<char> result;
std::vector<std::vector<char>> layers;
while (!queue.empty()) {
}
// 第 4 步:打印拓扑排序结果
...
return 0;
}
我们不再每次从待办列表中取一项,而是批量处理——因为它们都有 0 个依赖,可以同时执行。下一批将只包含在本次迭代中被加入待办列表的节点。
听起来有点复杂?让我们分解一下。
我们先创建一个叫 layer 的结果列表,然后遍历当前队列中的所有节点:
#include ...
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:逐层拓扑排序(Kahn 算法)
std::vector<std::vector<char>> layers;
while (!queue.empty()) {
std::vector<char> layer;
std::size_t size = queue.size();
// 处理当前层的所有节点
for (std::size_t i = 0; i < size; ++i) {
}
}
// 第 4 步:打印拓扑排序结果
...
return 0;
}
size 变量告诉我们当前队列中有多少节点。别忘了——在这个循环中可能会有新节点被加入队列。这正是我们先保存这个数字的原因:我们只想处理在本次迭代开始时已经在队列中的节点。
在循环内部,我们做的事情和之前一样——只是这次我们把当前批次收集到 layer 中。
例如,第一层可能看起来像这样:
Layer 0: H A
这意味着这些节点完全没有依赖——它们是合法的起点。因为它们共享相同的属性,我们可以在同一批次中并行执行它们。
接下来,我们找出哪些节点在下一批中变得可用,并将它们加入待办列表:
#include ...
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:逐层拓扑排序(Kahn 算法)
std::vector<char> result;
std::vector<std::vector<char>> layers;
while (!queue.empty()) {
std::size_t size = queue.size();
std::vector<char> layer;
// 处理当前层的所有节点
for (std::size_t i = 0; i < size; ++i) {
char node = queue.front();
queue.pop();
layer.push_back(node);
for (char neighbor : neighbors[node]) {
if (--in_degree[neighbor] == 0) {
queue.push(neighbor);
}
}
}
// 存储当前层
layers.push_back(layer);
}
// 第 4 步:打印拓扑排序结果
...
return 0;
}
然后在第 4 步中,我们更新输出以逐层打印:
#include ...
int main(){
...
// 第 1 步:构建图
...
// 第 2 步:用所有入度为 0 的节点初始化队列
...
// 第 3 步:逐层拓扑排序(Kahn 算法)
std::vector<std::vector<char>> layers;
...
// 第 4 步:打印拓扑排序结果
for (std::size_t i = 0; i < layers.size(); ++i) {
std::cout << "Layer " << i << ": ";
for (char node : layers[i]) {
std::cout << node << " ";
}
std::cout << std::endl;
}
return 0;
}
输出可能看起来像这样:
Layer 0: H A
Layer 1: I B C
Layer 2: D E
Layer 3: F
Layer 4: G
你可以理解为:我们将一层中的所有节点并行执行,然后再进入下一层。
与之前的结果相比:
Topological Sort Order:
H -> A -> I -> B -> C -> D -> E -> F -> G
我们有效地将一个 9 步的执行压缩为 5 个批次,在可以并行的情况下,这可以显著提升性能。
希望这篇详解能让你清楚地了解 Kahn 算法的工作原理——以及如何将其扩展用于并行执行。试着修改边或者玩玩这个例子,看看层级会如何变化。