###Libuv

libuv是一个高性能事件驱动的程序库,采用了异步, 事件驱动编程风格,提供了一套事件循环和基于IO通知回调函数。

Libuv Hello World示例程序:

1
2
3
4
5
6
7
8
9
10
11
#include <stdio.h>
#include <uv.h>

int main() {
uv_loop_t *loop = uv_loop_new();

printf("Now quitting.\n");
uv_run(loop, UV_RUN_DEFAULT);

return 0;
}

TCP

TCP拥塞控制机制

拥塞的发生与其不可避免

拥塞发生的主要原因在于网络能够提供的资源不足以满足用户的需求,这些资源包括缓存空间、链路带宽容量和中间节点的处理能力。由于互联网的设计机制导致其缺乏“接纳控制”能力,因此在网络资源不足时不能限制用户数量,而只能靠降低服务质量来继续为用户服务,也就是“尽力而为”的服务。
但是,是不是说只要增加网络资源,就能避免拥塞呢?答案却是否定的!拥塞虽然是由于网络资源的稀缺引起的,但单纯增加资源并不能避免拥塞的发生。例如增加缓存空间到一定程度时,只会加重拥塞,而不是减轻拥塞,这是因为当数据包经过长时间排队完成转发时,它们很可能早已超时,从而引起源端超时重发,而这些数据包还会继续传输到下一路由器,从而浪费网络资源,加重网络拥塞。事实上,缓存空间不足导致的丢包更多的是拥塞的“症状”而非原因。另外,增加链路带宽及提高处理能力也不能解决拥塞问题。
例如:我们有四台主机ABCD连接路由器R,所有链路带宽都是1Gbps,如果A和B同时向C以1Gbps的速率发送数据,则路由器R的输入速率为2Gbps,而输出速率只能为1Gbps,从而产生拥塞。避免拥塞的方法只能是控制AB的速率,例如,都是0.5Gbps,但是,这只是一种情况,倘若D也向R发送数据,且速率为1Gbps,那么,我们先前的修正又是不成立的,可见,拥塞其实是一个动态问题,我们没有办法用一个静态方案去解决,从这个意义上来说,拥塞是不可避免的。

流量控制

早期的TCP协议只有基于窗口的流控制(flow control)机制,我们简单介绍一下,并分析其不足。
在TCP中,为了实现可靠性,发送方发出一个数据段之后要等待接受方相应的确认信息,而不是直接发送下一个分组。具体的技术是采用滑动窗口,以便通信双方能够充分利用带宽。滑动窗口允许发送方在收到接收方的确认之前发送多个数据段。窗口大小决定了在收到目的地确认之前,一次可以传送的数据段的最大数目。窗口大小越大,主机一次可以传输的数据段就越多。当主机传输窗口大小数目的数据段后,就必须等收到确认,才可以再传下面的数据段。例如,若视窗的大小为 1,则传完数据段后,都必须经过确认,才可以再传下一个数据段;当窗口大小等于3时,发送方可以一次传输3个数据段,等待对方确认后,再传输下面三个数据段。

窗口的大小在通信双方连接期间是可变的,通信双方可以通过协商动态地修改窗口大小。在TCP的每个确认中,除了指出希望收到的下一个数据段的序列号之外,还包括一个窗口通告,通告中指出了接收方还能再收多少数据段(我们可以把通告看成接收缓冲区大小)。如果通告值增大,窗口大小也相应增大;通告值减小,窗口大小也相应减小。但是我们可以发现,接收端并没有特别合适的方法来判断当前网络是否拥塞,因为它只是被动得接收,不像发送端,当发出一个数据段后,会等待对方得确认信息,如果超时,就可以认为网络已经拥塞了。所以,改变窗口大小的唯一根据,就是接收端缓冲区的大小了。

流量控制作为接受方管理发送方发送数据的方式,用来防止接受方可用的数据缓存空间的溢出。流控制是一种局部控制机制,其参与者仅仅是发送方和接收方,它只考虑了接收端的接收能力,而没有考虑到网络的传输能力;而拥塞控制则注重于整体,其考虑的是整个网络的传输能力,是一种全局控制机制。正因为流控制的这种局限性,从而导致了拥塞崩溃现象的发生。

重传

