# Hardware Generator for JPEG Compression
> 方榮彬, 龔祐萱
[GitHub](https://github.com/RBing123/Chisel-JPEG)
## Introduction
### What is JPEG Compression ?
[JPEG](https://en.wikipedia.org/wiki/JPEG) compression is a widely used image compression standard designed to reduce the file size of digital images while preserving visual quality. It employs a lossy compression technique, which removes redundant and less noticeable information to achieve high compression ratios. The process involves several key steps: color space transformation, Discrete Cosine Transform (`DCT`) to separate image frequencies, quantization to reduce precision, and Huffman coding to encode the data efficiently. This makes JPEG ideal for applications where storage and bandwidth are critical, such as photography, web graphics, and mobile devices, while maintaining a balance between image quality and file size.
### JPEG compression workflow

1. Input image
* The input is typically an 8-bit grayscale or 24-bit RGB image.
* If the image is in RGB, it is converted to the **YCbCr**
* **Y**: Luminance (brightness)
* **Cb**: Chrominance (blue-difference)
* **Cr**: Chrominance (red-difference)
2. Downsampling (Optional)
* Chrominance components (Cb and Cr) are typically downsampled because the human eye is less sensitive to color differences than to luminance changes.
* Common downsampling ratios:
* 4:4:4: No downsampling (full resolution for Y, Cb, and Cr).
* 4:2:2: Horizontal downsampling by 2.
* 4:2:0: Horizontal and vertical downsampling by 2.
3. Block Splitting
* The image is divided into 8x8 pixel blocks for each channel (Y, Cb, Cr).
* Each block is processed independently in the subsequent steps.
4. Level Shift
* Each pixel value in the 8x8 block is level-shifted by subtracting 128, bringing the range from [0, 255] to [-128, 127].
* This centers the values around zero for more efficient transformation.
5. [Discrete Cosine Transform (DCT)](https://en.wikipedia.org/wiki/Discrete_cosine_transform)
* The DCT is applied to each 8x8 block, transforming spatial domain data into frequency domain data:
$$
F(u, v) = \frac{1}{4} C(u) C(v) \sum_{x=0}^{7} \sum_{y=0}^{7} f(x, y) \cos\left(\frac{(2x+1)u\pi}{16}\right) \cos\left(\frac{(2y+1)v\pi}{16}\right)
$$
where:
$$
C(u) =
\begin{cases}
\frac{1}{\sqrt{2}}, & \text{if } u = 0 \\
1, & \text{if } u \neq 0
\end{cases}
$$
$$
C(v) =
\begin{cases}
\frac{1}{\sqrt{2}}, & \text{if } v = 0 \\
1, & \text{if } v \neq 0
\end{cases}
$$
6. [Quantization](https://en.wikipedia.org/wiki/Quantization_(image_processing))
* The DCT coefficients are divided by a quantization table and rounded to the nearest integer.
* Quantization reduces less visually significant high-frequency components to zero, achieving compression.
7. [Zig-zag scanning](https://zh.wikipedia.org/zh-tw/%E6%9B%B2%E6%8A%98%E6%8E%83%E6%8F%8F)
The quantized 8x8 block is converted into a 1D sequence using a zigzag pattern
8. Differential Pulse Code Modulation (DPCM)
Calculate the difference between the DC values of adjacent blocks to reduce redundant information
$$
DC_{difference}=DC_{current} - DC_{previous}
$$
9. [Run-Length Encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding)
* The 1D sequence is compressed using RLE, which encodes sequences of zero coefficients as a single value.
* Special markers:
* EOB (End of Block): Indicates no more non-zero values in the block.
* ZRL (Zero Run Length): Encodes runs of zeros.
10. [Huffman coding](https://en.wikipedia.org/wiki/Huffman_coding)
Huffman coding is used to compress the data efficiently by assigning shorter codes to more frequent values and longer codes to less frequent values. This is particularly effective for the `DC difference` and `AC coefficients`.
Steps for Huffman Coding:
a. DC Component Encoding:
* The DC difference calculated using DPCM is represented as a pair:
* Size: The number of bits required to encode the amplitude of the DC difference.
* Amplitude: The actual bits of the DC difference.
* The Size is Huffman encoded using a predefined DC Huffman table.
b. AC Component Encoding:
* The AC coefficients are encoded using a combination of:
* Run/Size Pair: Encodes the number of consecutive zero coefficients (Run) followed by the size of the next non-zero coefficient.
* Amplitude: The actual value of the non-zero coefficient.
* Special markers:
* EOB (End of Block): Indicates that all remaining coefficients in the block are zero.
* ZRL (Zero Run Length): Encodes runs of 16 consecutive zeros.
c. Huffman Tables:
* Separate Huffman tables are used for DC and AC components.
* Separate tables are maintained for luminance and chrominance.
## Implementation
### 0. Prepare Image conversion tool in software
The Python script `utils.py` is designed to convert images of formats like `*.png` or `*.jpg` into the `*.bmp` format, which is compatible with further processing in Chisel.
- [ ] `utils.py`
```python=
# utils.py
import numpy as np
from PIL import Image
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert image to BMP format')
parser.add_argument('input_path', type=str, help='Input image path')
parser.add_argument('--output', '-o', type=str, default='output.bmp',
help='Output BMP file path (default: output.bmp)')
parser.add_argument('--quality', '-q', type=int, default=100,
help='Output quality 0-100 (default: 100)')
args = parser.parse_args()
# Open Image
img = Image.open(args.input_path)
# ensure RGB
if img.mode != 'RGB':
img = img.convert('RGB')
# store as BMP
img.save(args.output, "BMP", quality=args.quality)
```
By running the below command
```bash
$ python utils.py {Input image name} {Output image name} {Quality}
```
This converts `{input}.jpg` into `{output}.bmp` at maximum quality. However, we need to convert the image from ``RGB space to `YCbCr` space, and store each channel in a separate file using the following function.
```python=
def read_bmp(filepath):
"""
Read BMP and convert it to YCbCr
"""
img = Image.open(filepath)
if img.mode != 'RGB':
raise ValueError("Image must be in RGB format!")
ycbcr = img.convert('YCbCr')
y, cb, cr = ycbcr.split()
width, height = img.size
if width % 8 != 0 or height % 8 != 0:
raise ValueError(f"Image dimensions must be multiples of 8! Current size: {width}x{height}")
return np.array(y), np.array(cb), np.array(cr)
```
And then we will spilt the image to `8x8` block to maintain the compatibility.
```python=
def extract_blocks(channel_data):
h, w = channel_data.shape
blocks = []
for i in range(0, h, 8):
for j in range(0, w, 8):
block = channel_data[i:i+8, j:j+8]
block = block.astype(np.int16) - 128
blocks.append(block)
return blocks
def save_blocks_for_chisel(blocks, component_name, output_dir="hw_output"):
os.makedirs(output_dir, exist_ok=True)
for idx, block in enumerate(blocks):
filename = f"{output_dir}/{component_name}_block_{idx}.txt"
with open(filename, 'w') as f:
for row in block:
for val in row:
f.write(f"{val}\n")
```
### 1. Read the image in Chisel
In the hardware design, we read the image data through a test harness that loads preprocessed 8x8 blocks from text files:
1. Data loading stucture
```c=
def readDataFromFile(fileName: String): Seq[Seq[Int]] = {
val lines = Source.fromFile(fileName).getLines().toSeq
require(lines.size == 64, s"File $fileName must contain 64 lines")
val data = lines.map(_.toInt)
data.grouped(8).toSeq
}
```
2. Test Interface
```c=
test(new JPEGEncodeChisel(p)) { dut =>
dut.io.in.valid.poke(true.B)
for (i <- 0 until 8; j <- 0 until 8) {
dut.io.in.bits.yComponent(i)(j).poke(dataY(i)(j).S)
}
}
```
The hardware receives image data as 8x8 blocks through its input interface, where each pixel value is represented as a signed integer after preprocessing. The input interface supports three color components (Y, Cb, Cr).
### 2. Zigzag scanning
We discovered issues in the original zigzag implementation and made the following improvements:
1. Original Implementation Problems:
* Counter increment timing was incorrect: count was incremented at the start of processing state
* Register reset values were not properly managed
* State transitions were not optimally controlled
2. Key Improvement
- [ ] `src/main/scala/jpeg/zigzag.scala`
```diff
is(ZigZagState.idle) {
when (io.in.valid) {
stateReg := ZigZagState.processing
inMatrix := io.in.bits.matrixIn
validOut := false.B
+ count := 0.U // Reset counter
+ row := 0.U // Reset position
+ col := 0.U
+ isUp := true.B
}
}
is(ZigZagState.processing) {
- count := count + 1.U // Incorrect: increment at start
when(count < p.totalElements.U) {
outReg(count) := inMatrix(row)(col)
// zigzag traversal logic
...
+ count := count + 1.U // Correct: increment after processing
}
}
```
### 3.1 Run-Length Encode
The module uses a state machine to traverse the input data and generate a compressed output. The RLE algorithm identifies sequences of identical elements (referred to as "runs") in the input and encodes them as a pair of values:
1. The count of consecutive identical elements (the run length).
2. The value being repeated.
When we finish the test, we can obtain the compressed data in `1d` sequence, where the even index (`0-indexed`) is for the number of identical number and the odd index is for the value. The format is well-suited for doing huffman coding in sw.
### 3.2 Differential Pulse Code Modulation
The original implementation had limitations in following the JPEG standard. Here's how we improved it:
- [ ] `src/main/scala/jpeg/encoding.scala`
```diff
- is(EncodingState.encode){
- outputReg(0) := dataReg(0) // Simply copying first value
- when (dataIndex < p.totalElements.U) {
- val diff = dataReg(dataIndex) - dataReg(dataIndex - 1.U)
+ // Store previous block's DC value
+ val prevTopLeft = RegInit(0.S(p.w8))
+ // In encoding state
+ when(dataIndex === 1.U) {
+ // Proper DPCM: Current DC - Previous DC
+ val topLeftDiff = dataReg(0) - prevTopLeft
+ outputReg(0) := topLeftDiff
+ prevTopLeft := dataReg(0) // Update for next block
+ }
```
Key improvements:
* Added storage for previous block's DC value
* Implemented true DPCM by computing difference between consecutive blocks' DC values
* First block automatically handled by initializing prevTopLeft to 0
* Maintained proper sequential processing for AC coefficients
This implementation now correctly follows the JPEG standard's DPCM encoding for DC coefficients.
### 4. Huffman coding
Our implementation of Huffman coding is shown below:
1. Frequency calculation
* **Goal**: Determine how often each value appears in the encoded data.
* **Process**:
* Extract all values from the encoded data (`RLE` or `Delta`) across all blocks and channels (`Y`, `Cb`, `Cr`).
* Count the occurrences of each value using a frequency dictionary
* This frequency table will serve as the input for the Huffman tree construction.
2. Huffman Tree Construction
* **Goal**: Build a binary tree where ==the most frequent symbols== are closer to the root, resulting in shorter codes.
* **Process**:
* Create a **priority queue (min-heap)** using the `heapq` module, where each node is an instance of `HuffmanNode`, which stores:
* `char`: The symbol (e.g., a value from RLE or Delta encoding).
* `freq`: The frequency of the symbol.
* `left` and `right`: Pointers to child nodes.
* Combine two nodes with the smallest frequencies into a new internal node until only one node remains, forming the root of the tree.
3. Generate Huffman Codes
* **Goal**: Assign binary codes to each symbol based on the structure of the Huffman tree.
* **Process**:
* Traverse the Huffman tree recursively:
* Append `0` when moving to the left child.
* Append `1` when moving to the right child.
* Assign the resulting binary string to each leaf node's symbol.
4. Encode Data
* **Goal**: Replace each value in the input data with its corresponding Huffman code.
* **Process**:
* For each block in the encoded data, replace each value using the huffman_codes dictionary
5. Save Results
* Huffman codes
Save the symbol-to-code mappings to a text file for later use or debugging
* Encoded Data
Save the encoded data (now in binary) to another text file
- [ ] `test.py`
```python=
def generate_huffman_codes(frequencies):
"""
Generate Huffman codes for given frequencies
Args:
frequencies: Dictionary of value:frequency pairs
Returns:
Dictionary of value:huffman_code pairs
"""
if not frequencies:
return {}
# Create heap from frequencies
heap = []
for char, freq in frequencies.items():
heapq.heappush(heap, HuffmanNode(char, freq))
# Build Huffman tree
while len(heap) > 1:
left = heapq.heappop(heap)
right = heapq.heappop(heap)
internal = HuffmanNode(None, left.freq + right.freq)
internal.left = left
internal.right = right
heapq.heappush(heap, internal)
# Generate codes by traversing tree
codes = {}
def generate_codes_recursive(node, code=""):
if node is None:
return
if node.char is not None:
codes[node.char] = code
return
generate_codes_recursive(node.left, code + "0")
generate_codes_recursive(node.right, code + "1")
if heap:
generate_codes_recursive(heap[0])
return codes
def perform_huffman_coding(encoded_data):
"""
Perform Huffman coding on RLE and Delta encoded data
Args:
encoded_data: Dictionary containing encoded data for each component
"""
for encoding_type in ['RLE', 'Delta']:
data = read_encoded_blocks(encoding_type=encoding_type)
for component in ['Y', 'Cb', 'Cr']:
# Collect all values for frequency calculation
all_values = []
for block in data[component]:
all_values.extend(block)
# Create Huffman tree
frequencies = {}
for value in all_values:
frequencies[value] = frequencies.get(value, 0) + 1
# Generate Huffman codes
huffman_codes = generate_huffman_codes(frequencies)
# Save Huffman codes and encoded data
save_huffman_output(
component,
encoding_type,
huffman_codes,
data[component],
output_dir="hw_output/huffman"
)
```
### 5. Bit-stream
The bitstream is structured to contain both encoding types (RLE and Delta) for each color component. For each component (Y, Cb, Cr), the bitstream is organized as follows:
```python=
def create_bitstream(rle_data, delta_data, output_dir="hw_output/bitstream"):
for component in ['Y', 'Cb', 'Cr']:
bitstream = bytearray()
# 1. Component Marker
component_marker = ord(component[0]) # ASCII value of Y/C
bitstream.append(component_marker)
# 2. DC Value (from Delta encoding)
dc_value = delta_data[component][0][0]
bitstream.extend(int(dc_value).to_bytes(2, byteorder='big', signed=True))
# 3. AC Values (from RLE encoding)
for block in rle_data[component]:
for run_length, value in zip(block[::2], block[1::2]):
bitstream.append(int(run_length)) # 1 byte for run length
bitstream.extend(int(value).to_bytes(2, byteorder='big', signed=True))
```
The resulting bitstream format:
1. Component Identifier (1 byte)
2. DC Coefficient Difference (2 bytes)
3. Sequence of RLE pairs:
* Run Length (1 byte)
* Value (2 bytes)
Each component's data is stored in a separate binary file under the `hw_output/bitstream` directory.
After implementing our Huffman encoding process, we can analyze the efficiency of the compression:
For RLE (Run-Length Encoding) results:
* Y component uses 15 unique symbols with code lengths ranging from 1 to 6 bits
* Cb component needs only 5 unique symbols with more compact code lengths (2-3 bits)
* Cr component has 6 unique symbols utilizing 1-4 bits per code
These statistics demonstrate the data characteristics of each color component after DCT and quantization, with Y (luminance) requiring more diverse encoding patterns than the chrominance components (Cb, Cr).
The final compression results show:
* Original size: ==192 bytes==
* Compressed size: ==138 bytes==
* Achieving a compression ratio of ==1.39:1==
This creates a clear flow from our compression statistics to the actual bitstream structure.
```bash
Y RLE Statistics:
Total symbols: 15
Min code length: 1
Max code length: 6
Average code length: 4.93
Cb RLE Statistics:
Total symbols: 5
Min code length: 2
Max code length: 3
Average code length: 2.40
Cr RLE Statistics:
Total symbols: 6
Min code length: 1
Max code length: 4
Average code length: 3.17
Compression Results:
Original size: 192 bytes
Compressed size: 138 bytes
Compression ratio: 1.39:1
```
### 6. Inverse Discrete Cosine Transform (IDCT)
The purpose of the IDCT module is to transform frequency domain data into spatial domain data. This module employs a state machine to manage and transition between the corresponding operations.
1. Waiting State
This is the default initial state, indicating that the module is waiting for a valid input.
```verilog=
is(IDCTState.waiting) {
when(io.in.valid) {
dctInput := io.in.bits.matrixIn
state := IDCTState.calculating
validOut := false.B
}
}
```
When `io.in.valid` is `true`, the program will store the 8x8 matrix (`io.in.bits.matrixIn`) into the register `dctInput` and switch the state to `calculating`. Then, it will set the output validity flag (`validOut`) to `false`, indicating that no valid output is ready yet.
2. Calculating State
This is the critical state where the Inverse Discrete Consine Transform (IDCT) computation is performed. The `IDCT` function is invoked to compute each element of the matrix using the mathematical IDCT formula.
To enhance the precision of the calculations, intermediate values—such as the cosine coefficients and the scaling factors (`alphaU` and `alphaV`)—are multiplied by 100. This scaling helps mitigate precision loss during computation. Subsequently, The program divides the accumulated values by `1,000,000` to revert the scaling effect.
Once the IDCT computation is complete, the operation adds `128` to each element of the matrix to restore it to its original value range. The final result is stored in the register `matrixOutput`, and the machine state is transitioned back to the `waiting` state.
The complete program is as follows:
```verilog=
package jpeg
import chisel3._
import chisel3.internal.firrtl.Width
import chisel3.util._
import scala.math._
import chisel3.experimental._
/**
* Object for IDCT
*/
object IDCTChisel {
def apply(matrixIn: Valid[Vec[Vec[SInt]]], shiftedOut: Vec[Vec[SInt]], idctOut: Valid[Vec[Vec[SInt]]]) = {
val mod = Module(new IDCTChisel)
mod.io.in := matrixIn
mod.io.shiftedOut := shiftedOut
mod.io.idctOut := idctOut
mod
}
}
/**
* Creates FSM states for IDCT
*/
object IDCTState extends ChiselEnum {
val calculating, waiting = Value
}
/** Performs IDCT on 8x8 Matrix with scaling
*
* IO
* @param dctIn Input matrix to perform IDCT on
*
* @return shiftedOut Re-shifted version of output matrix by +128
* @return idctOut Resulting restored output of IDCT
*/
class IDCTChisel extends Module {
val io = IO(new Bundle {
val in = Flipped(Valid(new Bundle {
val matrixIn = Input(Vec(8, Vec(8, SInt(32.W))))
}))
val shiftedOut = Output(Vec(8, Vec(8, SInt(9.W)))) // Test output to check re-shifted block
val idctOut = Valid(Vec(8, Vec(8, SInt(9.W))))
})
// Initializes registers for matrix and Valid bit
val dctInput = Reg(Vec(8, Vec(8, SInt(32.W))))
val shiftedBlock = Reg(Vec(8, Vec(8, SInt(9.W))))
val matrixOutput = Reg(Vec(8, Vec(8, SInt(9.W))))
val validOut = RegInit(false.B)
// Assigns outputs
io.idctOut.valid := validOut
io.idctOut.bits := matrixOutput
io.shiftedOut := DontCare
// Function to compute IDCT values for each element of input matrix
def IDCT(matrix: Vec[Vec[SInt]]): Vec[Vec[SInt]] = {
val idctMatrix = Wire(Vec(8, Vec(8, SInt(9.W))))
// Compute IDCT
for (i <- 0 until 8) {
for (j <- 0 until 8) {
var sum = 0.S
for (u <- 0 until 8) {
for (v <- 0 until 8) {
val coefValue = matrix(u)(v)
val alphaU = if (u == 0) (1.0 / math.sqrt(2)) * 100 else 100
val alphaV = if (v == 0) (1.0 / math.sqrt(2)) * 100 else 100
val cosVal = (math.cos((2 * i + 1) * u * Pi / 16) * math.cos((2 * j + 1) * v * Pi / 16) * 100).toInt.S
val divSum = (coefValue * cosVal * alphaU.toInt.S * alphaV.toInt.S) / 1000000.S
sum = sum +& divSum / 1000000.S
}
}
idctMatrix(i)(j) := (sum / 4.S)
}
}
idctMatrix
}
// Initializes state and defines FSM
val state = RegInit(IDCTState.waiting)
switch(state) {
is(IDCTState.waiting) {
when(io.in.valid) {
dctInput := io.in.bits.matrixIn
state := IDCTState.calculating
validOut := false.B
}
}
is(IDCTState.calculating) {
// Assigns output matrix to calculated IDCT values
val idctResult = IDCT(dctInput)
for (i <- 0 until 8) {
for (j <- 0 until 8) {
matrixOutput(i)(j) := idctResult(i)(j) +& 128.S // Re-shifting to restore original range
}
}
io.shiftedOut := matrixOutput
state := IDCTState.waiting
validOut := true.B
}
}
}
```
### Encoder
Our hardware encoder workflow is listed below ( from 1. to 4. ):
1. DCT Module
* Performs 2D Discrete Cosine Transform
* Converts spatial domain data to frequency domain
* Processes 8x8 blocks of pixels
2. Quantization Module
* Reduces precision of DCT coefficients
* Uses configurable quantization tables
* Implements lossy compression step
3. ZigZag Module
* Reorders 2D matrix into 1D array
* Groups similar frequencies together
* Prepares data for entropy encoding
4. Encoding Stage
In the Encoding stage, we need to distinguish two kinds of coding type, one is `RLE` and the other is `DPCM`.
* `RLE` (Run-Length Encoding): Compresses sequences of repeated values
* Delta Encoding (`DPCM`): Encodes differences between consecutive values
```verilog=
package jpeg
import chisel3._
import chisel3.internal.firrtl.Width
import chisel3.util._
import chisel3.stage.{ChiselStage, ChiselGeneratorAnnotation}
import firrtl.options.TargetDirAnnotation
/**
* Performs JPEG Compression
*
* @param p JPEG Parameters
*
* IO
* @param pixelDataIn Pixel data to be encoded
*
* @return dctOut DCT output used in testing
* @return quantOut Quantization output used in testing
* @return zigzagOut Zigzag output used in testing
* @return encoded Encoded pixel data
*
*/
class JPEGEncodeChisel(p: JPEGParams) extends Module {
val io = IO(new Bundle {
val in = Flipped(Valid(new Bundle {
val yComponent = Input(Vec(p.givenRows, Vec(p.givenCols, SInt(9.W))))
}))
// Y output
val dctOutY = Output(Vec(8, Vec(8, SInt(32.W))))
val quantOutY = Output(Vec(p.numRows, Vec(p.numCols, SInt(32.W))))
val zigzagOutY = Output(Vec(p.totalElements, SInt(32.W)))
val encodedRLEY = Output(Vec(p.maxOutRLE, SInt(p.w8)))
val encodedDeltaY = Output(Vec(p.totalElements, SInt(p.w8)))
})
// Dontcare for output yet
io.encodedRLEY := DontCare
io.encodedDeltaY := DontCare
// Y channel
// Discrete Cosine Transform Module
val dctModuleY = Module(new DCTChisel)
val quantModuleY = Module(new QuantizationChisel(p.copy(qtChoice = 1)))
val zigzagModuleY = Module(new ZigZagChisel(p))
// val encodeModuleY = Module(new RLEChiselEncode(p))
dctModuleY.io.in.valid := io.in.valid
dctModuleY.io.in.bits.matrixIn := io.in.bits.yComponent
quantModuleY.io.in.valid := dctModuleY.io.dctOut.valid
quantModuleY.io.in.bits.data := dctModuleY.io.dctOut.bits
quantModuleY.io.quantTable.zipWithIndex.foreach { case (row, i) =>
row.zipWithIndex.foreach { case (element, j) =>
element := p.getQuantTable(i)(j).S
}
}
zigzagModuleY.io.in.valid := quantModuleY.io.out.valid
zigzagModuleY.io.in.bits.matrixIn := quantModuleY.io.out.bits
when(zigzagModuleY.io.zigzagOut.valid) {
when(p.encodingChoice.B) {
val encodeModuleYr = Module(new RLEChiselEncode(p))
encodeModuleYr.io.in.valid := zigzagModuleY.io.zigzagOut.valid
encodeModuleYr.io.in.bits.data := zigzagModuleY.io.zigzagOut.bits
io.encodedRLEY := encodeModuleYr.io.out.bits
val encodeModuleYd = Module(new DeltaChiselEncode(p))
encodeModuleYd.io.in.valid := zigzagModuleY.io.zigzagOut.valid
encodeModuleYd.io.in.bits.data := zigzagModuleY.io.zigzagOut.bits
io.encodedDeltaY := encodeModuleYd.io.out.bits
}
}
io.dctOutY := dctModuleY.io.dctOut.bits
io.quantOutY := quantModuleY.io.out.bits
io.zigzagOutY := zigzagModuleY.io.zigzagOut.bits
}
```
### Decoder
Our hardware decoder workflow is listed below (from 1. to 4.):
1. Decoding Stage
* `RLE` (Run-Length Encoding): Decompresses sequences of repeated values
* Delta decoding (`DPCM`): Decodes differences between consecutive values
2. Inverse ZigZag Module
* Reorders 1D array into 2D matrix
* Prepares data for inverse quantization module
3. Inverse Quantization Module
* Uses configurable quantization tables
* Implements the reverse of the lossy compression step
4. Inverse DCT Module
* Performs 2D Inverse Discrete Cosine Transform
* Converts frequency domain to spatial domain
* Processes 8x8 blocks of pixels
```verilog=
package jpeg
import chisel3._
import chisel3.internal.firrtl.Width
import chisel3.util._
/**
* Performs JPEG Decompression
*
* @param p JPEG Parameters
*
* IO
* @param encodedRLE RLE encoded pixel data or Delta encoded data
*
* @return decodedPixels Decoded pixel data
*
*/
class JPEGDecodeChisel(p: JPEGParams) extends Module {
val io = IO(new Bundle {
// Encoded input data (RLE or Delta)
val in = Flipped(Valid(new Bundle {
val encodedRLE = Input(Vec(p.maxOutRLE, SInt(9.W)))
val encodedDelta = Input(Vec(p.totalElements, SInt(8.W)))
}))
val length = Input(UInt(9.W))
// Decoded Output
val decodedPixels = Output(Vec(p.givenRows, Vec(p.givenCols, SInt(9.W))))
// Intermediate testing outputs (optional for debugging)
val decodeRLEOut = Output(Vec(p.totalElements, SInt(9.W)))
val decodeDeltaOut = Output(Vec(p.totalElements, SInt(p.w16)))
val inverseZigzagOut = Output(Vec(p.numRows, Vec(p.numCols, SInt(9.W))))
val inverseQuantOut = Output(Vec(p.numRows, Vec(p.numCols, SInt(12.W))))
val inverseDCTOut = Output(Vec(8, Vec(8, SInt(9.W))))
})
// Dontcare for output yet
io.decodedPixels := DontCare
// Decoding Module
// RLE decode
val RLEdecodeModule = Module(new RLEChiselDecode(p))
RLEdecodeModule.io.in.valid := io.in.valid
RLEdecodeModule.io.length := io.length
RLEdecodeModule.io.in.bits.data := io.in.bits.encodedRLE
io.decodeRLEOut := RLEdecodeModule.io.out.bits
// Delta decode
val DeltadecodeModule = Module(new DeltaChiselDecode(p))
DeltadecodeModule.io.in.valid := io.in.valid
DeltadecodeModule.io.in.bits.data := io.in.bits.encodedDelta
io.decodeDeltaOut := DeltadecodeModule.io.out.bits
// Inverse ZigZag Module
val inversezigzagModule = Module(new InverseZigZagChisel(p))
inversezigzagModule.io.in.valid := RLEdecodeModule.io.out.valid || DeltadecodeModule.io.out.valid
inversezigzagModule.io.in.bits.zigzagIn := RLEdecodeModule.io.out.bits
io.inverseZigzagOut := inversezigzagModule.io.matrixOut.bits
// Inverse Quantization Module
val inversequantModule = Module(new InverseQuantizationChisel(p))
inversequantModule.io.in.valid := inversezigzagModule.io.matrixOut.valid
inversequantModule.io.in.bits.data := inversezigzagModule.io.matrixOut.bits
io.inverseQuantOut := inversequantModule.io.out.bits
// Converts Quantization Table back to SInt
inversequantModule.io.quantTable.zipWithIndex.foreach { case (row, i) =>
row.zipWithIndex.foreach { case (element, j) =>
element := p.getQuantTable(i)(j).S
}
}
// Inverse DCT (IDCT) Module
val idctModule = Module(new IDCTChisel)
idctModule.io.in.valid := inversezigzagModule.io.matrixOut.valid
idctModule.io.in.bits.matrixIn := inversezigzagModule.io.matrixOut.bits
io.inverseDCTOut := idctModule.io.idctOut.bits
// Output
when(idctModule.io.idctOut.valid) {
io.decodedPixels := idctModule.io.idctOut.bits
}
}
```
### Test framework
When running the command below, the program will first execute the` hardware` implementation and then verify the results against the `software` reference model:
```shell
$ python3 unittest.py
```
The output contains two main parts:
1. Hardware execution output: Results from JPEG encoder hardware implementation
2. Software validation results: Comparison between hardware and software model outputs for each stage (DCT, Quantization, ZigZag) of each color component (Y, Cb, Cr)
```shell
Running hardware test...
Take 17.22s
Hardware test completed
=== Hardware Test Output ===
Converting JPG to BMP...
Converted 8.jpg to output.bmp
Reading and processing BMP file...
Running Chisel implementation...
Running Chisel tests...
Reading encoded data...
Performing Huffman coding...
Huffman Coding Results:
Y RLE: 15 unique codes
Y DELTA: 1 unique codes
Cb RLE: 5 unique codes
Cb DELTA: 1 unique codes
Cr RLE: 6 unique codes
Cr DELTA: 1 unique codes
Creating bitstreams...
Created bitstream for Y: hw_output/bitstream/y_encoded.bin
Created bitstream for Cb: hw_output/bitstream/cb_encoded.bin
Created bitstream for Cr: hw_output/bitstream/cr_encoded.bin
Analyzing Huffman table statistics...
Y RLE Statistics:
Total symbols: 15
Min code length: 1
Max code length: 6
Average code length: 4.93
Cb RLE Statistics:
Total symbols: 5
Min code length: 2
Max code length: 3
Average code length: 2.40
Cr RLE Statistics:
Total symbols: 6
Min code length: 1
Max code length: 4
Average code length: 3.17
Compression Results:
Original size: 192 bytes
Compressed size: 138 bytes
Compression ratio: 1.39:1
===========================
Testing Y component:
Test Y_DCT: PASS
Test Y_Quantization: PASS
Test Y_Zigzag: PASS
Testing Cb component:
Test Cb_DCT: PASS
Test Cb_Quantization: PASS
Test Cb_Zigzag: PASS
Testing Cr component:
Test Cr_DCT: PASS
Test Cr_Quantization: PASS
Test Cr_Zigzag: PASS
```
The code below shows our test framework that automates the hardware and software verification process:
```python=
import numpy as np
import subprocess
import time
from test import *
class JPEGDCT:
def __init__(self, scaling_factor=100):
self.scaling_factor = scaling_factor
def process_block(self, block):
N = 8
shifted_block = np.array(block, dtype=np.int32) - 128
dct_block = np.zeros((N, N), dtype=np.int64)
for u in range(N):
for v in range(N):
sum_val = 0
for i in range(N):
for j in range(N):
pixel_value = shifted_block[i][j]
cos_i = int(np.cos((2 * i + 1) * u * np.pi / 16) * 100)
cos_j = int(np.cos((2 * j + 1) * v * np.pi / 16) * 100)
cos_val = (pixel_value * cos_i) // 100
sum_val += cos_val * cos_j
alpha_u = int((1.0 / np.sqrt(2)) * 100) if u == 0 else 100
alpha_v = int((1.0 / np.sqrt(2)) * 100) if v == 0 else 100
intermediate = (sum_val * alpha_u) // 100
final = (intermediate * alpha_v) // 100
dct_block[u][v] = final // 4
return dct_block
class JPEGQuantization:
def __init__(self, qt_choice=1):
self.qt_choice = qt_choice
# Define quantization tables based on qt_choice
if qt_choice == 1: # Luminance (Y)
self.quant_table = np.array([
[16, 11, 10, 16, 24, 40, 51, 61],
[12, 12, 14, 19, 26, 58, 60, 55],
[14, 13, 16, 24, 40, 57, 69, 56],
[14, 17, 22, 29, 51, 87, 80, 62],
[18, 22, 37, 56, 68, 109, 103, 77],
[24, 35, 55, 64, 81, 104, 113, 92],
[49, 64, 78, 87, 103, 121, 120, 101],
[72, 92, 95, 98, 112, 100, 103, 99]
])
else: # Chrominance (Cb, Cr)
self.quant_table = np.array([
[17, 18, 24, 47, 99, 99, 99, 99],
[18, 21, 26, 66, 99, 99, 99, 99],
[24, 26, 56, 99, 99, 99, 99, 99],
[47, 66, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99]
])
def quantize(self, dct_block):
"""Quantize the DCT coefficients"""
return np.round(dct_block / self.quant_table).astype(np.int32)
class JPEGZigzag:
def __init__(self):
# Define zigzag scanning order
self.zigzag_order = [
(0,0), (0,1), (1,0), (2,0), (1,1), (0,2), (0,3), (1,2),
(2,1), (3,0), (4,0), (3,1), (2,2), (1,3), (0,4), (0,5),
(1,4), (2,3), (3,2), (4,1), (5,0), (6,0), (5,1), (4,2),
(3,3), (2,4), (1,5), (0,6), (0,7), (1,6), (2,5), (3,4),
(4,3), (5,2), (6,1), (7,0), (7,1), (6,2), (5,3), (4,4),
(3,5), (2,6), (1,7), (2,7), (3,6), (4,5), (5,4), (6,3),
(7,2), (7,3), (6,4), (5,5), (4,6), (3,7), (4,7), (5,6),
(6,5), (7,4), (7,5), (6,6), (5,7), (6,7), (7,6), (7,7)
]
def scan(self, block):
"""Convert block to 1D array in zigzag order"""
return [block[i][j] for i, j in self.zigzag_order]
class JPEGTest:
@staticmethod
def read_block(filename):
"""Read an 8x8 block from file"""
with open(filename, 'r') as f:
values = [int(line.strip()) for line in f]
return np.array(values).reshape(8, 8)
def test_dct(self, input_file):
"""Test DCT processing"""
print(f"Testing DCT for {input_file}")
block = self.read_block(input_file)
dct = JPEGDCT()
result = dct.process_block(block)
print("\n=== DCT Output ===")
for row in result:
print(" ".join(f"{val:5d}" for val in row))
return result
def test_quantization(self, dct_result, qt_choice):
"""Test quantization processing"""
print("\n=== Quantization Output ===")
quant = JPEGQuantization(qt_choice)
quant_result = quant.quantize(dct_result)
for row in quant_result:
print(" ".join(f"{val:5d}" for val in row))
return quant_result
def test_zigzag(self, quant_result):
"""Test zigzag scanning"""
print("\n=== Zigzag Output ===")
zigzag = JPEGZigzag()
zigzag_result = zigzag.scan(quant_result)
for i, value in enumerate(zigzag_result):
print(f"Index {i}: {value}")
return zigzag_result
def compare_results(hw_file, sw_result, stage_name, threshold=1000):
try:
hw_values = np.loadtxt(hw_file, dtype=np.int64)
if isinstance(sw_result, list):
sw_result = np.array(sw_result, dtype=np.int64)
else:
sw_result = sw_result.astype(np.int64)
if len(hw_values.shape) == 2:
hw_values = hw_values.flatten()
if len(sw_result.shape) == 2:
sw_result = sw_result.flatten()
# For DCT stage
if 'DCT' in stage_name:
hw_values = hw_values // 10000
# For both Quantization and ZigZag stages
if 'Quantization' in stage_name or 'Zigzag' in stage_name:
sw_result = sw_result // 10000
diff = np.abs(hw_values - sw_result)
max_diff = np.max(diff)
if max_diff < threshold:
print(f"Test {stage_name}: PASS")
return True
else:
print(f"Test {stage_name}: FAIL (max diff: {max_diff:.2f})")
print(f"Hardware range: [{np.min(hw_values)}, {np.max(hw_values)}]")
print(f"Software range: [{np.min(sw_result)}, {np.max(sw_result)}]")
return False
except Exception as e:
print(f"Test {stage_name}: ERROR - {str(e)}")
return False
def test_full_pipeline():
try:
print("\nRunning hardware test...")
hw_start_time = time.time()
result = subprocess.run(["python3", "test.py"], capture_output=True, text=True)
hw_time = time.time() - hw_start_time
print(f"Take {hw_time:.2f}s")
if result.returncode != 0:
print("\nHardware test Failed")
print(result.stderr)
return
else:
print("\nHardware test completed")
print("\n=== Hardware Test Output ===")
if result.stdout:
print(result.stdout)
if result.stderr:
print("Errors:", result.stderr)
print("===========================\n")
components = [
("Y", 1),
("Cb", 2),
("Cr", 2)
]
for comp, qt_choice in components:
print(f"\nTesting {comp} component:")
test = JPEGTest()
input_block = test.read_block(f"hw_output/{comp.lower()}_block_0.txt")
# DCT
dct = JPEGDCT()
sw_dct = dct.process_block(input_block)
# Quantization
quant = JPEGQuantization(qt_choice=qt_choice)
sw_quant = quant.quantize(sw_dct)
# Zigzag
zigzag = JPEGZigzag()
sw_zigzag = zigzag.scan(sw_quant)
compare_results(f"hw_output/chisel_dct_{comp}.txt", sw_dct, f"{comp}_DCT")
compare_results(f"hw_output/chisel_quant_{comp}.txt", sw_quant, f"{comp}_Quantization")
compare_results(f"hw_output/chisel_zigzag_{comp}.txt", sw_zigzag, f"{comp}_Zigzag")
except Exception as e:
print(f"Error during testing: {e}")
raise
def main():
test_full_pipeline()
def generate_comparison_report(input_name):
print(f"\n=== Comparison Report for {input_name} ===")
print("\nSoftware Output:")
print("\nHardware Output:")
print("\nDifference Analysis:")
if __name__ == "__main__":
main()
```
## Verilator
Run the command below
```shell
$ sbt "runMain jpeg.VerilogGenerator"
```
Generate `JPEGEncodeChisel.v`, `JPEGEncodeChisel.fir` and `JPEGEncodeChisel.anno.json`.
Run
```shell
$ verilator --cc ./verilog_output/JPEGEncodeChisel.v --exe testbench.cpp --trace
$ make -j -C obj_dir -f VJPEGEncodeChisel.mk VJPEGEncodeChisel
```


## Reference
* [itu-t81](https://www.w3.org/Graphics/JPEG/itu-t81.pdf)
* [CA2023-lab3](https://github.com/sysprog21/ca2023-lab3)
* [JPEG 簡介](https://twins.ee.nctu.edu.tw/courses/soclab_04/lab_hw_pdf/proj1_jpeg_introduction.pdf)