方榮彬, 龔祐萱
JPEG compression is a widely used image compression standard designed to reduce the file size of digital images while preserving visual quality. It employs a lossy compression technique, which removes redundant and less noticeable information to achieve high compression ratios. The process involves several key steps: color space transformation, Discrete Cosine Transform (DCT
) to separate image frequencies, quantization to reduce precision, and Huffman coding to encode the data efficiently. This makes JPEG ideal for applications where storage and bandwidth are critical, such as photography, web graphics, and mobile devices, while maintaining a balance between image quality and file size.
where:
Zig-zag scanning
The quantized 8x8 block is converted into a 1D sequence using a zigzag pattern
Differential Pulse Code Modulation (DPCM)
Calculate the difference between the DC values of adjacent blocks to reduce redundant information
DC difference
and AC coefficients
.Steps for Huffman Coding:
a. DC Component Encoding:
b. AC Component Encoding:
c. Huffman Tables:
The Python script utils.py
is designed to convert images of formats like *.png
or *.jpg
into the *.bmp
format, which is compatible with further processing in Chisel.
utils.py
# utils.py
import numpy as np
from PIL import Image
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert image to BMP format')
parser.add_argument('input_path', type=str, help='Input image path')
parser.add_argument('--output', '-o', type=str, default='output.bmp',
help='Output BMP file path (default: output.bmp)')
parser.add_argument('--quality', '-q', type=int, default=100,
help='Output quality 0-100 (default: 100)')
args = parser.parse_args()
# Open Image
img = Image.open(args.input_path)
# ensure RGB
if img.mode != 'RGB':
img = img.convert('RGB')
# store as BMP
img.save(args.output, "BMP", quality=args.quality)
By running the below command
$ python utils.py {Input image name} {Output image name} {Quality}
This converts {input}.jpg
into {output}.bmp
at maximum quality. However, we need to convert the image from ``RGB space to YCbCr
space, and store each channel in a separate file using the following function.
def read_bmp(filepath):
"""
Read BMP and convert it to YCbCr
"""
img = Image.open(filepath)
if img.mode != 'RGB':
raise ValueError("Image must be in RGB format!")
ycbcr = img.convert('YCbCr')
y, cb, cr = ycbcr.split()
width, height = img.size
if width % 8 != 0 or height % 8 != 0:
raise ValueError(f"Image dimensions must be multiples of 8! Current size: {width}x{height}")
return np.array(y), np.array(cb), np.array(cr)
And then we will spilt the image to 8x8
block to maintain the compatibility.
def extract_blocks(channel_data):
h, w = channel_data.shape
blocks = []
for i in range(0, h, 8):
for j in range(0, w, 8):
block = channel_data[i:i+8, j:j+8]
block = block.astype(np.int16) - 128
blocks.append(block)
return blocks
def save_blocks_for_chisel(blocks, component_name, output_dir="hw_output"):
os.makedirs(output_dir, exist_ok=True)
for idx, block in enumerate(blocks):
filename = f"{output_dir}/{component_name}_block_{idx}.txt"
with open(filename, 'w') as f:
for row in block:
for val in row:
f.write(f"{val}\n")
In the hardware design, we read the image data through a test harness that loads preprocessed 8x8 blocks from text files:
def readDataFromFile(fileName: String): Seq[Seq[Int]] = {
val lines = Source.fromFile(fileName).getLines().toSeq
require(lines.size == 64, s"File $fileName must contain 64 lines")
val data = lines.map(_.toInt)
data.grouped(8).toSeq
}
test(new JPEGEncodeChisel(p)) { dut =>
dut.io.in.valid.poke(true.B)
for (i <- 0 until 8; j <- 0 until 8) {
dut.io.in.bits.yComponent(i)(j).poke(dataY(i)(j).S)
}
}
The hardware receives image data as 8x8 blocks through its input interface, where each pixel value is represented as a signed integer after preprocessing. The input interface supports three color components (Y, Cb, Cr).
We discovered issues in the original zigzag implementation and made the following improvements:
src/main/scala/jpeg/zigzag.scala
is(ZigZagState.idle) {
when (io.in.valid) {
stateReg := ZigZagState.processing
inMatrix := io.in.bits.matrixIn
validOut := false.B
+ count := 0.U // Reset counter
+ row := 0.U // Reset position
+ col := 0.U
+ isUp := true.B
}
}
is(ZigZagState.processing) {
- count := count + 1.U // Incorrect: increment at start
when(count < p.totalElements.U) {
outReg(count) := inMatrix(row)(col)
// zigzag traversal logic
...
+ count := count + 1.U // Correct: increment after processing
}
}
The module uses a state machine to traverse the input data and generate a compressed output. The RLE algorithm identifies sequences of identical elements (referred to as "runs") in the input and encodes them as a pair of values:
When we finish the test, we can obtain the compressed data in 1d
sequence, where the even index (0-indexed
) is for the number of identical number and the odd index is for the value. The format is well-suited for doing huffman coding in sw.
The original implementation had limitations in following the JPEG standard. Here's how we improved it:
src/main/scala/jpeg/encoding.scala
- is(EncodingState.encode){
- outputReg(0) := dataReg(0) // Simply copying first value
- when (dataIndex < p.totalElements.U) {
- val diff = dataReg(dataIndex) - dataReg(dataIndex - 1.U)
+ // Store previous block's DC value
+ val prevTopLeft = RegInit(0.S(p.w8))
+ // In encoding state
+ when(dataIndex === 1.U) {
+ // Proper DPCM: Current DC - Previous DC
+ val topLeftDiff = dataReg(0) - prevTopLeft
+ outputReg(0) := topLeftDiff
+ prevTopLeft := dataReg(0) // Update for next block
+ }
Key improvements:
This implementation now correctly follows the JPEG standard's DPCM encoding for DC coefficients.
Our implementation of Huffman coding is shown below:
RLE
or Delta
) across all blocks and channels (Y
, Cb
, Cr
).heapq
module, where each node is an instance of HuffmanNode
, which stores:
char
: The symbol (e.g., a value from RLE or Delta encoding).freq
: The frequency of the symbol.left
and right
: Pointers to child nodes.0
when moving to the left child.1
when moving to the right child.test.py
def generate_huffman_codes(frequencies):
"""
Generate Huffman codes for given frequencies
Args:
frequencies: Dictionary of value:frequency pairs
Returns:
Dictionary of value:huffman_code pairs
"""
if not frequencies:
return {}
# Create heap from frequencies
heap = []
for char, freq in frequencies.items():
heapq.heappush(heap, HuffmanNode(char, freq))
# Build Huffman tree
while len(heap) > 1:
left = heapq.heappop(heap)
right = heapq.heappop(heap)
internal = HuffmanNode(None, left.freq + right.freq)
internal.left = left
internal.right = right
heapq.heappush(heap, internal)
# Generate codes by traversing tree
codes = {}
def generate_codes_recursive(node, code=""):
if node is None:
return
if node.char is not None:
codes[node.char] = code
return
generate_codes_recursive(node.left, code + "0")
generate_codes_recursive(node.right, code + "1")
if heap:
generate_codes_recursive(heap[0])
return codes
def perform_huffman_coding(encoded_data):
"""
Perform Huffman coding on RLE and Delta encoded data
Args:
encoded_data: Dictionary containing encoded data for each component
"""
for encoding_type in ['RLE', 'Delta']:
data = read_encoded_blocks(encoding_type=encoding_type)
for component in ['Y', 'Cb', 'Cr']:
# Collect all values for frequency calculation
all_values = []
for block in data[component]:
all_values.extend(block)
# Create Huffman tree
frequencies = {}
for value in all_values:
frequencies[value] = frequencies.get(value, 0) + 1
# Generate Huffman codes
huffman_codes = generate_huffman_codes(frequencies)
# Save Huffman codes and encoded data
save_huffman_output(
component,
encoding_type,
huffman_codes,
data[component],
output_dir="hw_output/huffman"
)
The bitstream is structured to contain both encoding types (RLE and Delta) for each color component. For each component (Y, Cb, Cr), the bitstream is organized as follows:
def create_bitstream(rle_data, delta_data, output_dir="hw_output/bitstream"):
for component in ['Y', 'Cb', 'Cr']:
bitstream = bytearray()
# 1. Component Marker
component_marker = ord(component[0]) # ASCII value of Y/C
bitstream.append(component_marker)
# 2. DC Value (from Delta encoding)
dc_value = delta_data[component][0][0]
bitstream.extend(int(dc_value).to_bytes(2, byteorder='big', signed=True))
# 3. AC Values (from RLE encoding)
for block in rle_data[component]:
for run_length, value in zip(block[::2], block[1::2]):
bitstream.append(int(run_length)) # 1 byte for run length
bitstream.extend(int(value).to_bytes(2, byteorder='big', signed=True))
The resulting bitstream format:
Each component's data is stored in a separate binary file under the hw_output/bitstream
directory.
After implementing our Huffman encoding process, we can analyze the efficiency of the compression:
For RLE (Run-Length Encoding) results:
These statistics demonstrate the data characteristics of each color component after DCT and quantization, with Y (luminance) requiring more diverse encoding patterns than the chrominance components (Cb, Cr).
The final compression results show:
This creates a clear flow from our compression statistics to the actual bitstream structure.
Y RLE Statistics:
Total symbols: 15
Min code length: 1
Max code length: 6
Average code length: 4.93
Cb RLE Statistics:
Total symbols: 5
Min code length: 2
Max code length: 3
Average code length: 2.40
Cr RLE Statistics:
Total symbols: 6
Min code length: 1
Max code length: 4
Average code length: 3.17
Compression Results:
Original size: 192 bytes
Compressed size: 138 bytes
Compression ratio: 1.39:1
The purpose of the IDCT module is to transform frequency domain data into spatial domain data. This module employs a state machine to manage and transition between the corresponding operations.
Waiting State
This is the default initial state, indicating that the module is waiting for a valid input.
is(IDCTState.waiting) {
when(io.in.valid) {
dctInput := io.in.bits.matrixIn
state := IDCTState.calculating
validOut := false.B
}
}
When io.in.valid
is true
, the program will store the 8x8 matrix (io.in.bits.matrixIn
) into the register dctInput
and switch the state to calculating
. Then, it will set the output validity flag (validOut
) to false
, indicating that no valid output is ready yet.
Calculating State
This is the critical state where the Inverse Discrete Consine Transform (IDCT) computation is performed. The IDCT
function is invoked to compute each element of the matrix using the mathematical IDCT formula.
To enhance the precision of the calculations, intermediate values—such as the cosine coefficients and the scaling factors (alphaU
and alphaV
)—are multiplied by 100. This scaling helps mitigate precision loss during computation. Subsequently, The program divides the accumulated values by 1,000,000
to revert the scaling effect.
Once the IDCT computation is complete, the operation adds 128
to each element of the matrix to restore it to its original value range. The final result is stored in the register matrixOutput
, and the machine state is transitioned back to the waiting
state.
The complete program is as follows:
package jpeg
import chisel3._
import chisel3.internal.firrtl.Width
import chisel3.util._
import scala.math._
import chisel3.experimental._
/**
* Object for IDCT
*/
object IDCTChisel {
def apply(matrixIn: Valid[Vec[Vec[SInt]]], shiftedOut: Vec[Vec[SInt]], idctOut: Valid[Vec[Vec[SInt]]]) = {
val mod = Module(new IDCTChisel)
mod.io.in := matrixIn
mod.io.shiftedOut := shiftedOut
mod.io.idctOut := idctOut
mod
}
}
/**
* Creates FSM states for IDCT
*/
object IDCTState extends ChiselEnum {
val calculating, waiting = Value
}
/** Performs IDCT on 8x8 Matrix with scaling
*
* IO
* @param dctIn Input matrix to perform IDCT on
*
* @return shiftedOut Re-shifted version of output matrix by +128
* @return idctOut Resulting restored output of IDCT
*/
class IDCTChisel extends Module {
val io = IO(new Bundle {
val in = Flipped(Valid(new Bundle {
val matrixIn = Input(Vec(8, Vec(8, SInt(32.W))))
}))
val shiftedOut = Output(Vec(8, Vec(8, SInt(9.W)))) // Test output to check re-shifted block
val idctOut = Valid(Vec(8, Vec(8, SInt(9.W))))
})
// Initializes registers for matrix and Valid bit
val dctInput = Reg(Vec(8, Vec(8, SInt(32.W))))
val shiftedBlock = Reg(Vec(8, Vec(8, SInt(9.W))))
val matrixOutput = Reg(Vec(8, Vec(8, SInt(9.W))))
val validOut = RegInit(false.B)
// Assigns outputs
io.idctOut.valid := validOut
io.idctOut.bits := matrixOutput
io.shiftedOut := DontCare
// Function to compute IDCT values for each element of input matrix
def IDCT(matrix: Vec[Vec[SInt]]): Vec[Vec[SInt]] = {
val idctMatrix = Wire(Vec(8, Vec(8, SInt(9.W))))
// Compute IDCT
for (i <- 0 until 8) {
for (j <- 0 until 8) {
var sum = 0.S
for (u <- 0 until 8) {
for (v <- 0 until 8) {
val coefValue = matrix(u)(v)
val alphaU = if (u == 0) (1.0 / math.sqrt(2)) * 100 else 100
val alphaV = if (v == 0) (1.0 / math.sqrt(2)) * 100 else 100
val cosVal = (math.cos((2 * i + 1) * u * Pi / 16) * math.cos((2 * j + 1) * v * Pi / 16) * 100).toInt.S
val divSum = (coefValue * cosVal * alphaU.toInt.S * alphaV.toInt.S) / 1000000.S
sum = sum +& divSum / 1000000.S
}
}
idctMatrix(i)(j) := (sum / 4.S)
}
}
idctMatrix
}
// Initializes state and defines FSM
val state = RegInit(IDCTState.waiting)
switch(state) {
is(IDCTState.waiting) {
when(io.in.valid) {
dctInput := io.in.bits.matrixIn
state := IDCTState.calculating
validOut := false.B
}
}
is(IDCTState.calculating) {
// Assigns output matrix to calculated IDCT values
val idctResult = IDCT(dctInput)
for (i <- 0 until 8) {
for (j <- 0 until 8) {
matrixOutput(i)(j) := idctResult(i)(j) +& 128.S // Re-shifting to restore original range
}
}
io.shiftedOut := matrixOutput
state := IDCTState.waiting
validOut := true.B
}
}
}
Our hardware encoder workflow is listed below ( from 1. to 4. ):
RLE
and the other is DPCM
.RLE
(Run-Length Encoding): Compresses sequences of repeated valuesDPCM
): Encodes differences between consecutive values
package jpeg
import chisel3._
import chisel3.internal.firrtl.Width
import chisel3.util._
import chisel3.stage.{ChiselStage, ChiselGeneratorAnnotation}
import firrtl.options.TargetDirAnnotation
/**
* Performs JPEG Compression
*
* @param p JPEG Parameters
*
* IO
* @param pixelDataIn Pixel data to be encoded
*
* @return dctOut DCT output used in testing
* @return quantOut Quantization output used in testing
* @return zigzagOut Zigzag output used in testing
* @return encoded Encoded pixel data
*
*/
class JPEGEncodeChisel(p: JPEGParams) extends Module {
val io = IO(new Bundle {
val in = Flipped(Valid(new Bundle {
val yComponent = Input(Vec(p.givenRows, Vec(p.givenCols, SInt(9.W))))
}))
// Y output
val dctOutY = Output(Vec(8, Vec(8, SInt(32.W))))
val quantOutY = Output(Vec(p.numRows, Vec(p.numCols, SInt(32.W))))
val zigzagOutY = Output(Vec(p.totalElements, SInt(32.W)))
val encodedRLEY = Output(Vec(p.maxOutRLE, SInt(p.w8)))
val encodedDeltaY = Output(Vec(p.totalElements, SInt(p.w8)))
})
// Dontcare for output yet
io.encodedRLEY := DontCare
io.encodedDeltaY := DontCare
// Y channel
// Discrete Cosine Transform Module
val dctModuleY = Module(new DCTChisel)
val quantModuleY = Module(new QuantizationChisel(p.copy(qtChoice = 1)))
val zigzagModuleY = Module(new ZigZagChisel(p))
// val encodeModuleY = Module(new RLEChiselEncode(p))
dctModuleY.io.in.valid := io.in.valid
dctModuleY.io.in.bits.matrixIn := io.in.bits.yComponent
quantModuleY.io.in.valid := dctModuleY.io.dctOut.valid
quantModuleY.io.in.bits.data := dctModuleY.io.dctOut.bits
quantModuleY.io.quantTable.zipWithIndex.foreach { case (row, i) =>
row.zipWithIndex.foreach { case (element, j) =>
element := p.getQuantTable(i)(j).S
}
}
zigzagModuleY.io.in.valid := quantModuleY.io.out.valid
zigzagModuleY.io.in.bits.matrixIn := quantModuleY.io.out.bits
when(zigzagModuleY.io.zigzagOut.valid) {
when(p.encodingChoice.B) {
val encodeModuleYr = Module(new RLEChiselEncode(p))
encodeModuleYr.io.in.valid := zigzagModuleY.io.zigzagOut.valid
encodeModuleYr.io.in.bits.data := zigzagModuleY.io.zigzagOut.bits
io.encodedRLEY := encodeModuleYr.io.out.bits
val encodeModuleYd = Module(new DeltaChiselEncode(p))
encodeModuleYd.io.in.valid := zigzagModuleY.io.zigzagOut.valid
encodeModuleYd.io.in.bits.data := zigzagModuleY.io.zigzagOut.bits
io.encodedDeltaY := encodeModuleYd.io.out.bits
}
}
io.dctOutY := dctModuleY.io.dctOut.bits
io.quantOutY := quantModuleY.io.out.bits
io.zigzagOutY := zigzagModuleY.io.zigzagOut.bits
}
Our hardware decoder workflow is listed below (from 1. to 4.):
RLE
(Run-Length Encoding): Decompresses sequences of repeated valuesDPCM
): Decodes differences between consecutive values
package jpeg
import chisel3._
import chisel3.internal.firrtl.Width
import chisel3.util._
/**
* Performs JPEG Decompression
*
* @param p JPEG Parameters
*
* IO
* @param encodedRLE RLE encoded pixel data or Delta encoded data
*
* @return decodedPixels Decoded pixel data
*
*/
class JPEGDecodeChisel(p: JPEGParams) extends Module {
val io = IO(new Bundle {
// Encoded input data (RLE or Delta)
val in = Flipped(Valid(new Bundle {
val encodedRLE = Input(Vec(p.maxOutRLE, SInt(9.W)))
val encodedDelta = Input(Vec(p.totalElements, SInt(8.W)))
}))
val length = Input(UInt(9.W))
// Decoded Output
val decodedPixels = Output(Vec(p.givenRows, Vec(p.givenCols, SInt(9.W))))
// Intermediate testing outputs (optional for debugging)
val decodeRLEOut = Output(Vec(p.totalElements, SInt(9.W)))
val decodeDeltaOut = Output(Vec(p.totalElements, SInt(p.w16)))
val inverseZigzagOut = Output(Vec(p.numRows, Vec(p.numCols, SInt(9.W))))
val inverseQuantOut = Output(Vec(p.numRows, Vec(p.numCols, SInt(12.W))))
val inverseDCTOut = Output(Vec(8, Vec(8, SInt(9.W))))
})
// Dontcare for output yet
io.decodedPixels := DontCare
// Decoding Module
// RLE decode
val RLEdecodeModule = Module(new RLEChiselDecode(p))
RLEdecodeModule.io.in.valid := io.in.valid
RLEdecodeModule.io.length := io.length
RLEdecodeModule.io.in.bits.data := io.in.bits.encodedRLE
io.decodeRLEOut := RLEdecodeModule.io.out.bits
// Delta decode
val DeltadecodeModule = Module(new DeltaChiselDecode(p))
DeltadecodeModule.io.in.valid := io.in.valid
DeltadecodeModule.io.in.bits.data := io.in.bits.encodedDelta
io.decodeDeltaOut := DeltadecodeModule.io.out.bits
// Inverse ZigZag Module
val inversezigzagModule = Module(new InverseZigZagChisel(p))
inversezigzagModule.io.in.valid := RLEdecodeModule.io.out.valid || DeltadecodeModule.io.out.valid
inversezigzagModule.io.in.bits.zigzagIn := RLEdecodeModule.io.out.bits
io.inverseZigzagOut := inversezigzagModule.io.matrixOut.bits
// Inverse Quantization Module
val inversequantModule = Module(new InverseQuantizationChisel(p))
inversequantModule.io.in.valid := inversezigzagModule.io.matrixOut.valid
inversequantModule.io.in.bits.data := inversezigzagModule.io.matrixOut.bits
io.inverseQuantOut := inversequantModule.io.out.bits
// Converts Quantization Table back to SInt
inversequantModule.io.quantTable.zipWithIndex.foreach { case (row, i) =>
row.zipWithIndex.foreach { case (element, j) =>
element := p.getQuantTable(i)(j).S
}
}
// Inverse DCT (IDCT) Module
val idctModule = Module(new IDCTChisel)
idctModule.io.in.valid := inversezigzagModule.io.matrixOut.valid
idctModule.io.in.bits.matrixIn := inversezigzagModule.io.matrixOut.bits
io.inverseDCTOut := idctModule.io.idctOut.bits
// Output
when(idctModule.io.idctOut.valid) {
io.decodedPixels := idctModule.io.idctOut.bits
}
}
When running the command below, the program will first execute the hardware
implementation and then verify the results against the software
reference model:
$ python3 unittest.py
The output contains two main parts:
Running hardware test...
Take 17.22s
Hardware test completed
=== Hardware Test Output ===
Converting JPG to BMP...
Converted 8.jpg to output.bmp
Reading and processing BMP file...
Running Chisel implementation...
Running Chisel tests...
Reading encoded data...
Performing Huffman coding...
Huffman Coding Results:
Y RLE: 15 unique codes
Y DELTA: 1 unique codes
Cb RLE: 5 unique codes
Cb DELTA: 1 unique codes
Cr RLE: 6 unique codes
Cr DELTA: 1 unique codes
Creating bitstreams...
Created bitstream for Y: hw_output/bitstream/y_encoded.bin
Created bitstream for Cb: hw_output/bitstream/cb_encoded.bin
Created bitstream for Cr: hw_output/bitstream/cr_encoded.bin
Analyzing Huffman table statistics...
Y RLE Statistics:
Total symbols: 15
Min code length: 1
Max code length: 6
Average code length: 4.93
Cb RLE Statistics:
Total symbols: 5
Min code length: 2
Max code length: 3
Average code length: 2.40
Cr RLE Statistics:
Total symbols: 6
Min code length: 1
Max code length: 4
Average code length: 3.17
Compression Results:
Original size: 192 bytes
Compressed size: 138 bytes
Compression ratio: 1.39:1
===========================
Testing Y component:
Test Y_DCT: PASS
Test Y_Quantization: PASS
Test Y_Zigzag: PASS
Testing Cb component:
Test Cb_DCT: PASS
Test Cb_Quantization: PASS
Test Cb_Zigzag: PASS
Testing Cr component:
Test Cr_DCT: PASS
Test Cr_Quantization: PASS
Test Cr_Zigzag: PASS
The code below shows our test framework that automates the hardware and software verification process:
import numpy as np
import subprocess
import time
from test import *
class JPEGDCT:
def __init__(self, scaling_factor=100):
self.scaling_factor = scaling_factor
def process_block(self, block):
N = 8
shifted_block = np.array(block, dtype=np.int32) - 128
dct_block = np.zeros((N, N), dtype=np.int64)
for u in range(N):
for v in range(N):
sum_val = 0
for i in range(N):
for j in range(N):
pixel_value = shifted_block[i][j]
cos_i = int(np.cos((2 * i + 1) * u * np.pi / 16) * 100)
cos_j = int(np.cos((2 * j + 1) * v * np.pi / 16) * 100)
cos_val = (pixel_value * cos_i) // 100
sum_val += cos_val * cos_j
alpha_u = int((1.0 / np.sqrt(2)) * 100) if u == 0 else 100
alpha_v = int((1.0 / np.sqrt(2)) * 100) if v == 0 else 100
intermediate = (sum_val * alpha_u) // 100
final = (intermediate * alpha_v) // 100
dct_block[u][v] = final // 4
return dct_block
class JPEGQuantization:
def __init__(self, qt_choice=1):
self.qt_choice = qt_choice
# Define quantization tables based on qt_choice
if qt_choice == 1: # Luminance (Y)
self.quant_table = np.array([
[16, 11, 10, 16, 24, 40, 51, 61],
[12, 12, 14, 19, 26, 58, 60, 55],
[14, 13, 16, 24, 40, 57, 69, 56],
[14, 17, 22, 29, 51, 87, 80, 62],
[18, 22, 37, 56, 68, 109, 103, 77],
[24, 35, 55, 64, 81, 104, 113, 92],
[49, 64, 78, 87, 103, 121, 120, 101],
[72, 92, 95, 98, 112, 100, 103, 99]
])
else: # Chrominance (Cb, Cr)
self.quant_table = np.array([
[17, 18, 24, 47, 99, 99, 99, 99],
[18, 21, 26, 66, 99, 99, 99, 99],
[24, 26, 56, 99, 99, 99, 99, 99],
[47, 66, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99]
])
def quantize(self, dct_block):
"""Quantize the DCT coefficients"""
return np.round(dct_block / self.quant_table).astype(np.int32)
class JPEGZigzag:
def __init__(self):
# Define zigzag scanning order
self.zigzag_order = [
(0,0), (0,1), (1,0), (2,0), (1,1), (0,2), (0,3), (1,2),
(2,1), (3,0), (4,0), (3,1), (2,2), (1,3), (0,4), (0,5),
(1,4), (2,3), (3,2), (4,1), (5,0), (6,0), (5,1), (4,2),
(3,3), (2,4), (1,5), (0,6), (0,7), (1,6), (2,5), (3,4),
(4,3), (5,2), (6,1), (7,0), (7,1), (6,2), (5,3), (4,4),
(3,5), (2,6), (1,7), (2,7), (3,6), (4,5), (5,4), (6,3),
(7,2), (7,3), (6,4), (5,5), (4,6), (3,7), (4,7), (5,6),
(6,5), (7,4), (7,5), (6,6), (5,7), (6,7), (7,6), (7,7)
]
def scan(self, block):
"""Convert block to 1D array in zigzag order"""
return [block[i][j] for i, j in self.zigzag_order]
class JPEGTest:
@staticmethod
def read_block(filename):
"""Read an 8x8 block from file"""
with open(filename, 'r') as f:
values = [int(line.strip()) for line in f]
return np.array(values).reshape(8, 8)
def test_dct(self, input_file):
"""Test DCT processing"""
print(f"Testing DCT for {input_file}")
block = self.read_block(input_file)
dct = JPEGDCT()
result = dct.process_block(block)
print("\n=== DCT Output ===")
for row in result:
print(" ".join(f"{val:5d}" for val in row))
return result
def test_quantization(self, dct_result, qt_choice):
"""Test quantization processing"""
print("\n=== Quantization Output ===")
quant = JPEGQuantization(qt_choice)
quant_result = quant.quantize(dct_result)
for row in quant_result:
print(" ".join(f"{val:5d}" for val in row))
return quant_result
def test_zigzag(self, quant_result):
"""Test zigzag scanning"""
print("\n=== Zigzag Output ===")
zigzag = JPEGZigzag()
zigzag_result = zigzag.scan(quant_result)
for i, value in enumerate(zigzag_result):
print(f"Index {i}: {value}")
return zigzag_result
def compare_results(hw_file, sw_result, stage_name, threshold=1000):
try:
hw_values = np.loadtxt(hw_file, dtype=np.int64)
if isinstance(sw_result, list):
sw_result = np.array(sw_result, dtype=np.int64)
else:
sw_result = sw_result.astype(np.int64)
if len(hw_values.shape) == 2:
hw_values = hw_values.flatten()
if len(sw_result.shape) == 2:
sw_result = sw_result.flatten()
# For DCT stage
if 'DCT' in stage_name:
hw_values = hw_values // 10000
# For both Quantization and ZigZag stages
if 'Quantization' in stage_name or 'Zigzag' in stage_name:
sw_result = sw_result // 10000
diff = np.abs(hw_values - sw_result)
max_diff = np.max(diff)
if max_diff < threshold:
print(f"Test {stage_name}: PASS")
return True
else:
print(f"Test {stage_name}: FAIL (max diff: {max_diff:.2f})")
print(f"Hardware range: [{np.min(hw_values)}, {np.max(hw_values)}]")
print(f"Software range: [{np.min(sw_result)}, {np.max(sw_result)}]")
return False
except Exception as e:
print(f"Test {stage_name}: ERROR - {str(e)}")
return False
def test_full_pipeline():
try:
print("\nRunning hardware test...")
hw_start_time = time.time()
result = subprocess.run(["python3", "test.py"], capture_output=True, text=True)
hw_time = time.time() - hw_start_time
print(f"Take {hw_time:.2f}s")
if result.returncode != 0:
print("\nHardware test Failed")
print(result.stderr)
return
else:
print("\nHardware test completed")
print("\n=== Hardware Test Output ===")
if result.stdout:
print(result.stdout)
if result.stderr:
print("Errors:", result.stderr)
print("===========================\n")
components = [
("Y", 1),
("Cb", 2),
("Cr", 2)
]
for comp, qt_choice in components:
print(f"\nTesting {comp} component:")
test = JPEGTest()
input_block = test.read_block(f"hw_output/{comp.lower()}_block_0.txt")
# DCT
dct = JPEGDCT()
sw_dct = dct.process_block(input_block)
# Quantization
quant = JPEGQuantization(qt_choice=qt_choice)
sw_quant = quant.quantize(sw_dct)
# Zigzag
zigzag = JPEGZigzag()
sw_zigzag = zigzag.scan(sw_quant)
compare_results(f"hw_output/chisel_dct_{comp}.txt", sw_dct, f"{comp}_DCT")
compare_results(f"hw_output/chisel_quant_{comp}.txt", sw_quant, f"{comp}_Quantization")
compare_results(f"hw_output/chisel_zigzag_{comp}.txt", sw_zigzag, f"{comp}_Zigzag")
except Exception as e:
print(f"Error during testing: {e}")
raise
def main():
test_full_pipeline()
def generate_comparison_report(input_name):
print(f"\n=== Comparison Report for {input_name} ===")
print("\nSoftware Output:")
print("\nHardware Output:")
print("\nDifference Analysis:")
if __name__ == "__main__":
main()
Run the command below
$ sbt "runMain jpeg.VerilogGenerator"
Generate JPEGEncodeChisel.v
, JPEGEncodeChisel.fir
and JPEGEncodeChisel.anno.json
.
Run
$ verilator --cc ./verilog_output/JPEGEncodeChisel.v --exe testbench.cpp --trace
$ make -j -C obj_dir -f VJPEGEncodeChisel.mk VJPEGEncodeChisel