Object Detection with OFA-YOLO

This section focuses on the Python implementation of the OFA-YOLO network from the Vitis AI 3.0 Model Zoo for real-time object detection, specifically optimized to run on the DPUCZDX8G Deep Learning Processing Unit (DPU). Utilizing Python, I present a robust implementation of a Python class that integrates several essential functionalities, including image preprocessing, data inference, and parsing of the network’s output with non-maximum suppression. Additionally, performance metrics are provided, comparing the results from both the full and pruned versions of the network.

The accuracy of the object detection model is evaluated using the COCO dataset, with performance metrics derived through the pycocotools library. In addition to accuracy, I provide throughput benchmarks by evaluating multithreaded video stream processing in the Python implementation. For comparison, throughput results are also presented for the C++ implementation from the Xilinx Vitis AI 3.0 GitHub repository.

All tests are carried out on a Trenz TE0820-03-2AI21FA (ZU+ 2CG) module mounted on a TE0703-06 carrier board, with a Logitech C270 USB camera used for capturing video streams. The prerequisites for these tests include a Linux environment with Vitis AI 3.0 libraries already integrated and properly configured. For step-by-step guidance, check out the previous pages of the website:

All resources, including the optimized Python class for the OFA-YOLO network, the multithreading Python implementation for enhancing real-time object detection, the script for accuracy estimation, and the README markdown file, are available as paid content. Recompiled xmodels for the B1152 DPU architecture also are provided.

To accelerate your Deep Learning journey on Zynq Ultrascale+ MPSoCs, feel free to contact me to obtain the sources with a small donation. Alternatively, you can do optimizations yourself using the free, non-optimized Python script of OFA-YOLO implementation available on this webpage.

To better understand how to leverage the OFA-YOLO network deployed on the DPU, let’s consider the design of YOLOv3, focusing on its input image preprocessing and the parsing and decoding of its output layers. The input image for the OFA-YOLO network is a normalized and scaled image with dimensions 640x640x3. The network produces three output layers with different dimensions (20x20, 40x40, 80x80) for detecting small, medium, and large objects. Each output layer contains three prediction boxes, corresponding to three anchors. This results in 255 values for each of the three boxes per grid cell in the output layer: four bounding box coordinates, one objectness score, and 80 class scores.

Therefore, to utilize the deployed OFA-YOLO network on the DPU, the input image should be preprocessed (scaling and normalization) before quantization to the int8 format. For the output layers, the process should be reversed: the output should be dequantized and scaled from int8 to float. Afterward, the data should be parsed into meaningful bounding box coordinates, objectness scores, and class scores. Information about the scaling, quantization, and dequantization factors, as well as anchor dimensions and the normalization factor, can be retrieved from the input/output tensors after DPU initialization and found in the prototxt file.

input/output tensors
    
    dpu_runner = vart.Runner.create_runner(subgraphs[0], "run")
    input_tensors = dpu_runner.get_input_tensors()
    output_tensors = dpu_runner.get_output_tensors()
    print(input_tensors[0])
    print(output_tensors[0])
    print(output_tensors[1])
    print(output_tensors[2])

    {name: 'ofa_yolo_50__ofa_yolo_50_QuantStub_quant__input_1_fix', shape: [1, 640, 640, 3], type: 'xint8', attrs: {'location': 1, 'ddr_addr': 3872, 'reg_id': 2, 'fix_point': 6, 'if_signed': True, 'round_mode': 'DPU_ROUND', 'bit_width': 8}}
    
    {name: 'ofa_yolo_50__ofa_yolo_50_Model_model__Conv2d_module_240__ip_1_fix', shape: [1, 80, 80, 255], type: 'xint8', attrs: {'location': 1, 'ddr_addr': 0, 'reg_id': 3, 'fix_point': 3, 'if_signed': True, 'bit_width': 8, 'round_mode': 'DPU_ROUND'}}
    {name: 'ofa_yolo_50__ofa_yolo_50_Model_model__Conv2d_module_241__ip_3_fix', shape: [1, 40, 40, 255], type: 'xint8', attrs: {'location': 1, 'ddr_addr': 1632000, 'bit_width': 8, 'round_mode': 'DPU_ROUND', 'reg_id': 3, 'fix_point': 4, 'if_signed': True}}
    {name: 'ofa_yolo_50__ofa_yolo_50_Model_model__Conv2d_module_242__ip_fix', shape: [1, 20, 20, 255], type: 'xint8', attrs: {'location': 1, 'ddr_addr': 2040000, 'reg_id': 3, 'fix_point': 4, 'round_mode': 'DPU_ROUND', 'bit_width': 8, 'if_signed': True}}
    
  
