# Homework 2-2 Particle Simulation - OpenMPI
[Website](https://hackmd.io/@gyao/hw2-2/edit)
> Zixi Cai: Designed and implemented processor workload partitioning and *in-processor* blocking algorithm. Debug and fine tune message passing to achieve optimal performance.
>
> Gang Yao: Designed and implemented naive binning solution. Debug message passing synchronization and benchmark code performance.
>
::: success
:tada: **15.601** seconds for **1000000** particles (1000 steps with 68 KNL Processors)
:::
## Naive Solution
### Simple Binning with Number of Processor
This solution is an parallel improvement over the original $O(N^2)$ serial solution. For a given number of processor, we split the simulation space into `num_processor` equal narrow bins. Then put every particle into their corresponding bin according to their y coordinate. Each processor (marked with an integer `rank`) would be assigned a bin to work on. During each iteration of the simulation, every bin, aka process, should be notified of any particles in the neighboring bins that is within close range to the current bin's boundary. Here, we did an *even-odd* blocking *send&receive*. First, the even bins will send its particles sitting near their boundary to their neighboring bins. Then all the odd bins will call `MPI_Recv`. Then the odd bins will send and even bins will receive. These two steps are blocking so that no process will proceed without receiving their neighbor's particles.
Then we loop through all the particles in the current bin and call `apply_force()` on all the other particles inside the current bin and received buffer. Finally, at the end of the step, we call `MPI_Allgather()` and collect new status into process rank 0.

### Performance Against Naive Serial Solution

| N_PARTICLES| 100 |1000 |10000 | 100000 |
| -------- | -------- | -------- | -------- | -------- | -------- |
| Serial Simulation Time (s) | 0.0163891 | 0.937768 | 188.65 | never finish |
| OpenMPI Simulation Time (s) | 9.81e-06 | 0.0263929 | 0.47382 | 27.0479 |
MPI parallel $O(n^2)$ solution is almost at a constant times faster than naive serial solution.
## Optimized Solution
### Design - Processor Level Partition Strategy
Support we have $N$ processors, if we partition the space into N equal stripes, the total communication distance is $2(N-1)$. But if we partition the space into $\sqrt{N}\cdot\sqrt{N}$ square grids, the total communication distance is $4(\sqrt{N}-1)$. So partition into square grids can reduce the communication time, especially when $N$ is large. However, square root of the given number of processors $\sqrt{N}$ is not always an integer. So we implement a trade off to utilize the most processors while making the grid as square as possible.
Our optimal goal is to have `cols` and `rows` as close as possible to reduce communication, and `cols * rows` as close to `num_procs` as possible to prevent us from wasting too many processors. But in most cases there are trade-offs between the two, and it is kind of tricky to select values for the two so that we can maximize our performance. Here is our strategy: we set the constraints that `rows >= 1/2 * cols` to make sure `rows` and `cols` will not be too far apart, and `cols * rows <= num_procs` to make sure we can handle it using given number of processors. Then we maximize `cols * rows` under the constraints to minimize the number of wasted processors.

### Design - Within Processor Blocking
Similar to HW2-1, we partition the space inside every processor block into small blocks whose dimensions are defined with `cutoff` distance. Each small block contains a linked list of particles and we compute the updated position inside a processor block just like in HW2-1. Other than that, every small block was also assigned a local coordinate and a global coordinate for the convenience of computing particle transmission between processors.
### Synchronization - Communication between Processors
```cpp=
struct particle_msg_t {
double x, y;
};
struct particleid_msg_t {
uint64_t id;
double x, y;
double vx, vy;
};
```
We first define the data structure needed for between-processor communication. `particle_msg_t` is defined for exchange particles between boundary small blocks of two processor blocks to calculate `apply_force()`. The message would be coming from four sides of neighboring blocks (North, East, South West) and four corners of the other four neighboring blocks (NE, NW, SE, SW). `particleid_mes_t` is defined for exchanging message in the `move()` step. At every step of moving particles, we push all the information with local particles into a message vector and sort it by `particle_t::id`. Then do an Alltoall communication so that every processor could update their local blocks correspondingly.
First we will talk about design of data structure used in communication. The basic idea behind it is to only exchange data that is necessary. In force computing phase, for each partition associated with each processor, information of particles near boundaries of partitions next to this partition is required. But only coordinates are required, no need for `id` of those particles in computation. So we use `particle_msg_t` for this part, which only contains `x` and `y` of each particle. As for particles moving phase, since one processor has to take over particles from other processors with no knowledge about what has happened to those particles, it needs all information of particles except `ax` and `ay`. To this end, we adopt `particleid_msg_t` for this phase of communication.
Next we will dive into details on what happen in communication of each of the two phase. For each partition associated with each processor, useful message will merely be particles in blocks immediately surrounding the partition, that is sides of neighboring partitions in four horizontal or vertical direction (N, E, S, W), and corners of neighboring partitions in four oblique direction (NE, NW, SE, SW).
Relevant code for force computing as well as applying is shown below. In each processor, we iterate over 8 directions and send/receive particles in boundaries/corners of each direction to/from the corresponding neighboring processor in that direction.
###### Apply force
```c=
// iterate over 8 directions
for (int d = 0; d < 8; ++d) {
if (/* receive partition out of boundary */) continue;
... // prepare particles in local boundary/corner blocks for sending
MPI_Isend(buffer_send[d], n, PARTICLE_MSG, nextrank, 0, MPI_COMM_MYWORLD, &reqs_send[d]);
MPI_Irecv(buffer_recv[d], MAX_N_PARTICLES, PARTICLE_MSG, nextrank, 0, MPI_COMM_MYWORLD, &reqs_recv[d]);
}
/////////////////////////////////
..... // Apply force locally
////////////////////////////////
if (/* current processor partition not on global boundary */) {
MPI_Waitall(8, reqs_send, statuses_send);
MPI_Waitall(8, reqs_recv, statuses_recv);
}
else { /* current processor partition on global boundary */
for (int d = 0; d < 8; ++d) {
if (/* Next processor partition out of global boundary */) continue;
MPI_Wait(&reqs_send[d], &statuses_send[d]);
MPI_Wait(&reqs_recv[d], &statuses_recv[d]);
}
}
///////////////////////////////////////////
..... // Apply force across processors
//////////////////////////////////////////
```
One of the most important tricks here is that we use non-blocking send/receive functions `MPI_Isend` and `MPI_Irecv` and call "wait" to finish out the communication only after we have done intra-partition force computation. Note that the amount of work regarding intra-partition force computation is usually much larger than that regarding inter-partition force computation, and intra-partition force computation does not rely on message exchange. So we can overlap communication with this part of computation, and latency of communication can be hidden very well.
Another thing to add is that we use `MPI_Waitall` explicitly for partitions that do not sit on the global boundary, because they communication with all 8 directions. We do this because we reckon `MPI_Waitall` may be faster than applying `MPI_Wait` individually, although it has not been proved.
As for particles moving phase, we leverage a different mode of communication. Since we are not convinced that in each step a particle will move to no further than neighboring partitions (although it seems true), we conservatively adopt all-to-all communication.
One of the problem is, a processor will not know how many particles will be sent from each of other processors, so that it is hard to allocate space for potentially incoming messages, until some communication has been made. The solution is to first exchange amounts of particles about to move from one partition to another partition, so that each processor can then allocate space accordingly.
After "flux" data between each pair is all settled, each processor is ready to receive data from other processors. Here we can still use all-to-all communication, but for most processor pairs, there is no particle exchange. Therefore we use point-to-point communication for pairs that have non-zero amount of particle exchange. We have also tried all-to-all communication for this part since possibility exists that all-to-all functions will outperform stacking individual point-to-point functions, and the result proved that point-to-point communication wins.
Below shows code for this phase. First we prepare messages to distribute and complete intra-block moving at the same time. Then flux data is distributed via all-to-all communication routine. Next point-to-point communication for concrete information of particles is executed. And finally we finalize particles living in each block of each processor. Synchronization is achieve here by waiting for all processors to receive what they requested.
###### Move particles
```c=
////////// Move & prepare mssages //////////
msgs_send.clear();
for (int b = 0; b < Nb; ++b) {
for (auto it = blocks[b].begin(); it != blocks[b].end(); ) {
...... // Move particles locally
msgs_send.push_back(/* message for this neighbor */);
}
}
std::sort(/* Sort msg by id */);
MPI_Alltoall(/* Send flux messages to all processors */);
std::vector<MPI_Request> reqs;
std::vector<MPI_Status> statuses;
int n_msgs = /* Total amount of incoming particles */
for (int p = 0, position = 0; p < true_num_procs; ++p) {
if (n_recv_from_procs[p] == 0) continue;
reqs.push_back(MPI_Request());
statuses.push_back(MPI_Status());
MPI_Irecv(/* Only receive if received particle count is not 0 */);
}
for (int i = 0, j = 0; i < msgs_send.size(); i = j) {
...... // j proceed with size of msg
reqs.push_back(MPI_Request());
statuses.push_back(MPI_Status());
MPI_Isend(/* Send to corresponding process */);
}
///// Finalize particles inside each block in each processor ///
...... // Update current block
MPI_Waitall(reqs.size(), &reqs[0], &statuses[0]);
......// Update particles according to other processor's messages
```
One thing we have also tried is to use non-blocking `MPI_Ialltoall` instead of `MPI_Alltoall` and do something else before finishing out the communication. But there is not much we can do here, and we barely put the code of finalizing inter-processor moving here, which was not effectiveness. The performance even dropped a little bit possibly because the non-blocking routine brings about extra overhead.
### Synchronization - Gathering Result for One Step
For saving one step of simulation data, we let all processor whose rank is not 0 send all of their particle list to processor rank 0. Then processor 0 would write its particle list to file.
### Performance
#### 68 Processor Performance Scaling with Num Particles

|Number of Particles | Simulation Time (s) | Number of Particles | Simulation Time (s) |
|-|-|-|-|
|2| 0.0706311 | 5000 | 0.174773 |
|10| 0.103798| 10000 | 0.209997 |
|50| 0.141478 |50000| 0.610232|
|100| 0.13156 | 100000 | 1.56002|
|500| 0.141311 | 500000 | 7.57214 |
|1000| 0.143599 | 1000000 | 15.601 |
#### Million Particle Performance Scaling with Num Processors

|Number of Processors | Simulation Time (s) | Number of Processors | Simulation Time (s) |
|-|-|-|-|
|2 |458.751 | 80 |15.1393
|5 | 228.094| 100|11.8713
|10|93.9158| 120 |11.4629
|20|47.5527| 150 |10.7525
|40|24.3722| 200 |9.0737
|60|17.3668| | |
### Breaking Down the Runtime
To view on where time goes in order to think about potential improvement, we break down the process of each simulation step and timing each part. We divide the whole process into:
1. Prepare messages for inter-processor force computation and initiate send/receive requests (non-blocking).
2. Apply forces locally.
3. Wait for send/receive requests to finish.
4. Apply forces across processors.
5. Compute new positions of particles and prepare messages for moving particles across processors.
6. Distribute flux data via all-to-all communication.
7. Initiate send/receive requests (non-blocking).
8. Finalize moving particles across blocks in the same partition.
9. Wait for send/receive requests to finish.
10. Finalize moving particles across partitions.
We count time cost of each part on processor 0 with the setting of 1000000 particles and 68 processors. To be clarify, timing adds a little bit overhead to total time consumed. The table and the chart below shows the result.
|Step|Simulation Time (ms)|Step|Simulation Time (ms)|
|-|-|-|-|
|1|69.7578|6|368.776|
|2|5976.07|7|12.3496|
|3|37.2818|8|1304.56|
|4|48.844|9|17.0852|
|5|7130.94|10|2.07918|

We can see that time cost is dominated by step 2, step 5 and step 8, all of which are computation inside a processor, and hard to further improve from parallelism standpoint. When p scales up, computation time will also drop linearly since the number of particles each processor need to handle is the total number of particles divided by p, and complexity of handling n particles serially is $O(n)$ with blocking strategy.
Second to that is step 6, which is the only all-to-all communication in the whole process. The only way to optimize this part out is to discard all-to-all communication. If we can make some assumption about how far can each particle possibly move, maybe we could still try point-to-point communication. And when p scales up, this part of time goes up in "log" way, since complexity of all-to-all communication contains the factor $\log p$.
Step 3 and step 9 are synchronization, neither of which takes much time. Step 3 does not take much time because latency of communication of this part is well hidden by computation. Step 9 does not probably because the amount of communication initiated in step 7 is not so much. When p scales up, synchronization will not have apparent change.