Source code for sklearn.linear_model.huber

# Authors: Manoj Kumar mks542@nyu.edu
# License: BSD 3 clause

import numpy as np

from scipy import optimize, sparse

from ..base import BaseEstimator, RegressorMixin
from .base import LinearModel
from ..utils import check_X_y
from ..utils import check_consistent_length
from ..utils import axis0_safe_slice
from ..utils.extmath import safe_sparse_dot


def _huber_loss_and_gradient(w, X, y, epsilon, alpha, sample_weight=None):
    """Returns the Huber loss and the gradient.

    Parameters
    ----------
    w : ndarray, shape (n_features + 1,) or (n_features + 2,)
        Feature vector.
        w[:n_features] gives the coefficients
        w[-1] gives the scale factor and if the intercept is fit w[-2]
        gives the intercept factor.

    X : ndarray, shape (n_samples, n_features)
        Input data.

    y : ndarray, shape (n_samples,)
        Target vector.

    epsilon : float
        Robustness of the Huber estimator.

    alpha : float
        Regularization parameter.

    sample_weight : ndarray, shape (n_samples,), optional
        Weight assigned to each sample.

    Returns
    -------
    loss : float
        Huber loss.

    gradient : ndarray, shape (len(w))
        Returns the derivative of the Huber loss with respect to each
        coefficient, intercept and the scale as a vector.
    """
    X_is_sparse = sparse.issparse(X)
    _, n_features = X.shape
    fit_intercept = (n_features + 2 == w.shape[0])
    if fit_intercept:
        intercept = w[-2]
    sigma = w[-1]
    w = w[:n_features]
    n_samples = np.sum(sample_weight)

    # Calculate the values where |y - X'w -c / sigma| > epsilon
    # The values above this threshold are outliers.
    linear_loss = y - safe_sparse_dot(X, w)
    if fit_intercept:
        linear_loss -= intercept
    abs_linear_loss = np.abs(linear_loss)
    outliers_mask = abs_linear_loss > epsilon * sigma

    # Calculate the linear loss due to the outliers.
    # This is equal to (2 * M * |y - X'w -c / sigma| - M**2) * sigma
    outliers = abs_linear_loss[outliers_mask]
    num_outliers = np.count_nonzero(outliers_mask)
    n_non_outliers = X.shape[0] - num_outliers

    # n_sq_outliers includes the weight give to the outliers while
    # num_outliers is just the number of outliers.
    outliers_sw = sample_weight[outliers_mask]
    n_sw_outliers = np.sum(outliers_sw)
    outlier_loss = (2. * epsilon * np.sum(outliers_sw * outliers) -
                    sigma * n_sw_outliers * epsilon ** 2)

    # Calculate the quadratic loss due to the non-outliers.-
    # This is equal to |(y - X'w - c)**2 / sigma**2| * sigma
    non_outliers = linear_loss[~outliers_mask]
    weighted_non_outliers = sample_weight[~outliers_mask] * non_outliers
    weighted_loss = np.dot(weighted_non_outliers.T, non_outliers)
    squared_loss = weighted_loss / sigma

    if fit_intercept:
        grad = np.zeros(n_features + 2)
    else:
        grad = np.zeros(n_features + 1)

    # Gradient due to the squared loss.
    X_non_outliers = -axis0_safe_slice(X, ~outliers_mask, n_non_outliers)
    grad[:n_features] = (
        2. / sigma * safe_sparse_dot(weighted_non_outliers, X_non_outliers))

    # Gradient due to the linear loss.
    signed_outliers = np.ones_like(outliers)
    signed_outliers_mask = linear_loss[outliers_mask] < 0
    signed_outliers[signed_outliers_mask] = -1.0
    X_outliers = axis0_safe_slice(X, outliers_mask, num_outliers)
    sw_outliers = sample_weight[outliers_mask] * signed_outliers
    grad[:n_features] -= 2. * epsilon * (
        safe_sparse_dot(sw_outliers, X_outliers))

    # Gradient due to the penalty.
    grad[:n_features] += alpha * 2. * w

    # Gradient due to sigma.
    grad[-1] = n_samples
    grad[-1] -= n_sw_outliers * epsilon ** 2
    grad[-1] -= squared_loss / sigma

    # Gradient due to the intercept.
    if fit_intercept:
        grad[-2] = -2. * np.sum(weighted_non_outliers) / sigma
        grad[-2] -= 2. * epsilon * np.sum(sw_outliers)

    loss = n_samples * sigma + squared_loss + outlier_loss
    loss += alpha * np.dot(w, w)
    return loss, grad


