# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
range = getattr(__builtins__, 'xrange', range)
# end of py2 compatability boilerplate
# handle Python 2/3 Iterable import
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
import os
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
from matrixprofile import core
def __combine(a, b):
"""
Helper function to combine lists or a list and an object.
"""
output = []
if isinstance(a, list) and isinstance(b, list):
output = a + b
elif isinstance(a, list):
output = a
output.append(b)
elif isinstance(b, list):
output = b
output.append(a)
return output
def is_visualizable(obj):
"""
Helper function to determine if the passed in object can be visualized or
not based on the data structure.
Parameters
----------
obj : Object
The object to test.
Returns
-------
list : figures
A list of matplotlib figures.
"""
return core.is_mp_obj(obj) or core.is_pmp_obj(obj) or core.is_stats_obj(obj)
[docs]def visualize(profile):
"""
Automatically creates plots for the provided data structure. In some cases
many plots are created. For example, when a MatrixProfile is passed with
corresponding motifs and discords, the matrix profile, discords and motifs
will be plotted.
Parameters
----------
profile : dict_like
A MatrixProfile, Pan-MatrixProfile or Statistics data structure.
Returns
-------
list : figures
A list of matplotlib figures.
"""
figures = []
if not is_visualizable(profile):
raise ValueError('MatrixProfile, Pan-MatrixProfile or Statistics data structure expected!')
# plot MP
if core.is_mp_obj(profile):
figures = __combine(figures, plot_mp(profile))
if 'cmp' in profile and len(profile['cmp']) > 0:
figures = __combine(figures, plot_cmp_mp(profile))
if 'av' in profile and len(profile['av']) > 0:
figures = __combine(figures, plot_av_mp(profile))
if 'motifs' in profile and len(profile['motifs']) > 0:
figures = __combine(figures, plot_motifs_mp(profile))
if 'discords' in profile and len(profile['discords']) > 0:
figures = __combine(figures, plot_discords_mp(profile))
# plot PMP
if core.is_pmp_obj(profile):
figures = __combine(figures, plot_pmp(profile))
if 'motifs' in profile and len(profile['motifs']) > 0:
figures = __combine(figures, plot_motifs_pmp(profile))
if 'discords' in profile and len(profile['discords']) > 0:
figures = __combine(figures, plot_discords_pmp(profile))
# plot stats
if core.is_stats_obj(profile):
figures = __combine(figures, plot_stats(profile))
return figures
def plot_stats(profile):
"""
Plots the given Statistics data structure provided.
Parameters
----------
profile : dict
The dict structure from a Statistics algorithm.
Returns
-------
matplotlib.Figure : figure
The matplotlib figure object.
"""
fig, ax = plt.subplots(2, 1, figsize=(15, 7))
ts = profile.get('ts')
ax[0].plot(ts, label='Time Series', c='black')
for k, v in profile.items():
if k.startswith('moving'):
ax[1].plot(v, label=k)
fig.legend(loc="upper right", bbox_to_anchor=(1.11, 0.97))
fig.tight_layout()
return fig
def plot_pmp(profile):
"""
Plots the given Pan-MatrixProfile data structure provided.
Parameters
----------
profile : dict
The dict structure from a PMP algorithm.
Returns
-------
matplotlib.Figure : figure
The matplotlib figure object.
"""
pmp = profile.get('pmp', None)
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
depth = 256
test = np.ceil(pmp * depth) / depth
test[test > 1] = 1
ax.imshow(test, interpolation=None, aspect='auto')
ax.invert_yaxis()
ax.set_title('Pan-MatrixProfile')
ax.set_xlabel('Profile Index')
ax.set_ylabel('Window Size')
ax.set_aspect(1, 'box')
fig.tight_layout()
return fig
def plot_mp(profile):
"""
Plots the matrix profile given the appropriate data structure.
Parameters
----------
profile : dict_like
The matrix profile to plot.
Returns
-------
matplotlib.Figure : figure
The matplotlib figure object.
"""
plot_count = 0
data = profile.get('data', None)
ts = None
query = None
if data:
ts = data.get('ts', None)
query = data.get('query', None)
mp = profile.get('mp', None)
lmp = profile.get('lmp', None)
rmp = profile.get('rmp', None)
for val in [ts, query, mp, lmp, rmp]:
if core.is_array_like(val):
plot_count += 1
if plot_count < 1:
raise ValueError("Object passed has nothing to plot!")
w = profile.get('w', None)
if not isinstance(w, int):
raise ValueError("Expecting window size!")
current = 0
fig, axes = plt.subplots(plot_count, 1, sharex=True, figsize=(15, 7))
if not isinstance(axes, Iterable):
axes = [axes,]
# plot the original ts
if core.is_array_like(ts):
axes[current].plot(np.arange(len(ts)), ts)
axes[current].set_ylabel('Data')
current += 1
# plot the original query
if core.is_array_like(query):
axes[current].plot(np.arange(len(query)), query)
axes[current].set_ylabel('Query')
current += 1
# plot matrix profile
if core.is_array_like(mp):
mp_adj = np.append(mp, np.zeros(w - 1) + np.nan)
axes[current].plot(np.arange(len(mp_adj)), mp_adj)
axes[current].set_ylabel('Matrix Profile')
axes[current].set_title('Window Size {}'.format(w))
current += 1
# plot left matrix profile
if core.is_array_like(lmp):
mp_adj = np.append(lmp, np.zeros(w - 1) + np.nan)
axes[current].plot(np.arange(len(mp_adj)), mp_adj)
axes[current].set_ylabel('Left Matrix Profile')
axes[current].set_title('Window Size {}'.format(w))
current += 1
# plot left matrix profile
if core.is_array_like(rmp):
mp_adj = np.append(rmp, np.zeros(w - 1) + np.nan)
axes[current].plot(np.arange(len(mp_adj)), mp_adj)
axes[current].set_ylabel('Right Matrix Profile')
axes[current].set_title('Window Size {}'.format(w))
current += 1
fig.tight_layout()
return fig
def plot_cmp_mp(profile):
"""
Plot corrected matrix profile for a MatrixProfile data structure.
Parameters
----------
profile : dict_like
The matrix profile object to plot.
Returns
-------
matplotlib.Figure : figure
The matplotlib figure object.
"""
cmp = profile['cmp']
w = profile['w']
fig, ax = plt.subplots(1, 1, figsize=(15, 7))
cmp_adj = np.append(cmp, np.zeros(w - 1) + np.nan)
ax.plot(np.arange(len(cmp_adj)), cmp_adj)
ax.set_ylabel('Corrected Matrix Profile')
ax.set_title('Window Size {}'.format(w))
fig.tight_layout()
return fig
def plot_av_mp(profile):
"""
Plot the annotation vector for a MatrixProfile data structure.
Parameters
----------
profile : dict_like
The matrix profile object to plot.
Returns
-------
matplotlib.Figure : figure
The matplotlib figure object.
"""
av = profile['av']
w = profile['w']
fig, ax = plt.subplots(1, 1, figsize=(15, 7))
av_adj = np.append(av, np.zeros(w - 1) + np.nan)
ax.plot(np.arange(len(av_adj)), av_adj)
ax.set_ylabel('Annotation Vector')
ax.set_title('Window Size {}'.format(w))
fig.tight_layout()
return fig
def plot_discords_mp(profile):
"""
Plot discords for a MatrixProfile data structure.
Parameters
----------
profile : dict_like
The matrix profile object to plot.
Returns
-------
matplotlib.Figure : figure
The matplotlib figure object.
"""
mp = profile['mp']
w = profile['w']
discords = profile['discords']
data = profile.get('data', None)
if data:
ts = data.get('ts', None)
mp_adjusted = np.append(mp, np.full(w + 1, np.nan))
fig, axes = plt.subplots(3, 1, sharex=True, figsize=(15, 7), gridspec_kw={'height_ratios': [25, 5, 25]})
pos = axes[1].imshow([mp_adjusted,], aspect='auto', cmap='coolwarm')
axes[1].set_yticks([])
axes[0].plot(np.arange(len(ts)), ts)
axes[0].set_ylabel('Data')
axes[2].plot(np.arange(len(mp_adjusted)), mp_adjusted)
axes[2].set_ylabel('Matrix Profile')
axes[2].set_title('Window Size {}'.format(w))
for idx in discords:
axes[2].plot(idx, mp_adjusted[idx], c='r', marker='*', lw=0, markersize=10)
fig.subplots_adjust(right=0.8)
cbar_ax = fig.add_axes([1, 0.46, 0.01, 0.1])
fig.colorbar(pos, orientation='vertical', cax=cbar_ax, use_gridspec=True)
lines = [
Line2D([0], [0], color='red', marker='*', lw=0),
Line2D([0], [0], color='blue'),
]
fig.legend(lines, ['Discord', 'MP'], bbox_to_anchor=(1.07, 0.44))
fig.tight_layout()
return fig
def plot_discords_pmp(profile):
"""
Plot discords for the given Pan-MatrixProfile data structure.
Parameters
----------
profile : dict_like
The pmp object to plot.
Returns
-------
matplotlib.Figure : figure
The matplotlib figure object.
"""
discord_figures = []
for discord in profile['discords']:
mp_idx = discord[0]
idx = discord[1]
w = profile['windows'][mp_idx]
data = profile.get('data', None)
if data:
ts = data.get('ts', None)
mp_adjusted = profile['pmp'][mp_idx]
# mp_adjusted = np.append(mp, np.full(w + 1, np.nan))
fig, axes = plt.subplots(3, 1, sharex=True, figsize=(15, 7), gridspec_kw={'height_ratios': [25, 5, 25]})
pos = axes[1].imshow([mp_adjusted,], aspect='auto', cmap='coolwarm')
axes[1].set_yticks([])
axes[0].plot(np.arange(len(ts)), ts)
axes[0].set_ylabel('Data')
axes[2].plot(np.arange(len(mp_adjusted)), mp_adjusted)
axes[2].set_ylabel('Matrix Profile')
# for idx in discords:
axes[2].plot(idx, mp_adjusted[idx], c='r', marker='*', lw=0, markersize=10)
axes[2].set_title('Window Size = {}'.format(w))
fig.subplots_adjust(right=0.8)
cbar_ax = fig.add_axes([1, 0.46, 0.01, 0.1])
fig.colorbar(pos, orientation='vertical', cax=cbar_ax, use_gridspec=True)
lines = [
Line2D([0], [0], color='red', marker='*', lw=0),
Line2D([0], [0], color='blue'),
]
fig.legend(lines, ['Discord', 'MP'], bbox_to_anchor=(1.07, 0.44))
fig.tight_layout()
discord_figures.append(fig)
return discord_figures
def plot_motifs_mp(profile):
"""
Plot motifs given a MatrixProfile data structure.
Parameters
----------
profile : dict_like
The matrix profile object to plot.
Returns
-------
list : figures
A list of matplotlib figure objects.
"""
figures = []
w = profile['w']
motifs = profile['motifs']
data = profile.get('data', None)
if data:
ts = data.get('ts', None)
fig, axes = plt.subplots(len(motifs), 2, figsize=(15, 7), sharey='row', sharex='col')
if len(motifs) == 1:
axes = [axes,]
pair_num = 1
for ax_row, motif in zip(axes, motifs):
first = True
for ax, idx in zip(ax_row, motif['motifs']):
subquery = ts[idx:idx + w]
indices = np.arange(len(subquery))
ax.plot(indices, subquery)
ax.set_title('Index Start {}'.format(idx))
if first:
ax.set_ylabel('Motif {}'.format(pair_num))
first = False
pair_num += 1
fig.tight_layout()
figures.append(fig)
fig, axes = plt.subplots(len(motifs), 1, figsize=(15, 7), sharey='row', sharex='col')
if len(motifs) == 1:
axes = [axes,]
pair_num = 1
for ax, motif in zip(axes, motifs):
ax.plot(np.arange(len(ts)), ts)
for idx in motif['motifs']:
subquery = ts[idx:idx + w]
indices = np.arange(idx, idx + w)
ax.plot(indices, subquery, c='r')
ax.set_ylabel('Motif {}'.format(pair_num))
for idx in motif['neighbors']:
subquery = ts[idx:idx + w]
indices = np.arange(idx, idx + w)
ax.plot(indices, subquery, c='black')
pair_num += 1
lines = [
Line2D([0], [0], color='blue'),
Line2D([0], [0], color='red'),
Line2D([0], [0], color='black')
]
fig.legend(lines, ['Data', 'Motif', 'Neighbor'], bbox_to_anchor=(1.08, 0.975))
fig.tight_layout()
figures.append(fig)
return figures
def plot_motifs_pmp(profile):
"""
Plot motifs given a Pan-MatrixProfile data structure.
Parameters
----------
profile : dict_like
The pan matrix profile object to plot.
Returns
-------
list : figures
A list of matplotlib figure objects.
"""
motif_figures = []
motifs = profile.get('motifs')
data = profile.get('data', None)
windows = profile.get('windows', None)
if data:
ts = data.get('ts', None)
fig, axes = plt.subplots(len(motifs), 2, figsize=(15, 7), sharey='row', sharex='col')
if len(motifs) == 1:
axes = [axes,]
pair_num = 1
for ax_row, motif in zip(axes, motifs):
first = True
for ax, motif_loc in zip(ax_row, motif['motifs']):
w = windows[motif_loc[0]]
idx = motif_loc[1]
subquery = ts[idx:idx + w]
indices = np.arange(len(subquery))
ax.plot(indices, subquery)
ax.set_title('Index Start {}, Window Size {}'.format(idx, w))
if first:
ax.set_ylabel('Motif {}'.format(pair_num))
first = False
pair_num += 1
fig.tight_layout()
motif_figures.append(fig)
fig, axes = plt.subplots(len(motifs), 1, figsize=(15, 7), sharey='row', sharex='col')
if len(motifs) == 1:
axes = [axes,]
pair_num = 1
for ax, motif in zip(axes, motifs):
ax.plot(np.arange(len(ts)), ts)
for motif_loc in motif['motifs']:
w = windows[motif_loc[0]]
idx = motif_loc[1]
subquery = ts[idx:idx + w]
indices = np.arange(idx, idx + w)
ax.plot(indices, subquery, c='r')
ax.set_title('Window Size {}'.format(w))
ax.set_ylabel('Motif {}'.format(pair_num))
for neigh_loc in motif['neighbors']:
w = windows[neigh_loc[0]]
idx = neigh_loc[1]
subquery = ts[idx:idx + w]
indices = np.arange(idx, idx + w)
ax.plot(indices, subquery, c='black')
pair_num += 1
lines = [
Line2D([0], [0], color='blue'),
Line2D([0], [0], color='red'),
Line2D([0], [0], color='black')
]
fig.legend(lines, ['Data', 'Motif', 'Neighbor'], bbox_to_anchor=(1.08, 0.975))
fig.tight_layout()
motif_figures.append(fig)
return motif_figures
def plot_snippets(snippets, ts):
"""
Plot snippets for the given Snippets data structure.
Parameters
----------
snippets : list
A list of snippets as dictionary objects to plot.
ts : array_like
The time series.
Returns
-------
list : figures
A list of matplotlib figures.
"""
figures = []
for i in range(len(snippets)):
snippet_id = str(i+1)
snippet_start = snippets[i]['index']
snippet_end = snippets[i]['index']+len(snippets[i]['snippet'])
snippet_data = snippets[i]['snippet']
# Create a plot for current snippets
fig, ax = plt.subplots(1, 1, sharex=True, figsize=(15,5))
ax.plot(ts)
ax.set_title('Snippet-'+snippet_id, size=12)
ax.set_ylabel('Data')
flag = 1
# Get intervals for the given neighboring snippet indices
neighbors = snippets[i]['neighbors']
intervals = []
for i in range(len(neighbors)):
if i == 0:
intervals.append(neighbors[i])
if i == len(neighbors)-1:
intervals.append(neighbors[i])
break
if (neighbors[i+1] - neighbors[i]) != 1:
intervals.append(neighbors[i])
intervals.append(neighbors[i+1])
step = 2
intervals = [intervals[i:i+step] for i in range(0, len(intervals), step)]
# Plot the neighboring snippets
for interval in intervals:
start = interval[0]
end = interval[1]
if flag:
ax.plot(np.arange(start,end+1),ts[start:end+1], c = "orange"
,label = "Subsequences Represented by Snippet-"+ snippet_id)
flag = 0
else:
ax.plot(np.arange(start,end+1),ts[start:end+1],"orange")
# Plot the snippet
ax.plot(np.arange(snippet_start,snippet_end), snippet_data, c = "red"
, label = 'Snippet-'+ snippet_id)
plt.legend(loc="upper right")
fig.tight_layout()
figures.append(fig)
return figures