memmove

contributed by < AndybnACT >

tags: interview

A basic memmove

To implement a memmove, we need to consider if there is memory alaising (overlapped block) for input parameters. If there is an overlap and the address of destination is higher then the source parameter, then the copy of src to dst starts from high to low address. Otherwise, we start moving memory block from low to high.

Then, we can implement the memmove function. Note that we copy data byte by byte to deal with objects of any sizes (e.g. char, int arrays).

void *memmove_bs1(void *dest, const void *src, size_t n) { char* d = (char*) dest; char* s = (char*) src; if (dest >= src && dest < src + n) { /* ovelap */ for (int i = n - 1; i >= 0; i--) { d[i] = s[i]; } } else { for (size_t i = 0; i < n; i++) d[i] = s[i]; } return dest; }

memmove with block size of register width

Copying data byte by byte seems not so efficient at the first glimpse, since intel allows us to load at most a QWORD (8 bytes) in single instruction. Thus, if the boundary is properly handled, we may consider copying data each time at 8 bytes basis. Then, we revert to byte-by-byte copying if there are remaining bytes at the end.

void *memmove_bs8(void *dest, const void *src, size_t n) { size_t* d = (size_t*) dest; size_t* s = (size_t*) src; int remain; int pos; char *tmp; if (dest >= src && dest < src + n) { /* ovelap */ for (remain = n - sizeof(size_t); remain - sizeof(size_t) >= 0; remain -= sizeof(size_t)) { *(size_t *)((char*)d + remain) = *(size_t*)((char*)s + remain); } while (remain) { *((char*)d + remain) = *((char*)s + remain); remain--; } } else { // printf("no overlap\n" ); for (pos = 0; pos + sizeof(size_t) < n; pos += sizeof(size_t)) { *(size_t *)((char*)d + pos) = *(size_t*)((char*)s + pos); } remain = n - pos; // printf("remain =%d\n",remain ); while (remain >= 0) { *((char*)d + remain) = *((char*)s + remain); remain--; } } return dest; }

memmove with sse3 on x86_64

Further more, we may utilize SIMD unit on modern CPUs. With the help of compiler intrinsics _mm_storeu_si128() and _mm_lddqu_si128, we can move 16 bytes at a time. According to intel intrinsics guide:

  • _mm_storeu_si128: Store 128-bits of integer data from a into memory. mem_addr does not need to be aligned on any particular boundary.
  • _mm_lddqu_si128(): Load 128-bits of integer data from unaligned memory into dst. This intrinsic may perform better than _mm_loadu_si128 when the data crosses a cache line boundary.
void *memmove_sse(void *dest, const void *src, size_t n) { size_t* d = (size_t*) dest; size_t* s = (size_t*) src; int chunck = 16; int remain; int pos; char *tmp; if (dest >= src && dest < src + n) { /* ovelap */ for (remain = n - chunck; remain - chunck >= 0; remain -= chunck) { _mm_storeu_si128((__m128i*)((char*)d + remain), _mm_lddqu_si128((__m128i*)((char*)s + remain))); } while (remain) { *((char*)d + remain) = *((char*)s + remain); remain--; } } else { for (pos = 0; pos + chunck < n; pos += chunck) { _mm_storeu_si128((__m128i*)((char*)d + pos), _mm_lddqu_si128((__m128i*)((char*)s + pos))); } remain = n - pos; while (remain >= 0) { *((char*)d + remain) = *((char*)s + remain); remain--; } } return dest; }

To beyond and infinity: memmove with avx2

Suprisingly, we found that using aligned avx2 extension, operating on 32 bytes of data block, would perform 23% better than the unaligned sse3 one and therfore beat the glibc implementation. However, both unaligned avx2 and aligned sse3 performs 0.3% slower than the unaligned sse3.

void *memmove_avx(void *dest, const void *src, size_t n) { size_t* d = (size_t*) dest; size_t* s = (size_t*) src; int chunck = 32; int remain = n - chunck; int pos = 0; char *tmp; if (dest >= src && dest < src + n) { /* ovelap */ while ((size_t)((char*)d + remain) & 0x1F) { *((char*)d + remain + chunck - 1) = *((char*)s + remain + chunck - 1); remain--; } for (; remain - chunck >= 0; remain -= chunck) { _mm256_store_si256((__m256i*)((char*)d + remain), _mm256_load_si256((__m256i*)((char*)s + remain))); } if (remain) { memmove_bs8((char*)d + remain, (char*)s + remain, remain); } } else { while ((size_t)((char*)d + pos) & 0x1F) { *((char*)d + pos) = *((char*)s + pos); pos++; } for (; pos + chunck < n; pos += chunck) { _mm256_store_si256((__m256i*)((char*)d + pos), _mm256_load_si256((__m256i*)((char*)s + pos))); } remain = n - pos; if (remain) { memmove_bs8((char*)d + remain, (char*)s + remain, remain); } } return dest; }

Performance Comparison

We compare implemented functions together with the one from glibc (2.30). To maximumly eliminate external disturbances, we use the following boot parameters:

$ cat /proc/cmdline 
\\boot\vmlinuz-linux416-base ro root=/dev/sda3 isolcpus=0  initrd=boot\initramfs-linux416-base.img

Besides, turnning off turbo boost and select a preferred scalling factor are necessary

$ sudo sh -c "echo 1 > /sys/devices/system/cpu/intel_pstate/no_turbo"
$ sudo sh -c "echo performance > /sys/devices/system/cpu/cpu$CPUID/cpufreq/scaling_governor"

Last but not least, turn off the ASLR let proceses have the same view of memory across executions

$ sudo sh -c "echo 0 > /proc/sys/kernel/randomize_va_space"

functions

To select a proper test function during these tests, we declare an array of function pointer as below.

void * (*__memmove_ptr[CLASS])(void *, const void*, size_t) = { memmove, memmove_bs1, memmove_bs8, memmove_sse, };

timing

The adopted clock resource is CLOCK_PROCESS_CPUTIME_ID, which claims to have 1ns resolution on my machine.

clock_getres(CLOCK_PROCESS_CPUTIME_ID, &t);

Then, we do the following job to get the execution time insode our test loop. This code is automatically correct even if tv_nsec overflows.

clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &t1); __memmove_ptr[j](DST, A, bytes[i]); clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &t2); dt = (int64_t)(t2.tv_sec - t1.tv_sec); dt *= 1000000000; dt += (int64_t)(t2.tv_nsec - t1.tv_nsec);

