Google Percolator

Percolator is Google’s internal-only system used to make incremental updates to the Google search index. Google published its architecture in 2010. Percolator achieved a 50% reduction in the delay between page crawling and page availability in the search index, in comparison to the MapReduce-based batch processing system it replaced.

Percolator is designed on top of BigTable, Google’s original wide-column NoSQL store first introduced to the world in 2006. It adds cross-shard ACID transactions using a two-phase commit protocol on top of BigTable’s single row atomicity. This enhancement was necessary because the process of updating an index is now divided into multiple concurrent transactions.

Isolation Levels & Time Tracking

Percolator provides Snapshot Isolation, implemented using MVCC and a monotonically increasing timestamp allocated by a Timestamp Oracle. Every transaction requires contacting this Oracle twice, thus making the scalability and availability of this component a significant concern.

Practical Implications

As highlighted in the Percolator paper itself, it’s design is not suitable for an OLTP database where user-facing transactions have to be processed with low latency.


倒排索引是Google搜索引擎中最为关键的技术之一。应对海量数据时,高效的索引创建和索引的实时更新都是必须解决的难题。Google设计了MapReduece系统解决了海量数据索引创建的问题,但并没有解决增量数据的实时更新问题。

因此,Google设计Percolator的初衷是:支持海量数据存储、并行随机读写、跨行事务的分布式数据库。

由于Percolator构建在不支持跨行事务的BigTable之上,基于BigTable达到Percolator的设计目标便是其要解决的核心问题。Percolator 提供了跨行、跨表的、基于快照隔离的ACID事务。

notify列仅仅是一个hint值(可能是个bool值),表示是否需要触发通知。

ack列是一个简单的时间戳值,表示最近执行通知的观察者的开始时间。

data列是KV结构,key是时间戳,value是真实数据,包含多个entry。

write列包含的是写记录,也是KV结构,key是时间戳,value是各个时间戳下曾经写入的值。

lock列也是KV结构,key是时间戳,value是锁的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class Transaction {
struct Write{ Row row; Column: col; string value;};
vector<Write> writes_;
int start_ts_;

Transaction():start_ts_(orcle.GetTimestamp()) {}
void Set(Write w) {writes_.push_back(w);}
bool Get(Row row, Column c, string* value) {
while(true) {
bigtable::Txn = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
if (T.Read(row, c+"locks", [0, start_ts_])) {
// There is a pending lock; try to clean it and wait
BackoffAndMaybeCleanupLock(row, c);
continue;
}
}

// Find the latest write below our start_timestamp.
latest_write = T.Read(row, c+"write", [0, start_ts_]);
if(!latest_write.found()) return false; // no data
int data_ts = latest_write.start_timestamp();
*value = T.Read(row, c+"data", [data_ts, data_ts]);
return true;
}
// prewrite tries to lock cell w, returning false in case of conflict.
bool Prewrite(Write w, Write primary) {
Column c = w.col;
bigtable::Txn T = bigtable::StartRowTransaction(w.row);

// abort on writes after our start stimestamp ...
if (T.Read(w.row, c+"write", [start_ts_, max])) return false;
// ... or locks at any timestamp.
if (T.Read(w.row, c+"lock", [0, max])) return false;

T.Write(w.row, c+"data", start_ts_, w.value);
T.Write(w.row, c+"lock", start_ts_,
{primary.row, primary.col}); // The primary's location.
return T.Commit();
}
bool Commit() {
Write primary = write_[0];
vector<Write> secondaries(write_.begin() + 1, write_.end());
if (!Prewrite(primary, primary)) return false;
for (Write w : secondaries)
if (!Prewrite(w, primary)) return false;

int commit_ts = orcle.GetTimestamp();

// Commit primary first.
Write p = primary;
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
return false; // aborted while working
T.Write(p.row, p.col+"write", commit_ts,
start_ts_); // Pointer to data written at start_ts_
T.Erase(p.row, p.col+"lock", commit_ts);
if(!T.Commit()) return false; // commit point

// Second phase: write our write records for secondary cells.
for (Write w:secondaries) {
bigtable::write(w.row, w.col+"write", commit_ts, start_ts_);
bigtable::Erase(w.row, w.col+"lock", commit_ts);
}
return true;
}
}; // class Transaction

