# 快速整理直播螢幕錄影影片(Kofan uploader) ###### tags: `self maintains` `youtube upload` `api` [toc] ## 背景 在手機上每天錄下的直播影片,每天晚上都會透過onedrive進行同步,但目前累積的量已經超過雲端的空間,因此打算將所有影片移至youtube中,並且釋放空間 ## 影片的形式 由於手機錄影一次的時間不長,因此直播影片被分成許多個部分檔案,並且有個固定的格式: ``` screen-20220101-233211.mp4 screen-20220101-234055.mp4 screen-20220102-205853.mp4 .......................... ``` 分為三個部分: 1. 類型(screen) 2. 日期(2022xxxx) 3. 時間(233211) ## 影片處理方式 因為影片是由時間所作為檔名,不需要重新進行排列,只需要將多部影片合併為同一支影片在進行youtube的影片上傳。只需將同一個時間軸的影片放入以類型及日期命名的資料夾中,並且使用FFmpeg軟體將影片合併成為一部單一影片即可。 ### 以python將同天或同次的檔案放入對應資料夾 #### 同批影片的判斷: 由於影片錄影時間每部不超過20分鐘,因此如果產生跨日的直播則可以使用前一天最後一部影片的錄影時間來進行判斷,如果最後的錄影時間為在23:40以後,並且隔日有影片的時數為02:00以前,則將該部影片放入前一日的資料夾中。 **Bugs fixed:** 1. use datetime module(because the month or year end -1 with int will find no file) 2. add credential file and insert command in upload function(the credential files will not found) 3. everytime checks the uploaded file with deleting them(storge full) 4. remerge the file which can't open(escape by error of moviepy) 5. get files by ref path(easy to change the computer) 6. **Busgs not fixed** 1. error caused by ffmpeg(the last file uploaded in dir) 2. first upload will not show the https of license ```python= #file.pyimport os import shutil import moviepy.editor import pickle import datetime Cloud_path = '..\\圖片\\相機膠卷' output_path = 'C:\\Users\\andy4\\Videos' class file: def __init__(self , filename): self.type = filename[-3:] if self.type == 'mp4': if filename.find('screen') != -1: self.title = filename[:6] self.filename = filename self.dir_name = filename[0:15] self.year = int(filename[7:11]) self.month = int(filename[11:13]) self.day = int(filename[13:15]) self.hour = int(filename[-10:-8]) self.minutes = int(filename[-8:-6]) self.date = datetime.datetime(self.year,self.month,self.day,self.hour,self.minutes) self.lastday = self.date - datetime.timedelta(1) self.lastday_dir_name = filename[:11] + "{:02d}{:02d}".format(self.lastday.month,self.lastday.day) self.mp4 = True else: self.mp4 = True else: self.mp4 = False """_summary_ 確認此日是否為前一日的接續檔案 return 0:yes 1:no 2:probally(please check yesterday videos) """ def check_cont(self): if not self.hour < 2: return 1 try: os.chdir(self.lastday_dir_name) filenames = next(os.walk("."), (None, None, []))[2] flag = 1 for i in [file(j) for j in filenames if j.find("screen") != -1]: if i.hour >= 23 and i.minutes >= 40: flag = 0 os.chdir("..") except: print("Please check the videos for previous day\n") flag = 2 return flag def check_Size(self): if os.path.getsize(self.filename) / (1<<20) < 300: return True else: return False def send_back(self): shutil.move('.\\check\\' + self.filename , f"{Cloud_path}\\{self.year}\\{self.month:02d}") def get_video_duration(self): video = moviepy.editor.VideoFileClip(self.filename) video_duration = int(video.duration) return video_duration def get_day(self): return self.day def get_output_files(path): return [filename for filename in next(os.walk(path), (None, None, []))[2] if file(filename).mp4] def get_day(filename): return int(filename[13:15]) def get_nextday_dir_name(dir_name): date = datetime.date(int(dir_name.split('-')[1][:4]) , int(dir_name[-4:-2]) , int(dir_name[-2:])) + datetime.timedelta(days=1) return f"screen-{date.strftime('%Y%m%d')}" def download_file_NextTwoDir(dir_name): os.chdir(dir_name) os.system("attrib +p *") os.chdir("..") count = 0 #下載下兩個資料夾的檔案 while True: try: count += 1 dir_name = get_nextday_dir_name(dir_name) os.chdir(dir_name) os.system("attrib +p *") os.chdir("..") if count >= 2: break except: if count >= 30: break continue def save_unuploadedlist(): k = show_uploadedlist() with open('not_uploaded.txt' , 'w', encoding='UTF-8') as f: f.writelines([dir_name+"\n" for dir_name in next(os.walk('.'))[1] if dir_name not in k]) """_summary_ if the stroge > 70 Gb return Ture """ def Check_Storge(): total, used, free = shutil.disk_usage("/") if free//2**30 > 70: return True else: return False def save_checkedlist(data): try: with open('checked_list.pickle', 'rb') as f: checkedlist = pickle.load(f) if data not in checkedlist: checkedlist.append(data) with open('checked_list.pickle', 'wb') as f: pickle.dump(checkedlist, f) except: with open('checked_list.pickle', 'wb') as f: pickle.dump([data] , f) def save_uploadedlist(data): try: with open('uploaded_list.pickle', 'rb') as f: uploaded_list = pickle.load(f) if data not in uploaded_list: uploaded_list.append(data) with open('uploaded_list.pickle', 'wb') as f: pickle.dump(uploaded_list, f) except: with open('uploaded_list.pickle', 'wb') as f: pickle.dump([data] , f) def show_uploadedlist(): k = list() with open('uploaded_list.pickle', 'rb') as f: k = pickle.load(f) print(k) return k def show_checkededlist(): with open('checked_list.pickle', 'rb') as f: print(pickle.load(f)) """_summary_ option 1 for uploaded_list option 2 for checked_list """ def Check_done(dir_name , options): try: if options == 1: p = 'uploaded_list.pickle' if options == 2: p = 'checked_list.pickle' with open( p , 'rb') as f: uploaded_list = pickle.load(f) if dir_name in uploaded_list: return True else: return False except: return False def get_files_from_photos(start_date): files = [] path = f"{Cloud_path}\\" for year in range(start_date.year , datetime.date.today().year + 1): if year == datetime.date.today().year: end_month = datetime.date.today().month else: end_month = 12 for month in range(start_date.month , end_month + 1): fs = [f for f in get_output_files(path + f"{year}\\{month:02d}") if f[-3:] == 'mp4' and f.find('screen') != -1 and not Check_done(f,2)] fs = [f for f in fs if datetime.date(file(f).year, file(f).month, file(f).day) >= start_date] for f in fs: shutil.move( path + f"{year}\\{month:02d}\\" + f , '.') files.extend(fs) return files def __upload_file(dir_name): status = os.system(f"youtube-upload --title={dir_name} --client-secrets=.\\client_secret.json --privacy private {output_path}\\{dir_name}.mp4") if status == 0: save_uploadedlist(dir_name) __delete_file(dir_name) return status """_summary_ 上傳在輸出資料夾中到目前的所有檔案,並且回傳目前api狀態 0:可上傳 3:無配額 1:其他 """ def upload_until(dir_name): filenames = get_output_files(f"{output_path}\\") filenames = [f for f in filenames if file(f).title == 'screen' and file(f).day <= int(dir_name[-2:])] for filename in filenames: filename = file(filename) if not Check_done(filename.dir_name , 1): if Cmp_video_duration(filename.dir_name): check = __upload_file(filename.dir_name) if check != 0: return check else: print(f"Remerge the videos {filename.dir_name}") os.chdir(filename.dir_name) os.system("del mergelist.txt") os.system("for %i in (*.mp4) do echo file '%i' >> mergelist.txt") os.system(f"ffmpeg -y -safe 0 -f concat -i mergelist.txt -c copy {output_path}\\{filename.dir_name}.mp4") os.chdir("..") else: __delete_file(filename.dir_name) return 0 def __delete_file(dir_name): try: if os.system(f"del {output_path}\\{dir_name}.mp4") == 0: print(f"{dir_name}.mp4 has been deleted") else: print(f"{dir_name}.mp4 has not been deleted") except: print(f"{dir_name}.mp4 does not exist") def remove_uploaded_file(dirname): with open('uploaded_list.pickle', 'rb') as f: uploaded_list = pickle.load(f) uploaded_list.remove(dirname) with open('uploaded_list.pickle', 'wb') as f: pickle.dump(uploaded_list , f) def get_video_duration(path): if path[-3:] == 'mp4': try: video = moviepy.editor.VideoFileClip(path) video_duration = int(video.duration) except: video_duration = 0 else: filenames = next(os.walk(path), (None, None, []))[2] video_duration = sum([int(moviepy.editor.VideoFileClip(path + '\\' + filename).duration) for filename in filenames if filename[-3:] == 'mp4']) return video_duration def Cmp_video_duration(path1 , path2): if (get_video_duration(path1) - get_video_duration(path2)) <= 10: return True else: return False def Cmp_video_duration(dir_name): if (get_video_duration(dir_name) - get_video_duration(f"{output_path}\\{dir_name}.mp4")) <= 10: return True else: print(f"The merge of the video {dir_name} is not complete") return False ``` ```python= #sort.py import os from os import walk import shutil from file import file #取得所有檔案的檔名 filenames = next(walk("."), (None, None, []))[2] for filename in filenames: filename = file(filename) if filename.type == 'mp4': #如果沒有資料夾則創立資料夾 if not os.path.isdir(filename.dir_name): os.mkdir(filename.dir_name) #如果檔案為前日的接續檔案則移動到前日資料夾 if filename.check_cont(): shutil.move(filename.filename, filename.lastday_dir_name) else: shutil.move(filename.filename, filename.dir_name) ``` https://stackoverflow.com/questions/141291/how-to-list-only-top-level-directories-in-python ### 使用python批次處理影片合併 #### FFmpeg合併 1. 產生需要合併的清單 ``` #at cmd for %i in (*.mp4) do echo file '%i' >> mergelist.txt ``` 2. 合併檔案 ``` ffmpeg -safe 0 -f concat -i mergelist.txt -c copy D:\user\videoes\screen-20211230.mp4 ``` https://ffmpeg.org/ https://stackoverflow.com/questions/7333232/how-to-concatenate-two-mp4-files-using-ffmpeg #### 進入個別資料夾並傳送指令 由sort.py執行後可得到許多包含同次直播之資料夾,透過merge.py可瀏覽每個資料夾,產生mergelist並且進行合併檔案,並且輸出檔案到D:\user\videoes\資料夾內。 - 注意事項 1. 進行合併檔案時避免重複輸出檔案 2. 輸出檔案前確認硬碟空間 3. 從one drive雲端上下載檔案到本地 (https://docs.microsoft.com/en-us/onedrive/files-on-demand-windows) ```python= #merge.py import os from os import walk import shutil import file # 取得資料夾內所有資料夾名稱 dir_list = next(os.walk('.'))[1] #取得已合併輸出檔案之名稱(避免重複進行輸出檔案) filenames = next(walk("D:\\user\\videoes\\"), (None, None, []))[2] filenames = [filename[:-4] for filename in filenames] api_status = 0 for dir_name in dir_list: if dir_name[:6] == 'screen': #當已有輸出檔案或已上傳過時進行跳過 if (dir_name in filenames) or file.Check_done(dir_name): continue #如果api的狀態為可上傳,則上傳已合併檔案 if api_status == 0: api_status = file.upload_until(dir_name) else: print(f"Shut down with error code {api_status}") break file.download_file_NextTwoDir(dir_name) os.chdir(dir_name) #避免重複輸出檔案資料 if not os.path.isfile("mergelist.txt"): os.system("for %i in (*.mp4) do echo file '%i' >> mergelist.txt") check = os.system(f"ffmpeg -safe 0 -f concat -i mergelist.txt -c copy D:\\user\\videoes\\{dir_name}.mp4") os.chdir("..") #當硬碟空間不足時跳出迴圈 if not file.Check_Storge(): break ``` https://docs.microsoft.com/en-us/onedrive/files-on-demand-windows ### 使用api上傳影片至youtube #### 透過google cloud platform連接api * Google Cloud Platform設定說明 https://learndataanalysis.org/how-to-upload-a-video-to-youtube-using-youtube-data-api-in-python/ * youtube-uploader套件 https://github.com/tokland/youtube-upload * Google Cloud Platform https://console.cloud.google.com/ * youtube example https://github.com/youtube/api-samples/tree/master/python * youtube upload parameters https://developers.google.com/youtube/v3/docs/videos ## 未完成 ### 加入影片人物辨識(取消) 由於直播時間為10:30後,並且可能會超過當日來到隔日,因此在做分類的時候不能只透過日期將檔案進行歸類,還需要考慮到是否直播到隔日,但由於該時間也可能有進行非直播的螢幕錄影,因此可能需要透過人物辨識的方式來將隔日00:00到01:00的時間和影片人物是否為可凡取交集來進行分類,如使用此方法則之後將不需人工進行校對,可直接將分類、合併,與youtube上傳影片之api結合。 https://docs.microsoft.com/zh-tw/azure/cognitive-services/face/quickstarts/client-libraries?pivots=programming-language-python&tabs=visual-studio ### merge完成後與原檔案進行校對 - [x] 1. 影片長度與未修剪前長度相加相同則為成功合併-> - [x] 2. 如果成功合併則上船影片至youtube - [x] 3. 上傳成功則刪除檔案 預計使用工具:https://www.geeksforgeeks.org/get-video-duration-using-python-opencv/ ### 產生的錯誤訊息以line傳送 預計使用工具:https://bustlec.github.io/note/2018/07/10/line-notify-using-python/ ### 平行處理 https://stackoverflow.com/questions/25120363/multiprocessing-execute-external-command-and-wait-before-proceeding