testing

We run totally 2000 times for each class of functions, ranging from 0~65536 bytes of memmove.

#define NR_MEASURE 2000 #define DROP 500

We drop the first and last 500 of measurements, so the state of cache might not interfere with each others.

if (k > DROP && k < NR_MEASURE - DROP) { t_push(ctx + i, dt, j); }

The meassurement then goes into t_push to update statistics.

/* code copied from DUDECT */ void t_push(t_ctx *ctx, double x, uint8_t class) { assert(class == 0 || class == 1 || class == 2 || class == 3); ctx->n[class]++; /* Welford method for computing online variance * in a numerically stable way. */ double delta = x - ctx->mean[class]; ctx->mean[class] = ctx->mean[class] + delta / ctx->n[class]; ctx->m2[class] = ctx->m2[class] + delta * (x - ctx->mean[class]); }

Result

Environment

$ gcc --version
gcc (Arch Linux 9.2.1+20200130-2) 9.2.1 20200130
$ clang --version
clang version 9.0.1 
Target: x86_64-pc-linux-gnu
Thread model: posix
$ uname -a
Linux archlinux 4.16.1-base #3 SMP PREEMPT Sun Jul 28 21:54:28 CST 2019 x86_64 GNU/Linux
$ cpuinfo
...
Intel(R) Core(TM) i5-6400 CPU @ 2.70GHz
...

To temporarily make a concolusion,
Intrestingly, using byte-by-byte copying routine doesn't necessaryly mean that it will perform slower than the others if the compiler does the right optimizations.
Second, using unaligned sse3 intrinsics perform even better than the aligned one.
Third, we can beat glibc with the help of avx2. However, unfortunately I don't have a machine supporting avx512, maybe we could reach one step further with the help of it.

move_zeros

How embracing is it to be stuck by a question tagged easy on leetcode in an interview. Well, I hope that we won't make the same mistake again Here is the problem:

Move Zeros:

Given an array nums, write a function to move all 0's to the end of it while maintaining the relative order of the non-zero elements.

Example:

Input: [0,1,0,3,12]
Output: [1,3,12,0,0]

Solution:

Being hinted by the interviewer, here goes my solution:

void move_zeros(int *nums, size_t n) { for (size_t i = 0; i < n; i++) { if (nums[i] == 0) { size_t j; int tmp; for (j = i + 1; j < n && nums[j] == 0; j++) ; /* reaches first non-zero */ if (j == n -1) break; tmp = nums[i]; nums[i] = nums[j]; nums[j] = tmp; } } }

This is not correct as well, we should do the following modification:

--- a/diffa +++ b/diffb @@ -6,7 +6,7 @@ void move_zeros(int *nums, size_t n) int tmp; for (j = i + 1; j < n && nums[j] == 0; j++) ; /* reaches first non-zero */ - if (j == n -1) + if (j == n) break; tmp = nums[i]; nums[i] = nums[j];

If we don't want to have nested for-loop, then we could simply do:

void move_zeros(int *nums, size_t n){ int last = 0; for (int i = 0; i < n; i++) { if (nums[i] != 0) nums[last++] = nums[i]; } /* memset(num+last, 0, n - i); */ for (int i = last; i < n; i++) { nums[i] = 0; } }