Source code for skmultilearn.adapt.mlaram

# copyright @Fernando Benites
from builtins import object
from builtins import range

import numpy
import numpy.core.umath as umath
import scipy.sparse
from scipy.sparse import issparse

from ..base import MLClassifierBase


class Neuron(object):
    """An implementation of a neuron for MLARAM

    Parameters
    ----------
    vc : array
        neuron's assigned vector
    label : int
        label number
    """

    def __init__(self, vc, label):
        # vector must be in complement form
        self.vc = vc
        self.label = label


def _get_label_combination_representation(label_assignment_binary_indicator_list):
    return label_assignment_binary_indicator_list.nonzero()[0].tostring()


def _get_label_vector(y, i):
    if issparse(y):
        return numpy.squeeze(numpy.asarray(y[i].todense()))
    return y[i]

def _concatenate_with_negation(row):
    ones = scipy.ones(row.shape)
    if issparse(row):
        return scipy.sparse.hstack((row, ones - row))
    else:
        # concatenate and merge sublists in the row if it is matrix
        return numpy.concatenate((row, ones - row), int(len(row.shape) != 1))

def _normalize_input_space(X):
    x_max = X.max()
    x_min = X.min()
    if x_max < 0 or x_max > 1 or x_min < 0 or x_min > 1:
        return numpy.multiply(X - x_min, 1 / (x_max - x_min))
    return X


