owned this note
owned this note
Published
Linked with GitHub
```as=
import argparse
import time
from pathlib import Path
import cv2
import torch
import torch.backends.cudnn as cudnn
from numpy import random
from models.experimental import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.general import check_img_size, check_requirements, check_imshow, non_max_suppression, apply_classifier, \
scale_coords, xyxy2xywh, strip_optimizer, set_logging, increment_path
from utils.plots import plot_one_box
from utils.torch_utils import select_device, load_classifier, time_synchronized, TracedModel
# modified
from threading import Thread
from queue import Queue
import numpy as np
import os
import math
# show_3D
# import matplotlib.pyplot as plt
# import matplotlib
# matplotlib.use( 'tkagg' )
# from mpl_toolkits import mplot3d
rally = 1
show_point_frame = 60
show_point=show_point_frame
getPoint = False
ball_end = False
ball_end_buffer = 600
template_ball_end_buffer = 600
twist=False
#output xyz coor
x=0
y=0
z=0
diffy=0
diffz=0
#StartBall
startBall = False
startCD = 80
startCDFrame = 80
#Smash
GSmash = False
HSmash = False
HSmashCD = False
GSmashCD = False
smashbufferH = 0
smashbufferG = 0
smashFrame = 300
count = 0
confirmSmashFrame = 1
showGSmash = 120 #120fps
showHSmash = 120
# show Ground
show_ground_frame = 240 # show show_ground_frame/fps (sec)
show_cnt = show_ground_frame # timer
# detect Ground
real_ground = False
threshold = 0.3
check_ground_flag = True # enable to detect ground = True
groundball_CD = 240 # ground ball Cooldown(CD)
groundball_CD_cnt = groundball_CD # timer
def detect(source, shift, queue_2d, circles_all, real_ground_list, save_img=False):
weights, view_img, save_txt, imgsz, trace = opt.weights, opt.view_img, opt.save_txt, opt.img_size, not opt.no_trace
save_img = not opt.nosave and not source.endswith(
'.txt') # save inference images
webcam = source.isnumeric() or source.endswith('.txt') or source.lower().startswith(
('rtsp://', 'rtmp://', 'http://', 'https://'))
# Directories
save_dir = Path(increment_path(Path(opt.project) / opt.name,
exist_ok=opt.exist_ok)) # increment run
(save_dir / 'labels' if save_txt else save_dir).mkdir(parents=True,
exist_ok=True) # make dir
# Initialize
set_logging()
device = select_device(opt.device)
half = device.type != 'cpu' # half precision only supported on CUDA
# Load model
model = attempt_load(weights, map_location=device) # load FP32 model
stride = int(model.stride.max()) # model stride
imgsz = check_img_size(imgsz, s=stride) # check img_size
if trace:
model = TracedModel(model, device, opt.img_size)
if half:
model.half() # to FP16
# Second-stage classifier
classify = False
if classify:
modelc = load_classifier(name='resnet101', n=2) # initialize
modelc.load_state_dict(torch.load(
'weights/resnet101.pt', map_location=device)['model']).to(device).eval()
# Set Dataloader
vid_path, vid_writer = None, None
if webcam:
view_img = check_imshow()
cudnn.benchmark = True # set True to speed up constant image size inference
dataset = LoadStreams(source, img_size=imgsz, stride=stride)
else:
dataset = LoadImages(source, img_size=imgsz, stride=stride)
# Get names and colors
names = model.module.names if hasattr(model, 'module') else model.names
colors = [[random.randint(0, 255) for _ in range(3)] for _ in names]
# Run inference
if device.type != 'cpu':
model(torch.zeros(1, 3, imgsz, imgsz).to(device).type_as(
next(model.parameters()))) # run once
old_img_w = old_img_h = imgsz
old_img_b = 1
t0 = time.time()
# 10/13
for path, img, im0s, vid_cap in dataset:
###
VIDEO_END = dataset.frame == dataset.nframes
# print(VIDEO_END)
if dataset.frame > shift:
###
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Warmup
if device.type != 'cpu' and (old_img_b != img.shape[0] or old_img_h != img.shape[2] or old_img_w != img.shape[3]):
old_img_b = img.shape[0]
old_img_h = img.shape[2]
old_img_w = img.shape[3]
for i in range(3):
model(img, augment=opt.augment)[0]
# Inference
t1 = time_synchronized()
with torch.no_grad(): # Calculating gradients would cause a GPU memory leak
pred = model(img, augment=opt.augment)[0]
t2 = time_synchronized()
# Apply NMS
pred = non_max_suppression(
pred, opt.conf_thres, opt.iou_thres, classes=opt.classes, agnostic=opt.agnostic_nms)
t3 = time_synchronized()
# Apply Classifier
if classify:
pred = apply_classifier(pred, modelc, img, im0s)
# Process detections
for i, det in enumerate(pred): # detections per image
if webcam: # batch_size >= 1
p, s, im0, frame = path[i], '%g: ' % i, im0s[i].copy(
), dataset.count
else:
p, s, im0, frame = path, '', im0s, getattr(
dataset, 'frame', 0)
p = Path(p) # to Path
save_path = str(save_dir / p.name) # img.jpg
txt_path = str(save_dir / 'labels' / p.stem) + \
('' if dataset.mode == 'image' else f'_{frame}') # img.txt
# normalization gain whwh
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]
if len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(
img.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
# add to string
s += f"{n} {names[int(c)]}{'s' * (n > 1)}, "
########
# print("output", source, det)
queue_2d.put((vid_cap, VIDEO_END, det))
########
# Write results
for *xyxy, conf, cls in reversed(det):
if save_txt: # Write to file
xywh = (xyxy2xywh(torch.tensor(xyxy).view(
1, 4)) / gn).view(-1).tolist() # normalized xywh
# label format
line = (
cls, *xywh, conf) if opt.save_conf else (cls, *xywh)
with open(txt_path + '.txt', 'a') as f:
f.write(('%g ' * len(line)).rstrip() %
line + '\n')
if save_img or view_img: # Add bbox to image
label = f'{names[int(cls)]} {conf:.2f}'
plot_one_box(xyxy, im0, label=label,
color=colors[int(cls)], line_thickness=1)
cen1 = int((int(xyxy[0]) + int(xyxy[2]))/2)
cen2 = int((int(xyxy[1]) + int(xyxy[3]))/2)
# print("append ", cen1, cen2)
circles_all.append((cen1, cen2))
else: # put None into queue 1016
queue_2d.put((vid_cap, VIDEO_END, None))
# draw trajectory
for (cx, cy) in circles_all:
cv2.circle(im0, (cx, cy), 5, color=(255, 133, 233),
thickness=-1) # -1 = filled circle
global getPoint
global show_point_frame
global show_point
global rally
cv2.putText(im0, "Rally:" + str(rally), (100, 100), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
global ball_end
global ball_end_buffer
global template_ball_end_buffer
if ball_end:
ball_end_buffer -=1
if(ball_end_buffer == 0):
ball_end=False
ball_end_buffer = template_ball_end_buffer
if getPoint:
getPoint = False
show_point -= 1
if show_point < show_point_frame and show_point > 0 :
show_point -= 1
elif show_point == 0:
show_point= show_point_frame
else:
show_point = show_point_frame
global real_ground
global show_cnt
global check_ground_flag
global groundball_CD
global groundball_CD_cnt
if real_ground and check_ground_flag:
print("real Ground")
try:
real_ground_list.append(circles_all[-1])
print("add to list", real_ground_list[-1])
except IndexError:
print("list index out of range")
pass
# real_ground_list.append(circles_all[-1])
# print("add to list", real_ground_list[-1])
# print("append: ", circles_all.pop())
circles_all.clear()
# print("clear list")
real_ground = False # reset status
check_ground_flag = False # wait until CD
show_cnt -= 1 # start showing "Ground" (show_ground_frame/fps (sec))
groundball_CD_cnt -= 1 # start CD timer
# draw ground point in video
for (gx, gy) in real_ground_list:
cv2.circle(im0, (gx, gy), 5, color=(255, 133, 233),
thickness=-1) # -1 = filled circle
# print Ground
if show_cnt < show_ground_frame and show_cnt > 0:
# cv2.putText(im0, "Ground!", (100, 200),
# cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
show_cnt -= 1
elif show_cnt == 0:
show_cnt = show_ground_frame
else:
show_cnt = show_ground_frame
if groundball_CD_cnt < groundball_CD and groundball_CD_cnt > 0:
groundball_CD_cnt -= 1
check_ground_flag = False
elif groundball_CD_cnt == 0:
groundball_CD_cnt = groundball_CD
check_ground_flag = True
else:
groundball_CD_cnt = groundball_CD
check_ground_flag = True
#output xyz coor
global x
global y
global z
global diffy
global diffz
# cv2.putText(im0, "X:" + str(x), (100, 100), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
# cv2.putText(im0, "Y:" + str(y) + " " + str(diffy), (100, 200), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
# cv2.putText(im0, "Z:" + str(z) + " " + str(diffz), (100, 300), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
#Smash
global GSmash
global HSmash
global showGSmash
global showHSmash
if(GSmash or showGSmash != 120):
cv2.putText(im0, "Guest Smash",(400, 100), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
GSmash = False
showGSmash -= 1
if(showGSmash == 0):
showGSmash = 120
if(HSmash or showHSmash != 120):
cv2.putText(im0, "Home Smash", (400, 100), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
HSmash = False
showHSmash -= 1
if(showHSmash == 0):
showHSmash = 120
with open("ball_coordinate.txt", 'a') as f:
f.write(str(diffy)+ " " + str(diffz) +'\n')
global twist
if(twist):
cv2.putText(im0, "Twist", (100, 100), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2, cv2.LINE_AA)
twist = False
# Print time (inference + NMS)
# print(f'{s}Done. ({(1E3 * (t2 - t1)):.1f}ms) Inference, ({(1E3 * (t3 - t2)):.1f}ms) NMS')
# Stream results
if view_img:
cv2.imshow(str(p), im0)
cv2.waitKey(1) # 1 millisecond
# Save results (image with detections)
if save_img:
if dataset.mode == 'image':
cv2.imwrite(save_path, im0)
print(
f" The image with the result is saved in: {save_path}")
else: # 'video' or 'stream'
if vid_path != save_path: # new video
vid_path = save_path
if isinstance(vid_writer, cv2.VideoWriter):
vid_writer.release() # release previous video writer
if vid_cap: # video
fps = vid_cap.get(cv2.CAP_PROP_FPS)
w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
else: # stream
fps, w, h = 30, im0.shape[1], im0.shape[0]
save_path += '.mp4'
vid_writer = cv2.VideoWriter(
save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
vid_writer.write(im0)
print('{}: ({}/{})'.format(os.path.basename(source),
dataset.frame, dataset.nframes))
# with open("ball_coordinate.txt", 'a') as f:
# f.write('{}: ({}/{})'.format(os.path.basename(source), dataset.frame, dataset.nframes)+ '\n')
if save_txt or save_img:
s = f"\n{len(list(save_dir.glob('labels/*.txt')))} labels saved to {save_dir / 'labels'}" if save_txt else ''
print(f"Results saved to {save_dir}{s}")
print(f'Done. ({time.time() - t0:.3f}s)')
def project_3d_position(queues_2d, queue_3d):
from scipy.spatial import distance
OPEN = True
VIDEO_END = False
prev_view1 = [0, 0, 0, 0]
prev_view2 = [0, 0, 0, 0]
while OPEN and not VIDEO_END:
view1_cap, VIDEO_END1, ball_det_view1 = queues_2d[0].get()
view2_cap, VIDEO_END2, ball_det_view2 = queues_2d[1].get()
if (not ball_det_view1 is None) and (not ball_det_view2 is None):
ball1_bbx = ball_det_view1.detach().cpu().numpy()[0][:4]
ball2_bbx = ball_det_view2.detach().cpu().numpy()[0][:4]
# bladebaby0326
# ball_det_view1_numpy = ball_det_view1.detach().cpu().numpy()
# ball_det_view2_numpy = ball_det_view2.detach().cpu().numpy()
# min_distance = 999999
# for det in ball_det_view1_numpy:
# distance_to_prev_ball = distance.euclidean(det[:4], prev_view1)
# if distance_to_prev_ball < min_distance:
# min_distance = distance_to_prev_ball
# ball1_bbx = det[:4]
# min_distance = 999999
# for det in ball_det_view2_numpy:
# distance_to_prev_ball = distance.euclidean(det[:4], prev_view2)
# if distance_to_prev_ball < min_distance:
# min_distance = distance_to_prev_ball
# ball2_bbx = det[:4]
# prev_view1 = ball1_bbx
# prev_view2 = ball2_bbx
# there may be more than one ball in the view,
# we choose the ball with highest confidence
ball1_bbx_xywh = np.array([(ball1_bbx[0]+ball1_bbx[2])/2,
(ball1_bbx[1]+ball1_bbx[3])/2,
ball1_bbx[2]-ball1_bbx[0],
ball1_bbx[3]-ball1_bbx[1]]).astype(int)
ball2_bbx_xywh = np.array([(ball2_bbx[0]+ball2_bbx[2])/2,
(ball2_bbx[1]+ball2_bbx[3])/2,
ball2_bbx[2]-ball2_bbx[0],
ball2_bbx[3]-ball2_bbx[1]]).astype(int)
# print(ball1_bbx, ball2_bbx)
# print(ball1_bbx_xywh, ball2_bbx_xywh)
court, b = draw2court(ball1_bbx_xywh, ball2_bbx_xywh,
reRun=True,
srcPts_1=getUVL_refPoint('srcPts_1'),
srcPts_2=getUVL_refPoint('srcPts_2'),
dstPts=getUVL_refPoint('dstPts'))
print(b) # print ball x y z
# save x y z to file ball_coordinate.txt
# with open("ball_coordinate.txt", 'a') as f:
# f.write(str(b)+ '\n')
VIDEO_END = VIDEO_END1 or VIDEO_END2
queue_3d.put((view1_cap, view2_cap, VIDEO_END, b))
OPEN = view1_cap.isOpened() and view2_cap.isOpened()
else:
VIDEO_END = VIDEO_END1 or VIDEO_END2
queue_3d.put((view1_cap, view2_cap, VIDEO_END, [None, None, None]))
def getUVL_refPoint(get_points):
# 由發球位為起始點,順時針
# video 1
if get_points == 'srcPts_1':
return [
[1677,717], [1074,1055], [584,775], [443,693], [327,629],
[170,539], [694,460], [905,516], [1042,551], [1204,594]
]
elif get_points == 'srcPts_2':
return [
[747,1055], [122,739], [616,599], [788,553], [928, 512],
[1135,455], [1643,519], [1490,611], [1383,676], [1241,760]
]
elif get_points == 'dstPts':
return [
[9, 0, 0], [0, 0, 0], [0, 6, 0], [0, 9, 0], [0, 12, 0],
[0, 18, 0], [9, 18, 0], [9, 12, 0], [9, 9, 0], [9, 6, 0],
]
# video 5
# if get_points == 'srcPts_1':
# return [
# [1203, 938], [139, 838], [534, 631], [681, 553], [809, 487],
# [1019, 377], [1725, 390], [1606, 514], [1532, 592], [1443, 685]
# ]
# elif get_points == 'srcPts_2':
# return [
# [1888,711], [1242,895], [742,793], [535,750], [351,713],
# [31,648], [655,538], [989,583], [1179,609], [1385,635]
# ]
# elif get_points == 'dstPts':
# return [
# [9, 0, 0], [0, 0, 0], [0, 6, 0], [0, 9, 0], [0, 12, 0],
# [0, 18, 0], [9, 18, 0], [9, 12, 0], [9, 9, 0], [9, 6, 0],
# ]
def draw2court(view1_ball, view2_ball, isShow=False, reRun=True, saveProjPath=None,
srcPts_1=getUVL_refPoint('srcPts_1'), srcPts_2=getUVL_refPoint('srcPts_2'), dstPts=getUVL_refPoint('dstPts'),
dist=np.load("./cfg/calib.npz")["dist_coefs"], mtx=np.load("./cfg/calib.npz")["camera_matrix"]):
# def checkPts(img, pts):
# img = copy.deepcopy(img)
# for p in pts:
# if p is not None:
# print(p)
# cv2.circle(img, (p[0],p[1]), 10, (0, 255, 255), 2)
# cv2.namedWindow('img', cv2.WINDOW_NORMAL)
# while(1):
# cv2.imshow('img',img)
# k = cv2.waitKey(0) & 0xFF
# if k == 27:
# break
isProj = False
projBall = None
# view1ball = np.array([[view1_ball[0]+view1_ball[2]*0.5, view1_ball[1]+view1_ball[3]*0.5]], dtype=np.float32)
# view2ball = np.array([[view2_ball[0]+view2_ball[2]*0.5, view2_ball[1]+view2_ball[3]*0.5]], dtype=np.float32) # read img
view1pts = srcPts_1
view2pts = srcPts_2
view1pts = np.array([view1pts]).astype('float32')
view2pts = np.array([view2pts]).astype('float32')
if reRun:
dstPts_mul = np.concatenate((dstPts, dstPts), axis=0)
dstPts_mul = np.array([dstPts_mul]).astype('float32')
# # mtx = np.array([[1526.7906793957582, 0.0, 952.4275689356942],[0.0, 1531.318818224647, 548.4049095626559],[0.0, 0.0, 1.0]])
# calibrationParameter = np.load("calib.npz") # ['rms', 'camera_matrix', 'dist_coefs', 'rvecs', 'tvecs']
# dist = calibrationParameter["dist_coefs"]
# mtx = calibrationParameter["camera_matrix"]
dstPts_pnp = np.array([dstPts]).astype('float32')
# pdb.set_trace()
retval1, rvec1, tvec1 = cv2.solvePnP(dstPts_pnp, view1pts, mtx, dist)
r1, _ = cv2.Rodrigues(rvec1)
retval2, rvec2, tvec2 = cv2.solvePnP(dstPts_pnp, view2pts, mtx, dist)
r2, _ = cv2.Rodrigues(rvec2)
projM1 = np.matmul(mtx, np.concatenate((r1, tvec1), axis=1))
projM2 = np.matmul(mtx, np.concatenate((r2, tvec2), axis=1))
projPathM1 = 'projM1'
projPathM2 = 'projM2'
if not saveProjPath is None:
projPathM1 = saveProjPath+projPathM1
projPathM2 = saveProjPath+projPathM2
np.save(projPathM1, projM1)
np.save(projPathM2, projM2)
else:
projM1 = np.load(
'/media/jennychen/DATA/code/tools/ballTracking/projM1.npy')
projM2 = np.load(
'/media/jennychen/DATA/code/tools/ballTracking/projM2.npy')
view1ball = np.array([[view1_ball[0]+view1_ball[2]*0.5,
view1_ball[1]+view1_ball[3]*0.5]], dtype=np.float32)
view2ball = np.array([[view2_ball[0]+view2_ball[2]*0.5,
view2_ball[1]+view2_ball[3]*0.5]], dtype=np.float32) # read img
pts1 = np.concatenate((view1pts[0], view1ball), axis=0)
pts2 = np.concatenate((view2pts[0], view2ball), axis=0)
pts1 = np.transpose(pts1)
pts2 = np.transpose(pts2)
pts4D = cv2.triangulatePoints(projM1, projM2, pts1, pts2)
pts4D = pts4D[:, :] / pts4D[-1, :]
x, y, z, w = pts4D
# print('pts4D: ',pts4D)
# if isProj:
# np.savez("xyz.npz", x=x, y=y, z=z)
# # c = np.load("pts4D.npy")
# projBall = np.load("ballprojPlanePosition.npz")
# projBall = [projBall["x"],projBall["y"],projBall["z"]]
# if isShow:
# show3D(x,y,z,projBall)
ball = [x[-1], y[-1], z[-1]]
courtX = x[:-1]
courtY = y[:-1]
courtZ = z[:-1]
court = [courtX, courtY, courtZ]
return court, ball
def check_ball_in_court(ball_3d):
x = ball_3d[0]
y = ball_3d[1]
if(x is None or y is None):
return False
else:
return (0 < x < 9) & (0 < y < 18)
def direction_judge(COUNTINOUS_NUMBER, diff, ball_count, direction, True_index, xyz):
is_V = False
direction_diff = diff[ball_count-(COUNTINOUS_NUMBER-1):ball_count+1, xyz]
count = 0
for d in direction_diff:
if d > 0:
count += 1
if direction == None: ## initial state
if count >= COUNTINOUS_NUMBER:
direction = "+"
elif count == 0:
direction = "-"
else:
direction = None
elif direction == "-":
if count == COUNTINOUS_NUMBER: ## To +
True_index.append(ball_count-(COUNTINOUS_NUMBER-1))
is_V = True
direction = "+"
elif direction == "+":
if count == 0:
if xyz != 2: ### z not need
True_index.append(ball_count-(COUNTINOUS_NUMBER-1))
is_V = True
direction = "-"
return direction, True_index, is_V
def new_direction_judge(result, current_idx, origin_state):
global diffy
global diffz
# is_V = False
y1 = result[current_idx - 1][1]
y2 = result[current_idx][1]
z1 = result[current_idx - 1][2]
z2 = result[current_idx][2]
if(y1 != None and y2 != None):
diffy=(int((y1-y2)*100))
diffz=(int((z1-z2)*100))
#StartBall
global startBall
global startCD
global startCDFrame
if(y1 != None and y2 != None):
if(y1 < 5 or y1 > 13): #startArea
startBall = True
startCD = startCDFrame
startCD -= 1
print("start")
if(startCD != startCDFrame):
startCD -= 1
if(startCD == 0):
startCD = startCDFrame
startBall = False
#Smash
global GSmash
global HSmash
global HSmashCD
global GSmashCD
global smashbufferH
global smashbufferG
global smashFrame
global count
global confirmSmashFrame
if(HSmashCD):
print("HomeSmashCD")
smashbufferH += 1
if(smashbufferH == smashFrame):
smashbufferH = 0
HSmashCD = False
if(GSmashCD):
print("GuestSmashCD")
smashbufferG += 1
if(smashbufferG == smashFrame):
smashbufferG = 0
GSmashCD = False
if(y1 != None and y2 != None):
if(y1 > 6): #HSmash Area
if(not HSmashCD and not startBall):
if(diffy < -20):
count += 1
if(count == confirmSmashFrame):
HSmash = True
count = 0
HSmashCD = True
print("HomeSmash")
if(y1 < 12): #GSmash Area
if(not GSmashCD and not startBall):
if(diffy > 20):
count += 1
if(count == confirmSmashFrame):
GSmash = True
count = 0
GSmashCD = True
print("GuestSmash")
if z1 is None or z2 is None:
return origin_state
else:
if (z2-z1) >= 0:
state = '+'
else:
state = '-'
return state
def ground_threshold(ball_3d):
global threshold
x = ball_3d[0]
y = ball_3d[1]
z = ball_3d[2]
if x is None or y is None or z is None:
return False
else:
if z < threshold:
return True
else:
return False
def save_npy_file(queue_3d, npy_filename='../ball3d_result.npy', show_trajectory=False, write_trajectory_video=False):
result = []
global getPoint
global ball_end
count = 0
count_ball_out_of_court = 0
buffer_ball_in_court = 0
buffer_ball_in_frame = 60
global rally
VIDEO_END = False
OPEN = True
init_state = '+'
while OPEN and not VIDEO_END:
# print('count',count)
view1_cap, view2_cap, VIDEO_END, ball_3d = queue_3d.get()
ball_3d.append(count)
ball_3d = ball_3d.copy() # 避免影響到前面預測的數值
new_ball_3d = ball_3d
# result.append(ball_3d)
### Rally
if check_ball_in_court(ball_3d):
buffer_ball_in_court += 1
if buffer_ball_in_court >= buffer_ball_in_frame:
count_ball_out_of_court = 0 # 重置計數器
else:
count_ball_out_of_court += 1 # out of bound
circles_all[0].clear()
circles_all[1].clear()
print('ball_3d:', ball_3d)
print('out_of_court: ', count_ball_out_of_court)
print('rally: ', rally)
if count_ball_out_of_court == 160 and not ball_end: #120fps count_ball_out_of_court == 200 ,60fps count_ball_out_of_court == 60
rally += 1
getPoint = True
ball_end = True
buffer_ball_in_court = 0
circles_all[0].clear()
circles_all[1].clear()
###
#output xyz coor
global x
global y
global z
if(ball_3d[0] != None):
x=new_ball_3d[0]
y=new_ball_3d[1]
z=new_ball_3d[2]
print("X:"+str(x))
print("Y:"+str(y))
print("Z:"+str(z))
# detect ground(threshold)
global real_ground
global check_ground_flag
if(ground_threshold(ball_3d)):
real_ground = True
else:
real_ground = False
# save ground status to npy file
if new_ball_3d[0] is None and new_ball_3d[1] is None and new_ball_3d[2] is None:
new_ball_3d.append(False)
elif check_ground_flag and real_ground:
new_ball_3d.append(True)
else:
new_ball_3d.append(False)
print('new_ball_3d:', new_ball_3d)
result.append(new_ball_3d)
current_state = new_direction_judge(result, count, init_state)
print("current_state:", current_state)
if count % 50 == 0:
result_np = np.array(result, dtype=object)
# print('result_np', result_np)
np.save(npy_filename, result_np) # save file while running
if show_trajectory and len(result) > n_balls: # TAB - avoid closing the window too many times
n_balls = 500
tmp = np.array(result[-1*n_balls-1:], dtype=float)
diff = np.diff(tmp, axis=0) # x,y,z 位移
x_direction = None
y_direction = None
z_direction = None
x_True_index = []
y_True_index = []
z_True_index = []
COUNTINOUS_NUMBER = 10
for ball_count in range(len(diff)):
if ball_count >= COUNTINOUS_NUMBER-1:
x_direction, x_True_index, x_is_V = direction_judge(COUNTINOUS_NUMBER, diff, ball_count, x_direction, x_True_index, xyz=0)
y_direction, y_True_index, y_is_V = direction_judge(COUNTINOUS_NUMBER, diff, ball_count, y_direction, y_True_index, xyz=1)
z_direction, z_True_index, z_is_V = direction_judge(COUNTINOUS_NUMBER, diff, ball_count, z_direction, z_True_index, xyz=2)
global twist
if x_is_V or y_is_V or z_is_V:
print("Twist!!")
print("x",x_True_index)
print("y",y_True_index)
print("z",z_True_index)
twist = True
# show overvall 2D and 3D trajectory
show_3D_trajectory(result[-1*n_balls:])
# if write_trajectory_video:
# write_video_3D_trajectory(view1_cap, view2_cap, result)
OPEN = view1_cap.isOpened() and view2_cap.isOpened()
count += 1
np.save(npy_filename, result_np)
view1_cap.release()
view2_cap.release()
def write_video_3D_trajectory(view1_cap, view2_cap, trajectory): # not finished
trajectory = np.array(trajectory)
img1 = view1_cap.read()[1]
img1 = cv2.resize(img1, None, fx=0.3, fy=0.3)
img2 = view2_cap.read()[1]
img2 = cv2.resize(img2, None, fx=0.3, fy=0.3)
img_concate_Hori = np.concatenate((img1, img2), axis=1)
# cv2.imshow('concatenated_Hori',img_concate_Hori)
# cv2.waitKey(20) # crush if too small
def show_3D_trajectory(ball_trajectory):
'''
show_3D function can be used to show the final trajectory,
but cannot be used to show animation frame-by-frame
'''
# ball_trajectory = np.array(ball_trajectory)
# set fig parameters
fig = plt.figure(figsize=(12, 10))
gs = matplotlib.gridspec.GridSpec(12, 10)
# 3D Court
ax = plt.subplot(gs[:, :8], projection='3d')
ax.view_init(elev=5, azim=195)
ax.set_xlim(0, 9)
ax.set_ylim(-1, 19)
ax.set_zlim(-0.5, 5)
ax.set_xlabel('X Axis')
ax.set_ylabel('Y Axis')
ax.set_zlabel('Z Axis')
# 2D Court
ax2D = plt.subplot(gs[3:9, 8:])
ax2D.set_xlim(-1, 10)
ax2D.set_ylim(-3, 21)
ax2D.set_xlabel('X Axis')
ax2D.set_ylabel('Y Axis')
time = []
xdata = []
ydata = []
zdata = []
for x, y, z, n in ball_trajectory:
# print(x,y,z,n)
if (x is not None) and (y is not None) and (z is not None):
xdata.append(x)
ydata.append(y)
zdata.append(z)
time.append(n)
ax.scatter3D(xdata, ydata, zdata, c=time, cmap='gist_rainbow_r')
ax2D.scatter(xdata, ydata, c=time, cmap='gist_rainbow_r')
fig.subplots_adjust(right=0.95, left=0.05, top=0.95,
bottom=0.05, hspace=0.05, wspace=0.05)
# x/y ticks
ax.set_xticks(np.arange(0, 9, 2))
ax.set_yticks(np.arange(0, 20, 3))
ax2D.set_xticks(np.arange(0, 10, 3))
ax2D.set_yticks(np.arange(0, 20, 3))
addCourt = True
if addCourt:
points = np.array([
[0, 0, 0], [9, 0, 0], [0, 6, 0], [9, 6, 0], [0, 9, 0],
[9, 9, 0], [0, 12, 0], [9, 12, 0], [0, 18, 0], [9, 18, 0]
])
courtedge = [2, 0, 1, 3, 2, 4, 5, 3, 5, 7, 6, 4, 6, 8, 9, 7]
curves = points[courtedge]
netpoints = np.array([
[0, 9, 0], [0, 9, 1.24], [0, 9, 2.24], [9, 9, 0], [9, 9, 1.24], [9, 9, 2.24]])
netedge = [0, 1, 2, 5, 4, 1, 4, 3]
netcurves = netpoints[netedge]
court = points.T
courtX, courtY, courtZ = court
# plot 3D court reference points
ax.scatter(courtX, courtY, courtZ, c='b', marker='o')
ax.plot(curves[:, 0], curves[:, 1], c='k',
linewidth=3, alpha=0.5) # plot 3D court edges
ax.plot(netcurves[:, 0], netcurves[:, 1], netcurves[:, 2],
c='k', linewidth=3, alpha=0.5) # plot 3D net edges
# plot 2D court reference points
ax2D.scatter(courtX, courtY, c='b', marker='o')
ax2D.plot(curves[:, 0], curves[:, 1], c='k',
linewidth=3, alpha=0.5) # plot 2D court edges
plt.show()
plt.close(fig)
return fig
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--weights', nargs='+', type=str,
default='yolov7.pt', help='model.pt path(s)')
# file/folder, 0 for webcam
parser.add_argument('--source1', type=str,
default='inference/images', help='source')
parser.add_argument('--source2', type=str,
default='inference/images', help='source')
parser.add_argument("--view1_shift", type=int, default=0, help="")
parser.add_argument("--view2_shift", type=int, default=0, help="")
parser.add_argument('--img-size', type=int, default=640,
help='inference size (pixels)')
parser.add_argument('--conf-thres', type=float,
default=0.25, help='object confidence threshold')
parser.add_argument('--iou-thres', type=float,
default=0.45, help='IOU threshold for NMS')
parser.add_argument('--device', default='',
help='cuda device, i.e. 0 or 0,1,2,3 or cpu')
parser.add_argument('--view-img', action='store_true',
help='display results') # 2 video input not to use!
parser.add_argument('--save-txt', action='store_true',
help='save results to *.txt')
parser.add_argument('--save-conf', action='store_true',
help='save confidences in --save-txt labels')
parser.add_argument('--nosave', action='store_true',
help='do not save images/videos')
parser.add_argument('--classes', nargs='+', type=int,
help='filter by class: --class 0, or --class 0 2 3')
parser.add_argument('--agnostic-nms', action='store_true',
help='class-agnostic NMS')
parser.add_argument('--augment', action='store_true',
help='augmented inference')
parser.add_argument('--update', action='store_true',
help='update all models')
parser.add_argument('--project', default='runs/detect',
help='save results to project/name')
parser.add_argument('--name', default='exp',
help='save results to project/name')
parser.add_argument('--exist-ok', action='store_true',
help='existing project/name ok, do not increment')
parser.add_argument('--no-trace', action='store_true',
help='don`t trace model')
opt = parser.parse_args()
print(opt)
#check_requirements(exclude=('pycocotools', 'thop'))
circles_all = []
real_ground_list = []
for i in range(2):
circles_all.append([])
real_ground_list.append([])
with torch.no_grad():
if opt.update: # update all models (to fix SourceChangeWarning)
for opt.weights in ['yolov7.pt']:
detect()
strip_optimizer(opt.weights)
else:
queues_2d = [Queue(maxsize=1), Queue(maxsize=1)]
for i, (src, shift) in enumerate([(opt.source1, opt.view1_shift), (opt.source2, opt.view2_shift)]):
Thread(target=detect, args=(
src, shift, queues_2d[i], circles_all[i], real_ground_list[i])).start()
queue_3d = Queue()
Thread(target=project_3d_position,
args=(queues_2d, queue_3d)).start()
Thread(target=save_npy_file, args=(queue_3d,)).start()
```