Source code for matrixprofile.algorithms.skimp

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

range = getattr(__builtins__, 'xrange', range)
# end of py2 compatability boilerplate


# handle Python 2/3 Iterable import
try:
    from collections.abc import Iterable
except ImportError:
    from collections import Iterable

import math

import warnings

from matplotlib import pyplot as plt
import numpy as np

from matrixprofile import core
from matrixprofile.algorithms.mpx import mpx


def split(lower_bound, upper_bound, middle):
    """
    Helper function to split the indices for BFS.
    """
    if lower_bound == middle:
        L = None
        R = [middle + 1, upper_bound]
    elif upper_bound == middle:
        L = [lower_bound, middle - 1]
        R = None
    else:
        L = [lower_bound, middle - 1]
        R = [middle + 1, upper_bound]
    
    return (L, R)


def binary_split(n):
    """
    Create a breadth first search for indices 0..n.

    Parameters
    ----------
    n : int
        The length of indices.

    Returns
    -------
    array_like :
        The indices to iterate to perform BFS.

    """
    # having length of 1 causes infinite loop and it just doesn't really
    # make much sense.
    # just return it
    if n < 2:
        return [0,]

    index = []
    intervals = []
    
    # always begin by exploring the first integer
    index.append(0)
    
    # after exploring first integer, split interval 2:n
    intervals.append([1, n - 1])
    
    while len(intervals) > 0:
        interval = intervals.pop(0)
        lower_bound = interval[0]
        upper_bound = interval[1]
        middle = int(math.floor((lower_bound + upper_bound) / 2))
        index.append(middle)
        
        if lower_bound == upper_bound:
            continue
        else:
            L, R = split(lower_bound, upper_bound, middle)
            
            if L is not None:
                intervals.append(L)
            
            if R is not None:
                intervals.append(R)
    
    return index


