Source code for template.runner.process_activation.activation

import logging
import os
import json
import uuid
from torchvision.utils import save_image

import numpy as np
from torch.autograd import Variable
from collections import OrderedDict
from datetime import datetime
from copy import deepcopy
from tqdm import tqdm


[docs]class Activation: def __init__(self, log_folder, model_name, dataset, process_size, save_cover, no_cuda): """ Actication class Parameters ---------- log_folder : string the DeepDIVA common log_folder path. model_name : string Name of the model. dataset : torch.nn.dataset Dataset prepared by DeepDIVA. process_size : int Number of item (picture of the dataset) processed. save_cover : boolean Save or not cover for classes and item processed. no_cuda : bool Specify whether to use the GPU or not. """ self.log_folder = os.path.realpath(os.path.join(log_folder, 'activations')) self.cover_folder = os.path.join(self.log_folder, 'cover') self.data_folder = os.path.join(self.log_folder, 'data') self.model_name = model_name self.dataset = dataset self.sample_image = None self.process_size = process_size self.save_cover = save_cover self.no_cuda = no_cuda self.store = OrderedDict()
[docs] def init(self, model): """ This method initialize internal global according to model passed. Storage and create custom folders on the disk. Parameters ---------- model : Torch.nn.model PyTorch model initialized. Returns ------- None """ logging.info('Creating activation directories') os.mkdir(self.log_folder) os.mkdir(self.data_folder) if self.save_cover: os.mkdir(self.cover_folder) logging.info('Init manifest structure') # Get sample dataset image self.sample_image = next(enumerate(self.dataset))[1][0] if not self.no_cuda: self.sample_image = Variable(self.sample_image.cuda()) # Extract model's shape shape = Activation._capture_activations(model, self.sample_image, self.no_cuda, False) # Init value in global store self.store['datetime'] = datetime.now().strftime("%A, %d. %B %Y %I:%M%p") self.store['version'] = 2 self.store['model'] = OrderedDict() self.store['model']['name'] = self.model_name self.store['model']['layers'] = shape self.store['items'] = OrderedDict() self.store['epochs'] = OrderedDict()
[docs] def resolve_items(self): """ This method prepare all the items to process, prepare and save cover for items (if needed), create the model's shape internally. Parameters ---------- None Returns ------- None """ logging.info('Resolving process items and class') single = OrderedDict() classe = OrderedDict() general = OrderedDict() general[0] = OrderedDict() general[0]['key'] = uuid.uuid4().hex general[0]['class'] = -1 general[0]['size'] = 0 if self.save_cover: pass # default icon ? how to ? for i, (image, label) in enumerate(self.dataset): # break policy if i >= self.process_size: break item_index = str(i) image = image[0] label = int(Variable(label)[0]) single[item_index] = OrderedDict() single[item_index]['key'] = uuid.uuid4().hex single[item_index]['class'] = label single[item_index]['size'] = 1 if self.save_cover: cover_name = uuid.uuid4().hex + '.jpg' single[item_index]['cover'] = cover_name save_image(image, os.path.join(self.cover_folder, cover_name)) if not label in classe: classe[label] = OrderedDict() classe[label]['key'] = uuid.uuid4().hex classe[label]['class'] = label classe[label]['size'] = 1 if self.save_cover: cover_name = uuid.uuid4().hex + '.jpg' classe[label]['cover'] = cover_name save_image(image, os.path.join(self.cover_folder, cover_name)) else: classe[label]['size'] += 1 general[0]['size'] += 1 self.store['items']['single'] = single self.store['items']['class'] = classe self.store['items']['general'] = general self._save()
[docs] def add_epoch(self, epoch_number, epoch_accuracy, model): """ This method collect, compute and save all activation data (and mean activation data) from a given epoch Parameters ---------- epoch_number : int Epoch number of the processing. epoch_accuracy : int Epoch accuracy retrived by the last training. model : Torch.nn.model PyTorch model trained. Returns ------- None """ logging.info('Processing images for epoch {}'.format(epoch_number)) # Create epoch folder epoch_name = 'epoch' + str(epoch_number) epoch_folder = os.path.join(self.data_folder, epoch_name) os.mkdir(epoch_folder) # Create epoch entry in manifest self.store['epochs'][epoch_number] = OrderedDict() self.store['epochs'][epoch_number]['number'] = epoch_number self.store['epochs'][epoch_number]['accuracy'] = epoch_accuracy self.store['epochs'][epoch_number]['folder'] = os.path.join('/', epoch_name) self.store['epochs'][epoch_number]['datetime'] = datetime.now().strftime("%A, %d. %B %Y %I:%M%p") # Collect activations activations = self._process(model) # Prepare class/general mean classes = OrderedDict() general = OrderedDict() for i in activations: item_info = self._get_item_info('single', i) index = item_info['class'] # Add in classes array if not index in classes: classes[index] = OrderedDict() classes[index] = deepcopy(activations[i]) else: for lkey, lval in classes[index]['layers'].items(): if 'filters' in lval: for fkey, fval in lval['filters'].items(): classes[index]['layers'][lkey]['filters'][fkey] = ( fval + activations[i]['layers'][lkey]['filters'][fkey] ) # Add in general array if not 0 in general: general[0] = OrderedDict() general[0] = deepcopy(activations[i]) else: for lkey, lval in general[0]['layers'].items(): if 'filters' in lval: for fkey, fval in lval['filters'].items(): general[0]['layers'][lkey]['filters'][fkey] = ( fval + activations[i]['layers'][lkey]['filters'][fkey] ) for ckey, cval in classes.items(): item_info = self._get_item_info('class', ckey) for lkey, lval in cval['layers'].items(): if 'filters' in lval: for fkey, fval in lval['filters'].items(): classes[ckey]['layers'][lkey]['filters'][fkey] = ( classes[ckey]['layers'][lkey]['filters'][fkey] / item_info['size'] ) for gkey, gval in general.items(): item_info = self._get_item_info('general', gkey) for lkey, lval in gval['layers'].items(): if 'filters' in lval: for fkey, fval in lval['filters'].items(): general[gkey]['layers'][lkey]['filters'][fkey] = ( general[gkey]['layers'][lkey]['filters'][fkey] / item_info['size'] ) for i in activations: item_info = self._get_item_info('single', i) with open(os.path.join(epoch_folder, item_info['key'] + '.json'), 'w') as out: json.dump(activations[i], out, indent=2) for i in classes: item_info = self._get_item_info('class', i) with open(os.path.join(epoch_folder, item_info['key'] + '.json'), 'w') as out: json.dump(classes[i], out, indent=2) for i in general: item_info = self._get_item_info('general', i) with open(os.path.join(epoch_folder, item_info['key'] + '.json'), 'w') as out: json.dump(general[i], out, indent=2) self._save()
def _save(self): """ Write global internal storage on the disk. Parameters ---------- None Returns ------- None """ manifest_path = os.path.join(self.log_folder, 'manifest.json') with open(manifest_path, 'w') as out: json.dump(self.store, out, indent=2) def _get_item_info(self, store_type, index): return self.store['items'][store_type][index] def _process(self, model): images = OrderedDict() pbar = tqdm(enumerate(self.dataset), total=self.process_size, ncols=100, leave=False) for i, (image, label) in pbar: # break policy if i >= self.process_size: break # Prepare data loader to be on CUDA or not if not self.no_cuda: image = image.cuda() label = label.cuda() image = Variable(image) label = Variable(label) input_index = str(i) input_class = int(label[0]) layers = Activation._capture_activations(model, image, self.no_cuda, True) # store activation images[input_index] = OrderedDict() images[input_index]['layers'] = layers return images @staticmethod def _capture_activations(model, data_input, no_cuda, store_filters=True): store = OrderedDict() if not no_cuda: model = model.module for l, layer in enumerate(model.children()): data_input = layer(data_input) layer_dim = data_input.dim() layer_name = str(l + 1) store[layer_name] = OrderedDict() if not store_filters: store[layer_name]['type'] = str(layer) # TODO: store something cleaner store[layer_name]['dim'] = layer_dim store[layer_name]['size'] = data_input.size()[1] if store_filters: numpy_filter = np.array([]) store[layer_name]['filters'] = OrderedDict() if layer_dim == 4: # dimension 1 is for the mini-batch for f in range(0, data_input.size()[1]): # mean fa = data_input[0, f].data.permute(0, 1).cpu().numpy() numpy_filter = np.append(numpy_filter, np.mean(fa)) elif layer_dim == 2: for f in range(0, data_input.size()[1]): numpy_filter = np.append(numpy_filter, float(data_input[0, f].data)) else: # is that even possible? numpy_filter = np.append(numpy_filter, 0) # normalize data along the layer if numpy_filter.min() < 0: numpy_filter -= numpy_filter.min() numpy_filter *= 1 / numpy_filter.max() # store for index, value in np.ndenumerate(numpy_filter): store[layer_name]['filters'][str(index[0] + 1)] = float(value) return store