Source code for matrixprofile.algorithms.mpdist

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

range = getattr(__builtins__, 'xrange', range)
# end of py2 compatability boilerplate

import math

import numpy as np

from matrixprofile import core
from matrixprofile.algorithms.cympx import mpx_ab_parallel as cympx_ab_parallel
from matrixprofile.algorithms.mass2 import mass2


[docs]def mpdist(ts, ts_b, w, threshold=0.05, n_jobs=1): """ Computes the MPDist between the two series ts and ts_b. For more details refer to the paper: Matrix Proļ¬le XII: MPdist: A Novel Time Series Distance Measure to Allow Data Mining in More Challenging Scenarios. Shaghayegh Gharghabi, Shima Imani, Anthony Bagnall, Amirali Darvishzadeh, Eamonn Keogh. ICDM 2018 Parameters ---------- ts : array_like The time series to compute the matrix profile for. ts_b : array_like The time series to compare against. w : int The window size. threshold : float, Default 0.05 The percentile in which the distance is taken from. By default it is set to 0.05 based on empircal research results from the paper. Generally, you should not change this unless you know what you are doing! This value must be a float greater than 0 and less than 1. n_jobs : int, Default = 1 Number of cpu cores to use. Returns ------- float : mpdist The MPDist. """ ts = core.to_np_array(ts).astype('d') ts_b = core.to_np_array(ts_b).astype('d') n_jobs = core.valid_n_jobs(n_jobs) if not core.is_one_dimensional(ts): raise ValueError('ts must be one dimensional!') if not core.is_one_dimensional(ts_b): raise ValueError('ts_b must be one dimensional!') if not isinstance(threshold, float) or threshold <= 0 or threshold >= 1: raise ValueError('threshold must be a float greater than 0 and less'\ ' than 1') mp, mpi, mpb, mpib = cympx_ab_parallel(ts, ts_b, w, 0, n_jobs) mp_abba = np.append(mp, mpb) data_len = len(ts) + len(ts_b) abba_sorted = np.sort(mp_abba[~core.nan_inf_indices(mp_abba)]) distance = np.inf if len(abba_sorted) > 0: upper_idx = int(np.ceil(threshold * data_len)) - 1 idx = np.min([len(abba_sorted) - 1, upper_idx]) distance = abba_sorted[idx] return distance
def mass_distance_matrix(ts, query, w): """ Computes a distance matrix using mass that is used in mpdist_vector algorithm. Parameters ---------- ts : array_like The time series to compute the matrix for. query : array_like The time series to compare against. w : int The window size. Returns ------- array_like : dist_matrix The MASS distance matrix. """ subseq_num = len(query) - w + 1 distances = [] for i in range(subseq_num): distances.append(np.real(mass2(ts, query[i:i + w]))) return np.array(distances) def calculate_mpdist(profile, threshold, data_length): """ Computes the MPDist given a profile, threshold and data length. This is primarily used for MPDist Vector algorithm. Parameters ---------- profile : array_like The profile to calculate the mpdist for. threshold : float The threshold to use in computing the distance. data_length : int The length of the original data. Returns ------- float : mpdist The MPDist. """ dist_loc = int(np.ceil(threshold * data_length)) profile_sorted = np.sort(profile) mask = core.not_nan_inf_indices(profile_sorted) profile_clean = profile_sorted[mask] if len(profile_clean) < 1: distance = np.inf elif len(profile_clean) >= dist_loc: distance = profile_clean[dist_loc] else: distance = np.max(profile_clean) return distance def mpdist_vector(ts, ts_b, w): """ Computes a vector of MPDist measures. Parameters ---------- ts : array_like The time series to compute the matrix for. ts_b : array_like The time series to compare against. w : int The window size. Returns ------- array_like : mpdist_vector The MPDist vector. """ matrix = mass_distance_matrix(ts, ts_b, w) rows, cols = matrix.shape # compute row and column minimums all_right_hist = matrix.min(axis=0) mass_minimums = np.apply_along_axis(core.moving_min, 1, matrix, window=rows) # recreate the matrix profile and compute MPDist mpdist_length = len(ts) - len(ts_b) + 1 right_hist_length = len(ts_b) - w + 1 mpdist_array = np.zeros(mpdist_length) left_hist = np.zeros(right_hist_length) mpdist_array = [] for i in range(mpdist_length): right_hist = all_right_hist[i:right_hist_length + i] left_hist = mass_minimums[:, i] profile = np.append(left_hist, right_hist) mpdist_array.append(calculate_mpdist(profile, 0.05, 2 * len(ts_b))) return np.array(mpdist_array)