Source code for matrixprofile.algorithms.snippets

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

range = getattr(__builtins__, 'xrange', range)
# end of py2 compatability boilerplate

import numpy as np

from matrixprofile import core
from matrixprofile.algorithms.mpdist import mpdist_vector


[docs]def snippets(ts, snippet_size, num_snippets=2, window_size=None): """ The snippets algorithm is used to summarize your time series by identifying N number of representative subsequences. If you want to identify typical patterns in your time series, then this is the algorithm to use. Parameters ---------- ts : array_like The time series. snippet_size : int The size of snippet desired. num_snippets : int, Default 2 The number of snippets you would like to find. window_size : int, Default (snippet_size / 2) The window size. Returns ------- list : snippets A list of snippets as dictionary objects with the following structure. >>> { >>> index: the index of the snippet, >>> snippet: the snippet values, >>> neighbors: the starting indices of all subsequences similar to the current snippet >>> fraction: fraction of the snippet >>> } """ ts = core.to_np_array(ts).astype('d') time_series_len = len(ts) n = len(ts) if not isinstance(snippet_size, int) or snippet_size < 4: raise ValueError('snippet_size must be an integer >= 4') if n < (2 * snippet_size): raise ValueError('Time series is too short relative to snippet length') if not window_size: window_size = int(np.floor(snippet_size / 2)) if window_size >= snippet_size: raise ValueError('window_size must be smaller than snippet_size') # pad end of time series with zeros num_zeros = int(snippet_size * np.ceil(n / snippet_size) - n) ts = np.append(ts, np.zeros(num_zeros)) # compute all profiles indices = np.arange(0, len(ts) - snippet_size, snippet_size) distances = [] for j, i in enumerate(indices): distance = mpdist_vector(ts, ts[i:(i + snippet_size - 1)], int(window_size)) distances.append(distance) distances = np.array(distances) # find N snippets snippets = [] minis = np.inf total_min = None for n in range(num_snippets): minims = np.inf for i in range(len(indices)): s = np.sum(np.minimum(distances[i, :], minis)) if minims > s: minims = s index = i minis = np.minimum(distances[index, :], minis) actual_index = indices[index] snippet = ts[actual_index:actual_index + snippet_size] snippet_distance = distances[index] snippets.append({ 'index': actual_index, 'snippet': snippet, 'distance': snippet_distance }) if isinstance(total_min, type(None)): total_min = snippet_distance else: total_min = np.minimum(total_min, snippet_distance) # compute the fraction of each snippet for snippet in snippets: mask = (snippet['distance'] <= total_min) # create a key "neighbors" for the snippet dict, # and store all the time series indices for the data represented by a snippet (arr[mask]) arr = np.arange(len(mask)) # max_index indicates the length of a profile, which is (n-m) in the Snippets paper) max_index = time_series_len - snippet_size # since 'ts' is padded with 0 before calculate the MPdist profile # all parts of the profile that are out of range [0, n-m] cannot be used as neighboring snippet indices snippet['neighbors'] = list(filter(lambda x : x <= max_index, arr[mask])) # Add the last m time series indices into the neighboring snippet indices if max_index in snippet['neighbors']: last_m_indices = list(range(max_index+1, time_series_len)) snippet['neighbors'].extend(last_m_indices) snippet['fraction'] = mask.sum() / (len(ts) - snippet_size) total_min = total_min - mask del snippet['distance'] return snippets