算法4 并查集

爱因斯坦说过:”如果你不能给一个六岁小孩解释清楚,那你并不真的懂。“,所以我准备写一系列关于算法的文章。

我是通过《算法 第4版》这本书和它的配套网站Coursera课程学习的,当时的学习过程实在有点曲折,首先是由于翻译的原因书中的许多描述非常啰嗦,另外就是Coursera课程的中文字幕是机器翻译的,还不如硬啃英文。但是本着“我真的懂”的信念,我想尽量让你们不看这些书和课程,更加轻松的学习到《算法 第4版》中的相关知识。

配套网站中的资源还是非常有用的,如果我重新构造这些东西将耗费大量的时间,并且没有合适的位置存放这些资源,所以接下来的文章我都准备沿用配套网站中的资源;

  1. 安装编程环境:WindowsMacLinux
  2. 资源文件(主要是测试数据):打包下载按需

《算法 第4版》这本书中,我们接触到的第一个算法是Union-Find,也称为并查集,用于解决动态连通性问题。以这个算法作为开篇是有原因的,该算法本身非常简单,拥有多个改进算法,可以渐进式的教学。另外就是该算法有足够的趣味性,能解决实际问题,也为我们了解开发一个算法的基本步骤:

  1. 对问题进行建模;
  2. 尝试找到一个算法解决它;
  3. 算法是否足够快,是否符合内存要求;
  4. 如果不是,找出问题所在;
  5. 重复以上步骤直到结果令人满意。

动态连通性问题

假设存在这样一个模型,可以录一对对象 a-b,表示a和b是相连的;后续可以继续录入更多对对象,c-db-c……

而这个模型的作用是在任意时刻可以查询任意两个对象是否连通,而“动态”的意思则是原本非连通的两个对象,可能在某个时刻后连通,例如上面a和d原本是没有连通的,但是在录入b-c后它们就连通了。

动态连通性问题存在许多应用:

  • 计算机网络中,某台计算机是否需要架设线一条新的线路才能和另一台计算机连接;
  • 社交网络中,两个人是否存某种联系;
  • 电子芯片中,两个触点是否可以连通;

以上应用都可能是超大规模的问题,如何快速的查询两个对象是否连通或者连通两个对象?

大规模问题

步骤1:建模

数学模型

动态连通性问题很容易让人联想到集合论中的等价关系,等价关系是非空集合上的一个二元关系,主要用于划分集合,将一个较大的集合划分成多个子集,而每个子集称为”等价类“,选取每类的代表元素来降低问题的复杂度。同时满足自反性、对称性、传递性的二元关系才能被称为等价关系。在这里,我们有必要证明动态连通性具有等价关系,以表示我们的建模是符合数学规律的。

假设p和q相连中的“相连”是一个二元关系:

  • 自反性:p和p是相连的;
  • 对称性:p和q相连的,那么q和p也是相连的;
  • 传递性:p和q相连,q和r相连,那么p和r也是相连的。

现在我们可以将集合中对象划分成多个等价类,查询两个对象是否相连即查询他们是否属于同一个等价类,连通两个对象即合并两个等价类。

这也是“并查集”名字的由来,因为它的原理正是通过集合的“合并”、“查询”实现的。

编程模型

为了方便描述,下面我们将对象称为“触点”,将相连称为“连接”,将等价类称为“连通分量”或直接称为“分量”。

设计数据类型

  1. 用数组表示集合;
  2. 数组索引表示触点的id;
  3. 数组元素表示分量的id;

具有相同分量id的所有触点之间存在等价关系,即他们都是相连的。

设计API

1
2
3
4
5
6
7
8
9
10
class UF {
// 构造包含N个触点的并查集
UF(int N)

// 连通两个分量
void union(int p, int q)

// 查询分量名称
int find(int p)
}

测试客户端

测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
~> cat tinyUF.txt
10
4 3
3 8
6 5
9 4
2 1
8 9
5 0
7 2
6 1
1 0
6 7

我们读取文件第一个整数,表示问题规模。后续读取一对整数,查询他们是否相连。如果不相连则连通他们,并输出到控制台。

