Try   HackMD

Python-LeetCode 581 第五招 優先佇列 Priority Queue/Heap

很噴口水的一句話「After importing heapq, people push data and then pop the minimal with procedure heappushpop in Python」,因為好多「p」。

基本操作

優先佇列(Priority Queue,簡稱PQ),也常以其實作方式Heap來稱呼,是一個教科書很標準、應用很廣的資料結構。相對於先進先出的Queue、先進後出的Stack、雙端開口的Deque,PQ具有優先權概念,優先權高的會先出來。
以Heap來實作PQ其實並不困難,它是以陣列來實現完全二分樹的觀念,所以寫起來好寫,跑起來也快,網路上很容易找到相關原理的教學,我們這裡就不贅述,只著重在它的應用以及Python的使用方式。

Python的PQ在名為heapq.py的模組中,LeetCode的環境已經將它import進來,所以我們只要會應用即可,想要看完整的說明可以查Python的文件。以下我們說明常用且必要知道的幾個用法與知識:
Python的PQ不是個封起來的資料結構,而只是一套演算法的實作,所以你放資料的list就自己準備好,需要使用時候叫它提供的函數就可以了。以下程式示範一些常用的動作。

from heapq import * # not necessary for leetcode pq = [] # default min-heap for i in range(5): heappush(pq,i) # push data print(pq[0]) # top at [0] imin = heappop(pq) # pop min # each element is a 2-tuple. # To make a max-heap, you may use -x for data x q2 = [(-2,-3),(-4,-1),(-1,-5),(-4,-2)] heapify(q2) # heapify a heap while q2: x,y = heappop(q2) print(-x,-y)

使用時只要注意以下幾件事就夠了

  • 預設是個min-heap,最小值top在list[0]的位置。
  • 增加元素就用heappush(pq,item)
  • 取出(刪除)最小值就用x = heappop(pq)
  • 如果元素是tuple,list或string,python的比較都是字典順序(lexicographic order)
  • 如果要用max-heap,一個簡單的方法是把數值正負反轉,像上面的範例那樣。
  • 如果列表中一開始有資料,可以用heapify來初始化heap,雖然用sort也可以,但heapify比較快。
  • 插入與刪除的時間複雜度都是
    O(logn)
    ,查看就是一般的list取值O(1)。

heapq內還有其他一些比較複雜的函數,有興趣的可以自行參考。

運用時機與技巧

從PQ所支援的操作就可以看出來,PQ運用的時機在於要持續的對一群資料找出最小值(或最大值)。如果資料不會變動,那也用不著PQ,因為簡單的排序就足夠。PQ的需求是要應付動態資料,也就是資料會變動的情形。
如果只需要找最小值與刪除最小值,那麼我們還是可以用排序來處理,當資料有可能新增時,排序就無法應付了,如果每次新增資料就進行重新排序,那時間複雜度會太高。舉例來說,如果將資料排序放在list中,每次新增時,用二分搜找到位置再將新資料插入序列中,類似下面的程式

import bisect q = list(range(10)) item = 5 bisect.insort_left(q,item) print(q)

在資料量不大時,這方法跑起來不慢,但一次新增的複雜度還是

O(n+logn)=O(n),複雜度不是吃素的,當資料量很大時,這不是一個好的方法。
PQ最適合的時機就是我們可能新增資料、找最小值、以及刪除最小值的時候。

但如果資料可能修改(變大或變小),這時候還可以用PQ嗎?如果我們自己寫一個heap,在了解heap運作原理後,其實不難處理資料變大與變小的狀況。不過在解題時不適合自己去寫一個heap,其實競程發展出來一些應變的方式。

偷懶的原則
現在要應付的狀況是我們有一群資料放在heap中,在運算的過程中,其中某個資料值改變了。我們處理的方法是一個「偷懶」的原則。以min-heap為例,當heap內資料的值變小的時候,「不要積極的去修改容器內的資料,我們就直接再丟一份變小之後的值到heap裡面,同時,在外部用一些方式去紀錄,讓我們足以分辨將來從heap出來的資料是否是已過期的資料」。
因為min-heap是小的資料先出來,所以將來新的資料一定會先出來,我們會抓到需要的值。但因為heap內會留存許多過期的資料「屍體」,它將來有可能會從heap中出來,我們必須能夠加已分辨。
這麼做會不會浪費時間與空間呢?理論上確實會,但通常實際影響不大。要知道heap的插入刪除動作都是

