Hadoop RPC 工作机制

网络通信模块是分布式系统的底层基础,支撑上层分布式环境下复杂的进程间通信。远程过程调用(RPC,Remote Procedure Call)是一种常见的分布式网络通信协议。RPC 允许运行与一台机器的程序调用另一台机器的子程序,同时将网络细节屏蔽起来,大大简化了分布式程序的开发。

Hadoop RPC 简介

Hadoop 实现了自己的 RPC 通信协议,是上层分布式子系统(HDFS、MapReduce、HBase 等)公用的网络通信模块。Hadoop RPC 具有以下特点:

  • 透明性。所有 RPC 框架的基本特性,对用户屏蔽了网络通信过程。
  • 高性能。Hadoop 各个子系统均采用 Master/Slave 架构,Master 作为一个 RPC Server,负责处理所有 Slave 发送的请求,需要能够高效的处理多个并发 RPC 请求。
  • 可控性。JDK 自带的 RPC 框架(RMI)过于重量级,用户可控之处太少,如:网络连接、超时和缓存等难以修改。因此 Hadoop 实现了轻量级的可控性更强的 RPC 框架。

Hadoop RPC 架构

Hadoop RPC 与其他 RPC 框架一样主要由四个部分组成:序列化层、函数调用层、网络传输层、服务端处理框架。

  • 序列化层,将结构化数据转换为字节流,便于通过网络传输或进行持久化。
  • 函数调用层,定位要调用的函数并执行函数。Hadoop RPC 采用 Java 反射和动态代理实现函数调用。
  • 网络传输层,描述了 Client 和 Server 之间消息传输的方式。Hadoop RPC 采用基于 TCP/IP 的 Socket 机制。
  • 服务端处理框架,可以抽象为网络I/O模型,直接决定了服务器端的并发处理能力。常见的有:阻塞式I/O、非阻塞式I/O、事件驱动I/O等,Hadoop RPC 采用基于 Reactor 设计模式的事件驱动I/O模型。

Hadoop RPC 总体架构如图,自下而上分为两层:第一层是基于 Java NIO 实现的 Client/Server 通信模型;第二层是供上层应用调用的 RPC 接口。

Hadoop RPC

Hadoop RPC 使用

首先定义 RPC 协议,RPC 协议是客户端和服务端的通信接口,定义了服务器端对外提供的服务接口。

interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
    // 版本号,默认情况下不同版本号的 Client 和 Server 不能通信
    public static final long versionId = 1L;
    String echo(String value) throws IOException;
    int add(int v1, int v2) throwd IOException;
}

实现 RPC 协议,Hadoop RPC 协议通常是一个 Java 接口。

public static class ClientProtocolImpl implemenets ClientProtocol {
    public long getProtocolVersion(String protocol,
                                   long clientVersion) throws IOException {
        return ClientProtocol.versionId;
    }

    public String echo(String value) throws IOException {
        return value;
    }
    public int add(int v1, int v2) throwd IOException {
        return v1 + v2;
    }
}

构造并启动 RPC Server,使用 getServer() 方法构造 RPC Server,并启动

// serverHost 和 serverPort 表示服务器的 host 和端口
// numHandlers 表示服务器端处理请求的线程数
server = RPC.getServer(new ClientProtocolImpl(), serverHost, serverPort,
    numHandlers, false, conf);

server.start();    

构造 RPC CLient,并发送 RPC 请求。使用 getProxy() 方法构造客户端代理,通过代理对象调用远程服务器端方法

proxy = (ClientProtocol) RPC.getProxy(ClientProtocol.class, new ClientProtocolImpl(),
    ClientProtocol.versionId, addr, conf);

int result = proxy.add(5, 6);
String echoResult = proxy.echo("hello");

经过上面四个步骤,便利用 Hadoop RPC 构建了一个简单的 Client/Server 网络模型。

深入理解 Hadoop RPC

Hadoop RPC 主要由三个类组成:RPC、Client、Server,分别对应接口、客户端实现、服务器端实现。

ipc.RPC

