import pickle
from typing import List
import gc
from ..base import BaseDocIndexer as BDI
from ...proto import gnes_pb2
[docs]class RocksDBIndexer(BDI):
def __init__(self, data_path: str,
drop_raw_data: bool = False,
drop_chunk_blob: bool = False,
read_only: bool = False,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.data_path = data_path
self.drop_raw_data = drop_raw_data
self.drop_chunk_blob = drop_chunk_blob
self.read_only = read_only
self.kwargs = kwargs
[docs] def post_init(self):
import rocksdb
opts = rocksdb.Options()
opts.create_if_missing = True
opts.max_open_files = 300000
opts.write_buffer_size = 67108864
opts.max_write_buffer_number = 3
opts.target_file_size_base = 67108864
opts.table_factory = rocksdb.BlockBasedTableFactory(
filter_policy=rocksdb.BloomFilterPolicy(10),
block_cache=rocksdb.LRUCache(2 * (1024 ** 3)),
block_cache_compressed=rocksdb.LRUCache(500 * (1024 ** 2)))
for key, value in self.kwargs.items():
setattr(opts, key, value)
self._db = rocksdb.DB(self.data_path, opts, read_only=self.read_only)
[docs] @BDI.update_counter
def add(self, keys: List[int], docs: List['gnes_pb2.Document'], *args, **kwargs):
import rocksdb
write_batch = rocksdb.WriteBatch()
for k, d in zip(keys, docs):
key_bytes = pickle.dumps(k)
if self.drop_raw_data:
d.ClearField('raw_data')
if self.drop_chunk_blob:
for c in d.chunks:
c.ClearField('blob')
value_bytes = d.SerializeToString()
write_batch.put(key_bytes, value_bytes)
self._db.write(write_batch, sync=True)
[docs] def query(self, keys: List[int], *args, **kwargs) -> List['gnes_pb2.Document']:
query_keys = []
for k in keys:
key_value = pickle.dumps(k)
query_keys.append(key_value)
values = self._db.multi_get(query_keys)
docs = []
for k in query_keys:
v = values[k]
if v is not None:
_doc = gnes_pb2.Document()
_doc.ParseFromString(v)
docs.append(_doc)
else:
docs.append(None)
return docs
[docs] def scan(self, reversed_scan: bool = False):
iterator = self._db.iterkeys()
if reversed_scan:
iterator.seek_to_last()
else:
iterator.seek_to_first()
if reversed_scan:
iterator = reversed(iterator)
for key_bytes in iterator:
doc_id = pickle.loads(key_bytes)
value_bytes = self._db.get(key_bytes)
pb_doc = gnes_pb2.Document()
pb_doc.ParseFromString(value_bytes)
yield doc_id, pb_doc
[docs] def close(self):
super().close()
try:
del self._db
except AttributeError:
pass
gc.collect()