# Python-LeetCode 581 第五招 優先佇列 Priority Queue/Heap 很噴口水的一句話「After importing heapq, people push data and then pop the minimal with procedure heappushpop in Python」,因為好多「p」。 ## 基本操作 優先佇列(Priority Queue,簡稱PQ),也常以其實作方式Heap來稱呼,是一個教科書很標準、應用很廣的資料結構。相對於先進先出的Queue、先進後出的Stack、雙端開口的Deque,PQ具有優先權概念,優先權高的會先出來。 以Heap來實作PQ其實並不困難,它是以陣列來實現完全二分樹的觀念,所以寫起來好寫,跑起來也快,網路上很容易找到相關原理的教學,我們這裡就不贅述,只著重在它的應用以及Python的使用方式。 Python的PQ在名為heapq.py的模組中,LeetCode的環境已經將它import進來,所以我們只要會應用即可,想要看完整的說明可以查[Python的文件](https://docs.python.org/zh-tw/3/library/heapq.html)。以下我們說明常用且必要知道的幾個用法與知識: Python的PQ不是個封起來的資料結構,而只是一套演算法的實作,所以你放資料的list就自己準備好,需要使用時候叫它提供的函數就可以了。以下程式示範一些常用的動作。 ```python= from heapq import * # not necessary for leetcode pq = [] # default min-heap for i in range(5): heappush(pq,i) # push data print(pq[0]) # top at [0] imin = heappop(pq) # pop min # each element is a 2-tuple. # To make a max-heap, you may use -x for data x q2 = [(-2,-3),(-4,-1),(-1,-5),(-4,-2)] heapify(q2) # heapify a heap while q2: x,y = heappop(q2) print(-x,-y) ``` 使用時只要注意以下幾件事就夠了 * 預設是個min-heap,最小值top在list[0]的位置。 * 增加元素就用heappush(pq,item) * 取出(刪除)最小值就用x = heappop(pq) * 如果元素是tuple,list或string,python的比較都是字典順序(lexicographic order) * 如果要用max-heap,一個簡單的方法是把數值正負反轉,像上面的範例那樣。 * 如果列表中一開始有資料,可以用heapify來初始化heap,雖然用sort也可以,但heapify比較快。 * 插入與刪除的時間複雜度都是$O(\log n)$,查看就是一般的list取值O(1)。 heapq內還有其他一些比較複雜的函數,有興趣的可以自行參考。 ## 運用時機與技巧 從PQ所支援的操作就可以看出來,PQ運用的時機在於要持續的對一群資料找出最小值(或最大值)。如果資料不會變動,那也用不著PQ,因為簡單的排序就足夠。PQ的需求是要應付動態資料,也就是資料會變動的情形。 如果只需要找最小值與刪除最小值,那麼我們還是可以用排序來處理,當資料有可能新增時,排序就無法應付了,如果每次新增資料就進行重新排序,那時間複雜度會太高。舉例來說,如果將資料排序放在list中,每次新增時,用二分搜找到位置再將新資料插入序列中,類似下面的程式 ```python= import bisect q = list(range(10)) item = 5 bisect.insort_left(q,item) print(q) ``` 在資料量不大時,這方法跑起來不慢,但一次新增的複雜度還是$O(n+\log n) = O(n)$,複雜度不是吃素的,當資料量很大時,這不是一個好的方法。 PQ最適合的時機就是我們可能新增資料、找最小值、以及刪除最小值的時候。 但如果資料可能修改(變大或變小),這時候還可以用PQ嗎?如果我們自己寫一個heap,在了解heap運作原理後,其實不難處理資料變大與變小的狀況。不過在解題時不適合自己去寫一個heap,其實競程發展出來一些應變的方式。 **偷懶的原則** 現在要應付的狀況是我們有一群資料放在heap中,在運算的過程中,其中某個資料值改變了。我們處理的方法是一個「偷懶」的原則。以min-heap為例,當heap內資料的值變小的時候,「不要積極的去修改容器內的資料,我們就直接再丟一份變小之後的值到heap裡面,同時,在外部用一些方式去紀錄,讓我們足以分辨將來從heap出來的資料是否是已過期的資料」。 因為min-heap是小的資料先出來,所以將來新的資料一定會先出來,我們會抓到需要的值。但因為heap內會留存許多過期的資料「屍體」,它將來有可能會從heap中出來,我們必須能夠加已分辨。 這麼做會不會浪費時間與空間呢?理論上確實會,但通常實際影響不大。要知道heap的插入刪除動作都是$O(\log n)$的時間,即使資料屍體數量膨脹到$O(n^2)$,時間其實才變兩倍。萬一萬一萬一,真的空間與時間不足,我們可以在運作過程中,做一些清理的動作,把資料倒出來,活的放回去,屍體丟棄。其實很少看到題目需要做這個動作。 如果資料變大呢?用類似的操作即可,我們在外部紀錄資料目前的值,如果出來的值不是紀錄的值,就是修改之前的過期資料。 以下我們看範例會更容易了解實作細節。 ## 範例 ### Heap + Linked List 以下這題是很明顯PQ的應用。 [(題目連結) 23. Merge k Sorted Lists (Hard)](https://leetcode.com/problems/merge-k-sorted-lists/description/) 題目是說有$k$個已經排好序的linked list,要把它合併成一個排好序的linked list。合併兩個sorted list是基本程式學習過程中常見的一個程序,將其擴展到$k$個list實在也不難,這題列在難題的原因一個是這裡的$k$很大,可能到達$10^4$,此外此題必須操作在題目提供的linked list class而非陣列,所以必須對Linked list的操作有所了解。 sorted list的merge程序其實相當直接易懂,而且大多數學程式的人都曾學過,這個程序也是著名merge sort的核心。 #### Algorithm for merging sorted lists * 初始一個空的串列用來放結果。 * 每一回合在所有list最前方的元素中挑出最小值,將其移到結果串列的尾端。重複這步驟直到所有串列元素皆移完畢。 算法的正確性來自小於的遞移性:各串列最前方的元素是該串列的最小元素,所以所有最前方元素中的最小值就是所有剩下元素中最小的。 在本題中要實現上述演算法,我們要克服兩個問題: 1. 如何快速的找出$k$個串列最前方元素的最小值。 2. 如何操作串列鏈結的尾端插入動作。 第一個問題得答案是使用PQ維護所有串列最前方的元素。第二個問題是資料結構的基本技能,如果忘掉了可以查一下教科書或網路。基本上,我們用一個變數$tail$指著串列的尾端,插入新結點時,我們產生一個新結點,將tail.next指向該新結點即可。 以下看範例程式後我們再說明一些細節。 ```python= # Definition for singly-linked list. # class ListNode: # def __init__(self, val=0, next=None): # self.val = val # self.next = next class Solution: # pq holds the current smallest of every list def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]: k = len(lists) pq = [] # (value, idx) for i in range(k): if lists[i]: heappush(pq,(lists[i].val,i)) lists[i] = lists[i].next head = tail = ListNode() # result, dummy head while pq: v,i = heappop(pq) tail.next = ListNode(v) # new node of val v tail = tail.next # move to last if lists[i]: # push the next if existing heappush(pq, (lists[i].val, i)) lists[i] = lists[i].next return head.next ``` 前5行是題目給的說明,LeetCode這一類題目會給你它定義好的結點結構,你必須呼叫它來產生你的結點與答案。這裡我們只需要知道該串列結點的結構,以及產生一個物件ListNode時要給予的資料即可, 第9行初始一個空的列表當作PQ用。第11 ~ 14行將每個串列最前方的元素數值放入PQ,因為將來每次取出最小值之後,我們需要知道該最小值來自於哪一個串列,以便將它的下一個元素放入PQ,因此我們放在PQ中的每一個元素是一個$(value,index)$的tuple,如第13行,其中要比較的值必須放在第一欄位。同時,每個串列當前方元素被移入PQ後,我們將指標指向該串列的下一個元素(第14行)。 在串列新增時,空串列與非空串列稍微有所不同,因此,在開始挑最小值之前,我們先建構一個串列只含一個結點,這是一個dummy head。這樣我們不必考慮空串列的情形,這也是一個sentinel簡化程式的方式。變數tail會永遠指著串列的最後一個結點。 第16行的while迴圈會跑到PQ為空為止,也就是所有串列都是空的時候才會結束。每次進入迴圈,從PQ中彈出最小值,第一個欄位是值$v$,第二個欄位是來自哪個串列的index $i$。於是我們建構一個結點 $(val=0, next=None)$,然後將最後一個結點tail.next設為此新結點(第18行)。第19行將tail移到新的結尾。第20 ~ 22行將第$i$個串列的下一個元素放入PQ中(如果還有的話)。最後,答案在dummy head的next。 時間複雜度為$O(n\log k)$,因為PQ的大小為$O(k)$,操作了$O(n)$次,其中$n$為所有串列的長度總和。 ### Heap + Sorting [(題目連結) 1942. The Number of the Smallest Unoccupied Chair (Medium)](https://leetcode.com/problems/the-number-of-the-smallest-unoccupied-chair/description/) 題目說有$n$個人參加一場聚會,每個人有各自的到達與離開時間,每個人到達時,會坐在***當時編號最小的空位***上,請計算出編號為$target$的人會坐在哪一個位子。 我們用直接的想法來做。我們將每個人到達與離開的時間各自排序,模擬到達(取最小空位)與離開(歸還坐位)的動作。既然每次要挑出最小的空位,我們用min-heap維護好所有空位。因為空位需要歸還,我們用list紀錄每一個人所坐的坐位編號。 以下是範例程式。 ```python= class Solution: def smallestChair(self, times: List[List[int]], targetFriend: int) -> int: n = len(times) empty = list(range(n)) seat = [-1]*n arrival = sorted((t[0],i) for i,t in enumerate(times)) leave = sorted((t[1],i) for i,t in enumerate(times)) j = 0 for t,i in arrival: while leave[j][0]<=t: heappush(empty,seat[leave[j][1]]) seat[leave[j][1]] = -1 j += 1 seat[i] = heappop(empty) if i==targetFriend: return seat[i] # ``` 第4行是所有空位編號的PQ,初始是由0 ~ $n-1$的所有編號,因為我們用list(range(n))是排好序的,就不必再做heapify的動作。 第5行的seat用來記錄每個人坐位編號。第6行取出到達時間並加以排序,第7行取出離開時間並加以排序。 第9行的for迴圈模擬每一位依序到達時的狀況。第10 ~ 13行我們根據離開時間,將在這一位到達前離開的坐位一一歸還,然後在第14行取出編號最小的空位。 時間複雜度$O(n\log n)$。因為排序是$O(n\log n)$,有$O(n)$次PQ的操作也是$O(n\log n)$。 ### 偷懶原則的範例 下面這一題在第四招前綴和單元時出現過的題目,當時是用來介紹單調隊列,它其實也可以用PQ來做。 [(題目連結) 239. Sliding Window Maximum (Hard)](https://leetcode.com/problems/sliding-window-maximum/) 題目說有一個長度為k的sliding window在一序列中一步一步移動,請計算出在每一個位置時的區間最大值,也就是$max(nums[i:k+i]))$對於所有可能的$i$。 這題用單調隊列來做時間複雜度只需要$O(n)$,我們也可以用PQ來做,雖然會比較差一點。想法是把window內的數值丟到一個max-heap內去維護,每次拿出最大值時,要檢查該數值是否已經離開了window。以下是範例程式。 ```python= class Solution: # using pq def maxSlidingWindow(self, nums: List[int], k: int) -> List[int]: if k==1: return nums[:] # (-value,index) max-heap pq = [(-p,i) for i,p in enumerate(nums[:k])] heapify(pq) ans = [-pq[0][0]] for i,p in enumerate(nums[k:],start=k): while pq[0][1] <= i-k: #out of date, never empty heappop(pq) heappush(pq,(-p,i)) ans.append(-pq[0][0]) return ans ``` 第4行我們特判一下視窗寬度$k=1$的情形。在一般情形下,我們初始先將前$k$放入pq中,然後做heapify的動作。因為要取最大值,我們放入heap的數值取負號,這樣最大會變最小而先彈出heap。因為要偵測是否出界,所以heap內的元素使用tuple,第二個欄位放index。初始後,pq[0]就是第一個視窗的答案,取出時要再轉一次負號。 接著第9行開始從第$k$個一一掃描序列,每次放進去一個,用while迴圈將已經出界的最大值移除,如果不是最大值的出界就不理它。 時間複雜度$O(n\log n)$。 ### minHeap + maxHeap 我們來看一個用到兩個heap組合成一個資料結構的有趣例子。 [(題目連結) 295. Find Median from Data Stream (hard)](https://leetcode.com/problems/find-median-from-data-stream/) 這個題目是說有一序列的整數會逐步加入,你要實作一個class支援隨時查詢已加入整數的中位數(median)為何。依照題目給的定義,實作三個函數分別執行:初始化、加入一個整數、以及查詢中位數。在偶數個資料時,中位數此處定義為兩個中位數的平均值(實數)。 如果要靜態的在$n$個數中找一次median,最簡單是用排序,時間是$O(n\log n)$。在演算法中,也有更快速$O(n)$時間的方法,但是對於本題來說都不夠好,因為本題的要求是資料逐步加入,而可以多次查詢。 這題目挺有趣,需要動一下腦筋。我們的heap可以找最小,可以找最大,那麼正中間的數呢? 江湖一點訣,說破不值錢。 我們把已進入的數列依照數值大小分成兩半,比較小的數放入一個max-heap;比較大的數放入一個min-heap,那麼median就在這兩個heap的top上了。 每加入一個資料或查詢的時間複雜度都是$O(\log n)$。實作時要注意資料數量為奇數與偶數時的狀況。 ```python= class MedianFinder: # two heap def __init__(self): self.large = [] # min heap self.small = [] # max heap, using negative # len: large = small or small+1 (odd) def addNum(self, num: int) -> None: if not self.large or num >= self.large[0]: heappush(self.large, num) if len(self.large)> len(self.small)+1: x = heappop(self.large) heappush(self.small,-x) else: heappush(self.small, -num) if len(self.small)> len(self.large): x = heappop(self.small) heappush(self.large,-x) def findMedian(self) -> float: if len(self.large) > len(self.small): return self.large[0] return (self.large[0]-self.small[0])/2 #even number # Your MedianFinder object will be instantiated and called as such: # obj = MedianFinder() # obj.addNum(num) # param_2 = obj.findMedian() ``` 初始化時我們就初始兩個空列表large與small。這裡的寫法是計畫維持 $len(small)\leq len(large) \leq len(small)+1$ 也就是說當奇數時在$large$多放一個,否則兩邊數量一樣多。 當加入一個整數$num$時,我們依照$num$的大小判斷丟進$large$或$small$,但如果丟進去的一邊數量太多(違反我們的計畫),我們就從數量太多的heap中pop中一個來丟到另外一邊,以維持計畫的數量。max-heap的部分我們藉由將key值轉負號的方式來實現。 查詢時就非常簡單,依照奇數或偶數回傳題目定義的中位數即可。 ### Heap + Greedy Heap能夠很快地找出最小值,與貪心法則(Greedy)的需求搭配,所以Heap與Greedy的搭配是很自然的組合。以下是一例。 [(題目連結)1353. Maximum Number of Events That Can Be Attended (Medium)](https://leetcode.com/problems/maximum-number-of-events-that-can-be-attended/) 題目是有一些活動,每個活動有開始與結束的日期,每一個活動在開與結束之間的任何一天都可以去參加該活動,一個活動只要參加一天就算有參加,但你每天只能參加一場活動,請問最多可以參加多少場活動。 很顯然的,一場活動只要參加一天就好了,因為同一個活動參加兩天並不會得到好處。這其實是個排程(Scheduling)的問題:每個job都需要一單位的時間,每個job有arrival time與deadline,只能在該區間內完成該工作,否則該工作就只能丟棄。請問最多可以完成多少件工作。 答案是:Greedy, earliest deadline first。 Greedy算法的證明大多是相同的架構,證明我們在每一個步驟的挑選是正確的。假設另外有一個最佳解與Greedy挑選的不一樣,那麼我們可以將最佳解做一個替換,得到與Greedy挑選一樣的最佳解。 本題的證明:假設Greedy的解與最佳解OPT第一個不一樣的是Greedy在第$d$天參加活動$E$。 * 如果最佳解OPT始終沒有參加活動$E$,那麼把OPT的第$d$改成活動$E$,修改之後的解不會變差。 * 如果最佳解OPT第$d$天參加了活動$F$,在第$p$天參加了活動$E$。因為Greedy是採取earliest deadline first,我們知道$F$的deadline不小於$E$,因此在OPT中調換$E$與$F$參加的日期是一個合法的排程。 因為活動有開始時間,不能一股腦兒將所有的活動依照deadline排序後挑選。我們以一個min-heap維護目前已經開始的活動,並且我們用一個變數avail維護好下一個可以參加活動的日期。 我們將所有活動依照開始時間排序後一一檢視。在下一個活動開始前,從heap中挑選出已經開始且deadline最小的活動,如果它的deadline不小於avail,也就是還來得及參加,我們就參加;否則放棄此活動。當下一個活動的開始日期之前能參加的活動都挑選結束後,我們把下一個活動放入heap中以便一起參與比較。 ```python= class Solution: # earlies deadline first, with arrival time def maxEvents(self, events: List[List[int]]) -> int: events.sort() # sorted by starting time events.append([100001,100001]) # dummy sentinel avail = 0 # the available(free) day cnt = 0 # answer pq = [] # endtime for s,t in events: while pq and avail<s: # before next arrival deadline = heappop(pq) # earliest deadline if deadline >= avail: # attend cnt += 1 avail += 1 # else: pass # discard, do nothing avail = s heappush(pq,t) return cnt ``` 第5行我們加入一個假的活動,開始時間晚於所有輸入的活動,這是當哨兵(sentinel)使用,因為前述的演算法在所有活動皆到達之後,heap內還會有一些活動要挑選,為了簡化程式,我們加上一個哨兵。 時間複雜度$O(n\log n)$。因為排序是$O(n\log n)$,有$O(n)$次PQ的總操作時間也是$O(n\log n)$,均攤分析。 ### Heap + Dijkstra 接下來看一個二維陣列走訪的題目。 [778. Swim in Rising Water (Hard)](https://leetcode.com/problems/swim-in-rising-water/) 本題中,輸入一個$n\times n$網格(矩陣),$n^2$個格子內是0 ~ $n^2 -1$這些整數的排列,代表每一個格子的海拔高度。假設雨水在時間$t$的時候水位高度是$t$,你可以從一個格子游到上下左右相鄰格子,只要兩個格子的海拔都不大於$t$。 假設游泳的時間不計,請問何時可以由左上角$(0,0)$游到右下角$(n-1,n-1)$。也就是說要找出最小的時間$t$,滿足在時間$t$時會存在起點到終點一條路徑,此路徑上每個格子海拔都$\leq t$。 這題有不只一種合理的解法。這裡我們要說明的是運用heap的方法。如果把網格當graph來看,這是一個最短路徑問題的相似題,權重在點上而不是在邊上,並且最小化的不是權重的和而是最大的權重(mini-max版本或稱bottleneck版本),做法與最短路徑類似,可以用修改的Dijkstra演算法來做。即使不了解Dijkstra,也不太難明瞭以下說明的本題演算法。 #### 演算法 * 變數curr是目前的水位,初始值給起點與終點的較大值,因為答案不可能低於這個值。運算過程中這個值只可能上升不會下降,當看見終點時,這個值就是答案。 * 用min-heap pq維護有鄰居可以被走得到那些點的座標並以該點高度當作key。 * seen紀錄已經看過的點,包含已經走得到的點與目前在heap中的點。seen的作用是避免一個點重複被檢查。 * 每回合從pq中取出高度最低的點,如果這點的高度高於目前高度curr,則更新curr,因為低於此高度的點已經拜訪完,而此點是尚未被拜訪且走得到的點中高度最低的,所以低於此高度不可能有答案。如果取出的高度低於或等於curr,則curr不做變動。 * 對於每回合pq中取出的點,檢查它的四個鄰居,如尚未被看過,則納入pq內。 以下是實作上述的演算法。我們用二維陣列來做seen,並且右方與下方圍半圈True來避免走出界。 如不熟悉Grid的走訪與哨兵技巧,請參考 [Python-LeetCode 第三招 Grid Traversal ](/KNjxtbQPQ9GoObErkZAoKQ)。 ```python= class Solution: def swimInWater(self, grid: List[List[int]]) -> int: n = len(grid) seen = [[False]*n+[True] for i in range(n)] seen.append([True]*n) curr = max(grid[0][0],grid[n-1][n-1]) pq = [(grid[0][0],0,0)] seen[0][0] = True while pq and not seen[n-1][n-1]: h,r,c = heappop(pq) if h>curr: curr = h for nr,nc in ((r,c-1),(r,c+1),(r-1,c),(r+1,c)): if not seen[nr][nc]: heappush(pq,(grid[nr][nc],nr,nc)) seen[nr][nc] = True #end while return curr ``` 時間複雜度$O(n^2 \log n)$,輸入為$n\times n$矩陣,故資料量是$O(n^2)$。 ## 結語 優先佇列PQ是一個很重要的資料結構,Python中heapq.py是以min-heap實作PQ的一套模組,它支援新增資料與查詢或刪除最小值的操作,刪除與新增的時間複雜度都是$O(\log n)$。 PQ的應用很多,本單元示範了LeetCode上幾個典型的題目,其他類似題目可以透過tag去搜尋。PQ常常用在其他演算法中用來在一個步驟中找到某個最小或最大值,所以與Greedy或DP的結合都是會出現的。在一些其他著名的演算法中,例如Dijkstra與Prim演算法中也需要使用它。 雖然有其他功能更強大的資料結構也可以做到PQ的功能,例如Python的SortedList與C++中的set/map(底層是BST),但通常heap的速度最快速。本單元中一些題目也有其他的做法,將來應該都會有另外的單元來討論。