O(logn)的時間,即使資料屍體數量膨脹到
O(n2)
,時間其實才變兩倍。萬一萬一萬一,真的空間與時間不足,我們可以在運作過程中,做一些清理的動作,把資料倒出來,活的放回去,屍體丟棄。其實很少看到題目需要做這個動作。
如果資料變大呢?用類似的操作即可,我們在外部紀錄資料目前的值,如果出來的值不是紀錄的值,就是修改之前的過期資料。

以下我們看範例會更容易了解實作細節。

範例

Heap + Linked List

以下這題是很明顯PQ的應用。
(題目連結) 23. Merge k Sorted Lists (Hard)
題目是說有

k個已經排好序的linked list,要把它合併成一個排好序的linked list。合併兩個sorted list是基本程式學習過程中常見的一個程序,將其擴展到
k
個list實在也不難,這題列在難題的原因一個是這裡的
k
很大,可能到達
104
,此外此題必須操作在題目提供的linked list class而非陣列,所以必須對Linked list的操作有所了解。

sorted list的merge程序其實相當直接易懂,而且大多數學程式的人都曾學過,這個程序也是著名merge sort的核心。

Algorithm for merging sorted lists

  • 初始一個空的串列用來放結果。
  • 每一回合在所有list最前方的元素中挑出最小值,將其移到結果串列的尾端。重複這步驟直到所有串列元素皆移完畢。

算法的正確性來自小於的遞移性:各串列最前方的元素是該串列的最小元素,所以所有最前方元素中的最小值就是所有剩下元素中最小的。

在本題中要實現上述演算法,我們要克服兩個問題:

  1. 如何快速的找出
    k
    個串列最前方元素的最小值。
  2. 如何操作串列鏈結的尾端插入動作。

第一個問題得答案是使用PQ維護所有串列最前方的元素。第二個問題是資料結構的基本技能,如果忘掉了可以查一下教科書或網路。基本上,我們用一個變數

tail指著串列的尾端,插入新結點時,我們產生一個新結點,將tail.next指向該新結點即可。

以下看範例程式後我們再說明一些細節。

# Definition for singly-linked list. # class ListNode: # def __init__(self, val=0, next=None): # self.val = val # self.next = next class Solution: # pq holds the current smallest of every list def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]: k = len(lists) pq = [] # (value, idx) for i in range(k): if lists[i]: heappush(pq,(lists[i].val,i)) lists[i] = lists[i].next head = tail = ListNode() # result, dummy head while pq: v,i = heappop(pq) tail.next = ListNode(v) # new node of val v tail = tail.next # move to last if lists[i]: # push the next if existing heappush(pq, (lists[i].val, i)) lists[i] = lists[i].next return head.next

前5行是題目給的說明,LeetCode這一類題目會給你它定義好的結點結構,你必須呼叫它來產生你的結點與答案。這裡我們只需要知道該串列結點的結構,以及產生一個物件ListNode時要給予的資料即可,
第9行初始一個空的列表當作PQ用。第11 ~ 14行將每個串列最前方的元素數值放入PQ,因為將來每次取出最小值之後,我們需要知道該最小值來自於哪一個串列,以便將它的下一個元素放入PQ,因此我們放在PQ中的每一個元素是一個

(value,index)的tuple,如第13行,其中要比較的值必須放在第一欄位。同時,每個串列當前方元素被移入PQ後,我們將指標指向該串列的下一個元素(第14行)。

在串列新增時,空串列與非空串列稍微有所不同,因此,在開始挑最小值之前,我們先建構一個串列只含一個結點,這是一個dummy head。這樣我們不必考慮空串列的情形,這也是一個sentinel簡化程式的方式。變數tail會永遠指著串列的最後一個結點。

第16行的while迴圈會跑到PQ為空為止,也就是所有串列都是空的時候才會結束。每次進入迴圈,從PQ中彈出最小值,第一個欄位是值

v,第二個欄位是來自哪個串列的index
i
。於是我們建構一個結點
(val=0,next=None)
,然後將最後一個結點tail.next設為此新結點(第18行)。第19行將tail移到新的結尾。第20 ~ 22行將第
i
個串列的下一個元素放入PQ中(如果還有的話)。最後,答案在dummy head的next。
時間複雜度為
O(nlogk)
,因為PQ的大小為
O(k)
,操作了
O(n)
次,其中
n
為所有串列的長度總和。

