Source code for matrixprofile.algorithms.pairwise_dist

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

range = getattr(__builtins__, 'xrange', range)
# end of py2 compatability boilerplate

import numpy as np

from matrixprofile import core
from matrixprofile.algorithms.mpdist import mpdist

def compute_dist(args):
    Helper function to parallelize pairwise distance calculation.

    args : tuple
        The arguments to pass to the mpdist calculation.
    values : tuple
        The kth index and distance.
    k = args[0]
    distance = mpdist(args[1], args[2], args[3], threshold=args[4])

    return (k, distance)

[docs]def pairwise_dist(X, window_size, threshold=0.05, n_jobs=1): """ Utility function to compute all pairwise distances between the timeseries using MPDist. Note ---- scipy.spatial.distance.pdist cannot be used because they do not allow for jagged arrays, however their code was used as a reference in creating this function. Parameters ---------- X : array_like An array_like object containing time series to compute distances for. window_size : int The window size to use in computing the MPDist. threshold : float The threshold used to compute MPDist. n_jobs : int Number of CPU cores to use during computation. Returns ------- Y : np.ndarray Returns a condensed distance matrix Y. For each :math:`i` and :math:`j` (where :math:`i<j<m`),where m is the number of original observations. The metric ``dist(u=X[i], v=X[j])`` is computed and stored in entry ``ij``. """ if not core.is_array_like(X): raise ValueError('X must be array_like!') # identify shape based on iterable or np.ndarray.shape m = 0 if isinstance(X, np.ndarray) and len(X.shape) == 2: m = X.shape[0] else: m = len(X) dm = np.empty((m * (m - 1)) // 2, dtype=np.double) k = 0 if n_jobs == 1: for i in range(0, m - 1): for j in range(i + 1, m): dm[k] = mpdist(X[i], X[j], window_size, threshold=threshold, n_jobs=n_jobs) k = k + 1 else: args = [] for i in range(0, m - 1): for j in range(i + 1, m): args.append((k, X[i], X[j], window_size, threshold)) k = k + 1 with core.mp_pool()(n_jobs) as pool: results =, args) # put results in the matrix for result in results: dm[result[0]] = result[1] return dm