1、一旦收到确认,发送方关闭重发定时器并且将数据片的备份从重发队列中删除。发送方如果在规定的时间内没有收到数据确认,就重传该数据。
2、当TCP超时并重传时,它不一定要重传同样的报文段,相反,TCP允许进行重新分组而发送一个较大的报文段,这将有助于提高性能(当然,这个较大的报文段不能够超过接收方声明的MSS)。在协议中这是允许的,因为TCP是使用字节序号而不是报文段序号来进行识别它所要发送的数据和进行确认。
3、重发定时器
(1) 每一次一个包含数据的包被发送(包括重发),如果该定时器没有运行则启动它,使得它在RTO秒之后超时(按照当前的RTO值)。
(2) 当所有的发出数据都被确认之后,关闭该重发定时器。
(3) 当接收到一个ACK确认一个新的数据,重新启动该重发定时器,使得它在RTO秒之后超时(按照当前的RTO值)

TCP拥塞控制机制

TCP的拥塞控制由4个核心算法组成:“慢启动”(Slow Start)、“拥塞避免”(Congestion voidance)、“快速重传 ”(Fast Retransmit)、“快速恢复”(Fast Recovery)。具体的流程图可以参见:http://www.eventhelix.com/RealtimeMantra/Networking/。发送端叫做client,接收端叫做server,每个segment长度为512字节,阻塞窗口长度为cwnd(简化起见,下面以segment为单位),sequence number为seq_num,acknowledges number为ack_num。通常情况下,TCP每接收到两个segment,发送一个ack。

-- 慢启动
早期开发的TCP应用在启动一个连接时会向网络中发送大量的数据包,这样很容易导致路由器缓存空间耗尽,网络发生拥塞,使得TCP连接的吞吐量急剧下降。由于TCP源端一开始并不知道网络资源当前的利用状况,因此新建立的TCP连接不能一开始就发送大量数据,而只能逐步增加每次发送的数据量,以避免上述现象的发生,这里有一个“学习”的过程。
假设client要发送5120字节到server,慢启动过程如下:
1.初始状态,cwnd=1,seq_num=1;client发送第一个segment;
2.server接收到512字节(一个segment),回应ack_num=513;
3.client接收ack(513),cwnd=1+1=2;现在可以一次发送2个数据段而不必等待ack
4.server接收到2个segment,回应ack_num=513+512*2=1537
5.client接收ack(1537),cwnd=2+1;一次发送3个数据段
6.server接收到3个segment,回应2个ack,分别为ack_num=1537+1024=2561和ack_num=2561+512=3073
7.client接收ack(2561)和ack(3073),cwnd=3+2=5;一次可以发送5个数据段,但是只用4个就满足要求了
8.server接收到4个segment,回应2个ack,分别为4097,5121
9.已经发送5120字节,任务完成!

总结一下:
当建立新的TCP连接时,拥塞窗口(congestion window,cwnd)初始化为一个数据包大小。源端按cwnd大小发送数据,每收到一个ACK确认,cwnd就增加一个数据包发送量。

-- 拥塞避免
可以想象,如果按上述慢启动的逻辑继续下去而不加任何控制的话,必然会发生拥塞,引入一个慢启动阈值ssthresh的概念,当cwnd<ssthresh的时候,tcp处于慢启动状态,否则,进入拥塞避免阶段。通常,ssthresh初始化为
64 Kbytes。
当cwnd = 64947 + 512 = 65459,进入拥塞避免阶段,假设此时seq_num = _101024:
1.client一次发送cwnd,但是先考虑头两个segment
2.server回应ack_num = 102048
3.client接收到ack(102048),cwnd = 65459 + [(512 * 512) /65459] = 65459 + 4 = 65463,也就是说,每接到一个ack,cwnd只增加4个字节。
4.client发送一个segment,并开启ack timer,等待server对这个segment的ack,如果超时,则认为网络已经处于拥塞状态,则重设慢启动阀值ssthresh=当前cwnd/2=65463/2=32731,并且,立刻把cwnd设为1,很极端的处理!
5.此时,cwnd<ssthresh,所以,恢复到慢启动状态。

总结一下:
如果当前cwnd达到慢启动阀值,则试探性的发送一个segment,如果server超时未响应,TCP认为网络能力下降,必须降低慢启动阀值,同时,为了避免形势恶化,干脆采取极端措施,把发送窗口降为1,个人感觉应该有更好的方法。