1
2
3
4
5
6
7
8
9
10
11
12
public void main(String[] args) {
int n = StdIn.readInt();
UF uf = new UF(n);
while(!StdIn.isEmpty()) {
int p = StdIn.readInt();
int q = StdIn.reqdInt();
if (uf.find(p) != uf.find(q)) {
uf.union(p, q);
StdOut.println(p + " " + q);
}
}
}

步骤2-5:迭代

QuickFind

我们的第一目标应该是找出一个可行的算法,而不是最优的算法,并尝试实现它。按照我们一开始的设计,算法应该是这样的:

  1. 数组索引表示触点id;
  2. 数组元素表示分量id;
  3. 分量id相同触点表示它们是相连的;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class QuickFindUF() {

private int[] ids;

// 初始状态每个触点可以表示大小为1的分量,即分量id = 触点id
public QuickFindUF(int n) {
ids = new int[n];
for(int i = 0; i < n; i++) {
ids[i] = i;
}
}

// 查询触点所在的分量的id
public int find(int p) {
return ids[p];
}

// 连接两个触点,即合并分量
public void union(int p, int q) {
int pid = find(p);
int qid = find(q);
// 遍历数组,将分量p的id改为分量q的id,表示合并两个分量
for(int i = 0; i < ids.length; i++) {
if (ids[i] == pid) {
ids[p] = qid;
}
}
}
}
find() union()
1(常数级) N (线性级)

对于N个触点M条连接的规模,quickfind整体的时间复杂度:~ NM,平方级。

QuickUnion

quick-find算法效率低的主要原因在于,合并两个分量代价太高,每次合并都需要遍历整个数组,我们尝试改进:

  • 数组的索引依旧用来表示触点id;
  • 数组的元素不再表示分量id,而是表示与它连接的另一个触点的id。这样可以将数组变成森林,森林中的每棵树则表示分量,树上的叶子表示触点;
  • 通过任意触点可以查找到树的根节点(索引和元素相同);
  • 根节点相同即表示两个触点是相连的;
  • 合并两个分量只需要合并两棵树,即连接他们的根节点。

QuickUnion

QuickUnion

QuickUnion

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class QuickUnionUF {

private int[] ids;

public QuickUnionUF(int n) {
ids = new int[n];
for(int i = 0; i < n; i++)
ids[i] = i;
}

public int find(int p) {
while(p != ids[p])
p = ids[p];
return p;
}

public void union(int p, int q) {
int rootQ = find(p);
int rootP = find(q);
ids[rootP] = rootQ;
}
}
find() union()
1~N 1

对于N个触点M条连接的规模,quickunion整体的时间复杂度:< NM,接近平方级。

quick-union算法整体上要比quick-find更快,因为find的方法是次线性的,不需要每次遍历完整个数组。但是存在一种情况让它的性能变为平方级:

假设输入的整数对是1-02-03-04-0... 那么将得到一棵非常高的树,只有树干没有叶子。这种情况下,find方法的成本是平方级增长的:

3+5+7+...+2n-1 ~ N^2

所以quick-union算法的时间成本取决于森林中的树的高度

WeightedQuickUnion

quick-union算法的时间成本是取决于森林中的树的高度的,那么我们只需要避免出现一棵非常高的树就可以大大增加quick-union算法的性能。

这个改进非常非常简单,只需要在合并两棵树时判断树的“大小”,永远只将较小的树归并到较大的树即可。

为此,我们的代价只是使用多一个数组用于记录分量大小。

思考:这里为什么记录的是树的大小而非树的高度?如何记录树的高度?

实际上对于加权的quick-union算法,判断树的高度和树的大小的性能差距非常小。但是后续有一项优化叫做“路径压缩”,这会让树的高度发生变化,但是树的大小不变,这使得记录树的高度变得非常困难,得不偿失。