如果事务在它的开始时间戳之后看见另一个写记录,它会取消;这是“写-写”冲突,也就是快照隔离机制所重点保护的情况。

如果事务在任意时间戳看见另一个锁,它也取消;如果看到的锁在我们的开始时间戳之前,可能提交的事务已经提交了却因为某种原因推迟了锁的释放,但是这种情况可能性不大,保险起见所以取消。

缺点

清理无效行的开销

Percolator系统还需要一组purge后台进程,把那些没有任何事务可以看到的行版本清除掉,以及清除掉那些被删除掉的行。由于行的所有版本都是存储在big table系统的数据表中的,所以后台的purge 任务也会对BigTable系统构成很大的负载。这个问题是PostgreSQL也有的问题。Innodb没有这个问题是因为数据表上面永远只有最新版本的行,老版本的行是通过undo日志临时生成的。Innodb Undo日志集中存放在undo表空间中,清理的代价要低很多。

Percolator 是 Google 的上一代分布式事务解决方案,构建在 BigTable 之上,用于网页索引更新的业务。原理总体来说就是一个经过优化的二阶段提交的实现,进行了一个二级锁的优化。

读写事务

  1. 事务提交前,在客户端 buffer 所有的 update/delete 操作。
  2. Prewrite 阶段:

首先在所有行的写操作中选出一个作为 primary,其他的为 secondaries

PrewritePrimary: 对 primaryRow 写入 L 列(上锁),L 列中记录本次事务的开始时间戳。写入 L 列前会检查:

  1. 是否已经有别的客户端已经上锁。
  2. 是否在本次事务开始时间之后,检查 W 列,是否有更新 [startTs, +Inf) 的写操作已经提交 (Conflict)。

在这两种种情况下会返回事务冲突。否则,就成功上锁。将行的内容写入 row 中,时间戳设置为 startTs。

将 primaryRow 的锁上好了以后,进行 secondaries 的 prewrite 流程:

  1. 类似 primaryRow 的上锁流程,只不过锁的内容为事务开始时间及 primaryRow 的 Lock 的信息。
  2. 检查的事项同 primaryRow 的一致。

当锁成功写入后,写入 row,时间戳设置为 startTs。

以上 Prewrite 流程任何一步发生错误,都会进行回滚:删除 Lock,删除版本为 startTs 的数据。

当 Prewrite 完成以后,进入 Commit 阶段,当前时间戳为 commitTs,且 commitTs> startTs :

  1. commit primary:写入 W 列新数据,时间戳为 commitTs,内容为 startTs,表明数据的最新版本是 startTs 对应的数据。
  2. 删除L列。

如果 primary row 提交失败的话,全事务回滚,回滚逻辑同 prewrite。如果 commit primary 成功,则可以异步的 commit secondaries, 流程和 commit primary 一致, 失败了也无所谓。

事务中的读操作

  1. 检查该行是否有 L 列,时间戳为 [0, startTs],如果有,表示目前有其他事务正占用此行,如果这个锁已经超时则尝试清除,否则等待超时或者其他事务主动解锁。注意此时不能直接返回老版本的数据,否则会发生幻读的问题。
  2. 读取至 startTs 时该行最新的数据,方法是:读取 W 列,时间戳为 [0, startTs], 获取这一列的值,转化成时间戳 t, 然后读取此列于 t 版本的数据内容。

由于锁是分两级的,primary 和 seconary,只要 primary 的行锁去掉,就表示该事务已经成功 提交,这样的好处是 secondary 的 commit 是可以异步进行的,只是在异步提交进行的过程中 ,如果此时有读请求,可能会需要做一下锁的清理工作。


Reference:

  1. Implementing Distributed Transactions the Google Way: Percolator vs. Spanner
  2. Google Percolator 事务模型的利弊分析
  3. percolator:在线增量处理系统 中文翻译
  4. Google Percolator 的事务模型
  5. Percolator简单翻译与个人理解
  6. Percolator 和 TiDB 事务算法