-- 快速重传和快速恢复
前面讲过标准的重传,client会等待RTO时间再重传,但有时候,不必等这么久也可以判断需要重传,例如:client一次发送8个segment,seq_num起始值为100000,但是由于网络原因,100512丢失,其他的正常,则server会响应4个ack(100512)(为什么呢,tcp会把接收到的其他segment缓存起来,ack_num必须是连续的),这时候,client接收到四个重复的ack,它完全有理由判断100512丢失,进而重传,而不必傻等RTO时间了。这就是快速重传。
那么,什么是快速恢复呢?我们通常认为client接收到3个重复的ack后,就会开始快速重传,但是,如果还有更多的重复ack呢,如何处理?这就是快速恢复要做的,事实上,我们可以把快速恢复看作是快速重传的后续处理,它不是一种单独存在的形态。

以下是具体的流程:
假设此时cwnd=70000,client发送4096字节到server,也就是8个segment,起始seq_num = _100000:

  1. client发送seq_num = _100000
  2. seq_num =100512的segment丢失
  3. client发送seq_num = _101024
  4. server接收到两个segment,它意识到100512丢失,先把收到的这两个segment缓存起来
  5. server回应一个ack(100512),表示它还期待这个segment
  6. client发送seq_num = _101536
  7. server接收到一个segment,它判断不是100512,依旧把收到的这个segment缓存起来,并回应ack(100512)
  8. 以下同6、7,直到client收到3个ack(100512),进入快速重发阶段:
  9. 重设慢启动阀值ssthresh=当前cwnd/2=70000/2=35000
  10. client发送seq_num = 100512

    以下,进入快速恢复阶段:
    11.重设cwnd = ssthresh + 3 segments =35000 + 3512 = 36536,之所以要加3,是因为我们已经接收到3个ack(100512)了,根据前面说的,每接收到一个ack,cwnd加1
    12.client接收到第四个、第五个ack(100512),cwnd=36536+2
    512=37560
    13.server接收到100512,响应ack_num = _104096
    14.此时,cwnd>ssthresh,进入拥塞避免阶段。

【思考】:为什么通常clieng每接收到一个ack,会把cwnd增加一个segment呢?
这是基于“管道”模型(pipe model)的“数据包守恒”的原则(conservation of packets principle),即同一时刻在网络中传输的数据包数量是恒定的,只有当“旧”数据包离开网络后,才能发送“新”数据包进入网络。如果发送方收到一个ACK,则认为已经有一个数据包离开了网络,于是将拥塞窗口加1。如果“数据包守恒”原则能够得到严格遵守,那么网络中将很少会发生拥塞;本质上,拥塞控制的目的就是找到违反该原则的地方并进行修正。

五.联想
想想看,能不能把TCP解决拥塞的方法应用到交通拥塞呢?
我们有两个原则:一是拥塞不可避免,单纯增加资源并不能避免拥塞的发生,只能用动态的方法加以解决;二是数据包守恒原则。政府花费很大资金修路,并不能避免堵车,只能从源头控制,例如首先限制车辆进入主路,根据实际情况,再慢慢增加每一个路口的车流量,但是,当达到一个阀值,增加速度要放缓,并不时探测整个主路的拥堵情况,如果情况危急,立刻封闭半个路口,并将车流量降到最低,也就是重新回复到慢启动状态。

Mesos

Mesos

试想,可否整合数据中心中的所有资源,并将它们放在一个大的虚拟池里,代替单独的物理服务器;然后开放诸如CPU、内存和I/O这些基本资源而不是虚拟机?

同样,可否把应用程序拆分成小的、隔离的任务单位,从而根据数据中心应用的需求,从虚拟数据中心池中动态分配任务资源?就像操作系统将PC的处理器和RAM放入资源池,使其可以为不同的进程协调分配和释放资源。

进一步讲,我们可以把Mesos作为操作系统内核,然后将数据中心看为PC。这也是正是我想说的:Mesos正在改变数据中心,它让真正的SDDC(软件定义数据中心)成为现实


Mesos 被定义为一个分布式系统内核。它和 Linux 内核设计原则相同,只是设计在不同的抽象层级上。