记录树的高度的方式:只有相同大小的树合并时,树的高度才会增加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class WeightedQuickUnionUF {

private int[] ids;
private int[] sz;

public WeightQuickUnionUF(int n) {
ids = new int[n];
sz = new int[n];
for(int i = 0; i < n; i++) {
ids[i] = i;
sz[i] = 1;
}
}

// 代码和quick-union相同
public int find(int p) {
while(p != ids[p])
p = ids[p];
return p;
}

public void union(int p, int q) {
int rootP = find(p);
int rootQ = find(q);
// 判断分量大小,永远只将小的分量合并到大的分量中,可以尽量避免树的高度增加。
if (sz[rootP] < sz[rootQ]) {
ids[rootP] = rootQ;
sz[rootQ] += sz[rootP];
} else {
ids[rootQ] = rootP;
sz[rootP] += sz[rootQ];
}
}
}

每当两棵树合并时,树的大小翻倍,一棵树的大小最大只能为N。所以 2*2*2*...*2 <= N,即树的高度最多为 logN ;

find() union()
logN 1

对于N个触点M条连接的规模,WeightedQuickUnion整体的时间复杂度:~ MlogN,线性对数级。

QuickUnion + Path Compression

还有一种基于quick-union算法的改进,即路径压缩。在每次查找分量的同时压缩路劲,压缩的方式有两种,分别是Two-pass和One-pass。

Two-pass Compression

这种路径压缩方式是将让森林完全扁平化,树的高度为1,所有叶子节点直接和根节点相连。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public int find(int p) {
int op = p;
// 先找到根节点
while(ids[p] != p)
p = ids[p];

// 重新遍历一次进行路径压缩。
int parent;
while(ids[op] != p) {
parent = ids[op];
// 将叶子节点直接连接到根节点
ids[op] = p;
op = parent;
}

return p;
}

下图演示了find(9)的路径压缩过程:

Two-pass
find() union()
logN 1

对于N个触点M条连接的规模,WeightedQuickUnion整体的时间复杂度:~ MlogN​,线性对数级。

One-pass Compression

One-pass实现起来更加简单,每次查找时将节点连接到前一个父节点,从而使得路径减半,只需要增加一行代码,但是他的性能和Two-pass是一样的。

1
2
3
4
5
6
7
public int find(int p) {
while(ids[p] != p) {
ids[p] = ids[ids[p]];
p = ids[p];
}
retrun p;
}
One-pass

WeightedQuickUnion + Path Compression

路径压缩加权的quick-union算法可以说是动态连通性问题的最优解了。想要分析它的性能非常困难,但是Hopcroft-Ulman和Tarjan已经证明了find方法的成本为log*N,表示使N变为1时需要取对数的次数,称为迭代对数。可以认为,他在自然界中的值最大为5。

N log*N(2为底)
1 0
2 1
4 2
16 3
65536 4
2^65536 5

这使得find方法的成本非常非常接近常数级,但还达不到常数级。对于N个触点M条连接的规模,算法整体的时间复杂度:~ Mlog*N

性能对比

动态连通性各个算法的性能对比如下(N个对象M条连接时):

算法 最坏情况下
quick-find M N
quick-union M N
weighted QU N + M log N (+N是创建size数组)
QU + path compression N + M log N(+N是创建size数组)
weighted QU + path compression N + M log* N(+N是创建size数组)

应用:渗滤(Percolation)

渗滤模型主要用于科学领域中判断某些液体是否可以渗透过某些材料,并且估算出一个渗滤阈值。或者用在游戏中的物理引擎,判断水流是否可以流通等场景。

定义N * N 的网格,每个网格开放的(白色块)概率为p,关闭的(黑色块)概率为 1 - p。如果顶部的网格和底部的网格可以连通,则表示这个系统是渗滤的。

percolation

下面是20 * 20的网格中,概率为p时的一些情况

percolation

当p小于 0.4时,系统几乎都是阻塞的;当p大于0.8时,系统几乎都是渗滤的;而p在0.6左右时,都有可能。

现在我们需要估计出一个精确渗滤阈值p* (用作科学参考,例如某些材料的密度)

  • p > p* 时,几乎可以断定是渗滤的。
  • p < p* 时,几乎可以断定是阻断的。

percolation probability

没有一个数学公式能给出答案,但是通过计算机进行大量的模拟运算后得出这个值为0.593。