class HuberRegressor(LinearModel, RegressorMixin, BaseEstimator):
    """Linear regression model that is robust to outliers.

    The Huber Regressor optimizes the squared loss for the samples where
    ``|(y - X'w) / sigma| < epsilon`` and the absolute loss for the samples
    where ``|(y - X'w) / sigma| > epsilon``, where w and sigma are parameters
    to be optimized. The parameter sigma makes sure that if y is scaled up
    or down by a certain factor, one does not need to rescale epsilon to
    achieve the same robustness. Note that this does not take into account
    the fact that the different features of X may be of different scales.

    This makes sure that the loss function is not heavily influenced by the
    outliers while not completely ignoring their effect.

    Read more in the :ref:`User Guide <huber_regression>`

    .. versionadded:: 0.18

    Parameters
    ----------
    epsilon : float, greater than 1.0, default 1.35
        The parameter epsilon controls the number of samples that should be
        classified as outliers. The smaller the epsilon, the more robust it is
        to outliers.

    max_iter : int, default 100
        Maximum number of iterations that scipy.optimize.fmin_l_bfgs_b
        should run for.

    alpha : float, default 0.0001
        Regularization parameter.

    warm_start : bool, default False
        This is useful if the stored attributes of a previously used model
        has to be reused. If set to False, then the coefficients will
        be rewritten for every call to fit.

    fit_intercept : bool, default True
        Whether or not to fit the intercept. This can be set to False
        if the data is already centered around the origin.

    tol : float, default 1e-5
        The iteration will stop when
        ``max{|proj g_i | i = 1, ..., n}`` <= ``tol``
        where pg_i is the i-th component of the projected gradient.

    Attributes
    ----------
    coef_ : array, shape (n_features,)
        Features got by optimizing the Huber loss.

    intercept_ : float
        Bias.

    scale_ : float
        The value by which ``|y - X'w - c|`` is scaled down.

    n_iter_ : int
        Number of iterations that fmin_l_bfgs_b has run for.
        Not available if SciPy version is 0.9 and below.

    outliers_ : array, shape (n_samples,)
        A boolean mask which is set to True where the samples are identified
        as outliers.

    References
    ----------
    .. [1] Peter J. Huber, Elvezio M. Ronchetti, Robust Statistics
           Concomitant scale estimates, pg 172
    .. [2] Art B. Owen (2006), A robust hybrid of lasso and ridge regression.
           http://statweb.stanford.edu/~owen/reports/hhu.pdf
    """

    def __init__(self, epsilon=1.35, max_iter=100, alpha=0.0001,
                 warm_start=False, fit_intercept=True, tol=1e-05):
        self.epsilon = epsilon
        self.max_iter = max_iter
        self.alpha = alpha
        self.warm_start = warm_start
        self.fit_intercept = fit_intercept
        self.tol = tol

[docs] def fit(self, X, y, sample_weight=None): """Fit the model according to the given training data. Parameters ---------- X : array-like, shape (n_samples, n_features) Training vector, where n_samples in the number of samples and n_features is the number of features. y : array-like, shape (n_samples,) Target vector relative to X. sample_weight : array-like, shape (n_samples,) Weight given to each sample. Returns ------- self : object Returns self. """ X, y = check_X_y( X, y, copy=False, accept_sparse=['csr'], y_numeric=True) if sample_weight is not None: sample_weight = np.array(sample_weight) check_consistent_length(y, sample_weight) else: sample_weight = np.ones_like(y) if self.epsilon < 1.0: raise ValueError( "epsilon should be greater than or equal to 1.0, got %f" % self.epsilon) if self.warm_start and hasattr(self, 'coef_'): parameters = np.concatenate( (self.coef_, [self.intercept_, self.scale_])) else: if self.fit_intercept: parameters = np.zeros(X.shape[1] + 2) else: parameters = np.zeros(X.shape[1] + 1) # Make sure to initialize the scale parameter to a strictly # positive value: parameters[-1] = 1 # Sigma or the scale factor should be non-negative. # Setting it to be zero might cause undefined bounds hence we set it # to a value close to zero. bounds = np.tile([-np.inf, np.inf], (parameters.shape[0], 1)) bounds[-1][0] = np.finfo(np.float64).eps * 10 # Type Error caused in old versions of SciPy because of no # maxiter argument ( <= 0.9). try: parameters, f, dict_ = optimize.fmin_l_bfgs_b( _huber_loss_and_gradient, parameters, args=(X, y, self.epsilon, self.alpha, sample_weight), maxiter=self.max_iter, pgtol=self.tol, bounds=bounds, iprint=0) except TypeError: parameters, f, dict_ = optimize.fmin_l_bfgs_b( _huber_loss_and_gradient, parameters, args=(X, y, self.epsilon, self.alpha, sample_weight), bounds=bounds) if dict_['warnflag'] == 2: raise ValueError("HuberRegressor convergence failed:" " l-BFGS-b solver terminated with %s" % dict_['task'].decode('ascii')) self.n_iter_ = dict_.get('nit', None) self.scale_ = parameters[-1] if self.fit_intercept: self.intercept_ = parameters[-2] else: self.intercept_ = 0.0 self.coef_ = parameters[:X.shape[1]] residual = np.abs( y - safe_sparse_dot(X, self.coef_) - self.intercept_) self.outliers_ = residual > self.scale_ * self.epsilon return self