Mesos 运行在服务器集群上并且通过 API 的形式给诸如 Hadoop,Spark 等应用提供资源管理、任务调度等功能。

Architect

Mesos实现了两级调度架构,第一级调度是Master的守护进程,管理Mesos集群中所有节点上运行的Slave守护进程。第二级调度由被称作Framework的组件组成。Framework包括调度器(Scheduler)和执行器(Executor)进程,其中每个节点上都会运行执行器。Mesos能和不同类型的Framework通信,每种Framework由相应的应用集群管理。

Mesos采用了master/slave结构,master做得尽可能地轻量级,其上面所有的元数据可通过各个slave重新注册而进行重构,故很容易通过zookeeper解决该单点故障问题。

Mesos 基本术语:

  1. Mesos Master : 主要负责管理各个Framework和Slave,并将slave上的资源分配给各个Framework
  2. Mesos Slave : 负责管理本节点上的各个Mesos Task,比如:为各个Executor分配资源
  3. Framework : Hadoop,Spark等计算框架,通过MesosSchedulerDiver接入Mesos
  4. Executor : 执行器 Mesos Slave上,用于启动计算框架中的Task

Mesos Master 协调全部的Slave,并确定每个节点的可用资源,
聚合计算跨节点的所有可用资源的报告,然后向注册到MasterFramework(作为Master的客户端)发出资源邀约。

Framework可以根据应用程序的需求,选择接受或拒绝来自Master的资源邀约。一旦接受邀约,Master即协调FrameworkSlave,调度参与节点上任务,并在容器中执行,以使多种类型的任务,比如Hadoop和Cassandra,可以在同一个节点上同时运行。

Mesos Framework Example

  1. Slave 1向Master汇报其空闲资源:4个CPU、4GB内存。
  2. Master触发分配策略模块,得到的反馈是Framework 1要请求全部可用资源。
  3. Master向Framework 1发送资源邀约,描述了Slave 1上的可用资源。
  4. Framework的调度器(Scheduler)响应Master,需要在Slave上运行两个任务,第一个任务分配<2 1="" cpus,="" gb="" ram="">资源,第二个任务分配<1 2="" cpus,="" gb="" ram="">资源。
  5. Master向Slave下发任务,分配适当的资源给Framework的任务执行器(Executor),接下来由执行器启动这两个任务(如图中虚线框所示)。

资源分配

资源邀约,即由Master向注册其上的Framework发送资源邀约。每次资源邀约包含一份Slave节点上可用的CPU、RAM等资源的列表。 Master提供这些资源给它的Framework,是基于分配策略的。

分配策略对所有的Framework普遍适用,同时适用于特定的Framework。 Framework可以拒绝资源邀约,如果它不满足要求,若此,资源邀约随即可以发给其他Framework。

由Mesos管理的应用程序通常运行短周期的任务,因此这样可以快速释放资源,缓解Framework的资源饥饿; Slave定期向Master报告其可用资源,以便Master能够不断产生新的资源邀约。每个Fraamework过滤不满足要求的资源邀约、Master主动废除给定周期内一直没有被接受的邀约。

Mesos实现了公平共享和严格优先级分配模块, 确保大部分用例的最佳资源共享。

如何作出资源邀约的决定是由资源分配模块实现的,该模块存在于Master之中。资源分配模块确定Framework接受资源邀约的顺序,与此同时,确保在本性贪婪的Framework之间公平地共享资源。

在同质环境中,使用最多的公平份额分配算法之一是最大最小公平算法(max-min fairness)。最大最小公平算法算法将最小的资源分配最大化,并将其提供给用户,确保每个用户都能获得公平的资源份额,以满足其需求所需的资源。

然而,在跨数据中心调度资源并且是异构的资源需求时,资源分配将会更加困难。默认情况下,Mesos包括一个严格优先级的资源分配模块和一个改良的公平份额资源分配模块。严格优先级模块实现的算法给定Framework的优先级,使其总是接收并接受足以满足其任务要求的资源邀约。这保证了关键应用在Mesos中限制动态资源份额上的开销,但是会潜在其他Framework饥饿的情况。

由于这些原因,大多数用户默认使用DRF(主导资源公平算法 Dominant Resource Fairness),这是Mesos中更适合异质环境的改良公平份额算法。

