远程过程调用

原本是写一篇Apache Thrift in HiveServer,改写JDBC连接Hive相关应用的推文,因为HiveServer是使用Thrift提供服务创建网络RPC的多种语言客户端;单独拿出来说,使用Thrift也可以轻松构建RPC服务器,是轻量级的跨语言的远程服务调用框架。说到远程过程调用,感觉又要解释很多,所以就先上个前菜,说一说远程过程调用(RPC);并加了一份佐料:关于JDBC连接Hive的实现。

远程过程调用(RPC)

照例搬了wiki的解释:

远程过程调用是分布式计算的客户端-服务器(Client/Server)的例子,它简单而又广受欢迎。远程过程调用总是由客户端对服务器发出一个执行若干过程请求,并用客户端提供的参数。执行结果将返回给客户端。

为了允许不同的客户端均能访问服务器,许多标准化的RPC框架应运而生。其中大部分采用接口描述语言(Interface Description Language,IDL),方便跨平台的远程过程调用。

举一个栗子:

1
2
3
4
5
6
7
8
9
10
public void invoke(){
String param1 = "hello";
String param2 = "world";
String result = appendStr(param1, param2);
System.out.println("result:" + result);
}

public String appendStr(String param1,String param2){
return param1+" "+param2;
}

这是本地函数调用代码,调用方法和被调用的方法都在一个程序内部,是属于进程内的调用。CPU在执行invoke方法的时候(称为调用方法),会去执行被调用的appendStr这个方法(称为被调用方法),执行完成后,切换回来继续执行后续的代码。对于调用方法而言,执行被调用方法时会阻塞,直到被调用方法执行完毕。画一个简单的过程调用图如下:

接下来看一下RPC调用(Thrift)的栗子🌰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void invoke() throws TTransportException {
TCLIService.Client client = getClient("127.0.0.1", 8088);
String param1 = "hello";
String param2 = "world";
String result = client.appendStr(param1, param2);
System.out.println("result:" + result);
}

// get thrift client
public static TCLIService.Client getClient(String host,int port) throws TTransportException {
// host port timeout
TSocket tSocket = new TSocket(host, port,10000);
TTransport transport = new TFramedTransport(tSocket);
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
TCLIService.Client client = new TCLIService.Client(protocol);
return client;
}

这是一个进程间的调用,调用方法和被调用方法不在一个进程,甚至不是相同的服务,或者不同的服务器。进程之间的调用需要通过网络来传输数据,调用方法在执行RPC调用时会阻塞知道调用结果返回结果菜继续执行后续代码。过程调用图如下:

总结:RPC是一种通过网络从远程计算机程序上请求服务的方法。

延伸:Apache Thrift在JDBC的应用

一般来说,JDBC连接数据源开发的步骤:加载驱动类->创建数据库连接->创建statement->执行SQL语句->处理结果。前段时间研究的JDBC连接Hive数据源,底层也是采用Thrift(源码在:org.apache.hive.jdbc)。前面初始化的连接会得到TCLIService.Iface clientTSessionHandle sessionHandle。具体的执行execute(String sql)代码如下,我分为三步:检查是否满足执行SQL的条件,初始化查询条件并异步执行SQL,等待执行得到结果。

检查条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public boolean execute(String sql) throws SQLException {
// statement是否被关闭,如果关闭报SQLException:Can't execute after statement has been closed
checkConnection("execute");
// 如果stmtHandle不为空,则关闭stmtHandle
closeClientOperation();
/**
* 初始话标志状态
* 如是否已经取消执行(isCancelled)=false,sql语句查询是否关闭(isQueryClosed)=false
* 是否生成查询日志(isLogBeingGenerated)=true,sql是否已经提交执行(isExecuteStatementFailed)=false
*/
initFlags();

...

}

初始化查询条件并执行SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public boolean execute(String sql) throws SQLException {
...

// 初始话 statement request,可以维持sessionhandle,添加sql执行
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);

// 设置异步执行
execReq.setRunAsync(true);
execReq.setConfOverlay(sessConf);
//获取reentrant lock,同一个时间点只能被一个线程锁持有,确保提交执行唯一
transportLock.lock();
try {
// client执行SQL
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
// 验证是否成功提交
Utils.verifySuccessWithInfo(execResp.getStatus());
// execResp 获取OperationHandle
stmtHandle = execResp.getOperationHandle();
// 正常执行->改变sql是否已经提交执行的状态为false
isExecuteStatementFailed = false;
} catch (SQLException eS) {
// 异常执行:改变sql是否已经提交执行的状态为true
isExecuteStatementFailed = true;
throw eS;
} catch (Exception ex) {
isExecuteStatementFailed = true;
throw new SQLException(ex.toString(), "08S01", ex);
} finally {
// 异常退出:释放锁区域代码
transportLock.unlock();
}
...
}

等待执行得到结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Override
public boolean execute(String sql) throws SQLException {
...

// 正常执行 拿到状态handle
TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
// 操作是否完成的状态,初始化状态false
boolean operationComplete = false;
TGetOperationStatusResp statusResp;

// while循环,通过操作是否完成的状态,手动维护一个阻塞队列,如果执行完成则退出
while (!operationComplete) {
try {
// 获取reentrant lock,确保执行过程状态
transportLock.lock();
try {
// 拿到执行状态
statusResp = client.GetOperationStatus(statusReq);
} finally {
// 释放锁
transportLock.unlock();
}

Utils.verifySuccessWithInfo(statusResp.getStatus());
// 获取状态,做出相应的回应
if (statusResp.isSetOperationState()) {
switch (statusResp.getOperationState()) {
case CLOSED_STATE:
// 执行完成,改变状态,退出while循环
case FINISHED_STATE:
operationComplete = true;
break;
// 执行取消
case CANCELED_STATE:
// 01000 -> warning
throw new SQLException("Query was cancelled", "01000");
// 错误状态
case ERROR_STATE:
// Get the error details from the underlying exception
throw new SQLException(statusResp.getErrorMessage(),
statusResp.getSqlState(), statusResp.getErrorCode());
// 未知错误状态
case UKNOWN_STATE:
throw new SQLException("Unknown query", "HY000");
// 正在初始化,正在pending,正在运行(退出switch循环),继续while循环
case INITIALIZED_STATE:
case PENDING_STATE:
case RUNNING_STATE:
break;
}
}
} catch (SQLException e) {
// 执行过程中异常,获取执行日志false
isLogBeingGenerated = false;
throw e;
} catch (Exception e) {
// 执行过程中异常,获取执行日志false
isLogBeingGenerated = false;
throw new SQLException(e.toString(), "08S01", e);
}
}
// 执行完成,获取执行日志false
isLogBeingGenerated = false;

// 流程到这里,说明已经执行完成,可以拿到结果,没有结果返回false
if (!stmtHandle.isHasResultSet()) {
return false;
}
// 有结果,获取结果,并封装成ResultSet全局变量
resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
.setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
.setScrollable(isScrollableResultset).setTransportLock(transportLock)
.build();
// 有结果,返回ture
return true;
}

以上就是JDBC连接Hive数据源,并执行SQL的代码。但是,如果想要一个会话窗口维持一个Session设置,如设置执行队列,设置Job Name等,采用JDBC方式是不可取的。所以我们可以将Thrift底层从JDBC剥离出来,把初始化过程中的TSessionHandle sessionHandle暴露,自己手动维护起来就可以了。

完。