Heap + Sorting

(題目連結) 1942. The Number of the Smallest Unoccupied Chair (Medium)
題目說有

n個人參加一場聚會,每個人有各自的到達與離開時間,每個人到達時,會坐在當時編號最小的空位上,請計算出編號為
target
的人會坐在哪一個位子。

我們用直接的想法來做。我們將每個人到達與離開的時間各自排序,模擬到達(取最小空位)與離開(歸還坐位)的動作。既然每次要挑出最小的空位,我們用min-heap維護好所有空位。因為空位需要歸還,我們用list紀錄每一個人所坐的坐位編號。
以下是範例程式。

class Solution: def smallestChair(self, times: List[List[int]], targetFriend: int) -> int: n = len(times) empty = list(range(n)) seat = [-1]*n arrival = sorted((t[0],i) for i,t in enumerate(times)) leave = sorted((t[1],i) for i,t in enumerate(times)) j = 0 for t,i in arrival: while leave[j][0]<=t: heappush(empty,seat[leave[j][1]]) seat[leave[j][1]] = -1 j += 1 seat[i] = heappop(empty) if i==targetFriend: return seat[i] #

第4行是所有空位編號的PQ,初始是由0 ~

n1的所有編號,因為我們用list(range(n))是排好序的,就不必再做heapify的動作。
第5行的seat用來記錄每個人坐位編號。第6行取出到達時間並加以排序,第7行取出離開時間並加以排序。
第9行的for迴圈模擬每一位依序到達時的狀況。第10 ~ 13行我們根據離開時間,將在這一位到達前離開的坐位一一歸還,然後在第14行取出編號最小的空位。
時間複雜度
O(nlogn)
。因為排序是
O(nlogn)
,有
O(n)
次PQ的操作也是
O(nlogn)

偷懶原則的範例

下面這一題在第四招前綴和單元時出現過的題目,當時是用來介紹單調隊列,它其實也可以用PQ來做。
(題目連結) 239. Sliding Window Maximum (Hard)
題目說有一個長度為k的sliding window在一序列中一步一步移動,請計算出在每一個位置時的區間最大值,也就是

max(nums[i:k+i]))對於所有可能的
i

這題用單調隊列來做時間複雜度只需要

O(n),我們也可以用PQ來做,雖然會比較差一點。想法是把window內的數值丟到一個max-heap內去維護,每次拿出最大值時,要檢查該數值是否已經離開了window。以下是範例程式。

class Solution: # using pq def maxSlidingWindow(self, nums: List[int], k: int) -> List[int]: if k==1: return nums[:] # (-value,index) max-heap pq = [(-p,i) for i,p in enumerate(nums[:k])] heapify(pq) ans = [-pq[0][0]] for i,p in enumerate(nums[k:],start=k): while pq[0][1] <= i-k: #out of date, never empty heappop(pq) heappush(pq,(-p,i)) ans.append(-pq[0][0]) return ans

第4行我們特判一下視窗寬度

k=1的情形。在一般情形下,我們初始先將前
k
放入pq中,然後做heapify的動作。因為要取最大值,我們放入heap的數值取負號,這樣最大會變最小而先彈出heap。因為要偵測是否出界,所以heap內的元素使用tuple,第二個欄位放index。初始後,pq[0]就是第一個視窗的答案,取出時要再轉一次負號。
接著第9行開始從第
k
個一一掃描序列,每次放進去一個,用while迴圈將已經出界的最大值移除,如果不是最大值的出界就不理它。
時間複雜度
O(nlogn)

minHeap + maxHeap

我們來看一個用到兩個heap組合成一個資料結構的有趣例子。
(題目連結) 295. Find Median from Data Stream (hard)
這個題目是說有一序列的整數會逐步加入,你要實作一個class支援隨時查詢已加入整數的中位數(median)為何。依照題目給的定義,實作三個函數分別執行:初始化、加入一個整數、以及查詢中位數。在偶數個資料時,中位數此處定義為兩個中位數的平均值(實數)。

如果要靜態的在

n個數中找一次median,最簡單是用排序,時間是
O(nlogn)
。在演算法中,也有更快速
O(n)
時間的方法,但是對於本題來說都不夠好,因為本題的要求是資料逐步加入,而可以多次查詢。