[docs]class MLARAM(MLClassifierBase): """HARAM: A Hierarchical ARAM Neural Network for Large-Scale Text Classification This method aims at increasing the classification speed by adding an extra ART layer for clustering learned prototypes into large clusters. In this case the activation of all prototypes can be replaced by the activation of a small fraction of them, leading to a significant reduction of the classification time. Parameters ---------- vigilance : float (default is 0.9) parameter for adaptive resonance theory networks, controls how large a hyperbox can be, 1 it is small (no compression), 0 should assume all range. Normally set between 0.8 and 0.999, it is dataset dependent. It is responsible for the creation of the prototypes, therefore training of the network. threshold : float (default is 0.02) controls how many prototypes participate by the prediction, can be changed for the testing phase. neurons : list the neurons in the network References ---------- Published work available `here`_. .. _here: http://dx.doi.org/10.1109/ICDMW.2015.14 .. code :: bibtex @INPROCEEDINGS{7395756, author={F. Benites and E. Sapozhnikova}, booktitle={2015 IEEE International Conference on Data Mining Workshop (ICDMW)}, title={HARAM: A Hierarchical ARAM Neural Network for Large-Scale Text Classification}, year={2015}, volume={}, number={}, pages={847-854}, doi={10.1109/ICDMW.2015.14}, ISSN={2375-9259}, month={Nov}, } Examples -------- Here's an example code with a 5% threshold and vigilance of 0.95: .. code :: python from skmultilearn.neurofuzzy import MLARAM classifier = MLARAM(threshold=0.05, vigilance=0.95) classifier.fit(X_train, y_train) prediction = classifier.predict(X_test) """ def __init__(self, vigilance=0.9, threshold=0.02, neurons=None): super(MLARAM, self).__init__() if neurons is not None: self.neurons = neurons else: self.neurons = [] self.vigilance = vigilance self.threshold = threshold self.copyable_attrs += ["neurons", "vigilance", "threshold"]
[docs] def reset(self): """Resets the labels and neurons""" self._labels = [] self.neurons = []
[docs] def fit(self, X, y): """Fit classifier with training data Parameters ---------- X : numpy.ndarray or scipy.sparse input features, can be a dense or sparse matrix of size :code:`(n_samples, n_features)` y : numpy.ndarray or scipy.sparse {0,1} binary indicator matrix with label assignments. Returns ------- skmultilearn.MLARAMfast.MLARAM fitted instance of self """ self._labels = [] self._allneu = "" self._online = 1 self._alpha = 0.0000000000001 is_sparse_x = issparse(X) label_combination_to_class_map = {} # FIXME: we should support dense matrices natively if isinstance(X, numpy.matrix): X = numpy.asarray(X) if isinstance(y, numpy.matrix): y = numpy.asarray(y) is_more_dimensional = int(len(X[0].shape) != 1) X = _normalize_input_space(X) y_0 = _get_label_vector(y, 0) if len(self.neurons) == 0: neuron_vc = _concatenate_with_negation(X[0]) self.neurons.append(Neuron(neuron_vc, y_0)) start_index = 1 label_combination_to_class_map[_get_label_combination_representation(y_0)] = [0] else: start_index = 0 # denotes the class enumerator for label combinations last_used_label_combination_class_id = 0 for row_no, input_vector in enumerate(X[start_index:], start_index): label_assignment_vector = _get_label_vector(y, row_no) fc = _concatenate_with_negation(input_vector) activationn = [0] * len(self.neurons) activationi = [0] * len(self.neurons) label_combination = _get_label_combination_representation(label_assignment_vector) if label_combination in label_combination_to_class_map: fcs = fc.sum() for class_number in label_combination_to_class_map[label_combination]: if issparse(self.neurons[class_number].vc): minnfs = self.neurons[class_number].vc.minimum(fc).sum() else: minnfs = umath.minimum(self.neurons[class_number].vc, fc).sum() activationi[class_number] = minnfs / fcs activationn[class_number] = minnfs / self.neurons[class_number].vc.sum() if numpy.max(activationn) == 0: last_used_label_combination_class_id += 1 self.neurons.append(Neuron(fc, label_assignment_vector)) label_combination_to_class_map.setdefault(label_combination, []).append(len(self.neurons) - 1) continue inds = numpy.argsort(activationn) indc = numpy.where(numpy.array(activationi)[inds[::-1]] > self.vigilance)[0] if indc.shape[0] == 0: self.neurons.append(Neuron(fc, label_assignment_vector)) label_combination_to_class_map.setdefault(label_combination, []).append(len(self.neurons) - 1) continue winner = inds[::- 1][indc[0]] if issparse(self.neurons[winner].vc): self.neurons[winner].vc = self.neurons[winner].vc.minimum(fc) else: self.neurons[winner].vc = umath.minimum( self.neurons[winner].vc, fc ) # 1 if winner neuron won a given label 0 if not labels_won_indicator = numpy.zeros(y_0.shape, dtype=y_0.dtype) labels_won_indicator[label_assignment_vector.nonzero()] = 1 self.neurons[winner].label += labels_won_indicator return self
[docs] def predict(self, X): """Predict labels for X Parameters ---------- X : numpy.ndarray or scipy.sparse.csc_matrix input features of shape :code:`(n_samples, n_features)` Returns ------- scipy.sparse of int binary indicator matrix with label assignments with shape :code:`(n_samples, n_labels)` """ result = [] # FIXME: we should support dense matrices natively if isinstance(X, numpy.matrix): X = numpy.asarray(X) ranks = self.predict_proba(X) for rank in ranks: sorted_rank_arg = numpy.argsort(-rank) diffs = -numpy.diff([rank[k] for k in sorted_rank_arg]) indcutt = numpy.where(diffs == diffs.max())[0] if len(indcutt.shape) == 1: indcut = indcutt[0] + 1 else: indcut = indcutt[0, -1] + 1 label = numpy.zeros(rank.shape) label[sorted_rank_arg[0:indcut]] = 1 result.append(label) return numpy.array(numpy.matrix(result))
[docs] def predict_proba(self, X): """Predict probabilities of label assignments for X Parameters ---------- X : numpy.ndarray or scipy.sparse.csc_matrix input features of shape :code:`(n_samples, n_features)` Returns ------- array of arrays of float matrix with label assignment probabilities of shape :code:`(n_samples, n_labels)` """ # FIXME: we should support dense matrices natively if isinstance(X, numpy.matrix): X = numpy.asarray(X) if issparse(X): if X.getnnz() == 0: return elif len(X) == 0: return is_matrix = int(len(X[0].shape) != 1) X = _normalize_input_space(X) all_ranks = [] neuron_vectors = [n1.vc for n1 in self.neurons] if any(map(issparse, neuron_vectors)): all_neurons = scipy.sparse.vstack(neuron_vectors) # can't add a constant to a sparse matrix in scipy all_neurons_sum = all_neurons.sum(1).A else: all_neurons = numpy.vstack(neuron_vectors) all_neurons_sum = all_neurons.sum(1) all_neurons_sum += self._alpha for row_number, input_vector in enumerate(X): fc = _concatenate_with_negation(input_vector) if issparse(fc): activity = (fc.minimum(all_neurons).sum(1) / all_neurons_sum).squeeze().tolist() else: activity = (umath.minimum(fc, all_neurons).sum(1) / all_neurons_sum).squeeze().tolist() if is_matrix: activity = activity[0] # be very fast sorted_activity = numpy.argsort(activity)[::-1] winner = sorted_activity[0] activity_difference = activity[winner] - activity[sorted_activity[-1]] largest_activity = 1 par_t = self.threshold for i in range(1, len(self.neurons)): activity_change = (activity[winner] - activity[sorted_activity[i]]) / activity[winner] if activity_change > par_t * activity_difference: break largest_activity += 1 rbsum = sum([activity[k] for k in sorted_activity[0:largest_activity]]) rank = activity[winner] * self.neurons[winner].label activated = [] activity_among_activated = [] activated.append(winner) activity_among_activated.append(activity[winner]) for i in range(1, largest_activity): rank += activity[sorted_activity[i]] * self.neurons[ sorted_activity[i]].label activated.append(sorted_activity[i]) activity_among_activated.append(activity[sorted_activity[i]]) rank /= rbsum all_ranks.append(rank) return numpy.array(numpy.matrix(all_ranks))