原本是写一篇Apache Thrift in HiveServer,改写JDBC连接Hive相关应用的推文,因为HiveServer是使用Thrift提供服务创建网络RPC的多种语言客户端;单独拿出来说,使用Thrift也可以轻松构建RPC服务器,是轻量级的跨语言的远程服务调用框架。说到远程过程调用,感觉又要解释很多,所以就先上个前菜,说一说远程过程调用(RPC);并加了一份佐料:关于JDBC连接Hive的实现。
远程过程调用(RPC) 照例搬了wiki的解释:
远程过程调用是分布式计算的客户端-服务器(Client/Server)的例子,它简单而又广受欢迎。远程过程调用总是由客户端对服务器发出一个执行若干过程请求,并用客户端提供的参数。执行结果将返回给客户端。
为了允许不同的客户端均能访问服务器,许多标准化的RPC框架应运而生。其中大部分采用接口描述语言(Interface Description Language,IDL),方便跨平台的远程过程调用。
举一个栗子:
1 2 3 4 5 6 7 8 9 10 public void invoke () { String param1 = "hello" ; String param2 = "world" ; String result = appendStr(param1, param2); System.out.println("result:" + result); } public String appendStr (String param1,String param2) { return param1+" " +param2; }
这是本地函数调用代码,调用方法和被调用的方法都在一个程序内部,是属于进程内的调用。CPU在执行invoke
方法的时候(称为调用方法
),会去执行被调用的appendStr
这个方法(称为被调用方法
),执行完成后,切换回来继续执行后续的代码。对于调用方法而言,执行被调用方法时会阻塞,直到被调用方法执行完毕。画一个简单的过程调用图如下:
接下来看一下RPC调用(Thrift)的栗子🌰:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void invoke () throws TTransportException { TCLIService.Client client = getClient("127.0.0.1" , 8088 ); String param1 = "hello" ; String param2 = "world" ; String result = client.appendStr(param1, param2); System.out.println("result:" + result); } public static TCLIService.Client getClient (String host,int port) throws TTransportException { TSocket tSocket = new TSocket(host, port,10000 ); TTransport transport = new TFramedTransport(tSocket); transport.open(); TProtocol protocol = new TBinaryProtocol(transport); TCLIService.Client client = new TCLIService.Client(protocol); return client; }
这是一个进程间的调用,调用方法和被调用方法不在一个进程,甚至不是相同的服务,或者不同的服务器。进程之间的调用需要通过网络来传输数据,调用方法在执行RPC调用时会阻塞知道调用结果返回结果菜继续执行后续代码。过程调用图如下:
总结:RPC是一种通过网络从远程计算机程序上请求服务的方法。
延伸:Apache Thrift在JDBC的应用 一般来说,JDBC连接数据源开发的步骤:加载驱动类->创建数据库连接->创建statement->执行SQL语句->处理结果。前段时间研究的JDBC连接Hive数据源,底层也是采用Thrift(源码在:org.apache.hive.jdbc)。前面初始化的连接会得到TCLIService.Iface client
和 TSessionHandle sessionHandle
。具体的执行execute(String sql)
代码如下,我分为三步:检查是否满足执行SQL的条件,初始化查询条件并异步执行SQL,等待执行得到结果。
检查条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public boolean execute (String sql) throws SQLException { checkConnection("execute" ); closeClientOperation(); initFlags(); ... }
初始化查询条件并执行SQL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public boolean execute (String sql) throws SQLException { ... TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); execReq.setRunAsync(true ); execReq.setConfOverlay(sessConf); transportLock.lock(); try { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); isExecuteStatementFailed = false ; } catch (SQLException eS) { isExecuteStatementFailed = true ; throw eS; } catch (Exception ex) { isExecuteStatementFailed = true ; throw new SQLException(ex.toString(), "08S01" , ex); } finally { transportLock.unlock(); } ... }
等待执行得到结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 @Override public boolean execute (String sql) throws SQLException { ... TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); boolean operationComplete = false ; TGetOperationStatusResp statusResp; while (!operationComplete) { try { transportLock.lock(); try { statusResp = client.GetOperationStatus(statusReq); } finally { transportLock.unlock(); } Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { case CLOSED_STATE: case FINISHED_STATE: operationComplete = true ; break ; case CANCELED_STATE: throw new SQLException("Query was cancelled" , "01000" ); case ERROR_STATE: throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), statusResp.getErrorCode()); case UKNOWN_STATE: throw new SQLException("Unknown query" , "HY000" ); case INITIALIZED_STATE: case PENDING_STATE: case RUNNING_STATE: break ; } } } catch (SQLException e) { isLogBeingGenerated = false ; throw e; } catch (Exception e) { isLogBeingGenerated = false ; throw new SQLException(e.toString(), "08S01" , e); } } isLogBeingGenerated = false ; if (!stmtHandle.isHasResultSet()) { return false ; } resultSet = new HiveQueryResultSet.Builder(this ).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) .setScrollable(isScrollableResultset).setTransportLock(transportLock) .build(); return true ; }
以上就是JDBC连接Hive数据源,并执行SQL的代码。但是,如果想要一个会话窗口维持一个Session设置,如设置执行队列,设置Job Name等,采用JDBC方式是不可取的。所以我们可以将Thrift底层从JDBC剥离出来,把初始化过程中的TSessionHandle sessionHandle
暴露,自己手动维护起来就可以了。
完。