這題目挺有趣,需要動一下腦筋。我們的heap可以找最小,可以找最大,那麼正中間的數呢?
江湖一點訣,說破不值錢。
我們把已進入的數列依照數值大小分成兩半,比較小的數放入一個max-heap;比較大的數放入一個min-heap,那麼median就在這兩個heap的top上了。
每加入一個資料或查詢的時間複雜度都是

O(logn)。實作時要注意資料數量為奇數與偶數時的狀況。

class MedianFinder: # two heap def __init__(self): self.large = [] # min heap self.small = [] # max heap, using negative # len: large = small or small+1 (odd) def addNum(self, num: int) -> None: if not self.large or num >= self.large[0]: heappush(self.large, num) if len(self.large)> len(self.small)+1: x = heappop(self.large) heappush(self.small,-x) else: heappush(self.small, -num) if len(self.small)> len(self.large): x = heappop(self.small) heappush(self.large,-x) def findMedian(self) -> float: if len(self.large) > len(self.small): return self.large[0] return (self.large[0]-self.small[0])/2 #even number # Your MedianFinder object will be instantiated and called as such: # obj = MedianFinder() # obj.addNum(num) # param_2 = obj.findMedian()

初始化時我們就初始兩個空列表large與small。這裡的寫法是計畫維持

len(small)len(large)len(small)+1
也就是說當奇數時在
large
多放一個,否則兩邊數量一樣多。
當加入一個整數
num
時,我們依照
num
的大小判斷丟進
large
small
,但如果丟進去的一邊數量太多(違反我們的計畫),我們就從數量太多的heap中pop中一個來丟到另外一邊,以維持計畫的數量。max-heap的部分我們藉由將key值轉負號的方式來實現。
查詢時就非常簡單,依照奇數或偶數回傳題目定義的中位數即可。

Heap + Greedy

Heap能夠很快地找出最小值,與貪心法則(Greedy)的需求搭配,所以Heap與Greedy的搭配是很自然的組合。以下是一例。
(題目連結)1353. Maximum Number of Events That Can Be Attended (Medium)
題目是有一些活動,每個活動有開始與結束的日期,每一個活動在開與結束之間的任何一天都可以去參加該活動,一個活動只要參加一天就算有參加,但你每天只能參加一場活動,請問最多可以參加多少場活動。

很顯然的,一場活動只要參加一天就好了,因為同一個活動參加兩天並不會得到好處。這其實是個排程(Scheduling)的問題:每個job都需要一單位的時間,每個job有arrival time與deadline,只能在該區間內完成該工作,否則該工作就只能丟棄。請問最多可以完成多少件工作。
答案是:Greedy, earliest deadline first。
Greedy算法的證明大多是相同的架構,證明我們在每一個步驟的挑選是正確的。假設另外有一個最佳解與Greedy挑選的不一樣,那麼我們可以將最佳解做一個替換,得到與Greedy挑選一樣的最佳解。

本題的證明:假設Greedy的解與最佳解OPT第一個不一樣的是Greedy在第

d天參加活動
E

  • 如果最佳解OPT始終沒有參加活動
    E
    ,那麼把OPT的第
    d
    改成活動
    E
    ,修改之後的解不會變差。
  • 如果最佳解OPT第
    d
    天參加了活動
    F
    ,在第
    p
    天參加了活動
    E
    。因為Greedy是採取earliest deadline first,我們知道
    F
    的deadline不小於
    E
    ,因此在OPT中調換
    E
    F
    參加的日期是一個合法的排程。

因為活動有開始時間,不能一股腦兒將所有的活動依照deadline排序後挑選。我們以一個min-heap維護目前已經開始的活動,並且我們用一個變數avail維護好下一個可以參加活動的日期。
我們將所有活動依照開始時間排序後一一檢視。在下一個活動開始前,從heap中挑選出已經開始且deadline最小的活動,如果它的deadline不小於avail,也就是還來得及參加,我們就參加;否則放棄此活動。當下一個活動的開始日期之前能參加的活動都挑選結束後,我們把下一個活動放入heap中以便一起參與比較。

class Solution: # earlies deadline first, with arrival time def maxEvents(self, events: List[List[int]]) -> int: events.sort() # sorted by starting time events.append([100001,100001]) # dummy sentinel avail = 0 # the available(free) day cnt = 0 # answer pq = [] # endtime for s,t in events: while pq and avail<s: # before next arrival deadline = heappop(pq) # earliest deadline if deadline >= avail: # attend cnt += 1 avail += 1 # else: pass # discard, do nothing avail = s heappush(pq,t) return cnt

