# Lessons learned - historic webhook processing
This document lists problems faced and solutions found - a.k.a. lessons learned - while processing historic webhook data from husbpot.
All work developed in this POC is on branch feature/historic-webhook-processing-377
Hubspot workflows used to generate data for this are:
* https://app.hubspot.com/workflows/355484/platform/flow/119669123/edit → contacts
* https://app.hubspot.com/workflows/355484/platform/flow/119011671/edit → deals
## The challenge
Parquets generated from webhook payloads have a complex schema structure.
This is the schema we get (represented in json form):
```json=
{
"properties": {
"<property_name_1>": {
"versions": [
{
"value": (string),
"timestamp": (long)
}
]
},
"<property_name_2>": {
"versions": [
{
"value": (string),
"timestamp": (long)
}
]
},
...,
"<property_name_1280>": {
"versions": [
{
"value": (string),
"timestamp": (long)
}
]
}
}
}
```
This is the schema we want to generate:

That is, we need to explode the **versions** array to get its **value** and **timestamp** keys. Easy right?
Should be. The problem is that this array is deeply nested within each one of the 1000+ property name keys, and we also need to have these property names in our result.
Welcome to the jungle. I'll be your guide.
(when we make this a documentary film, please start playing [this song](https://www.youtube.com/watch?v=o1tj2zJ2Wvg&t=5s) in this part)
## The problems (and, hopefully, solutions)
### 1. Writing properties sequentially takes too long
#### Situation
This first approach of processing consists of:
1. Read a dataframe with all contacts from rawmones
2. Extract a list or property names from this dataframe
3. For each property name, query the dataframe, exploding the properties.<property_name>.versions array, which produces a dataframe in the desired format, for one property at a time
4. Write this resulting dataframe in that property's partition in refined skynyrd
5. Repeat until all properties on the list are written]
#### Problem
This would be a good solution if we didn't have 1000+ properties and hundreds of thousands of objects to go through. The thing is, writing one property at a time takes a long time, and doesn't make a good use of EMR's memory - which stays idle for most of the writing.
#### Solution found
Let's extract properties sequentially, but write them in parallel!
### 2. Writing properties in parallel causes OutOfMemory error
#### Situation
This second approach is a direct solution to the first problem found. Here's what we do:
1. Read a dataframe with all contacts from rawmones
2. Extract a list or property names from this dataframe
3. For each property name, query the dataframe, exploding the properties.<property_name>.versions array, which produces a dataframe in the desired format, for one property at a time
4. **UnionAll resulting dataframes, outputting a single dataframe with all properties in the desired format**
5. Write this resulting dataframe in refined skynyrd
#### Problem
This causes and OutOfMemory error.
What is "this", exactly? In this operation, build a list of dataframes, one for each extracted property, and then union all these dataframes into one final object to be written to refined-skynyrd.
But Spark uses lazy loading to run these queries, meaning they don't really run until they are actually needed. If we have 1280 properties, we end up with 1280 copies of the original dataframe, which will be queried all at the same time - on the write operation.
The result? Out of memory error.
#### Solution
Two solutions:
1. Leverage memory usage by spark.
* We can achieve this by maximizing resource allocation by the spark driver. References:
* https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-performance.html
* https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation
* This can be done by adding the following to EMR's software settings
```json
{
"classification":"spark",
"properties":{
"maximizeResourceAllocation":"true"
}
}
```
2. Improve how we query the df for each property.
* This was achieved by running a more efficient query to extract all properties, instead of one query per property. This way we don't end up with an out of memory error when writing the result of many queries (nor take a long time to extract and write one property at a time).
* The solution was to use a combination of lateral view and unpivotting spark dataframe. References:
* https://stackoverflow.com/questions/36876959/sparksql-can-i-explode-two-different-variables-in-the-same-query
* https://sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe/
* We had to switch this code:
```python=
def explode_properties(properties, df):
"""
Extracts each of the properties from all objects in dataframe.
Args:
properties: name of properties to be extracted
"""
logger.warning(f"Extracting properties from parquet.")
exploded_properties_dfs_list = list(map(lambda property_name: explode_property(property_name, df), properties))
return reduce(lambda final_df, single_df: final_df.union(single_df), exploded_properties_dfs_list)
def explode_property(property_name, df):
return df.select(col("properties.hs_object_id.value").alias("object_id"),
lit(property_name).alias("property_name"),
explode(f"properties.{property_name}.versions")) \
.select("object_id", "property_name", col("col.value").alias("property_value"), col("col.timestamp"),
to_timestamp(from_unixtime(col("col.timestamp") / 1000)).alias("datetime"))
```
by this code:
```python=
def explode_properties(properties):
"""
Extracts each of the properties from all objects in dataframe.
Args:
properties: name of properties to be extracted
"""
explode_query = mount_explode_query(properties)
exploded_rawmones = sc.sql(explode_query)
exploded_rawmones = repartition(exploded_rawmones)
unpivot_df = unpivot_property_columns(exploded_rawmones, properties)
return unpivot_df
def unpivot_property_columns(exploded_rawmones, properties):
unpivot_list = [f"'{property_name}', {property_name}, {property_name}_timestamp" for property_name in properties]
unpivot_expr = f"stack({len(properties)}, {', '.join(unpivot_list)}) as (property_name, property_value, timestamp)"
unpivot_df = exploded_rawmones.select("object_id", expr(unpivot_expr)) \
.where("property_value is not null")
return unpivot_df
def mount_explode_query(properties):
lateral_views = [
f"lateral view explode(properties.{property_name}.versions) exploded_{property_name} as {property_name}" for
property_name in properties]
select_statements = [
f"{property_name}.value as {property_name}, {property_name}.timestamp as {property_name}_timestamp" for
property_name in properties]
complete_query = f"select properties.hs_object_id.value as object_id, {','.join(select_statements)} from rawmones {' '.join(lateral_views)}"
return complete_query
```
### 3. Too many partitions are difficult to write
#### Situation
Ok, so we managed to query the data we need in an efficient manner. Now we need to write this in s3. We want the resulting s3 bucket to be partitioned by property name and by object id - this way we can update only the objects that changed in the future, since each object_id holds the its own entire history.
#### Problem
We end up with a couple million partitions here. Spark knowingly doesn't deal well with either too few or too many partitions.
#### Solution
It's time to find a workaround. Or, in this case, workarounds!
1. Repartition data before sensitive operations
This is mostly important to do before we start working with the dataframe. Reference: https://luminousmen.com/post/spark-tips-partition-tuning
2. Increase spark maxResultSize
This doesn't make much sense, but basically executing a sql statement with a large number of partitions requires a high memory space for the driver even there are no requests to collect data back to the driver.
Reference: https://issues.apache.org/jira/browse/SPARK-12837
This can be achieve by setting:
```json
{"classification":"spark-defaults","properties":{"spark.driver.maxResultSize":"0"}}
```
The '0' here means unlimited result size.
### 4. Complex query
#### Situation
The query to extract all properties at once is a complex one, with thousands of statements.
#### Problem
Naturally, this query generates a complex query plan. Spark generates an error of exceeding spark.sql.optimizer.maxIterations.
#### Solutions
1. Increase spark.sql.optimizer.maxIterations
```json
{"classification":"spark-defaults","properties":{"spark.sql.optimizer.maxIterations":"400"}}
```
2. Divide the work in batches of properties
We can divide processing in batches of ~100 queries in order to limit the complexity of the query.
### 5. Overwriting everything
#### Situation
When writing data to refined-skynyrd, we only want to overwrite data in for the object_ids we are writing at the moment, not everything that is under the contacts/ (or deals/) folder.
#### Problem
By default, on the overwrite mode, spark overwrites everything.
#### Solution
Set partition overwrite mode to dynamic. Thanks Spark 2.4!
```json
{"classification":"spark-defaults","properties":{"spark.sql.sources.partitionOverwriteMode":"dynamic"}}
```
## Ideas
Right now I think the biggest issue is that we're trying to write too many partitions. So avoid partitioning by object_id can improve things.
Here's what I think needs to change in the flow for that to work. Wherever I write "contacts" can be any other webhook entity name.
1. Change the historic webhook workflows to send historic data to an alternative api gateway endpoint (e.g. /historic_contacts, instead of /contacts)
2. Add this endpoint as valid in the coordinator lambda
3. This will make the metadata lambda write data in dynamo with a different entity type.
4. The transient_to_raw transformer can be the same - just run it with "historic_contacts" as parameter instead of "contacts", so it reads the correct data from dynamo and stores it as parquet in a separate folder from the incremental flow
5. Run the historic raw_to_refined transformer. It'll write data partitioning only by property_name on refined-skynyrd, on folder historic_webhooks.
6. Run the historic refined_to_the_cure transformer. It should read all historic data from refined and finally partition it by property_name and object_id (in that order) in non-historic folder at the_cure. This is the part that makes both flows converge, so it's super important.