Source code for matrixprofile.transform

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

range = getattr(__builtins__, 'xrange', range)
# end of py2 compatability boilerplate

import numpy as np

from matrixprofile import core


[docs]def apply_av(profile, av="default", custom_av=None): """ Utility function that returns a MatrixProfile data structure with a calculated annotation vector that has been applied to correct the matrix profile. Parameters ---------- profile : dict A MatrixProfile structure. av : str, Default = "default" The type of annotation vector to apply. custom_av : array_like, Default = None Custom annotation vector (will only be applied if av is "custom"). Returns ------- dict : profile A MatrixProfile data structure with a calculated annotation vector and a corrected matrix profile. Raises ------ ValueError If profile is not a MatrixProfile data structure. If custom_av parameter is not array-like when using a custom av. If av paramter is invalid. If lengths of annotation vector and matrix profile are different. If values in annotation vector are outside [0.0, 1.0]. """ if not core.is_mp_obj(profile): raise ValueError('apply_av expects profile as an MP data structure') temp_av = None av_type = None if av == "default": temp_av = make_default_av(profile['data']['ts'], profile['w']) av_type = av elif av == "complexity": temp_av = make_complexity_av(profile['data']['ts'], profile['w']) av_type = av elif av == "meanstd": temp_av = make_meanstd_av(profile['data']['ts'], profile['w']) av_type = av elif av == "clipping": temp_av = make_clipping_av(profile['data']['ts'], profile['w']) av_type = av elif av == "custom": try: temp_av = core.to_np_array(custom_av) except ValueError: raise ValueError('apply_av expects custom_av to be array-like') av_type = av else: raise ValueError("av parameter is invalid") if len(temp_av) != len(profile['mp']): raise ValueError("Lengths of annotation vector and mp are different") if (temp_av < 0.0).any() or (temp_av > 1.0).any(): raise ValueError("Annotation vector values must be between 0 and 1") max_val = np.max(profile['mp']) temp_cmp = profile['mp'] + (np.ones(len(temp_av)) - temp_av) * max_val profile['cmp'] = temp_cmp profile['av'] = temp_av profile['av_type'] = av_type return profile
[docs]def make_default_av(ts, window): """ Utility function that returns an annotation vector filled with 1s (should not change the matrix profile). Parameters ---------- ts : array_like The time series. window : int The specific window size used to compute the MatrixProfile. Returns ------- np.array : av An annotation vector. Raises ------ ValueError If ts is not a list or np.array. If ts is not one-dimensional. If window is not an integer. """ try: ts = core.to_np_array(ts) except ValueError: raise ValueError('make_default_av expects ts to be array-like') if not core.is_one_dimensional(ts): raise ValueError('make_default_av expects ts to be one-dimensional') if not isinstance(window, int): raise ValueError('make_default_av expects window to be an integer') av = np.ones(len(ts) - window + 1) return av
[docs]def make_complexity_av(ts, window): """ Utility function that returns an annotation vector where values are based on the complexity estimation of the signal. Parameters ---------- ts : array_like The time series. window : int The specific window size used to compute the MatrixProfile. Returns ------- np.array : av An annotation vector. Raises ------ ValueError If ts is not a list or np.array. If ts is not one-dimensional. If window is not an integer. """ try: ts = core.to_np_array(ts) except ValueError: raise ValueError('make_complexity_av expects ts to be array-like') if not core.is_one_dimensional(ts): raise ValueError('make_complexity_av expects ts to be one-dimensional') if not isinstance(window, int): raise ValueError('make_complexity_av expects window to be an integer') av = np.zeros(len(ts) - window + 1) for i in range(len(av)): ce = np.sum(np.diff(ts[i: i + window]) ** 2) av[i] = np.sqrt(ce) max_val, min_val = np.max(av), np.min(av) if max_val == 0: av = np.zeros(len(av)) else: av = (av - min_val) / max_val return av
[docs]def make_meanstd_av(ts, window): """ Utility function that returns an annotation vector where values are set to 1 if the standard deviation is less than the mean of standard deviation. Otherwise, the values are set to 0. Parameters ---------- ts : array_like The time series. window : int The specific window size used to compute the MatrixProfile. Returns ------- np.array : av An annotation vector. Raises ------ ValueError If ts is not a list or np.array. If ts is not one-dimensional. If window is not an integer. """ try: ts = core.to_np_array(ts) except ValueError: raise ValueError('make_meanstd_av expects ts to be array-like') if not core.is_one_dimensional(ts): raise ValueError('make_meanstd_av expects ts to be one-dimensional') if not isinstance(window, int): raise ValueError('make_meanstd_av expects window to be an integer') av = np.zeros(len(ts) - window + 1) std = core.moving_std(ts, window) mu = np.mean(std) for i in range(len(av)): if std[i] < mu: av[i] = 1 return av
[docs]def make_clipping_av(ts, window): """ Utility function that returns an annotation vector such that subsequences that have more clipping have less importance. Parameters ---------- ts : array_like The time series. window : int The specific window size used to compute the MatrixProfile. Returns ------- np.array : av An annotation vector. Raises ------ ValueError If ts is not a list or np.array. If ts is not one-dimensional. If window is not an integer. """ try: ts = core.to_np_array(ts) except ValueError: raise ValueError('make_clipping_av expects ts to be array-like') if not core.is_one_dimensional(ts): raise ValueError('make_clipping_av expects ts to be one-dimensional') if not isinstance(window, int): raise ValueError('make_clipping_av expects window to be an integer') av = np.zeros(len(ts) - window + 1) max_val, min_val = np.max(ts), np.min(ts) for i in range(len(av)): num_clip = 0.0 for j in range(window): if ts[i + j] == max_val or ts[i + j] == min_val: num_clip += 1 av[i] = num_clip min_val = np.min(av) av -= min_val max_val = np.max(av) if max_val == 0: av = np.zeros(len(av)) else: av = 1 - av / max_val return av