模拟方式为初始化n * n 个阻断的格子,然后随机开放一个格子直到这个网格渗滤并计算开发格子的占比。将这个模拟运行上百万次,就可以得到一个比较可靠的阈值p*,这种方式也称为蒙特卡罗模拟。

实现分析

初始化一个n*n大小的数组,用索引表示渗滤系统中的格子。

percolation array

percolation union

如何确定这个系统是渗滤的?只需要在顶部和底部新增两个虚拟的格子,如果他们是可以连通的,那么表示这个系统的渗滤的。

percolation vitrual site

那么怎么模拟开放格子?开放格子然后连通上下左右的其他开放位即可。

percolation open site

代码实现

这个渗滤模型是Coursera上的一个作业,可以到相关页面下载项目,提供了测试代码和许多有意思的测试数据。

下面代码在coursera上的得分是96,失去的4分是因为内存使用过多(超了10~20%),用了两个并查集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import edu.princeton.cs.algs4.WeightedQuickUnionUF;

public class Percolation {

private int[] flags;
private int n;
private int openSitesCount;
private int vt;
private int vb;
private WeightedQuickUnionUF uf;
private WeightedQuickUnionUF uf2;

// creates n-by-n grid, with all sites initially blocked
public Percolation(int n) {
if (n <= 0) {
throw new IllegalArgumentException("N must be greater than zero.");
}
this.n = n;
flags = new int[n * n];
// n * n is the top virtual site, n * n + 1 is the bottom virtual site.
uf = new WeightedQuickUnionUF(n * n + 2);
uf2 = new WeightedQuickUnionUF(n * n + 1);
vt = n * n;
vb = vt + 1;
}

private int index(int row, int col) {
return (row - 1) * n + col - 1;
}

private void checkIndex(int row, int col) {
if (row <= 0 || row > n || col <= 0 || col > n) {
throw new IllegalArgumentException("Invalid row or col.");
}
}

// opens the site (row, col) if it is not open already
public void open(int row, int col) {
checkIndex(row, col);
int c = index(row, col);
if (flags[c] == 1) {
return;
}
flags[c] = 1;
openSitesCount++;

// union to top site
int top = index(row - 1, col);
if (row == 1) {
uf.union(vt, c);
uf2.union(vt, c);
}
else if (flags[top] == 1) {
uf.union(top, c);
uf2.union(top, c);
}

// union to left site
int left = index(row, col - 1);
if (col > 1 && flags[left] == 1) {
uf.union(left, c);
uf2.union(left, c);
}

// union to right site
int right = index(row, col + 1);
if (col < n && flags[right] == 1) {
uf.union(right, c);
uf2.union(right, c);
}

// union to bottom site
int bottom = index(row + 1, col);
if (row == n) {
uf.union(vb, c);
}
else if (flags[bottom] == 1) {
uf.union(bottom, c);
uf2.union(bottom, c);
}
}

// is the site (row, col) open?
public boolean isOpen(int row, int col) {
checkIndex(row, col);
return flags[index(row, col)] == 1;
}

// is the site (row, col) full?
public boolean isFull(int row, int col) {
checkIndex(row, col);
int i = index(row, col);
return uf2.find(i) == uf2.find(vt);
}

// returns the number of open sites
public int numberOfOpenSites() {
return openSitesCount;
}

// does the system percolate?
public boolean percolates() {
return uf.find(vt) == uf.find(vb);
}

// test client (optional)
public static void main(String[] args) {

}
}

为什么使用了两个并查集?

因为这个模型中要求有个isFull()的方法,用来查询某个格子是否可以连通到顶部的格子。

如果只使用一个并查集会导致isFull方法出问题,因为当系统已经是渗滤的情况下顶部的虚拟位和底部的虚拟位是等价的,使得“连接到底部”和“连接到顶部”这两个条件是等价的,导致isFull方法返回了错误的结果。

所以我使用了第二个并查集,它没有添加底部的虚拟位,单独用它来判断isFull。我没有想到更好的解决方案,如果实际使用中不需要isFull判断,否则性能能上升一个台阶。