Skip to content

Support for batch hase data acquisition #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ Get a row by key:
row = table.get('00001')
print(row)
exit()
Get multi rows by keys:

.. code-block:: python

import hbase

zk = 'sis3.ustcdm.org:2181,sis4.ustcdm.org:2181'

if __name__ == '__main__':
with hbase.ConnectionPool(zk).connect() as conn:
table = conn['mytest']['videos']
row_kv = table.mget(['00001','00002', '00002'])
print(row_kv)
exit()


Scan a table:

Expand Down
126 changes: 125 additions & 1 deletion hbase/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import collections
import time

from concurrent import futures
from concurrent.futures.thread import ThreadPoolExecutor
from . import filters
from . import region as _region
from .. import protobuf
from .. import services
from ..exceptions import *
from ..conf import thread_pool_size, fail_task_retry

DEFAULT_FAMILY = 'cf'

Expand Down Expand Up @@ -144,6 +146,8 @@ def __init__(self, zkquorum, zk_master_path=None, zk_region_path=None):

self._master_service = services.MasterService(zkquorum, zk_master_path)
self._region_manager = _region.RegionManager(zkquorum, zk_region_path)
self.fail_retrys = fail_task_retry
self.pool = ThreadPoolExecutor(max_workers=thread_pool_size)

def __enter__(self):
return self
Expand All @@ -162,6 +166,8 @@ def close(self):
self._region_manager.close()
self._region_manager = None

self.pool.shutdown()

def namespaces(self):
"""List all namespaces.

Expand Down Expand Up @@ -723,6 +729,124 @@ def _wait_for_proc(self, proc_id, sleep):
else:
break

def mget(self, table, keys, columns=None, filter_=None):
"""Query to get a row object with multiple row keys.

Args:
table (str): Table name.
key (tuple[str]|list[str]): Multi row keys.
columns (tuple[str]|list[str]): Columns to fetch.
filter_ (filters.Filter): Filter object.

Returns:
Row: The list row object.
None: The row does not exist.

Raises:
RegionError
RequestError

TransportError
ZookeeperProtocolError
ServiceProtocolError
NoSuchZookeeperNodeError

"""
pb_reqs = {}
for key in keys:
region = self._region_manager.get_region(table, key)
region_service = self._region_manager.get_service(region)
print(f'{region_service.host}:{region_service.port}')
pb_req = protobuf.GetRequest()

pb_req.region.type = 1
pb_req.region.value = region.name.encode()

pb_get = pb_req.get
pb_get.row = key.encode()

if columns is not None:
qualifier_dict = collections.defaultdict(list)
for column in columns:
try:
family, qualifier = column.split(':')
except ValueError or AttributeError:
raise RequestError(
'Invalid column name. {family}:{qualifier} expected, got %s.' % column
)
qualifier_dict[family.encode()].append(qualifier.encode())
for family, qualifiers in qualifier_dict.items():
pb_column = pb_get.column.add()
pb_column.family = family
pb_column.qualifier.extend(qualifiers)

if filter_ is not None:
pb_filter = pb_get.filter
pb_filter.name = filter_.name
pb_filter.serialized_filter = filter_.serialize()
pb_reqs.setdefault(key, (region_service, pb_req))

def _asyncrun(pb_reqs):
results = {}
fails = []

def __request(region_service, pb_req):
try:
pb_resp = region_service.request(pb_req)
except RegionError:
raise Exception(f"request {region_service} failure.")

return self._cells_to_row(pb_resp.result.cell)

tasks = {self.pool.submit(__request, pb_reqs.get(key)[0], pb_reqs.get(key)[1]): key for key in pb_reqs}

for f in futures.as_completed(tasks):
key = tasks[f]
try:
results.setdefault(key, f.result())
except Exception as err:
# print(str(err))
fails.append(key)
return results, fails

results, fails = _asyncrun(pb_reqs)

count = 0
# Retry fails
while count < self.fail_retrys and len(fails) != 0:
new_pb_reqs = {}
for k in pb_reqs:
if k in fails:
new_pb_reqs[k] = pb_reqs[k]
r, fails = _asyncrun(new_pb_reqs)
# Merge re-fetch data from rpc request.
results.update(r)
time.sleep(3)

# message GetResponse {
# optional Result result = 1;
# }
# region_service, pb_req, key, table, region
# try:
# pb_resp = region_service.request(pb_req)
# except RegionError:
# while True:
# time.sleep(3)
# # print('DEBUG: put() RegionError')
# # print(repr(region))
# # refresh the region information and retry the operation
# region = self._region_manager.get_region(table, key, use_cache=False)
# region_service = self._region_manager.get_service(region)
# pb_req.region.value = region.name.encode()
# # if the new region still doesn't work, it is a fatal error
# # print(repr(region))
# try:
# pb_resp = region_service.request(pb_req)
# break
# except RegionError:
# continue
return results

def get(self,
table,
key,
Expand Down
16 changes: 10 additions & 6 deletions hbase/client/region.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,16 @@ def _remove_from_cache(self, region_or_meta_key):
pass

def _region_lookup(self, meta_key):
#Fix from https://github.com/3601314/hbase-python/issues/3
column = protobuf.Column()
column.family = b'info'
req = protobuf.GetRequest()
req.get.row = meta_key.encode()
req.get.column.extend([column])
req.get.closest_row_before = True
req = protobuf.ScanRequest()
req.scan.column.extend([column])
req.scan.start_row = meta_key.encode()
req.scan.reversed = True
req.region.type = 1
req.region.value = b'hbase:meta,,1'

req.number_of_rows = 1
try:
resp = self._meta_service.request(req)
except exceptions.RegionError:
Expand All @@ -264,7 +265,10 @@ def _region_lookup(self, meta_key):
break
except exceptions.RegionError:
continue
cells = resp.result.cell
cells = []
for result in resp.results:
cells = result.cell
break
if len(cells) == 0:
return None

Expand Down
2 changes: 2 additions & 0 deletions hbase/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
num_threads_per_conn = 5
num_tasks_per_conn = 100

thread_pool_size = 10
fail_task_retry = 3

# Modify This Configuration on use zk path
class Conf:
Expand Down