Source code for util.evaluation.metrics.apk

# Utils
import datetime
import logging
import time
import types
import numpy as np

[docs]def apk(query, predicted, k='full'): """ Computes the average precision@k. Parameters ---------- query : int Query label. predicted : List(int) Ordered list where each element is a label. k : str or int If int, cutoff for retrieval is set to K If str, 'full' means cutoff is til the end of predicted 'auto' means cutoff is set to number of relevant queries. Example: query = 0 predicted = [0, 0, 1, 1, 0] if k == 'full', then k is set to 5 if k == 'auto', then k is set to num of 'query' values in 'predicted', i.e., k=3 as there as 3 of them in 'predicted' Returns ------- float Average Precision@k """ assert (len(predicted) > 0) # Count the number of relevant items that could be retrieved num_hits = np.sum(predicted == query) if num_hits == 0: return 0 # Resolve k in case is not a number if k == 'auto': k = num_hits elif k == 'full': k = len(predicted) else: assert isinstance(k, int) assert (k > 0) and (k <= len(predicted)) # Truncate the list to the number of desired elements which gets taken into account predicted = np.array(predicted[:k]) # Non-vectorized version. # score = 0.0 # The score is the precision@i integrated over i=1:k # num_hits = 0.0 # # for i, p in enumerate(predicted): # if p == query: # num_hits += 1.0 # score += num_hits / (i + 1.0) # # return score / k_or_num_hits # Make an empty array for relevant queries. relevant = np.zeros(len(predicted)) # Find all locations where the predicted value matches the query and vice-versa. hit_locs = (predicted == query) non_hit_locs = np.logical_not(hit_locs) # Set all `hit_locs` to be 1. [0,0,0,0,0,0] -> [0,1,0,1,0,1] relevant[hit_locs] = 1 # Compute the sum of all elements till the particular element. [0,1,0,1,0,1] -> [0,1,1,2,2,3] relevant = np.cumsum(relevant) # Set all `non_hit_locs` to be zero. [0,1,1,2,2,3] -> [0,1,0,2,0,3] relevant[non_hit_locs] = 0 # Divide element-wise by [0/1,1/2,0/3,2/4,0/5,3/6] and sum the array. score = np.sum(np.divide(relevant, np.arange(1, relevant.shape[0] + 1))) return score / min(k, num_hits)
[docs]def mapk(query, predicted, k=None, workers=1): """Compute the mean Average Precision@K. Parameters ---------- query : list List of queries. predicted : list of list, or generator to list of lists Predicted responses for each query. Supports chunking with slices in the first dimension. k : str or int If int, cutoff for retrieval is set to `k` If str, 'full' means cutoff is til the end of predicted 'auto' means cutoff is set to number of relevant queries. For e.g., `query` = 0 `predicted` = [0, 0, 1, 1, 0] if `k` == 'full', then `k` is set to 5 if `k` == 'auto', then `k` is set to num of `query` values in `predicted`, i.e., `k`=3 as there as 3 of them in `predicted`. workers : int Number of parallel workers used to compute the AP@k Returns ------- float The mean average precision@K. dict{label, float} The per class mean averages precision @k """ # If distances come from pairwise_distances_chunked they must be flattened # since apk operates on a per-row basis. if type(predicted) is types.GeneratorType: predicted = [row for nested in predicted for row in nested] results = np.array([apk(q, p, k) for q, p in zip(query, predicted)]) per_class_mapk = {str(l): np.mean(np.array(results)[np.where(query == l)[0]]) for l in np.unique(query)} return np.mean(results), per_class_mapk
# The overhead of the pool is killing any possible speedup. # In order to make this parallel (if ever needed) one should create a Process class which swallows # 1/`workers` part of `vals`, such that only `workers` threads are created. # if workers == 1: # return np.mean([_apk(q, p, k) for q, p in zip(query, predicted)]) # with Pool(workers) as pool: # vals = [[q, p, k] for q, p in zip(query, predicted)] # aps = pool.starmap(_apk, vals) # return np.mean(aps)
[docs]def compute_mapk(distances, labels, k, workers=None): """Convenience function to convert a grid of pairwise distances to predicted elements, to evaluate mean average precision (at K). Parameters ---------- distances : ndarray A numpy array containing pairwise distances between all elements labels : list Ground truth labels for every element k : int Maximum number of predicted elements Returns ------- float The mean average precision@K. dict{label, float} The per class mean averages precision @k """ def chunked_sorting(distances): '''Sorts a _chunked_ pairwise distance matrix. Parameters ---------- distances : generator of ndarray A generator yielding numpy arrays containing pairwise distances between a subset of all elements. Suitable for combination with sklearn.metrics.pairwise_distances_chunked which slices the matrix along the first dimenstion (i.e. one can iterate over entire rows easily). Returns ------- A generator of sorted chunks of the input matrix. ''' for i, chunk in enumerate(distances): # Fetch the index of the lowest `max_count` (k) elements if k != 'full': ind = np.argpartition(chunk, max_count - 1)[:, :max_count] # Find the sorting sequence according to the shortest chunk selected from `ind` ssd = np.argsort(np.array(chunk)[np.arange(chunk.shape[0])[:, None], ind], axis=1) # Consequently sort `ind` ind = ind[np.arange(ind.shape[0])[:, None], ssd] # Now `ind` contains the sorted indexes of the lowest `max_count` (k) elements else: # If we're in full mode, just to the sorting directly ind = np.argsort(chunk) # Resolve the labels of the elements referred by `ind` # sorted_predictions = [list(labels[row][1:]) for row in ind] sorted_predictions = np.empty(shape=(chunk.shape[0], chunk.shape[1]-1), for i, row in enumerate(ind): sorted_predictions[i, :] = labels[row][1:] yield sorted_predictions # In case of non-chunked input we wrap to ensure uniform treatment if type(distances) is not types.GeneratorType: distances = [distances] # Resolve k k = k if k == 'auto' or k == 'full' else int(k) # Reduce the size of distances that would anyway not be used afterwards. This makes sorting them faster. max_count = k if k == 'full': max_count = len(labels) if k == 'auto': # Take the highest frequency in the labels i.e. the highest possible 'auto' value for all entries max_count = np.max(np.unique(labels, return_counts=True)[1]) # Do lazy sorting of the distance matrix sorted_predictions_chunked = chunked_sorting(distances) if workers is None: workers = 16 if k == 'auto' or k == 'full' else 1 return mapk(labels, sorted_predictions_chunked, k, workers)