Source code for matrixprofile.preprocess

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

range = getattr(__builtins__, 'xrange', range)
# end of py2 compatability boilerplate

# Third-party imports
import numpy as np

# Project imports
from matrixprofile import core


def validate_preprocess_kwargs(preprocessing_kwargs):
    """
    Tests the arguments of preprocess function and raises errors for invalid arguments.

    Parameters
    ----------
    preprocessing_kwargs : dict-like or None or False
        A dictionary object to store keyword arguments for the preprocess function.
        It can also be None/False/{}/"".

    Returns
    -------
    valid_kwargs : dict-like or None
        The valid keyword arguments for the preprocess function.
        Returns None if the input preprocessing_kwargs is None/False/{}/"".

    Raises
    ------
    ValueError
        If preprocessing_kwargs is not dict-like or None.
        If gets invalid key(s) for preprocessing_kwargs.
        If gets invalid value(s) for preprocessing_kwargs['window'], preprocessing_kwargs['impute_method']
        preprocessing_kwargs['impute_direction'] and preprocessing_kwargs['add_noise'].

    """
    if preprocessing_kwargs:

        valid_preprocessing_kwargs_keys = {'window', 'impute_method', 'impute_direction', 'add_noise'}

        if not isinstance(preprocessing_kwargs,dict):
            raise ValueError("The parameter 'preprocessing_kwargs' is not dict like!")

        elif set(preprocessing_kwargs.keys()).issubset(valid_preprocessing_kwargs_keys):
            window = 4
            impute_method = 'mean'
            impute_direction = 'forward'
            add_noise = True
            methods = ['mean', 'median', 'min', 'max']
            directions = ['forward', 'fwd', 'f', 'backward', 'bwd', 'b']

            if 'window' in preprocessing_kwargs.keys():
                if not isinstance(preprocessing_kwargs['window'],int):
                    raise ValueError("The value for preprocessing_kwargs['window'] is not an integer!")
                window = preprocessing_kwargs['window']


            if 'impute_method' in preprocessing_kwargs.keys():
                if preprocessing_kwargs['impute_method'] not in methods:
                    raise ValueError('invalid imputation method! valid include options: ' + ', '.join(methods))
                impute_method = preprocessing_kwargs['impute_method']

            if 'impute_direction' in preprocessing_kwargs.keys():
                if preprocessing_kwargs['impute_direction'] not in directions:
                    raise ValueError('invalid imputation direction! valid include options: ' + ', '.join(directions))
                impute_direction = preprocessing_kwargs['impute_direction']

            if 'add_noise' in preprocessing_kwargs.keys():
                if not isinstance(preprocessing_kwargs['add_noise'],bool):
                    raise ValueError("The value for preprocessing_kwargs['add_noise'] is not a boolean value!")
                add_noise = preprocessing_kwargs['add_noise']

            valid_kwargs =  { 'window': window,
                              'impute_method': impute_method,
                              'impute_direction': impute_direction,
                              'add_noise': add_noise }
        else:
            raise ValueError('invalid key(s) for preprocessing_kwargs! '
                             'valid key(s) should include '+ str(valid_preprocessing_kwargs_keys))
    else:
        valid_kwargs = None

    return valid_kwargs


def is_subsequence_constant(subsequence):
    """
    Determines whether the given time series subsequence is an array of constants.

    Parameters
    ----------
    subsequence : array_like
        The time series subsequence to analyze.

    Returns
    -------
    is_constant : bool
        A boolean value indicating whether the given subsequence is an array of constants.

    """
    if not core.is_array_like(subsequence):
        raise ValueError('subsequence is not array like!')

    temp = core.to_np_array(subsequence)
    is_constant = np.all(temp == temp[0])

    return is_constant


