# Data pipelines Author: Philipp Rudiger ### What is a data pipeline? Effectively it is a DAG which starts with some dataset and applies various transforms on the data. In the case of HoloViews each node in the DAG may also have semantic meaning, e.g. an element type, or add additional metadata, e.g. labels or options. Each transform can have parameters which can either be static or be driven by some dynamic value. ### What implementations of data pipelines exist in HoloViews? #### `DynamicMap` Initially DynamicMap were designed as a simple mechanism to take some inputs, in the form of key dimensions, pass them to a callback and return some HoloViews object: ```python def callback(time): return hv.Image(ds.select(time=time)) hv.DynamicMap(callback, kdims=['time']).redim.range(time=(0, 10)) ``` However key dimensions were limited they, could only be driven by sliders and dropdowns, instead we wanted to drive them by arbitrary parameters or even inputs from other plots, so we designed `Streams`: ```python def callback(time, x_range, y_range): return hv.Image(ds.select(time=time, x=x_range, y_range=y_range)) hv.DynamicMap(callback, kdims=['time'], streams=[RangeXY()]).redim.range(time=(0, 10)) ``` This meant that a `DynamicMap` could now be driven by almost anything, but also complicated things because users would now have to understand both `kdims` and `streams` and how they map to the callback arguments. Next I introduced the `Dynamic` utility which would take some operation and apply it to the output of another `DynamicMap`, this way you could actually build a pipeline or a DAG. Streams could then be used to inject dynamic parameter values into operations that were part of this DAG, building a dynamic pipeline. ```python def callback(time, x_range, y_range): return hv.Image(ds.select(time=time, x=x_range, y_range=y_range)) dmap = hv.DynamicMap(callback, kdims=['time'], streams=[RangeXY()]).redim.range(time=(0, 10)) stream = hv.streams.Stream.define('Kernel', sigma=0.1) smoothed_dmap = smooth_operation(dmap, streams=[stream]) ``` We could now apply operations on a DynamicMap and the concept of a pipeline or DAG first became a reality in HoloViews. However, DynamicMap + Streams + Operations were all pretty obscure concepts to a newcomer so we set about making this easier, instead of requiring streams you could inject parameters and instead of Operations you could use simple functions and instead of using `Dynamic` you could use `.apply`. ```python time = pn.widgets.DateSlider(name='Time', start=..., end=...) sigma = pn.widgets.FloatSlider(name='sigma', start=..., end...) stream = RangeXY() @pn.depends(time=slider, x_range=stream.x_range, y_range=stream.y_range) def callback(time, x_range, y_range): return hv.Image(ds.select(time=time, x=x_range, y_range=y_range)) smoothed_dmap = DynamicMap(callback).apply(smooth_function, sigma=sigma) ``` Now we could build the pipeline or DAG by specifying an initial callback which could use `kdims`, `streams` or `pn.depends` and then chain simple functions, which could be driven dynamically by parameter values or widgets. This allowed for more explicit specification on where different dynamic inputs to our callbacks came from and users no longer needed to know about `DynamicMap.kdims` and how they relate to a callback, about the `streams` argument and how the stream parameter map to callback kwargs, or about `Dynamic` and/or `Operation` and how to chain functions to generate a pipeline. ### `dim` transform Much more recently we introduced `dim` transforms, these express operations on the data and allow users to use NumPy/Pandas/XArray APIs without having to learn our APIs for performing reductions, selections etc. By also adding support for dynamic parameters a user would not even have to write a function or operation anymore they could just express their dynamic transformation using APIs they are familiar with and end up with a dynamic pipeline or DAG. Let's take the example of computing the rolling mean and outliers on a Curve. Previously you would have to write a full operation to handle this: ```python class rolling_outlier_std(Operation, RollingBase): center = param.Boolean(default=True, doc=""" Whether to set the x-coordinate at the center or right edge of the window.""") min_periods = param.Integer(default=None, allow_None=True, doc=""" Minimum number of observations in window required to have a value (otherwise result is NaN).""") rolling_window = param.Integer(default=10, doc=""" The window size over which to operate.""") sigma = param.Number(default=2.0, doc=""" Minimum sigma before a value is considered an outlier.""") def _roll_kwargs(self): return {'window': self.p.rolling_window, 'center': self.p.center, 'min_periods': self.p.min_periods} def _process_layer(self, element, key=None): ys = element.dimension_values(1) # Calculate the variation in the distribution of the residual avg = pd.Series(ys).rolling(**self._roll_kwargs()).mean() residual = ys - avg std = pd.Series(residual).rolling(**self._roll_kwargs()).std() # Get indices of outliers with np.errstate(invalid='ignore'): outliers = (np.abs(residual) > std * self.p.sigma).values return element[outliers].clone(new_type=Scatter) def _process(self, element, key=None): return element.map(self._process_layer, Element) class rolling(Operation,RollingBase): """ Applies a function over a rolling window. """ window_type = param.ObjectSelector(default=None, allow_None=True, objects=['boxcar', 'triang', 'blackman', 'hamming', 'bartlett', 'parzen', 'bohman', 'blackmanharris', 'nuttall', 'barthann', 'kaiser', 'gaussian', 'general_gaussian', 'slepian'], doc="The shape of the window to apply") function = param.Callable(default=np.mean, doc=""" The function to apply over the rolling window.""") def _process_layer(self, element, key=None): xdim = element.kdims[0].name df = PandasInterface.as_dframe(element) df = df.set_index(xdim).rolling(win_type=self.p.window_type, **self._roll_kwargs()) if self.p.window_type is None: kwargs = {'raw': True} if pandas_version >= '0.23.0' else {} rolled = df.apply(self.p.function, **kwargs) else: if self.p.function is np.mean: rolled = df.mean() elif self.p.function is np.sum: rolled = df.sum() else: raise ValueError("Rolling window function only supports " "mean and sum when custom window_type is supplied") return element.clone(rolled.reset_index()) def _process(self, element, key=None): return element.map(self._process_layer, Element) window = pn.widgets.IntSlider(start=0, end=10) sigma = pn.widgets.FloatSlider(start=0, end=3) window_stream = ParamStream(slider, rename={'value': 'rolling_window'}) sigma_stream = ParamStream(sigmaa, rename={'value': 'sigma'}) curve = hv.Curve(some_timeseries) rolled = rolling(curve, streams=[window_stream]) outliers = rolling_outlier_std(curve, streams=[window_stream, rolling_stream]) rolled * outliers ``` While now you can express this transformation entirely using `dim` transforms: ```python window = pn.widgets.IntSlider(start=0, end=10) sigma = pn.widgets.FloatSlider(start=0, end=3) avg = dim('y').pd.rolling(window=window).mean() std = dim('y').pd.rolling(window=window).std() outliers = (np.abs(dim('y')-avg) > std * sigma) curve = hv.Curve(some_timeseries) curve.apply.transform(y=avg) * curve.apply.select(outliers).apply(hv.Scatter) ``` Thanks to the ability to lazily express arbitrary operations and method calls even a complex data transformation pipeline can be written in just a few lines of code. ### .dataset and .pipeline The `.dataset` and `.pipeline` attributes on HoloViews elements are just ways of capturing the data transformation pipelines expressed by both the DynamicMap and the `dim` transform approach. It encapsulates the original data with all the transformation required to generate the final plot. ### Summary In this history of data pipelines in HoloViews we can observe several trends: * Avoid custom classes a user has to write as much as possible * Simplify the way a user provides dynamic parameters * Bring the data transformation closer to the data and avoid new/custom APIs, i.e. stick with what the user knows At the same time certain features turn out to be essential: * Need to store the initial data * Need a way to represent all the transformation that make up the DAG * Need to cache/memoize intermediate steps to avoid having to reevaluate the entire every time anything changes. Only the `DynamicMap` has all three of these properties, the other two approaches to express a pipeline are therefore eventually converted to this final approach. ## What approaches exist in Panel? Panel also has a reactive model, i.e. one where a function depends on a number of parameters and emits a new object whenever one of these parameters changes, indeed a `param.depends` annotated function can be evaluated by both HoloViews and Panel. Panel however is missing two crucial features, functions cannot be easily be chained and they do not have memoization so even if you could chain them it would be tremendously inefficient. Should we aallow chaining param.depends functions like this (repeating the earlier HoloViews based example)? ```python time = pn.widgets.DateSlider(name='Time', start=..., end=...) sigma = pn.widgets.FloatSlider(name='sigma', start=..., end...) stream = RangeXY() @pn.depends(time=slider, x_range=stream.x_range, y_range=stream.y_range) def callback(time, x_range, y_range): return hv.Image(ds.select(time=time, x=x_range, y_range=y_range)) @pn.depends(callback, sigma) def smooth(img, sigma): return smooth_function(img, sigma=sigma) ``` ## What about `interactive`? Recently I have been playing around with a new API which we have dubbed `interactive` which operates directly on some underlying dataset, e.g. a pandas `DataFrame` or a xarray `Dataset`. ```python time = pn.widgets.IntSlider(start=0, end=20) xr_ds.interactive.isel(time=time).hvplot() ``` This goes even further to address the three main goals we have been optimizing for as the HoloViews APIs evolved, 1. no custom classes, 2. dynamic parameters can be injected directly and 3. no new APIs, users get to use the exact APIs they are already familiar with from their work with the underlying data library be it pandas, dask or xarray. However in implementing it I came across issues and questions, the main one being "Do I implement it on existing APIs in HoloViews or simply borrow ideas?". Do I use a DynamicMap? Then I have to always wrap the object in a HoloViews datastructure, which may not be appropriate? Do I build it solely on `dim` transforms and the equivalent of `.dataset` and `.pipeline`? Then I need to reimplement the intermediate nodes of the pipeline to be able to cache/memoize the intermediate computations. ## Going forward Is there a generalized way to express these kinds of DAGs or pipelines without HoloViews specific details? Do I rewrite interactive by heavily borrowing ideas from HoloViews? Do we rewrite parts of HoloViews internals based on a more generic DAG/pipeline system?