ofa_yolo_pt.prototxt
    
model {
  kernel {
     mean: 0.0
     mean: 0.0
     mean: 0.0
     scale: 0.00392156
     scale: 0.00392156
     scale: 0.00392156
  }
  model_type : YOLOv5
  yolo_v5_param {
    stride:8
    stride:16
    stride:32
    max_boxes_num:30000
    max_nms_num:300
    yolo_param {
      num_classes: 80
    anchorCnt: 3
    layer_name: "_242__"
    layer_name: "_241__"
    layer_name: "_240__"
    conf_threshold: 0.5
    nms_threshold: 0.65
    biases: 10
    biases: 13
    biases: 16
    biases: 30
    biases: 33
    biases: 23
    biases: 30
    biases: 61
    biases: 62
    biases: 45
    biases: 59
    biases: 119
    biases: 116
    biases: 90
    biases: 156
    biases: 198
    biases: 373
    biases: 326
    test_mAP: false
  }
}
}
    
  

After parsing and thresholding the data in the output layers based on the confidence threshold, Non-Maximum Suppression (NMS) should be applied to remove redundant boxes according to the predefined NMS threshold.

Show Python Code
    

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def calculate_iou(box1, box2):
    x1, y1, x2, y2 = box1
    x1_b, y1_b, x2_b, y2_b = box2
    inter_x1 = max(x1, x1_b)
    inter_y1 = max(y1, y1_b)
    inter_x2 = min(x2, x2_b)
    inter_y2 = min(y2, y2_b)
    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2_b - x1_b) * (y2_b - y1_b)
    union_area = box1_area + box2_area - inter_area
    return inter_area / union_area if union_area > 0 else 0

decoded_boxes = []
for layer_idx, output in enumerate(reshaped_outputs):
	grid_height, grid_width, num_anchors, _ = output.shape
	anchors_layer = np.array(anchors[layer_idx]).reshape(-1, 2)
	tx, ty, tw, th, confidence = np.split(output[..., :5], 5, axis=-1)
	class_scores = output[..., 5:]
	confidence = sigmoid(confidence)
	class_scores = sigmoid(class_scores)
	for h in range(grid_height):
		for w in range(grid_width):
			for anchor_idx in range(num_anchors):
				
				tx_val = tx[h, w, anchor_idx]
				ty_val = ty[h, w, anchor_idx]
				tw_val = tw[h, w, anchor_idx]
				th_val = th[h, w, anchor_idx]
				conf_val = confidence[h, w, anchor_idx]

				if conf_val <= conf_threshold:
					continue

				
				bx = (w + sigmoid(tx_val)) / grid_width
				by = (h + sigmoid(ty_val)) / grid_height
				bw = np.exp(tw_val) * anchors_layer[anchor_idx, 0] / 640
				bh = np.exp(th_val) * anchors_layer[anchor_idx, 1] / 640

				
				x_min = (bx - bw / 2) * 640
				y_min = (by - bh / 2) * 640
				x_max = (bx + bw / 2) * 640
				y_max = (by + bh / 2) * 640

				
				class_probs = class_scores[h, w, anchor_idx, :]

				
				decoded_boxes.append({
					"box": [x_min, y_min, x_max, y_max],
					"confidence": conf_val,
					"class_probs": class_probs,
				})

decoded_boxes.sort(key=lambda x: x["confidence"], reverse=True)
nms_boxes = []
while decoded_boxes:
	best_box = decoded_boxes.pop(0)
	nms_boxes.append(best_box)
	decoded_boxes = [
		box for box in decoded_boxes if calculate_iou(best_box["box"], box["box"]) < nms_threshold
	]
    
  

Last but not least, the box coordinates should be scaled back to the original image size during rendering.

Show Python Code
    
