import numpy as np
from itertools import product

from ..base import BaseBinaryEncoder
from ...helper import batching

[docs]class QuantizerEncoder(BaseBinaryEncoder): batch_size = 2048 def __init__(self, dim_per_byte: int, cluster_per_byte: int = 255, upper_bound: int = 10000, lower_bound: int = -10000, partition_method: str = 'average', *args, **kwargs): super().__init__(*args, **kwargs) assert 1 < cluster_per_byte <= 255, 'cluster number should >1 and <= 255 (0 is reserved for NOP)' self.dim_per_byte = dim_per_byte self.num_clusters = cluster_per_byte self.upper_bound = upper_bound self.lower_bound = lower_bound self.partition_method = partition_method self.centroids = self._get_centroids() def _get_centroids(self): """ calculate centroids for quantizer two kinds of divide methods are supported now: average, random average: split the space averagely and centroids of clusters lie on the corner of sub-space random: randomly pick points and treat them as centroids of clusters Variable Explaination: num_sample_per_dim: number of points to be sample on each dimension """ if self.upper_bound < self.lower_bound: raise ValueError("upper bound is smaller than lower bound") centroids = [] num_sample_per_dim = np.ceil(pow(self.num_clusters, 1 / self.dim_per_byte)).astype(np.uint8) if self.partition_method == 'average': axis_point = np.linspace(self.lower_bound, self.upper_bound, num=num_sample_per_dim + 1, endpoint=False, retstep=False, dtype=None)[1:] coordinates = np.tile(axis_point, (self.dim_per_byte, 1)) elif self.partition_method == 'random': coordinates = np.random.uniform(self.lower_bound, self.upper_bound, size=[self.dim_per_byte, num_sample_per_dim]) else: raise NotImplementedError for item in product(*coordinates): centroids.append(list(item)) return centroids[:self.num_clusters]
[docs] @batching def encode(self, vecs: np.ndarray, *args, **kwargs) -> np.ndarray: self._check_bound(vecs) num_bytes = self._get_num_bytes(vecs) x = np.reshape(vecs, [vecs.shape[0], num_bytes, 1, self.dim_per_byte]) x = np.sum(np.square(x - self.centroids), -1) # start from 1 x = np.argmax(-x, 2) + 1 return np.array(x, dtype=np.uint8)
def _get_num_bytes(self, vecs: np.ndarray): num_dim = vecs.shape[1] assert num_dim % self.dim_per_byte == 0 and num_dim >= (num_dim % self.dim_per_byte), \ 'input dimension (=%d) should be divided by dim_per_byte (=%d)!' % ( num_dim, self.dim_per_byte) return int(num_dim / self.dim_per_byte) @staticmethod def _get_max_min_value(vecs): return np.amax(vecs, axis=None), np.amin(vecs, axis=None) def _check_bound(self, vecs): max_value, min_value = self._get_max_min_value(vecs) if self.upper_bound < max_value: raise Warning("upper bound (=%.3f) is smaller than max value of input data (=%.3f), you should choose" "a bigger value for upper bound" % (self.upper_bound, max_value)) if self.lower_bound > min_value: raise Warning("lower bound (=%.3f) is bigger than min value of input data (=%.3f), you should choose" "a smaller value for lower bound" % (self.lower_bound, min_value)) if (self.upper_bound - self.lower_bound) >= 10 * (max_value - min_value): raise Warning("(upper bound - lower_bound) (=%.3f) is 10 times larger than (max value - min value) " "(=%.3f) of data, maybe you should choose a suitable bound" % ((self.upper_bound - self.lower_bound), (max_value - min_value)))