hiveserver2 python client

一个hiveserver2 python客户端的例子,大部分代码来自于hue。

忽略了一些必要的判断,只是做一个简单的例子。
需要安装thrift以及把hive-0.10.0-cdh4.3.0/src/service/src/gen/thrift/gen-py目录拷贝到项目目录中。
thrift文件在hive-0.10.0-cdh4.3.0/src/service/if/TCLIService.thrift

默认需要通过sasl客户端,前端的表现是连接了没反应,后端hiveserver2有错误日志

java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)

如果不用sasl客户端,可以设置参数(参考stackoverflow

<property><name>hive.server2.authentication</name><value>NOSASL</value></property>
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import sys
import os
 
 
cur_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(cur_dir, "gen-py"))
 
from TCLIService import TCLIService
from TCLIService.ttypes import TOpenSessionReq, TGetTablesReq, TFetchResultsReq,\
  TStatusCode, TGetResultSetMetadataReq, TGetColumnsReq, TType,\
  TExecuteStatementReq, TGetOperationStatusReq, TFetchOrientation,\
  TCloseSessionReq, TGetSchemasReq, TGetLogReq, TCancelOperationReq
 
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
 
class HiveServerTColumnValue:
  def __init__(self, tcolumn_value):
    self.column_value = tcolumn_value
 
  @property
  def val(self):
    # TODO get index from schema
    if self.column_value.boolVal is not None:
      return self.column_value.boolVal.value
    elif self.column_value.byteVal is not None:
      return self.column_value.byteVal.value
    elif self.column_value.i16Val is not None:
      return self.column_value.i16Val.value
    elif self.column_value.i32Val is not None:
      return self.column_value.i32Val.value
    elif self.column_value.i64Val is not None:
      return self.column_value.i64Val.value
    elif self.column_value.doubleVal is not None:
      return self.column_value.doubleVal.value
    elif self.column_value.stringVal is not None:
      return self.column_value.stringVal.value
 
class HiveServerClient(object):
    user = 'fatkun'
    session_handle = None
 
    def connect(self):
        transport = TSocket.TSocket('localhost', 10000)
        transport = TTransport.TBufferedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(transport)
        client = TCLIService.Client(protocol)
        transport.open()
        self._client = client
 
    def open_session(self, username):
        req = TOpenSessionReq(username=username, configuration={})
        res = self._client.OpenSession(req)
        session_handle = res.sessionHandle
        print res
        return session_handle
 
    def call(self, fn, req, status=TStatusCode.SUCCESS_STATUS):
        if self.session_handle is None:
            self.session_handle = self.open_session(self.user)
 
        if hasattr(req, 'sessionHandle') and req.sessionHandle is None:
            req.sessionHandle = self.session_handle
 
        res = fn(req)
        return res
 
    def execute_statement(self, statement, max_rows=100):
        req = TExecuteStatementReq(statement=statement, confOverlay={})
        res = self.call(self._client.ExecuteStatement, req)
 
        return self.fetch_result(res.operationHandle, max_rows=max_rows)
 
 
    def fetch_result(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT, max_rows=100):
        fetch_req = TFetchResultsReq(operationHandle=operation_handle, orientation=orientation, maxRows=max_rows)
        res = self.call(self._client.FetchResults, fetch_req)
 
        if operation_handle.hasResultSet:
          meta_req = TGetResultSetMetadataReq(operationHandle=operation_handle)
          schema = self.call(self._client.GetResultSetMetadata, meta_req)
        else:
          schema = None
 
        return res, schema
 
def main():
 
    client = HiveServerClient()
    client.connect()
    client.execute_statement(statement='SET hive.server2.blocking.query=true')
 
    statement = 'select name from test'
    res, schema = client.execute_statement(statement)
    for row in res.results.rows:
        for column in row.colVals:
            print HiveServerTColumnValue(column).val
 
 
if __name__ == '__main__':
	main()



fatkun

没有评论


You can leave the first : )



发表评论

电子邮件地址不会被公开。