DRF的目标是确保每一个用户,即Mesos中的Framework,在异质环境中能够接收到其最需资源的公平份额。Framework的主导资源是其最需的资源类型(CPU、内存等),在资源邀约中以可用资源百分比的形式展示。


容错

Mesos的优势之一便是将容错设计到架构之中,并以可扩展的分布式系统的方式来实现。

Master

Mesos使用热备份来实现 Master 节点集合。一个Master节点与多个备用节点运行在同一集群中,并由Zookeeper来监控。Zookeeper会监控Master集群中所有的节点,并在Master节点发生故障时管理新Master的选举。

Mesos的状态信息实际上驻留在Framework调度器和Slave节点集合之中。当一个新的Master当选后,Zookeeper会通知Framework和选举后的Slave节点集合,以便使其在新的Master上注册。彼时,新的 Master可以根据Framework和Slave节点集合发送过来的信息,重建内部状态。

Slave

当Slave节点上的进程失败时,可以让执行器/任务继续运行,并为那个Slave进程重新连接那台Slave节点上运行的执行器/任务。当任务执行时,Slave会将任务的监测点元数据存入本地磁盘。

如果Slave进程失败,任务会继续运行,当Master重新启动Slave进程后,因为此时没有可以响应的消息,所以重新启动的Slave进程会使用检查点数据来恢复状态,并重新与执行器/任务连接。

Framework调度器

Framework调度器的容错是通过Framework将调度器注册2份或者更多份到Master来实现。当一个调度器发生故障时,Master会通知另一个调度来接管。

执行器/任务

Master会向分配任务的Framework调度器汇报执行器/任务失败,并允许调度器根据其配置策略在任务失败时做出相应的处理。

通常情况下,Framework在接收并接受来自Master的相应的资源邀约后,会在新的Slave节点上重新启动任务。


Configuration

Reference :

  1. Apache Mesos
  2. Mesos架构与去哪儿的统一框架实践
  3. 深入浅出Mesos(一):为软件定义数据中心而生的操作系统
  4. 深入浅出Mesos(二):Mesos的体系结构和工作流
  5. 深入浅出Mesos(三):持久化存储和容错
  6. 深入浅出Mesos(四):Mesos的资源分配
  7. 深入浅出Mesos(五):成功的开源社区
  8. Mesos社区与生态
  9. Return of the Borg: How Twitter Rebuilt Google’s Secret Weapon
    10.Dominant Resource Fairness: Fair Allocation of Multiple Resource Types

Storm

Storm

Trident

Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。

让我们一起来看一个Trident的例子。在这个例子中,我们主要做了两件事情:

  1. 从一个流式输入中读取语句病计算每个单词的个数

  2. 提供查询给定单词列表中每个单词当前总数的功能

因为这只是一个例子,我们会从如下这样一个无限的输入流中读取语句作为输入:

1
2
3
4
5
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"),spout.setCycle(true);

这个spout会循环输出列出的那些语句到sentence stream当中,下面的代码会以这个stream作为输入并计算每个单词的个数:

1
2
3
4
5
6
TridentTopology topology = new TridentTopology();        
TridentState wordCounts =topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);

让我们一起来读一下这段代码。我们首先创建了一个TridentTopology对象。

TridentTopology类相应的接口来构造Trident计算过程中的所有内容。

我们在调用了TridentTopology类的newStream方法时,传入了一个spout对象,spout对象会从外部读取数据并输出到当前topology当中,从而在topology中创建了一个新的数据流。在这个例子中,我们使用了上面定义的FixedBatchSpout对象。

输入数据源同样也可以是如Kestrel或者Kafka这样的队列服务。

Trident会再Zookeeper中保存一小部分状态信息来追踪数据的处理情况,而在代码中我们指定的字符串“spout1”就是Zookeeper中用来存储metadata信息的Znode节点

Trident在处理输入stream的时候会把输入转换成若干个tuple的batch来处理。比如说,输入的sentence stream可能会被拆分成如下的batch:

一般来说,这些小的batch中的tuple可能会在数千或者数百万这样的数量级,这完全取决于你的输入的吞吐量。