image_height, image_width = frame_raw.shape[:2]
scale_x = 640 / image_width
scale_y = 640 / image_height
pad_x = (640 - image_width * scale_x) / 2
pad_y = (640 - image_height * scale_y) / 2
for detection in nms_boxes:
	x_min, y_min, x_max, y_max = detection["box"]
	x_min = max(0, int((x_min - pad_x) / scale_x))
	y_min = max(0, int((y_min - pad_y) / scale_y))
	x_max = min(image_width, int((x_max - pad_x) / scale_x))
	y_max = min(image_height, int((y_max - pad_y) / scale_y))
	confidence = float(detection["confidence"])
	class_probs = detection["class_probs"]
	class_id = int(np.argmax(class_probs))
	label = f"Class {class_id} ({confidence:.2f})"
	cv2.rectangle(frame_raw, (int(x_min), int(y_min)), (int(x_max), int(y_max)), (0, 255, 0), 2)
	cv2.putText(frame_raw, label, (int(x_min), int(y_min - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
    
  

Thus, the structure of a Python class utilizing the algorithms mentioned above can be represented by the following image:

Initialization (__init__):

  • Configures anchors and model-specific parameters such as strides, scaling factors, and fixed-point adjustments.

  • Sets thresholds for confidence and Non-Maximum Suppression (NMS).

  • Prepares internal structures for input and output tensor management.

    Preprocessing (preprocess):

  • Resizes and normalizes input images.

  • Quantizes the image into INT8 format based on the fixed-point scaling factor of the input tensor.

    Inference (run_dpu):

  • Feeds the preprocessed input into the DPU runner.

  • Executes the inference asynchronously and retrieves the raw outputs from the DPU.

    Postprocessing (postprocess):

  • Dequantizes and decodes the raw DPU outputs into bounding box coordinates, confidence scores, and class probabilities.

  • Filters detections based on the confidence threshold and applies NMS to eliminate overlapping boxes.

    Visualization (draw_detections):

  • Maps the decoded coordinates back to the original image dimensions

  • Draws bounding boxes, class labels, and confidence scores for visualization.

Performance evaluation was conducted across three OFA-YOLO models: the full model and two pruned models with 30% and 50% sparsity.

The accuracy metrics, such as Average Precision (AP) and Average Recall (AR), were calculated using standard COCO metrics with the pycocotools library. The results confirmed that the full model (ofa_yolo_pt) clearly outperforms the pruned models in both AP and AR across all object sizes. This aligns with expectations, as pruning reduces model size but negatively impacts detection performance, particularly for small and medium objects.

For throughput estimation, I evaluated both my Python multithreaded implementation of OFA-YOLO and a multithreaded C++ example from the Vitis AI Xilinx GitHub repository using the same three models. The C++ implementation demonstrates reduced inference time—approximately 20 milliseconds less per model—compared to the Python implementation. The Inference Time - time required to upload data into DPU runner and retrieve it back.

As a result, the C++ implementation achieves higher throughput than the Python implementation. Consistent with expectations, pruning improves throughput due to the smaller model size, but this comes at the cost of reduced accuracy.

Below, I provide the Python code for the non-optimized version of the OFA-YOLO detector, using a USB camera as the input video source. This implementation has low throughput (about 10 times slower compared to the multithreading implementation) and lower accuracy. As a challenge, you may try to optimize it yourself, or alternatively, you can request the ready-to-use multithreading implementation, including accuracy estimation, from me. This optimized version will definitely save you time and effort.

Python Code for non optimized ofa-yolo detector
    
import cv2
import numpy as np
import vart
import xir
import math
import time

def sigmoid(x):
        return 1 / (1 + np.exp(-x))

def calculate_iou(box1, box2):
    x1, y1, x2, y2 = box1
    x1_b, y1_b, x2_b, y2_b = box2
    inter_x1 = max(x1, x1_b)
    inter_y1 = max(y1, y1_b)
    inter_x2 = min(x2, x2_b)
    inter_y2 = min(y2, y2_b)
    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2_b - x1_b) * (y2_b - y1_b)
    union_area = box1_area + box2_area - inter_area
    return inter_area / union_area if union_area > 0 else 0

def get_child_subgraph_dpu(graph):
        root_subgraph = graph.get_root_subgraph()
        child_subgraphs = root_subgraph.toposort_child_subgraph()
        return [cs for cs in child_subgraphs if cs.has_attr("device") and cs.get_attr("device").upper() == "DPU"]

def main():
    model_path = "/home/root/work/B1152/ofa_yolo_pruned_0_50_pt.xmodel"
    anchors = [
        [(10, 13), (16, 30), (33, 23)],
        [(30, 61), (62, 45), (59, 119)],
        [(116, 90), (156, 198), (373, 326)],
    ]
    num_classes = 80
    output_shapes = [(80, 80), (40, 40), (20, 20)]
    scale_factor = 0.00392156
    conf_threshold = 0.2
    nms_threshold = 0.2
    inputId = 0

    cam = cv2.VideoCapture(inputId)
    cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 360)
    if not cam.isOpened():
        print("[ERROR] Failed to open camera ", inputId)
        exit()

    graph = xir.Graph.deserialize(model_path)
    subgraphs = get_child_subgraph_dpu(graph)
    assert len(subgraphs) == 1, "Expected exactly one DPU subgraph."
    dpu_runner = vart.Runner.create_runner(subgraphs[0], "run")

    input_tensors = dpu_runner.get_input_tensors()
    output_tensors = dpu_runner.get_output_tensors()

    fpga_input = [np.empty(input_tensors[0].dims, dtype=np.int8)]
    fpga_output = [np.empty(tuple(tensor.dims[1:]), dtype=np.int8, order="C") for tensor in output_tensors]

    fix_i = 2 ** input_tensors[0].get_attr("fix_point") * scale_factor
    fix_o = [1 / (2 ** tensor.get_attr("fix_point")) for tensor in output_tensors]

    frame_count = 0
    capture_time_total = 0
    preprocess_time_total = 0
    inference_time_total = 0
    postprocess_time_total = 0
    nms_time_total = 0
    draw_time_total = 0
    start_time = time.time()
    try:
        while True:
            capture_start = time.time()            
            ret, frame_raw = cam.read()
            capture_end = time.time()
            if not ret:
                break
            capture_time_total += (capture_end - capture_start)

            preprocess_start = time.time()
            frame = cv2.resize(frame_raw, (640, 640), interpolation=cv2.INTER_LINEAR)
            img_quantized = np.round(frame.astype(np.float32) * fix_i).astype(np.int8)
            preprocess_end = time.time()
            preprocess_time_total += (preprocess_end - preprocess_start)

            inference_start = time.time()
            fpga_input[0][0, ...] = img_quantized
            job_id = dpu_runner.execute_async(fpga_input, fpga_output)
            dpu_runner.wait(job_id)
            inference_end = time.time()
            inference_time_total += (inference_end - inference_start)

            postprocess_start = time.time()
            scaled_outputs = [
            (fpga_output[idx].astype(np.float32) * fix_o[idx]).reshape(tuple(tensor.dims))  
            for idx, tensor in enumerate(output_tensors)]

            reshaped_outputs = []
            for idx, scaled_output in enumerate(scaled_outputs):
                grid_height, grid_width = output_shapes[idx]
                expected_shape = (grid_height, grid_width, len(anchors[idx]), 5 + num_classes)
                reshaped_outputs.append(scaled_output.reshape(expected_shape))

            decoded_boxes = []
            for layer_idx, output in enumerate(reshaped_outputs):
                grid_height, grid_width, num_anchors, _ = output.shape
                anchors_layer = np.array(anchors[layer_idx]).reshape(-1, 2)
                tx, ty, tw, th, confidence = np.split(output[..., :5], 5, axis=-1)
                class_scores = output[..., 5:]
                confidence = sigmoid(confidence)
                class_scores = sigmoid(class_scores)
                for h in range(grid_height):
                    for w in range(grid_width):
                        for anchor_idx in range(num_anchors):
                            
                            tx_val = tx[h, w, anchor_idx]
                            ty_val = ty[h, w, anchor_idx]
                            tw_val = tw[h, w, anchor_idx]
                            th_val = th[h, w, anchor_idx]
                            conf_val = confidence[h, w, anchor_idx]

                            if conf_val <= conf_threshold:
                                continue

                            
                            bx = (w + sigmoid(tx_val)) / grid_width
                            by = (h + sigmoid(ty_val)) / grid_height
                            bw = np.exp(tw_val) * anchors_layer[anchor_idx, 0] / 640
                            bh = np.exp(th_val) * anchors_layer[anchor_idx, 1] / 640

                            
                            x_min = (bx - bw / 2) * 640
                            y_min = (by - bh / 2) * 640
                            x_max = (bx + bw / 2) * 640
                            y_max = (by + bh / 2) * 640

                            
                            class_probs = class_scores[h, w, anchor_idx, :]

                            
                            decoded_boxes.append({
                                "box": [x_min, y_min, x_max, y_max],
                                "confidence": conf_val,
                                "class_probs": class_probs,
                            })
            postprocess_end = time.time()
            postprocess_time_total += (postprocess_end - postprocess_start)
            nms_start = time.time()
            decoded_boxes.sort(key=lambda x: x["confidence"], reverse=True)
            nms_boxes = []
            while decoded_boxes:
                best_box = decoded_boxes.pop(0)
                nms_boxes.append(best_box)
                decoded_boxes = [
                    box for box in decoded_boxes if calculate_iou(best_box["box"], box["box"]) < nms_threshold
                ]
            nms_end = time.time()
            nms_time_total += (nms_end - nms_start)
            draw_start = time.time()
            image_height, image_width = frame_raw.shape[:2]
            scale_x = 640 / image_width
            scale_y = 640 / image_height
            pad_x = (640 - image_width * scale_x) / 2
            pad_y = (640 - image_height * scale_y) / 2
            for detection in nms_boxes:
                x_min, y_min, x_max, y_max = detection["box"]
                x_min = max(0, int((x_min - pad_x) / scale_x))
                y_min = max(0, int((y_min - pad_y) / scale_y))
                x_max = min(image_width, int((x_max - pad_x) / scale_x))
                y_max = min(image_height, int((y_max - pad_y) / scale_y))
                confidence = float(detection["confidence"])
                class_probs = detection["class_probs"]
                class_id = int(np.argmax(class_probs))
                label = f"Class {class_id} ({confidence:.2f})"
                cv2.rectangle(frame_raw, (int(x_min), int(y_min)), (int(x_max), int(y_max)), (0, 255, 0), 2)
                cv2.putText(frame_raw, label, (int(x_min), int(y_min - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
            draw_end = time.time()
            draw_time_total += (draw_end - draw_start)
            frame_count += 1
            current_time = time.time()
            elapsed_time = current_time - start_time
            fps = frame_count / elapsed_time
            cv2.putText(frame_raw, f"FPS: {fps:.2f}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (20, 20, 180), 2)
            cv2.imshow("Non-optimized ofa-yolo detector", frame_raw)
            key = cv2.waitKey(1) & 0xFF
            if key == 27:  # Press 'ESC' to exit
                break
    finally:
        cam.release()
        cv2.destroyAllWindows()
        print("\n[INFO] Average Timings per Frame (in ms):")
        print(f"[INFO] {'Frame Capture:':<20} {capture_time_total / frame_count * 1000:.2f} ms")
        print(f"[INFO] {'Preprocessing:':<20} {preprocess_time_total / frame_count * 1000:.2f} ms")
        print(f"[INFO] {'Run DPU:':<20} {inference_time_total / frame_count * 1000:.2f} ms")
        print(f"[INFO] {'Postprocessing:':<20} {postprocess_time_total / frame_count * 1000:.2f} ms")
        print(f"[INFO] {'NMS filtering:':<20} {nms_time_total / frame_count * 1000:.2f} ms")
        print(f"[INFO] {'Rendering/Draw:':<20} {draw_time_total / frame_count * 1000:.2f} ms")
        print(f"[INFO] {'Overall FPS:':<20} {fps:.2f}")

if __name__ == "__main__":
    main()
    
  

Conclusion. In this section, I explored the Python implementation of the OFA-YOLO network from the Vitis AI 3.0 Model Zoo, optimized for real-time object detection on the DPUCZDX8G Deep Learning Processing Unit. By integrating essential functionalities such as image preprocessing, data inference, and output parsing with non-maximum suppression, this implementation provides a robust solution for embedded systems. I also evaluated the accuracy of the model using the COCO dataset and presented performance benchmarks, comparing both the full and pruned network versions, as well as throughput results for the multithreaded Python implementation and the C++ implementation from Xilinx Vitis AI.

The tests were conducted on the Trenz TE0820-03-2AI21FA module with a Logitech C270 USB camera, demonstrating the potential of the OFA-YOLO network for embedded real-time object detection. While the optimized multithreading implementation provides a significant performance boost, you have the option to either use the free, non-optimized version available on this webpage or request the ready-to-use, optimized implementation with accuracy estimation to save time and effort.

By following the guidance and resources provided, you can accelerate your deep learning projects on Zynq Ultrascale+ MPSoCs and efficiently deploy the OFA-YOLO network in real-world applications. For further assistance and to obtain the optimized resources, feel free to contact me.