第5行我們加入一個假的活動,開始時間晚於所有輸入的活動,這是當哨兵(sentinel)使用,因為前述的演算法在所有活動皆到達之後,heap內還會有一些活動要挑選,為了簡化程式,我們加上一個哨兵。
時間複雜度

O(nlogn)。因為排序是
O(nlogn)
,有
O(n)
次PQ的總操作時間也是
O(nlogn)
,均攤分析。

Heap + Dijkstra

接下來看一個二維陣列走訪的題目。
778. Swim in Rising Water (Hard)
本題中,輸入一個

n×n網格(矩陣),
n2
個格子內是0 ~
n21
這些整數的排列,代表每一個格子的海拔高度。假設雨水在時間
t
的時候水位高度是
t
,你可以從一個格子游到上下左右相鄰格子,只要兩個格子的海拔都不大於
t

假設游泳的時間不計,請問何時可以由左上角
(0,0)
游到右下角
(n1,n1)
。也就是說要找出最小的時間
t
,滿足在時間
t
時會存在起點到終點一條路徑,此路徑上每個格子海拔都
t

這題有不只一種合理的解法。這裡我們要說明的是運用heap的方法。如果把網格當graph來看,這是一個最短路徑問題的相似題,權重在點上而不是在邊上,並且最小化的不是權重的和而是最大的權重(mini-max版本或稱bottleneck版本),做法與最短路徑類似,可以用修改的Dijkstra演算法來做。即使不了解Dijkstra,也不太難明瞭以下說明的本題演算法。

演算法

  • 變數curr是目前的水位,初始值給起點與終點的較大值,因為答案不可能低於這個值。運算過程中這個值只可能上升不會下降,當看見終點時,這個值就是答案。
  • 用min-heap pq維護有鄰居可以被走得到那些點的座標並以該點高度當作key。
  • seen紀錄已經看過的點,包含已經走得到的點與目前在heap中的點。seen的作用是避免一個點重複被檢查。
  • 每回合從pq中取出高度最低的點,如果這點的高度高於目前高度curr,則更新curr,因為低於此高度的點已經拜訪完,而此點是尚未被拜訪且走得到的點中高度最低的,所以低於此高度不可能有答案。如果取出的高度低於或等於curr,則curr不做變動。
  • 對於每回合pq中取出的點,檢查它的四個鄰居,如尚未被看過,則納入pq內。

以下是實作上述的演算法。我們用二維陣列來做seen,並且右方與下方圍半圈True來避免走出界。
如不熟悉Grid的走訪與哨兵技巧,請參考 Python-LeetCode 第三招 Grid Traversal

class Solution: def swimInWater(self, grid: List[List[int]]) -> int: n = len(grid) seen = [[False]*n+[True] for i in range(n)] seen.append([True]*n) curr = max(grid[0][0],grid[n-1][n-1]) pq = [(grid[0][0],0,0)] seen[0][0] = True while pq and not seen[n-1][n-1]: h,r,c = heappop(pq) if h>curr: curr = h for nr,nc in ((r,c-1),(r,c+1),(r-1,c),(r+1,c)): if not seen[nr][nc]: heappush(pq,(grid[nr][nc],nr,nc)) seen[nr][nc] = True #end while return curr

時間複雜度

O(n2logn),輸入為
n×n
矩陣,故資料量是
O(n2)

結語

優先佇列PQ是一個很重要的資料結構,Python中heapq.py是以min-heap實作PQ的一套模組,它支援新增資料與查詢或刪除最小值的操作,刪除與新增的時間複雜度都是

O(logn)

PQ的應用很多,本單元示範了LeetCode上幾個典型的題目,其他類似題目可以透過tag去搜尋。PQ常常用在其他演算法中用來在一個步驟中找到某個最小或最大值,所以與Greedy或DP的結合都是會出現的。在一些其他著名的演算法中,例如Dijkstra與Prim演算法中也需要使用它。

雖然有其他功能更強大的資料結構也可以做到PQ的功能,例如Python的SortedList與C++中的set/map(底層是BST),但通常heap的速度最快速。本單元中一些題目也有其他的做法,將來應該都會有另外的單元來討論。