Trident提供了一系列非常成熟的批量处理的API来处理这些小batch. 这些API和你在Pig或者Cascading中看到的非常类似, 你可以做group by’s, joins, aggregations, 运行 functions, 执行 filters等等。当然,独立的处理每个小的batch并不是非常有趣的事情,所以Trident提供了很多功能来实现batch之间的聚合的结果并可以将这些聚合的结果存储到内存,Memcached, Cassandra或者是一些其他的存储中。同时,Trident还提供了非常好的功能来查询实时状态。这些实时状态可以被Trident更新,同时它也可以是一个独立的状态源。

回到我们的这个例子中来,spout输出了一个只有单一字段“sentence”的数据流。在下一行,topology使用了Split函数来拆分stream中的每一个tuple,Split函数读取输入流中的“sentence”字段并将其拆分成若干个word tuple。每一个sentence tuple可能会被转换成多个word tuple,比如说”the cow jumped over the moon” 会被转换成6个 “word” tuples. 下面是Split的定义:

1
2
3
4
5
6
7
public class Split extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}}

如你所见,真的很简单。它只是简单的根据空格拆分sentence,并将拆分出的每个单词作为一个tuple输出。

topology的其他部分计算单词的个数并将计算结果保存到了持久存储中。首先,word stream被根据“word”字段进行group操作,然后每一个group使用Count聚合器进行持久化聚合。persistentAggregate会帮助你把一个状态源聚合的结果存储或者更新到存储当中。在这个例子中,单词的数量被保持在内存中,不过我们可以很简单的把这些数据保存到其他的存储当中,如 Memcached, Cassandra等。如果我们要把结果存储到Memcached中,只是简单的使用下面这句话替换掉persistentAggregate就可以,这当中的”serverLocations”是Memcached cluster的主机和端口号列表:

1
2
3
4
5
6
7
8
9
10
.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        MemcachedState.transactional()
```

persistentAggregate存储的数据就是所有batch聚合的结果。

Trident非常酷的一点就是它是完全容错的,拥有者有且只有一次处理的语义。这就让你可以很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复这些状态。

persistentAggregate方法会把数据流转换成一个TridentState对象。在这个例子当中,TridentState对象代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的进行分布式查询。

下面这部分实现了一个低延时的单词数量的分布式查询。这个查询以一个用空格分割的单词列表为输入,并返回这些单词当天的个数。这些查询是想普通的RPC调用那样被执行的,要说不同的话,那就是他们在后台是并行执行的。下面是执行查询的一个例子:

DRPCClient client = new DRPCClient(“drpc.server.location”, 3772);
System.out.println(client.execute(“words”, “cat dog the man”);
// prints the JSON-encoded result, e.g.: “[[5078]]”

1
2
3
4

如你所见,除了这是并发执行在storm cluster上之外,这看上去就是一个正常的RPC调用。这样的简单查询的延时通常在10毫秒左右。当然,更负责的DRPC调用可能会占用更长的时间,尽管延时很大程度上是取决于你给计算分配了多少资源。

这个分布式查询的实现如下所示:

topology.newDRPCStream(“words”)
.each(new Fields(“args”), new Split(), new Fields(“word”))
.groupBy(new Fields(“word”))
.stateQuery(wordCounts, new Fields(“word”), new MapGet(), new Fields(“count”))
.each(new Fields(“count”), new FilterNull())
.aggregate(new Fields(“count”), new Sum(), new Fields(“sum”));
```

我们仍然是使用TridentTopology对象来创建DRPC stream,并且我们将这个函数命名为“words”。这个函数名会作为第一个参数在使用DRPC Client来执行查询的时候用到。

每个DRPC请求会被当做只有一个tuple的batch来处理。在处理的过程中,以这个输入的单一tuple来表示这个请求。这个tuple包含了一个叫做“args”的字段,在这个字段中保存了客户端提供的查询参数。在这个例子中,这个参数是一个以空格分割的单词列表。

首先,我们使用Splict功能把入参拆分成独立的单词。然后对“word” 进行group by操作,之后就可以使用stateQuery来在上面代码中创建的TridentState对象上进行查询。stateQuery接受一个数据源(在这个例子中,就是我们的topolgoy所计算的单词的个数)以及一个用于查询的函数作为输入。在这个例子中,我们使用了MapGet函数来获取每个单词的出现个数。由于DRPC stream是使用跟TridentState完全同样的group方式(按照“word”字段进行group),每个单词的查询会被路由到TridentState对象管理和更新这个单词的分区去执行。