RPC 类是对底层客户机/服务器网络模型的封装,以便为开发人员提供方便简洁的编程接口。

RPC 类定义了一个内部类 RPC.Server,继承 Server 抽象,并利用反射机制实现了 call 接口。RPC 类包含一个 ClientCache 类型的成员根据用户提供的 SocketFactory 缓存 Client(重用 Client)。

ipc.RPC

与本地执行反射调用不通的是,RPC 函数调用时(执行 invock 方法),需要将函数调用信息(函数名、参数列表等)打包成可序列化对象 Invocation,通过网络发送给服务器端,服务器端接收到后,根据这些信息在利用反射机制完成函数调用。

ipc.Client

Client 类主要完成的功能是发送远程过程调用信息,并接收执行结果。Client 类内部有两个重要的类:Call 和 Connection。

Call 封装了一个 RPC 请求,包含唯一标识Id、函数调用信息、函数返回信息、错误信息、执行完成标识。Connection 封装了 Client 与每个 Server 之前的连接信息。包括通信连接唯一标识RemoteId、Socket、网络输入输出数据流、RPC请求等。

ipc.Client

ipc.Server

Hadoop 采用 Master/Slave 结构,Master(NameNode、JobTracker)是整个系统的单点,是系统的性能和扩展瓶颈之一。Master 通过 ipc.Server 接收并处理所有 Slave 发送的请求,这要求 ipc.Server 将高并发和可扩展性作为设计目标。

ipc.Server 采用了多种提高并发处理能力的技术,包括:线程池、事件驱动和 Reactor 设计模式等。均采用 JDK 自带的库实现。下面着重介绍 Reactor 设计模式如何提高整体性能。

Reactor 是并发编程中一种基于事件驱动的设计模式,具有以下两个特点:

  1. 通过派发、分离I/O操作事件提高系统的并发性能
  2. 提高粗粒度的并发控制,单线程实现,避免复杂的同步处理

典型的 Reactor 实现原理图如下:

Reactor

主要包括以下几个角色:

  • Reactor,IO事件的派发者
  • Acceptor,接收来自 Client 的连接,建立与 Client 对应的 Handler,并向 Reactor 注册 Handler
  • Handler,与 Client 通信的实体,实现业务的处理,内部会进一步划分为:read、decode、compute、encode、send 等过程,
  • Reader/Sender,为了加速处理速度,通过构建线程池,存放数据处理线程,数据读出后在线程池中等待后续处理即可。因此一般会分离 Handler 的读和写的过程,非别注册为读和写事件由 Reader 和 Sender 处理。

ipc.Server 实现了一个典型的 Reactor 模式,整体架构基本与上述一致。ipc.Server 被划分为三个阶段:接收请求、处理请求和返回结果

ipc.Server

接收请求

接收来自各个客户端的请求,封装为 Call 对象,放入共享队列(callQueue)。其中 Listener 负责监听请求,整个 Server 只有一个 Listener。一旦有新的请求到达,会轮询的方式从线程池中选择一个 Reader 处理。Selector 对象负责监听相关事件。

处理请求

从共享队列中获取 Call 对象,执行对应的函数调用,由多个 Handler 并行完成。Handler 会尝试将结果返回给客户端,但是考虑到某些函数调用返回的结果很大或网络慢等原因,可能很难一次性将结果发送到客户端,Handler 会尝试将后续发送任务交给 Responder 处理。

返回结果

Server 端只有一个 Responder,负责处理 Handler 的结果返回给客户端,Selector 对象负责监听相关事件。

Hadoop RPC 参数

Hadoop RPC 提供了一些可配置参数:

参数 说明
ipc.server.read.threadpool.size Reader 线程数
默认值:1
ipc.server.handler.queue.size 每个 Handler 对应最大 Call 数量,会影响 Call 队列长度
默认值:100
mapred.job.tracker.handler.count
dfs.namenode.service.handler.count
JobTracker 和 NameNode 中 Handler 数量
默认值:10
ipc.client.connect.max.retries 客户端最大重试次数,间隔1s
默认值:10

引用书籍
《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注