[docs]def skimp(ts, windows=None, show_progress=False, cross_correlation=False, pmp_obj=None, sample_pct=0.1, n_jobs=1): """ Computes the Pan Matrix Profile (PMP) for the given time series. When the time series is only passed, windows start from 8 and increase by increments of 2 up to length(ts) / 2. Also, the PMP is only computed using 10% of the windows unless sample_pct is set to a different value. Note ---- When windows is explicitly provided, sample_pct no longer takes affect. The MP for all windows provided will be computed. Parameters ---------- ts : array_like The time series. show_progress: bool, default = False Show the progress in percent complete in increments of 5% by printing it out to the console. cross_correlation : bool, default = False Return the MP values as Pearson Correlation instead of Euclidean distance. pmp_obj : dict, default = None Repurpose already computed window sizes with this provided PMP. It should be the output of a PMP algorithm such as skimp or maximum subsequence. sample_pct : float, default = 0.1 (10%) Number of window sizes to compute MPs for. Decimal percent between 0 and 1. n_jobs : int, Default = 1 Number of cpu cores to use. Returns ------- dict : profile A Pan-MatrixProfile data structure. >>> { >>> 'pmp': the pan matrix profile as a 2D array, >>> 'pmpi': the pmp indices, >>> 'data': { >>> 'ts': time series used, >>> }, >>> 'windows': the windows used to compute the pmp, >>> 'sample_pct': the sample percent used, >>> 'metric':The distance metric computed for the pmp, >>> 'algorithm': the algorithm used, >>> 'class': PMP >>> } Raises ------ ValueError : 1. ts is not array_like. 2. windows is not an iterable 3. show_progress is not a boolean. 4. cross_correlation is not a boolean. 5. sample_pct is not between 0 and 1. """ ts = core.to_np_array(ts) n = len(ts) # Argument validation if isinstance(windows, type(None)): start = 8 end = int(math.floor(len(ts) / 2)) windows = range(start, end + 1) if not isinstance(show_progress, bool): raise ValueError('show_progress must be a boolean!') if not isinstance(cross_correlation, bool): raise ValueError('cross_correlation must be a boolean!') if not isinstance(sample_pct, (int, float)) or sample_pct > 1 or sample_pct < 0: raise ValueError('sample_pct must be a decimal between 0 and 1') # create a breath first search index list of our window sizes split_index = binary_split(len(windows)) pmp = np.full((len(split_index), n), np.inf) pmpi = np.full((len(split_index), n), np.nan, dtype='int') idx = np.full(len(split_index), -1) # compute the sample pct index last_index = len(split_index) if sample_pct < 1: last_index = int(np.floor(len(split_index) * sample_pct)) last_index = np.minimum(len(split_index), last_index) pct_shown = {} # compute all matrix profiles for each window size for i in range(last_index): window_size = windows[split_index[i]] # check if we already computed this MP given a passed in PMP if isinstance(pmp_obj, dict): cw = pmp_obj.get('windows', None) w_idx = np.argwhere(cw == window_size) # having the window provided, we simply copy over the data instead # of recomputing it if len(w_idx) == 1: w_idx = w_idx[0][0] pmp[split_index[i], :] = pmp_obj['pmp'][w_idx, :] pmpi[split_index[i], :] = pmp_obj['pmpi'][w_idx, :] continue profile = mpx(ts, window_size, cross_correlation=cross_correlation, n_jobs=n_jobs) mp = profile.get('mp') pi = profile.get('pi') pmp[split_index[i], 0:len(mp)] = mp pmpi[split_index[i], 0:len(pi)] = pi j = split_index[i] while j < last_index and idx[j] != j: idx[j] = split_index[i] j = j + 1 # output the progress if show_progress: pct_complete = round((i / (last_index - 1)) * 100, 2) int_pct = math.floor(pct_complete) if int_pct % 5 == 0 and int_pct not in pct_shown: print('{}% complete'.format(int_pct)) pct_shown[int_pct] = 1 metric = 'euclidean' if cross_correlation: metric = 'pearson' return { 'pmp': pmp, 'pmpi': pmpi, 'data': { 'ts': ts, }, 'windows': np.array(windows), 'sample_pct': sample_pct, 'metric': metric, 'algorithm': 'skimp', 'class': 'PMP' }
[docs]def maximum_subsequence(ts, threshold=0.95, refine_stepsize=0.05, n_jobs=1, include_pmp=False, lower_window=8): """ Finds the maximum subsequence length based on the threshold provided. Note that this threshold is domain specific requiring some knowledge about the underyling time series in question. The subsequence length starts at 8 and iteratively doubles until the maximum correlation coefficient is no longer met. Parameters ---------- ts : array_like The time series to analyze. threshold : float, Default 0.95 The correlation coefficient used as the threshold. It should be between 0 and 1. refine_stepsize : float, Default 0.05 Used in the refinement step to find a more precise upper window. It should be a percentage between 0.01 and 0.99. n_jobs : int, Default = 1 Number of cpu cores to use. include_pmp : bool, default False Include the PanMatrixProfile for the computed windows. lower_window : int, default 8 Lower bound of subsequence length that can be altered if required. Returns ------- obj : With include_pmp=False (default) int : The maximum subsequence length based on the threshold provided. With include_pmp=True dict : A dict containing the upper window, windows and pmp. >>> { >>> 'upper_window': The upper window, >>> 'windows': array_like windows used to compute the pmp, >>> 'pmp': the pan matrix profile as a 2D array, >>> 'pmpi': the pmp indices, >>> } """ windows = np.array([], dtype='int') pearson = np.array([], dtype='d') pmp = [] pmpi = [] ts = core.to_np_array(ts) n = len(ts) correlation_max = np.inf window_size = lower_window max_window = int(np.floor(len(ts) / 2)) def resize(mp, pi, n): """Helper function to resize mp and pi to be aligned with the PMP. Also convert pearson to euclidean.""" mp = core.pearson_to_euclidean(profile['mp'], window_size) infs = np.full(n - mp.shape[0], np.inf) nans = np.full(n - mp.shape[0], np.nan) mp = np.append(mp, infs) pi = np.append(profile['pi'], nans) return (mp, pi) # first perform a wide search by increasing window by 2 in # each iteration while window_size <= max_window: profile = mpx(ts, window_size, cross_correlation=True) mask = ~np.isinf(profile['mp']) correlation_max = np.max(profile['mp'][mask]) windows = np.append(windows, window_size) pearson = np.append(pearson, correlation_max) if include_pmp: mp, pi = resize(profile['mp'], profile['pi'], n) pmp.append(mp) pmpi.append(pi) if correlation_max < threshold: break window_size = window_size * 2 # find last window within threshold and throw away # computations outside of the threshold mask = pearson > threshold pearson = pearson[mask] windows = windows[mask] if len(windows) < 1: warnings.warn('No windows found with given threshold, try to'\ ' set a lower threshold', RuntimeWarning) return np.nan window_size = windows[-1] if include_pmp: pmp = np.vstack(pmp)[mask] pmpi = np.vstack(pmpi)[mask] # refine the upper u by increase by + X% increments test_windows = np.arange(refine_stepsize, 1, step=refine_stepsize) + 1 test_windows = np.append(test_windows, 2) test_windows = np.floor(test_windows * window_size).astype('int') # keep windows divisible by 2 mask = test_windows % 2 == 1 test_windows[mask] = test_windows[mask] + 1 for window_size in test_windows: profile = mpx(ts, window_size, cross_correlation=True) mask = ~np.isinf(profile['mp']) correlation_max = np.max(profile['mp'][mask]) windows = np.append(windows, window_size) pearson = np.append(pearson, correlation_max) if include_pmp: mp, pi = resize(profile['mp'], profile['pi'], n) pmp = np.append(pmp, [mp,], axis=0) pmpi = np.append(pmpi, [pi,], axis=0) if correlation_max < threshold: break if include_pmp: return { 'upper_window': window_size, 'windows': windows, 'pmp': pmp, 'pmpi': pmpi } return window_size