接下来,我们用FilterNull这个过滤器把从未出现过的单词给去掉,并使用Sum这个聚合器将这些count累加起来。最终,Trident会自动把这个结果发送回等待的客户端。

Trident在如何最大程度的保证执行topogloy性能方面是非常智能的。在topology中会自动的发生两件非常有意思的事情:

1.

Linux

Linux

CPU Load Balancing

所谓CPU负载均衡就是将进程从繁忙的CPU迁移到比较空闲的CPU上,目的是为了提高系统的整体效率。

进程在不同CPU之间迁移通常会影响cache的命中率,例如:

  1. 支持超线程技术(SMT)的CPU内部的虚CPU之间完全共享cache,在它们之间迁移进程对cache几乎没有影响,所以尽量保证它们之间的进程数均衡。

  2. 单个CPU的多个核有各自的L1缓存,但共享L2和L3缓存,在这些核之间迁移进程会导致L1缓存的失效,但是L2和L3缓存不受影响。

  3. 多CPU,每个CPU有各自独享的缓存,在这些CPU之间迁移进程会导致缓存失效。

  4. 多CPU,多内存节点(NUMA),不同CPU对于不同内存结点访问的效率不同,在这些CPU之间迁移进程不仅会影响缓存,而且会出现内存访问效率的问题。

  5. 多CPU+NUMA组成的阵列。

内核根据迁移进程所造成的影响情况设置了不同的调度域(sched_domain),最小一级的调度域是SMT的情况,SMT中的多个虚拟CPU位于同一调度域内;次小一级是单个CPU的多个核的情况,这些核位于同一调度域内;往上是多个CPU的情况以及多CPU+NUMA的情况,最上面就是阵列情况,自下而上所有的sched_domain形成一棵树形结构,越往上迁移进程导致的影响越大,所以越往上内核会增大迁移的阻力。


Virtual Memory

Linux内核对整个系统的物理内存是通过类型为struct page的数组mem_map来管理的。

Linux的内核地址空间大小为1G 用户空间0~3G,内核空间3G~4G。

如果把这1G线性地址空间全部拿来直接一一映射物理内存的话,在内核态的所有进程能使用的物理内存总共最多只有1G,为了能使在内核态的所有进程能使用更多的物理内存,Linux采取了一种变通的形式:它将1G内核线性地址空间分为几部分,第一部分为1G的前896M,这部分内核线性空间与物理内存的0~896M一一映射,后面128M的线性空间拿来动态映射剩下的所有物理内存,由于动态映射的方法不一样,后面的128M又分成了几个部分。

前面896M线性空间对应的物理内存就是所谓的低端物理内存,剩下的物理内存就是高端物理内存。

进程中使用的所有地址都是虚地址,在linux下这个虚地址就是所谓的线性地址。Linux中进程可运行在用户态和内核态,(典型配置情况下)当进程运行在用户态时,它使用的线性地址只能位于0~3G范围内,当进程运行于内核态时,它使用的线性地址地址范围为3G~4G。

为了把线性地址转化为物理地址,每个进程都有自己私有的页目录和页表。

Linux在建立进程页目录时,把用户地址空间的页目录项(0~767项)清空而将内核页目录表(swapper_pg_dir)的第768项到1023项拷贝到进程的页目录表的第768项到1023项中。

由于内核在初始化时也只映射了物理内存的前896M,我们可以知道内核也目录表只能保证第768项开始的224项中有有效映射。从这里我们可以知道,所有的进程都共享了其内核线性地址空间。

当一个进程在内核空间发生缺页故障的时候,这主要发生在访问内核空间动态映射区线性地址,在其处理程序中,就要通过0号进程的页目录(swapper_pg_dir)来同步本进程的内核页目录,实际上就是拷贝0号进程的内核页目录到本进程中(内核页表与进程0共享,故不需要复制)。如果进程0的该地址处的内核页目录也不存在,则出错,具体代码可以参考vmalloc的实现源码。

当进程运行于用户态时,若其需要申请内存空间,内核首先会在其用户线性空间中分配需要的线性地址空间,再通过伙伴分配系统分配物理内存并把分配的物理内存跟用户空间线性地址映射起来,最后再修改进程的页目录项及页表项写入这些映射关系。