[docs]def add_noise_to_series(series): """ Adds noise to the given time series. Parameters ---------- series : array_like The time series subsequence to be added noise. Returns ------- temp : array_like The time series subsequence after being added noise. """ if not core.is_array_like(series): raise ValueError('series is not array like!') temp = np.copy(core.to_np_array(series)) noise = np.random.uniform(0, 0.0000009, size=len(temp)) temp = temp + noise return temp
[docs]def impute_missing(ts, window, method='mean', direction='forward'): """ Imputes missing data in time series. Parameters ---------- ts : array_like The time series to be handled. window : int The window size to compute the mean/median/minimum value/maximum value. method : string, Default = 'mean' A string indicating the data imputation method, which should be 'mean', 'median', 'min' or 'max'. direction : string, Default = 'forward' A string indicating the data imputation direction, which should be 'forward', 'fwd', 'f', 'backward', 'bwd', 'b'. If the direction is forward, we use previous data for imputation; if the direction is backward, we use subsequent data for imputation. Returns ------- temp : array_like The time series after being imputed missing data. """ method_map = { 'mean': np.mean, 'median': np.median, 'min': np.min, 'max': np.max } directions = ['forward', 'fwd', 'f', 'backward', 'bwd', 'b'] if not core.is_array_like(ts): raise ValueError('ts is not array like!') if method not in method_map: raise ValueError('invalid imputation method! valid include options: {}'.format(', '.join(method_map.keys()))) if direction not in directions: raise ValueError('invalid imputation direction! valid include options: ' + ', '.join(directions)) if not isinstance(window, int): raise ValueError("window is not an integer!") temp = np.copy(core.to_np_array(ts)) nan_infs = core.nan_inf_indices(temp) func = method_map[method] # Deal with missing data at the beginning and end of time series if np.isnan(temp[0]) or np.isinf(temp[0]): temp[0] = temp[~nan_infs][0] nan_infs = core.nan_inf_indices(temp) if np.isnan(temp[-1]) or np.isinf(temp[-1]): temp[-1] = temp[~nan_infs][-1] nan_infs = core.nan_inf_indices(temp) index_order = None if direction.startswith('f'): # Use previous data for imputation / fills in data in a forward direction index_order = range(len(temp) - window + 1) elif direction.startswith('b'): # Use subsequent data for imputation / fills in data in a backward direction index_order = range(len(temp) - window + 1, 0, -1) for index in index_order: start = index end = index + window has_missing = np.any(nan_infs[index:index + window]) if has_missing: subseq = temp[start:end] nan_infs_subseq = nan_infs[start:end] stat = func(temp[start:end][~nan_infs_subseq]) temp[start:end][nan_infs_subseq] = stat # Update nan_infs after array 'temp' is changed nan_infs = core.nan_inf_indices(temp) return temp
[docs]def preprocess(ts, window, impute_method='mean', impute_direction='forward', add_noise=True): """ Preprocesses the given time series by adding noise and imputing missing data. Parameters ---------- ts : array_like The time series to be preprocessed. window : int The window size to compute the mean/median/minimum value/maximum value. method : string, Default = 'mean' A string indicating the data imputation method, which should be 'mean', 'median', 'min' or 'max'. direction : string, Default = 'forward' A string indicating the data imputation direction, which should be 'forward', 'fwd', 'f', 'backward', 'bwd', 'b'. If the direction is forward, we use previous data for imputation; if the direction is backward, we use subsequent data for imputation. add_noise : bool, Default = True A boolean value indicating whether noise needs to be added into the time series. Returns ------- temp : array_like The time series after being preprocessed. """ if not core.is_array_like(ts): raise ValueError('ts is not array like!') temp = np.copy(core.to_np_array(ts)) # impute missing temp = impute_missing(temp, window, method=impute_method, direction=impute_direction) # handle constant values if add_noise: for index in range(len(temp) - window + 1): start = index end = index + window subseq = temp[start:end] if is_subsequence_constant(subseq): temp[start:end] = add_noise_to_series(subseq) return temp