Author: Philipp Rudiger
Effectively it is a DAG which starts with some dataset and applies various transforms on the data.
In the case of HoloViews each node in the DAG may also have semantic meaning, e.g. an element type, or add additional metadata, e.g. labels or options.
Each transform can have parameters which can either be static or be driven by some dynamic value.
DynamicMap
Initially DynamicMap were designed as a simple mechanism to take some inputs, in the form of key dimensions, pass them to a callback and return some HoloViews object:
def callback(time):
return hv.Image(ds.select(time=time))
hv.DynamicMap(callback, kdims=['time']).redim.range(time=(0, 10))
However key dimensions were limited they, could only be driven by sliders and dropdowns, instead we wanted to drive them by arbitrary parameters or even inputs from other plots, so we designed Streams
:
def callback(time, x_range, y_range):
return hv.Image(ds.select(time=time, x=x_range, y_range=y_range))
hv.DynamicMap(callback, kdims=['time'], streams=[RangeXY()]).redim.range(time=(0, 10))
This meant that a DynamicMap
could now be driven by almost anything, but also complicated things because users would now have to understand both kdims
and streams
and how they map to the callback arguments.
Next I introduced the Dynamic
utility which would take some operation and apply it to the output of another DynamicMap
, this way you could actually build a pipeline or a DAG. Streams could then be used to inject dynamic parameter values into operations that were part of this DAG, building a dynamic pipeline.
def callback(time, x_range, y_range):
return hv.Image(ds.select(time=time, x=x_range, y_range=y_range))
dmap = hv.DynamicMap(callback, kdims=['time'], streams=[RangeXY()]).redim.range(time=(0, 10))
stream = hv.streams.Stream.define('Kernel', sigma=0.1)
smoothed_dmap = smooth_operation(dmap, streams=[stream])
We could now apply operations on a DynamicMap and the concept of a pipeline or DAG first became a reality in HoloViews. However, DynamicMap + Streams + Operations were all pretty obscure concepts to a newcomer so we set about making this easier, instead of requiring streams you could inject parameters and instead of Operations you could use simple functions and instead of using Dynamic
you could use .apply
.
time = pn.widgets.DateSlider(name='Time', start=..., end=...)
sigma = pn.widgets.FloatSlider(name='sigma', start=..., end...)
stream = RangeXY()
@pn.depends(time=slider, x_range=stream.x_range, y_range=stream.y_range)
def callback(time, x_range, y_range):
return hv.Image(ds.select(time=time, x=x_range, y_range=y_range))
smoothed_dmap = DynamicMap(callback).apply(smooth_function, sigma=sigma)
Now we could build the pipeline or DAG by specifying an initial callback which could use kdims
, streams
or pn.depends
and then chain simple functions, which could be driven dynamically by parameter values or widgets. This allowed for more explicit specification on where different dynamic inputs to our callbacks came from and users no longer needed to know about DynamicMap.kdims
and how they relate to a callback, about the streams
argument and how the stream parameter map to callback kwargs, or about Dynamic
and/or Operation
and how to chain functions to generate a pipeline.
dim
transformMuch more recently we introduced dim
transforms, these express operations on the data and allow users to use NumPy/Pandas/XArray APIs without having to learn our APIs for performing reductions, selections etc. By also adding support for dynamic parameters a user would not even have to write a function or operation anymore they could just express their dynamic transformation using APIs they are familiar with and end up with a dynamic pipeline or DAG. Let's take the example of computing the rolling mean and outliers on a Curve.
Previously you would have to write a full operation to handle this:
class rolling_outlier_std(Operation, RollingBase):
center = param.Boolean(default=True, doc="""
Whether to set the x-coordinate at the center or right edge
of the window.""")
min_periods = param.Integer(default=None, allow_None=True, doc="""
Minimum number of observations in window required to have a
value (otherwise result is NaN).""")
rolling_window = param.Integer(default=10, doc="""
The window size over which to operate.""")
sigma = param.Number(default=2.0, doc="""
Minimum sigma before a value is considered an outlier.""")
def _roll_kwargs(self):
return {'window': self.p.rolling_window,
'center': self.p.center,
'min_periods': self.p.min_periods}
def _process_layer(self, element, key=None):
ys = element.dimension_values(1)
# Calculate the variation in the distribution of the residual
avg = pd.Series(ys).rolling(**self._roll_kwargs()).mean()
residual = ys - avg
std = pd.Series(residual).rolling(**self._roll_kwargs()).std()
# Get indices of outliers
with np.errstate(invalid='ignore'):
outliers = (np.abs(residual) > std * self.p.sigma).values
return element[outliers].clone(new_type=Scatter)
def _process(self, element, key=None):
return element.map(self._process_layer, Element)
class rolling(Operation,RollingBase):
"""
Applies a function over a rolling window.
"""
window_type = param.ObjectSelector(default=None, allow_None=True,
objects=['boxcar', 'triang', 'blackman', 'hamming', 'bartlett',
'parzen', 'bohman', 'blackmanharris', 'nuttall',
'barthann', 'kaiser', 'gaussian', 'general_gaussian',
'slepian'], doc="The shape of the window to apply")
function = param.Callable(default=np.mean, doc="""
The function to apply over the rolling window.""")
def _process_layer(self, element, key=None):
xdim = element.kdims[0].name
df = PandasInterface.as_dframe(element)
df = df.set_index(xdim).rolling(win_type=self.p.window_type,
**self._roll_kwargs())
if self.p.window_type is None:
kwargs = {'raw': True} if pandas_version >= '0.23.0' else {}
rolled = df.apply(self.p.function, **kwargs)
else:
if self.p.function is np.mean:
rolled = df.mean()
elif self.p.function is np.sum:
rolled = df.sum()
else:
raise ValueError("Rolling window function only supports "
"mean and sum when custom window_type is supplied")
return element.clone(rolled.reset_index())
def _process(self, element, key=None):
return element.map(self._process_layer, Element)
window = pn.widgets.IntSlider(start=0, end=10)
sigma = pn.widgets.FloatSlider(start=0, end=3)
window_stream = ParamStream(slider, rename={'value': 'rolling_window'})
sigma_stream = ParamStream(sigmaa, rename={'value': 'sigma'})
curve = hv.Curve(some_timeseries)
rolled = rolling(curve, streams=[window_stream])
outliers = rolling_outlier_std(curve, streams=[window_stream, rolling_stream])
rolled * outliers
While now you can express this transformation entirely using dim
transforms:
window = pn.widgets.IntSlider(start=0, end=10)
sigma = pn.widgets.FloatSlider(start=0, end=3)
avg = dim('y').pd.rolling(window=window).mean()
std = dim('y').pd.rolling(window=window).std()
outliers = (np.abs(dim('y')-avg) > std * sigma)
curve = hv.Curve(some_timeseries)
curve.apply.transform(y=avg) * curve.apply.select(outliers).apply(hv.Scatter)
Thanks to the ability to lazily express arbitrary operations and method calls even a complex data transformation pipeline can be written in just a few lines of code.
The .dataset
and .pipeline
attributes on HoloViews elements are just ways of capturing the data transformation pipelines expressed by both the DynamicMap and the dim
transform approach. It encapsulates the original data with all the transformation required to generate the final plot.
In this history of data pipelines in HoloViews we can observe several trends:
At the same time certain features turn out to be essential:
Only the DynamicMap
has all three of these properties, the other two approaches to express a pipeline are therefore eventually converted to this final approach.
Panel also has a reactive model, i.e. one where a function depends on a number of parameters and emits a new object whenever one of these parameters changes, indeed a param.depends
annotated function can be evaluated by both HoloViews and Panel.
Panel however is missing two crucial features, functions cannot be easily be chained and they do not have memoization so even if you could chain them it would be tremendously inefficient.
Should we aallow chaining param.depends functions like this (repeating the earlier HoloViews based example)?
time = pn.widgets.DateSlider(name='Time', start=..., end=...)
sigma = pn.widgets.FloatSlider(name='sigma', start=..., end...)
stream = RangeXY()
@pn.depends(time=slider, x_range=stream.x_range, y_range=stream.y_range)
def callback(time, x_range, y_range):
return hv.Image(ds.select(time=time, x=x_range, y_range=y_range))
@pn.depends(callback, sigma)
def smooth(img, sigma):
return smooth_function(img, sigma=sigma)
interactive
?Recently I have been playing around with a new API which we have dubbed interactive
which operates directly on some underlying dataset, e.g. a pandas DataFrame
or a xarray Dataset
.
time = pn.widgets.IntSlider(start=0, end=20)
xr_ds.interactive.isel(time=time).hvplot()
This goes even further to address the three main goals we have been optimizing for as the HoloViews APIs evolved, 1. no custom classes, 2. dynamic parameters can be injected directly and 3. no new APIs, users get to use the exact APIs they are already familiar with from their work with the underlying data library be it pandas, dask or xarray.
However in implementing it I came across issues and questions, the main one being "Do I implement it on existing APIs in HoloViews or simply borrow ideas?".
Do I use a DynamicMap? Then I have to always wrap the object in a HoloViews datastructure, which may not be appropriate? Do I build it solely on dim
transforms and the equivalent of .dataset
and .pipeline
? Then I need to reimplement the intermediate nodes of the pipeline to be able to cache/memoize the intermediate computations.
Is there a generalized way to express these kinds of DAGs or pipelines without HoloViews specific details?
Do I rewrite interactive by heavily borrowing ideas from HoloViews?
Do we rewrite parts of HoloViews internals based on a more generic DAG/pipeline system?