owned this note
owned this note
Published
Linked with GitHub
# 平行運算
大多數現代電腦擁有不只一個核心,多個電腦更可以整合在一起成為一個叢集。駕馭多核心的強大能力讓很多運算可以被快速地完成。有兩個影響效能的主要因素:CPU核心本身的速度,以及他們存取記憶體的速度。在一個叢集當中,顯然一個CPU跟他存取的記憶體在同一台電腦(node)上,會有最快的存取速度。也許更驚訝的是,在典型的多核心筆電上也有類似的議題,是由於主記憶體以及[快取](https://www.akkadia.org/drepper/cpumemory.pdf)速度的差異。最後,一個多行程(multiprocessing)的環境應該允許一個特定的CPU「擁有」一段記憶體空間的控制權。Julia提供了一個多行程環境,允許程式可以,藉由訊息傳遞的方式,同時在多個記憶體空間獨立的行程上執行。
Julia所實作的訊息傳遞的方式不同於其他環境,像是MPI [^1]。Julia當中通訊的一般來說是「單向的」,意思是,在兩個行程之間的操作,程式設計師需要明確地管理其中一個行程。此外,這些操作基本上並不像是"message send"跟"message receive",而更像是高階的操作,就像呼叫函式一般。
在Julia中,平行程式設計(parallel programming)是建構在兩個基本單元上:*remote references*和*remote calls*。一個remote reference是一個物件,他可以被任何行程使用,而且指向一個儲存於特定行程的物件。一個remote call是一個由行程發出的請求(request),在另一個(可能相同)行程上,來呼叫特定函式,並給定參數。
Remote references會以兩種形式出現:[`Future`](@ref)與[`RemoteChannel`](@ref)。
一個remote call會回傳一個[`Future`](@ref)作為結果。Remote call會立即回傳;行程會在其他地方讓函式呼叫繼續運算下去。你可以在回傳的[`Future`](@ref)上呼叫[`wait()`](@ref)來等remote call完成,以及可以使用[`fetch()`](@ref)來獲得完整的運算結果。
另一方面,[`RemoteChannel`](@ref)是可以被重寫的。舉例來說,多個行程可以互相協調,藉由參考同一個遠端的`Channel`。
每個行程會連結到一個辨識子。提供了互動式Julia prompt的行程的`id`一定是等於一。預設被用來執行平行操作的行程被視為"workers"。當只有一個行程存在,行程1也是個worker。否則,worker就會是除了1以外的全部行程。
我們一起來試試看吧。以`julia -p n`開始,會在在地機器上提供`n`個worker行程。
一般來說,讓`n`等同於機器的CPU核心數是合理的。
```julia
$ ./julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, Nullable{Any}())
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, Nullable{Any}())
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
```
[`remotecall()`](@ref)的第一個參數是要呼叫的函式。Julia中大多數的平行程式設計不需要操考特定的行程或是行程的編號,但[`remotecall()`](@ref)被設計為一個低階的介面來提供更細緻的控制。[`remotecall()`](@ref)的第二個參數是指定執行函式的行程`id`,剩下的參數將會傳遞給被呼叫的函式。
如你所見,第1行我們要求行程2建構一個2-by-2的隨機矩陣,以及第2行我們要求再加1到它身上。兩個計算的結果可以在`r`跟`s`得到。[`@spawnat`](@ref) macro會在第1個參數指定的行程上,對第2個參數的表達式求值。
有時候你可能希望馬上對遠端運算求值。這一般會發生在你讀取遠端的物件來獲取資料,而資料馬上被在地的運算所需要。[`remotecall_fetch()`](@ref)就是為了這種情形存在的。他等價於`fetch(remotecall(...))`,但更為高效。
```julia
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085
```
還記得[`getindex(r,1,1)`](@ref)[等價](@ref man-array-indexing)於`r[1,1]`,這個呼叫會取回future `r`的第一個元素。
[`remotecall()`](@ref)的語法不是特別方便。[`@spawn`](@ref) macro會讓事情更簡單。他操作表達式,而非函數,他會自動幫你挑選執行的行程:
```julia
julia> r = @spawn rand(2,2)
Future(2, 1, 4, Nullable{Any}())
julia> s = @spawn 1 .+ fetch(r)
Future(3, 1, 5, Nullable{Any}())
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
```
特別提醒,我們用`1 .+ fetch(r)`而不是`1 .+ r`。這是因為我們不知道程式碼會在哪裡執行,所以一般來說[`fetch()`](@ref)可能必須將`r`移到執行加法的行程去。在這個案例中,[`@spawn`](@ref)會知道在擁有`r`的行程上執行運算,所以[`fetch()`](@ref)將會是個no-op (no work is done)。
(值得一提的是[`@spawn`](@ref)並非內建的但確實以[macro](@ref man-macros)的形式定義在Julia中。你也可以定義你自己的macro。)
要記得一個重要的事物是,一旦fetch後,[`Future`](@ref)將會暫存值在本地。又一次[`fetch()`](@ref)呼叫並不用把計算網路再走過一遍。一旦所有被參考的[`Future`](@ref)都被fetch了,遠端儲存的值就會被刪除。
## 程式碼可用性與載入套件
你的程式碼必須在任何執行的行程上可以被存取。舉例來說,打入以下的程式到Julia命令列:
```julia
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawn rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
[...]
```
行程1知道`rand2`函式,但是行程2不知道。
最一般的狀況,你會從檔案或是套件載入程式碼,你會有可觀的彈性來控制哪一個行程需要載入程式碼。考慮一個檔案,`DummyModule.jl`,包含了以下程式:
```julia
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
```
用`julia -p 2`開始Julia,你可以用這個來驗證以下的事例:
* [`include("DummyModule.jl")`](@ref)會只載入檔案到一個行程中(載入到執行這行敘述式的行程中)。
* `using DummyModule`會載入模組到所有行程中;然而,模組只會被帶到執行這行敘述式的作用域中。
* 一旦`DummyModule`被載入到行程2,以下指令
```julia
rr = RemoteChannel(2)
put!(rr, MyType(7))
```
允許你儲存`MyType`的物件到行程2,即使`DummyModule`不在行程2的作用域中。
你可以使用[`@everywhere`](@ref) macro強制程式在每個行程上執行。舉個例子,`@everywhere`也可以直接在所有行程上定義函式:
```julia
julia> @everywhere id = myid()
julia> remotecall_fetch(()->id, 2)
2
```
一份檔案也可以在開始之時預先載入到多個行程上,並且一份驅動腳本可以被用來起動運算:
```
julia -p <n> -L file1.jl -L file2.jl driver.jl
```
上例中,執行驅動腳本的Julia行程擁有的`id`為1,就如同提供互動式命令列的行程一樣。
Julia的基本本安裝中已經內建支援兩種叢集:
* 一個本地叢集,需要指定`-p`選項,如同以上的例子。
* 一個叢集擴充機器,使用`--machinefile`選項。他使用了無密碼的`ssh`登入方式來啟動特定機器上的Julia worker行程(與目前主機有相同的路徑)。
[`addprocs()`](@ref)、[`rmprocs()`](@ref)、[`workers()`](@ref)函式,以及其他可用函式,如同字面上的含意,允許在叢集上增加、刪除、查詢行程。
重要的是,workers不會執行`.juliarc.jl`起始腳本,他們也不會與任何其他的行程同步他們的全域狀態(像是全域變數、新方法的定義,以及載入模組)。
其他種類的叢集可以藉由客製化自己的`ClusterManager`來支援,可以參考[ClusterManagers](@ref)章節。
## 搬移資料
傳遞訊息及搬移資料構成了大多數平行程式中的經常支出。減少訊息的數量以及資料的大小是達成效能與規模化的重要議題。為了這個目的,了解Julia多樣的資料搬移指令是重要的。
[`fetch()`](@ref)可以視為顯式的資料搬移操作,由於他直接要求一個物件被移動到本機。[`@spawn`](@ref)(以及一些相關的指令)也會搬移資料,但他不明顯,因此他可以被稱為隱式的資料搬移操作。利用這兩個方法來建構以及平方隨機矩陣:
方法一:
```julia
julia> A = rand(1000,1000);
julia> Bref = @spawn A^2;
[...]
julia> fetch(Bref);
```
方法二:
```julia
julia> Bref = @spawn rand(1000,1000)^2;
[...]
julia> fetch(Bref);
```
看起來差異微不足道,但是事實上[`@spawn`](@ref)的行為導致差異十分巨大。
在方法一中,隨機矩陣在本地被建構,然後傳遞給另一個行程處理平方。在方法二中,隨機矩陣在另一個行程被建構及平方。因此,第二個方法比第一個傳遞更少資料。
在這個範例中,兩個方法很容易區別,也容易選擇。然而,在實際程式中設計資料搬移可能需要更多思考跟一些量測。例如,如果第一個行程需要`A`矩陣,那麼第一個方法可能更好。或是說,如果計算`A`是昂貴的,而且只有目前行程擁有他,那麼將他移動到另一個行程去可能是無可避免的。又或者是,如果目前行程需要用到[`@spawn`](@ref)與`fetch(Bref)`處理的事情不多,一並移除平行運算可能是更好的。或是設想`rand(1000,1000)`被取代為更昂貴的運算。那麼為了這個步驟增加另一個[`@spawn`](@ref)敘述式可能是更合理的。
# 全域變數
藉由`@spawn`來遠端執行表達式,或是使用`remotecall`來遠端執行指定的閉包,都可能參考到全域變數。在`Main`模組底下,全域變數的綁定與在其他模組底下的綁定有一點點不同。考慮以下程式片斷:
```julia
A = rand(10,10)
remotecall_fetch(()->foo(A), 2)
```
`A`是個全域變數,定義於本地。Worker 2在`Main`之下並沒有稱為`A`的變數。傳送閉包`()->foo(A)`給worker 2的動作會在worker 2上定義`Main.A`。`Main.A`會持續存在在worker 2上,甚至直到`remotecall_fetch`回傳為止。有嵌入全域變數參考(只有在`Main`模組之下)的remote call管理全域變數的方式有:
- 如果他們作為remote call的一部份參考的話,在目的worker上會製造出新的全域變數的綁定。
- 全域常數也會在遠端結點上宣告出常數。
- 全域變數只有在有remote call的情況下值被改變會被重新傳遞給目標worker。叢集並不會在結點之間同步全域變數的綁定。
舉例而言:
```julia
A = rand(10,10)
remotecall_fetch(()->foo(A), 2) # worker 2
A = rand(10,10)
remotecall_fetch(()->foo(A), 3) # worker 3
A = nothing
```
執行以上的程式片斷,可以看到在worker 2的`Main.A`與worker 3的`Main.A`有不同的值,然而在node 1上的`Main.A`卻被設為`nothing`。
就如同你理解的,當在master上配置給全域變數的記憶體空間被重新指定,他們會被收集回來,若是worker上的綁定持續有效的話,這件事不會發生。
[`clear!`](@ref)可以用來手動重新指定遠端結點特定的全域變數成為`nothing`,一旦他們已經不再需要。這將會,作為常規garbage collection的一部份,釋放任何與他們有關聯的記憶體。
因此,程式應該小心參考remote call的全域變數。事實上,如果可能的話,我們更傾向避免他們。如果你一定要參考全域變數,考慮使用`let` blocks來限定全域變數。
例如:
```julia
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @spawnat 2 whos();
julia> From worker 2: A 800 bytes 10×10 Array{Float64,2}
From worker 2: Base Module
From worker 2: Core Module
From worker 2: Main Module
```
如你所見,全域變數`A`被定義於worker 2,但`B`被視為一個local variable,而因此`B`的綁定不存在於worker 2。
## Parallel Map以及迴圈
幸運的是,很多好用的平行運算不需要資料搬移。一個常見的例子是蒙地卡羅模擬,多行程可以同時處理獨立的模擬試驗。我們可以用[`@spawn`](@ref)在兩個行程上擲硬幣。首先,撰寫以下函式到`count_heads.jl`:
```julia
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
```
`count_heads`函式單純把`n`個亂數bits加在一起。我們來看看如何在兩台機器上模擬試驗,以及把結果加在一起:
```julia
julia> @everywhere include("count_heads.jl")
julia> a = @spawn count_heads(100000000)
Future(2, 1, 6, Nullable{Any}())
julia> b = @spawn count_heads(100000000)
Future(3, 1, 7, Nullable{Any}())
julia> fetch(a)+fetch(b)
100001564
```
這個例子示範了強大的平行程式設計常用的模式。多次迭代在一些行程上獨立執行,而且他們的結果會被某些函式合併。合併的函式被稱為*化約(reduction)*,因為廣義來說他就是tensor-rank-reducing:一個向量的數字被化約成單一數字,或是一個矩陣被化約成單一的列或是行等等。程式碼中,他看起來就像典型的`x = f(x,v[i])`模式,當中`x`是個累加的角色,`f`則是化約用的函式,而`v[i]`是被化約的元素。`f`是符合結合律的就更棒了,如此一來就不用管運算的先後順序了。
我們使用在`count_heads`的這個模式可以被廣義化。我們用兩個顯式的[`@spawn`](@ref)敘述,會限制平行在兩個行程上。為了要在任意數量的行程上執行,我們可以使用*parallel for loop*,可以在Julia中用[`@parallel`](@ref)寫成這樣:
```julia
nheads = @parallel (+) for i = 1:200000000
Int(rand(Bool))
end
```
他實現了指定迭代給多個行程的模式,也藉由特定的化約(在這個例子中是`(+)`)來合併他們。每個迭代的結果是採取迴圈中最後一個表達式的值。對整個平行迴圈本身求值來得到最後的答案。
即使parallel for loop看起來像是循序的for loop,但他們的行為是完全地不同。特別是,迭代並不會依序發生,寫入變數及陣列也不會是全域可見的,由於迭代會在不同的行程上執行。在parallel loop中使用的任何變數會被複製並且broadcast到每個行程。
例如,以下的例子並不會如預期地執行:
```julia
a = zeros(100000)
@parallel for i = 1:100000
a[i] = i
end
```
這份程式碼並不會初始化`a`的所有元素,因為每個行程會分別有一份他的複製品。
像這類的parallel for loop應該要避免。幸運地是,[Shared Arrays](@ref man-shared-arrays)可以處理這方面的限制:
```julia
a = SharedArray{Float64}(10)
@parallel for i = 1:10
a[i] = i
end
```
如果變數是唯讀的,在parallel loop使用"門外的"變數是非常合理的:
```julia
a = randn(1000)
@parallel (+) for i = 1:100000
f(a[rand(1:end)])
end
```
這裡的每次迭代會隨機從向量`a`選擇樣本,跑過`f`,其中向量是所有行程共享的。
如你所見,化約的運算子如果不必要可以省略。在那個案例中,迴圈非同步地執行,像是,他產生(spawn)獨立的任務到所有可用的worker並且立刻回傳裝有[`Future`](@ref)的陣列,不等待執行完畢。呼叫方可以等待[`Future`](@ref)完成,對他們呼叫[`fetch()`](@ref)之後會完成,或是在迴圈前加上[`@sync`](@ref),像是`@sync @parallel for`,來等他完成。
在不需要化約運算子的案例中,我們只不過希望對範圍內的所有整數(或是,更廣義來說,對某集合的所有元素)使用一個函式而已。有另外一個有用的運算稱為*parallel map*,被實作為Julia中的[`pmap()`](@ref)函式。舉例來說,我們可以平行地計算一些大型隨機矩陣的奇異值,如以下所示:
```julia
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svd, M);
```
Julia的[`pmap()`](@ref)是設計給每個函式呼叫會處理大量的工作的案例。相對,`@parallel for`可以處理每次迭代都很微量,也許只是兩個數字相加。[`pmap()`](@ref)及`@parallel for`兩者都只會使用worker行程來處理平行運算。`@parallel for`的案例中,最終的化約動作會在呼叫的行程上完成。
## 與Remote References同步
## 排程
Julia的平行程式設計平台使用[Tasks (aka Coroutines)](@ref man-tasks)在多個運算下互相切換。每當程式遇到一個溝通上的運算像是[`fetch()`](@ref)或[`wait()`](@ref),目前的task會暫停並且一個排程器會選取另一個task執行。當一個事件正等待執行完成,一個task會被重新啟動。
對很多問題來說,不必要直接去思考task。然而,他們可以被用來同時等待多個事件,這就是*動態排程*。在動態排程中,程式決定要計算什麼或是在哪裡計算,基於其他工作何時完成。這需要用來處理不可預測或是不均衡的工作量,就是我們想指定更多的工作給行程只有當他們結束他們目前的task。
一個例子,考慮計算不同大小矩陣的奇異值:
```julia
julia> M = Matrix{Float64}[rand(800,800), rand(600,600), rand(800,800), rand(600,600)];
julia> pmap(svd, M);
```
當一個行程處理都為800×800的矩陣,另一個行程處理都為600×600的矩陣,我們將無法得到我們想要的規模(scalability)。解法是,當每個行程完成目前的task,製造一個本地的task來"餵"工作給他。例如,考慮一個簡單的[`pmap()`](@ref)實作:
```julia
function pmap(f, lst)
np = nprocs() # determine the number of processes available
n = length(lst)
results = Vector{Any}(n)
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
nextidx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
if p != myid() || np == 1
@async begin
while true
idx = nextidx()
if idx > n
break
end
results[idx] = remotecall_fetch(f, p, lst[idx])
end
end
end
end
end
results
end
```
[`@async`](@ref)與[`@spawn`](@ref)很像,但只會在本地的行程上執行task。我們用他來創造一個"feeder" task。每個選取下一個需要運算的index,然後等待他手上的行程結束,接著重複直到我們用完所有的index。Feeder task不會開始去執行直到碰到[`@sync`](@ref) block的結束,在函式回傳前,這時候他會交出控制權並且等待所有的本地task完成。Feeder task可以藉由`nextidx()`來共享狀態,因為他們全部都跑在同一個行程上。不需要鎖定機制,由於thread被合作地(cooperatively)排程,並且不可搶先(preemptively)。這意味著context switches只發生在定義完好的地方:在這個例子中,是在[`remotecall_fetch()`](@ref)被呼叫之時。
## Channels
在[Control Flow](@ref)的[`Task`](@ref)章節討論了以合作的方式執行多個函式。[`Channel`](@ref)會是個十分有用的資料結構,用在執行task之間的資料傳遞,特別是,那些牽涉到I/O操作的部份。
涉及I/O的操作,包含讀寫檔案、存取網路服務、執行外部程式等等。在這些例子中,如果在讀檔的時候其他task可以執行,或是等待外部服務或程式完成,整體的執行時間就可以改善。
一個channel可以被視為管線,例如他有寫入端也有讀取端。
* 在不同task中有多個寫入可以藉由[`put!()`](@ref)函式並行地寫到同一個channel。
* 在不同task中有多個讀取可以藉由[`take!()`](@ref)函式並行地讀資料。
* 如同以下例子:
```julia
# 給Channels c1 and c2,
c1 = Channel(32)
c2 = Channel(32)
# 以及一個`foo()`函式,他會從c1讀東西,處理讀進來的東西
# 並且把結果寫到c2,
function foo()
while true
data = take!(c1)
[...] # process data
put!(c2, result) # write out result
end
end
# 我們可以排程`n`個`foo()`的實體來實現並行
for _ in 1:n
@schedule foo()
end
```
* Channel可以藉由`Channel{T}(sz)`建構子創造出來。Channel只會容納`T`型別的物件。如果沒有指定型別,channel則可以容納任何型別的物件。`sz`會指向可以在任何時間容納最多的元素數量。例如,`Channel(32)`創造了一個channel可以容納最多32個任意型別的物件。一個`Channel{MyType}(64)`則可以容納64個`MyType`型別的物件。
* 如果一個[`Channel`](@ref)是空的,讀取方(呼叫了[`take!()`](@ref))將會(block)直到有資料被放入。
* 如果一個[`Channel`](@ref)滿了,寫入方(呼叫了[`put!()`](@ref))將會(block)直到有空間可以放入。
* [`isready()`](@ref)會測試目前是否有任何物件在channel中,而[`wait()`](@ref)則會等待一個物件準備好。
* 一個[`Channel`](@ref)是一開始處於一個開放的狀態。這表示他可以藉由[`take!()`](@ref)跟[`put!()`](@ref)被自由地讀寫。[`close()`](@ref)會關閉一個[`Channel`](@ref)。在關閉的[`Channel`](@ref)上,[`put!()`](@ref)將會失敗。舉個例:
```julia
julia> c = Channel(2);
julia> put!(c, 1) # `put!` on an open channel succeeds
1
julia> close(c);
julia> put!(c, 2) # `put!` on a closed channel throws an exception.
ERROR: InvalidStateException("Channel is closed.",:closed)
[...]
```
* 在關閉的channel上,[`take!()`](@ref)以及[`fetch()`](@ref)(他會取回但不會移除值)可以成功回傳任何存在的值直到被取完。接續上例:
```julia
julia> fetch(c) # Any number of `fetch` calls succeed.
1
julia> fetch(c)
1
julia> take!(c) # The first `take!` removes the value.
1
julia> take!(c) # No more data available on a closed channel.
ERROR: InvalidStateException("Channel is closed.",:closed)
[...]
```
一個`Channel`可以被做為`for`迴圈中一個可迭代的(iterable)物件,在這個例子中只要`Channel`中含有資料或是是開放的狀態迴圈會持續執行。迴圈的變數會拿取所有被加入到`Channel`的值。一旦`Channel`關閉或是為空,`for`迴圈就會停止。
例如,以下程式碼會導致`for`迴圈持續等待更多的資料:
```julia
julia> c = Channel{Int}(10);
julia> foreach(i->put!(c, i), 1:3) # add a few entries
julia> data = [i for i in c]
```
然而這樣的程式碼在讀取所有資料後就會回傳:
```julia
julia> c = Channel{Int}(10);
julia> foreach(i->put!(c, i), 1:3); # add a few entries
julia> close(c); # `for` loops can exit
julia> data = [i for i in c]
3-element Array{Int64,1}:
1
2
3
```
考慮一個簡單使用channel的例子。我們起始4個task來處理來自單一`jobs` channel的資料。Job,藉由一個id (`job_id`)來辨識,會被寫到channel中。
在這個模擬中每個task會讀取`job_id`,並且等待隨機長度的時間並且寫回一個tuple,含有`job_id`以及時間到結果的channel中。最後,`results`中所有的東西會被印出來。
```julia
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
@schedule do_work()
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time,2)) seconds")
n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
```
最新版本的Julia multiplex所有的task到單一的系統執行緒上。因此,當task涉及I/O操作時,可以從平行執行中受惠,而運算吃重的task可以有效地在單一系統執行緒上循序執行。未來的版本可能支援task在多執行緒上的排程,如此一來,運算吃重的task也會見到平行執行的好處。
## Remote References及AbstractChannels
Remote reference總是指向一個`AbstractChannel`的實作品。
一個`AbstractChannel`的實作品(像是`Channel`),需要實作[`put!()`](@ref)、[`take!()`](@ref)、[`fetch()`](@ref)、[`isready()`](@ref)以及[`wait()`](@ref)。
遠端物件可以藉由一個[`Future`](@ref)來參考,他被存於一個`Channel{Any}(1)`中,也就是一個可容納`Any`型別物件為一的`Channel`。
[`RemoteChannel`](@ref),他可以被重寫,會指向任何型別、任何容量的channel,或是任何`AbstractChannel`的其他實作品。
`RemoteChannel(f::Function, pid)()`建構子允許我們建構指向容納多個特定型別的channel的參考。`f()`是一個函式,會接受`pid`作為參數,並且必須回傳一個`AbstractChannel`。
例如,`RemoteChannel(()->Channel{Int}(10), pid)`,他會回傳一個指向容納`Int`型別、容量為10的channel的參考。這個channel會存在於worker `pid`上。
在一個[`RemoteChannel`](@ref)上呼叫[`put!()`](@ref)、[`take!()`](@ref)、[`fetch()`](@ref)、[`isready()`](@ref)跟[`wait()`](@ref)方法會被代理到遠端行程裡的儲存上。
[`RemoteChannel`](@ref)可以因此被用來參考到自行實作的`AbstractChannel` 物件上。
`examples/dictchannel.jl`當中提供了一個簡單的例子,他使用了一個dictionary作為他遠端的儲存。
## Channels以及RemoteChannels
* 一個[`Channel`](@ref)是存在於一個行程中的。Worker 2不能直接參考到一個在worker 3的`Channel`,反之亦然。然而,一個[`RemoteChannel`](@ref)可以跨worker放入和拿取值。
* 一個[`RemoteChannel`](@ref)可以想成是一個`Channel`的*handle*。
* The process id,`pid`,標記在[`RemoteChannel`](@ref)上,代表後端儲存空間所在的行程,即後端`Channel`存在的空間。
* 任何擁有[`RemoteChannel`](@ref)參考的行程可以channel中從放入或是拿取東西。
資料會自動地被送去(或是接收自)[`RemoteChannel`](@ref)關聯到的行程。
* 序列化一個`Channel`也是序列化在當中的任何資料。因此,解序列化可以有效地造出原物件的副本。
* 另一方面,序列化一個[`RemoteChannel`](@ref)只包含序列化一個識別子,他會識別`Channel`的位置以及他參考到的實體。一個解序列化的 [`RemoteChannel`](@ref) 物件(在任何 worker中),因此,也會指向與原來相同的結構。
以上的 channels 範例可以被修改為適合行程間通訊,如以下示範。
我們啟動4個 workers 來處理單一 `jobs` 的 remote channel。工作,會藉由 id(`job_id`)來識別,會被寫到 channel。在這次模擬每個在遠端執行的 task 會讀到一個`job_id`,等待隨機長短的時間且寫入一個 `job_id` 的數組、使用的時間跟他自身的 `pid` 到結果 channel 中。最後所有 `results` 會在 master 行程被印出來。
```julia
julia> addprocs(4); # 加入 worker 行程
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # 在每個行程定義工作函式
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # 模擬真實工作的使用時間
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @schedule make_jobs(n); # 餵給 jobs channel "n" 個工作
julia> for p in workers() # 在 worker上開始 task 來平行地處理需求
@async remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time,2)) seconds on worker $where")
n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
```
## Remote References以及分散式垃圾回收
Objects referred to by remote references can be freed only when *all* held references in the cluster
are deleted.
The node where the value is stored keeps track of which of the workers have a reference to it.
Every time a [`RemoteChannel`](@ref) or a (unfetched) [`Future`](@ref) is serialized to a worker,
the node pointed to by the reference is notified. And every time a [`RemoteChannel`](@ref) or
a (unfetched) [`Future`](@ref) is garbage collected locally, the node owning the value is again
notified.
The notifications are done via sending of "tracking" messages--an "add reference" message when
a reference is serialized to a different process and a "delete reference" message when a reference
is locally garbage collected.
Since [`Future`](@ref)s are write-once and cached locally, the act of [`fetch()`](@ref)ing a
[`Future`](@ref) also updates reference tracking information on the node owning the value.
The node which owns the value frees it once all references to it are cleared.
With [`Future`](@ref)s, serializing an already fetched [`Future`](@ref) to a different node also
sends the value since the original remote store may have collected the value by this time.
It is important to note that *when* an object is locally garbage collected depends on the size
of the object and the current memory pressure in the system.
In case of remote references, the size of the local reference object is quite small, while the
value stored on the remote node may be quite large. Since the local object may not be collected
immediately, it is a good practice to explicitly call [`finalize()`](@ref) on local instances
of a [`RemoteChannel`](@ref), or on unfetched [`Future`](@ref)s. Since calling [`fetch()`](@ref)
on a [`Future`](@ref) also removes its reference from the remote store, this is not required on
fetched [`Future`](@ref)s. Explicitly calling [`finalize()`](@ref) results in an immediate message
sent to the remote node to go ahead and remove its reference to the value.
Once finalized, a reference becomes invalid and cannot be used in any further calls.
## [Shared Arrays](@id man-shared-arrays)
Shared Arrays use system shared memory to map the same array across many processes. While there
are some similarities to a [`DArray`](https://github.com/JuliaParallel/DistributedArrays.jl), the
behavior of a [`SharedArray`](@ref) is quite different. In a [`DArray`](https://github.com/JuliaParallel/DistributedArrays.jl),
each process has local access to just a chunk of the data, and no two processes share the same
chunk; in contrast, in a [`SharedArray`](@ref) each "participating" process has access to the
entire array. A [`SharedArray`](@ref) is a good choice when you want to have a large amount of
data jointly accessible to two or more processes on the same machine.
[`SharedArray`](@ref) indexing (assignment and accessing values) works just as with regular arrays,
and is efficient because the underlying memory is available to the local process. Therefore,
most algorithms work naturally on [`SharedArray`](@ref)s, albeit in single-process mode. In cases
where an algorithm insists on an [`Array`](@ref) input, the underlying array can be retrieved
from a [`SharedArray`](@ref) by calling [`sdata()`](@ref). For other `AbstractArray` types, [`sdata()`](@ref)
just returns the object itself, so it's safe to use [`sdata()`](@ref) on any `Array`-type object.
The constructor for a shared array is of the form:
```julia
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
```
which creates an `N`-dimensional shared array of a bits type `T` and size `dims` across the processes specified
by `pids`. Unlike distributed arrays, a shared array is accessible only from those participating
workers specified by the `pids` named argument (and the creating process too, if it is on the
same host).
If an `init` function, of signature `initfn(S::SharedArray)`, is specified, it is called on all
the participating workers. You can specify that each worker runs the `init` function on a distinct
portion of the array, thereby parallelizing initialization.
Here's a brief example:
```julia
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> S = SharedArray{Int,2}((3,4), init = S -> S[Base.localindexes(S)] = myid())
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
```
[`Base.localindexes()`](@ref) provides disjoint one-dimensional ranges of indexes, and is sometimes
convenient for splitting up tasks among processes. You can, of course, divide the work any way
you wish:
```julia
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = myid())
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
```
Since all processes have access to the underlying data, you do have to be careful not to set up
conflicts. For example:
```julia
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
```
would result in undefined behavior. Because each process fills the *entire* array with its own
`pid`, whichever process is the last to execute (for any particular element of `S`) will have
its `pid` retained.
As a more extended and complex example, consider running the following "kernel" in parallel:
```julia
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
```
In this case, if we try to split up the work using a one-dimensional index, we are likely to run
into trouble: if `q[i,j,t]` is near the end of the block assigned to one worker and `q[i,j,t+1]`
is near the beginning of the block assigned to another, it's very likely that `q[i,j,t]` will
not be ready at the time it's needed for computing `q[i,j,t+1]`. In such cases, one is better
off chunking the array manually. Let's split along the second dimension.
Define a function that returns the `(irange, jrange)` indexes assigned to this worker:
```julia
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in linspace(0,size(q,2),nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
```
Next, define the kernel:
```julia
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
```
We also define a convenience wrapper for a `SharedArray` implementation
```julia
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
```
Now let's compare three different versions, one that runs in a single process:
```julia
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
```
one that uses [`@parallel`](@ref):
```julia
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @parallel for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
```
and one that delegates in chunks:
```julia
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
```
If we create `SharedArray`s and time these functions, we get the following results (with `julia -p 4`):
```julia
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
```
Run the functions once to JIT-compile and [`@time`](@ref) them on the second run:
```julia
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
```
The biggest advantage of `advection_shared!` is that it minimizes traffic among the workers, allowing
each to compute for an extended time on the assigned piece.
## Shared Arrays以及分散式垃圾回收
如同remote reference,shared array在節點上也依賴垃圾回收來釋放來自所有參與的worker的reference。創造很多生命周期短的shared array物件的程式碼會受惠於儘可能快而明確地結束這些物件。
這導致記憶體及檔案中分享的片斷都可以更快地被釋放。
## ClusterManagers
在一個邏輯叢集,可以藉由叢集管理器來發起、管理Julia行程及建立網路。一個`ClusterManager`的職責有
* 在一個叢集環境下發起worker行程
* 對每個行程的整個生命周期進行事件管理
* 選擇性地,提供資料傳輸
一個Julia叢集有以下特徵:
* 初始的Julia行程,又稱為`master`,是特殊的且有`id`為1。
* 只有`master`行程可以增加或是刪除worker行程。
* 所有行程可以直接互相溝通。
Workers之間的連結(使用內建的TCP/IP傳輸)是以以下方式建立:
* [`addprocs()`](@ref) is called on the master process with a `ClusterManager` object.
* [`addprocs()`](@ref) calls the appropriate [`launch()`](@ref) method which spawns required number of worker processes on appropriate machines.
* Each worker starts listening on a free port and writes out its host and port information to [`STDOUT`](@ref).
* The cluster manager captures the [`STDOUT`](@ref) of each worker and makes it available to the
master process.
* The master process parses this information and sets up TCP/IP connections to each worker.
* Every worker is also notified of other workers in the cluster.
* Each worker connects to all workers whose `id` is less than the worker's own `id`.
* In this way a mesh network is established, wherein every worker is directly connected with every other worker.
While the default transport layer uses plain `TCPSocket`, it is possible for a Julia cluster to provide its own transport.
Julia provides two in-built cluster managers:
* `LocalManager`, used when [`addprocs()`](@ref) or [`addprocs(np::Integer)`](@ref) are called
* `SSHManager`, used when [`addprocs(hostnames::Array)`](@ref) is called with a list of hostnames
`LocalManager` is used to launch additional workers on the same host, thereby leveraging multi-core
and multi-processor hardware.
Thus, a minimal cluster manager would need to:
* be a subtype of the abstract `ClusterManager`
* implement [`launch()`](@ref), a method responsible for launching new workers
* implement [`manage()`](@ref), which is called at various events during a worker's lifetime (for
example, sending an interrupt signal)
[`addprocs(manager::FooManager)`](@ref addprocs) requires `FooManager` to implement:
```julia
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
```
As an example let us see how the `LocalManager`, the manager responsible for starting workers
on the same host, is implemented:
```julia
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
```
The [`launch()`](@ref) method takes the following arguments:
* `manager::ClusterManager`: the cluster manager that [`addprocs()`](@ref) is called with
* `params::Dict`: all the keyword arguments passed to [`addprocs()`](@ref)
* `launched::Array`: the array to append one or more `WorkerConfig` objects to
* `c::Condition`: the condition variable to be notified as and when workers are launched
The [`launch()`](@ref) method is called asynchronously in a separate task. The termination of
this task signals that all requested workers have been launched. Hence the [`launch()`](@ref)
function MUST exit as soon as all the requested workers have been launched.
Newly launched workers are connected to each other, and the master process, in an all-to-all manner.
Specifying the command argument `--worker <cookie>` results in the launched processes initializing
themselves as workers and connections being set up via TCP/IP sockets. Optionally, `--bind-to bind_addr[:port]`
may also be specified to enable other workers to connect to it at the specified `bind_addr` and
`port`. This is useful for multi-homed hosts.
As an example of a non-TCP/IP transport, an implementation may choose to use MPI, in which case
`--worker` must NOT be specified. Instead, newly launched workers should call `init_worker(cookie)`
before using any of the parallel constructs.
For every worker launched, the [`launch()`](@ref) method must add a `WorkerConfig` object (with
appropriate fields initialized) to `launched`
```julia
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Nullable{IO}
host::Nullable{AbstractString}
port::Nullable{Integer}
# Used when launching additional workers at a host
count::Nullable{Union{Int, Symbol}}
exename::Nullable{AbstractString}
exeflags::Nullable{Cmd}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Nullable{Any}
# SSHManager / SSH tunnel connections to workers
tunnel::Nullable{Bool}
bind_addr::Nullable{AbstractString}
sshflags::Nullable{Cmd}
max_parallel::Nullable{Integer}
connect_at::Nullable{Any}
[...]
end
```
Most of the fields in `WorkerConfig` are used by the inbuilt managers. Custom cluster managers
would typically specify only `io` or `host` / `port`:
* If `io` is specified, it is used to read host/port information. A Julia worker prints out its
bind address and port at startup. This allows Julia workers to listen on any free port available
instead of requiring worker ports to be configured manually.
* If `io` is not specified, `host` and `port` are used to connect.
* `count`, `exename` and `exeflags` are relevant for launching additional workers from a worker.
For example, a cluster manager may launch a single worker per node, and use that to launch additional
workers.
* `count` with an integer value `n` will launch a total of `n` workers.
* `count` with a value of `:auto` will launch as many workers as the number of cores on that machine.
* `exename` is the name of the `julia` executable including the full path.
* `exeflags` should be set to the required command line arguments for new workers.
* `tunnel`, `bind_addr`, `sshflags` and `max_parallel` are used when a ssh tunnel is required to
connect to the workers from the master process.
* `userdata` is provided for custom cluster managers to store their own worker-specific information.
`manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)` is called at different
times during the worker's lifetime with appropriate `op` values:
* with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool.
* with `:interrupt` when `interrupt(workers)` is called. The `ClusterManager` should signal the
appropriate worker with an interrupt signal.
* with `:finalize` for cleanup purposes.
## Cluster Managers with Custom Transports
Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a
little more involved. Each Julia process has as many communication tasks as the workers it is
connected to. For example, consider a Julia cluster of 32 processes in an all-to-all mesh network:
* Each Julia process thus has 31 communication tasks.
* Each task handles all incoming messages from a single remote worker in a message-processing loop.
* The message-processing loop waits on an `IO` object (for example, a `TCPSocket` in the default
implementation), reads an entire message, processes it and waits for the next one.
* Sending messages to a process is done directly from any Julia task--not just communication tasks--again,
via the appropriate `IO` object.
Replacing the default transport requires the new implementation to set up connections to remote
workers and to provide appropriate `IO` objects that the message-processing loops can wait on.
The manager-specific callbacks to be implemented are:
```julia
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
```
The default implementation (which uses TCP/IP sockets) is implemented as `connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)`.
`connect` should return a pair of `IO` objects, one for reading data sent from worker `pid`, and
the other to write data that needs to be sent to worker `pid`. Custom cluster managers can use
an in-memory `BufferStream` as the plumbing to proxy data between the custom, possibly non-`IO`
transport and Julia's in-built parallel infrastructure.
A `BufferStream` is an in-memory `IOBuffer` which behaves like an `IO`--it is a stream which can
be handled asynchronously.
Folder `examples/clustermanager/0mq` contains an example of using ZeroMQ to connect Julia workers
in a star topology with a 0MQ broker in the middle. Note: The Julia processes are still all *logically*
connected to each other--any worker can message any other worker directly without any awareness
of 0MQ being used as the transport layer.
When using custom transports:
* Julia workers must NOT be started with `--worker`. Starting with `--worker` will result in the
newly launched workers defaulting to the TCP/IP socket transport implementation.
* For every incoming logical connection with a worker, `Base.process_messages(rd::IO, wr::IO)()`
must be called. This launches a new task that handles reading and writing of messages from/to
the worker represented by the `IO` objects.
* `init_worker(cookie, manager::FooManager)` MUST be called as part of worker process initialization.
* Field `connect_at::Any` in `WorkerConfig` can be set by the cluster manager when [`launch()`](@ref)
is called. The value of this field is passed in in all [`connect()`](@ref) callbacks. Typically,
it carries information on *how to connect* to a worker. For example, the TCP/IP socket transport
uses this field to specify the `(host, port)` tuple at which to connect to a worker.
`kill(manager, pid, config)` is called to remove a worker from the cluster. On the master process,
the corresponding `IO` objects must be closed by the implementation to ensure proper cleanup.
The default implementation simply executes an `exit()` call on the specified remote worker.
`examples/clustermanager/simple` is an example that shows a simple implementation using UNIX domain
sockets for cluster setup.
## Network Requirements for LocalManager and SSHManager
Julia clusters are designed to be executed on already secured environments on infrastructure such
as local laptops, departmental clusters, or even the cloud. This section covers network security
requirements for the inbuilt `LocalManager` and `SSHManager`:
* The master process does not listen on any port. It only connects out to the workers.
* Each worker binds to only one of the local interfaces and listens on the first free port starting
from `9009`.
* `LocalManager`, used by `addprocs(N)`, by default binds only to the loopback interface. This means
that workers started later on remote hosts (or by anyone with malicious intentions) are unable
to connect to the cluster. An `addprocs(4)` followed by an `addprocs(["remote_host"])` will fail.
Some users may need to create a cluster comprising their local system and a few remote systems.
This can be done by explicitly requesting `LocalManager` to bind to an external network interface
via the `restrict` keyword argument: `addprocs(4; restrict=false)`.
* `SSHManager`, used by `addprocs(list_of_remote_hosts)`, launches workers on remote hosts via SSH.
By default SSH is only used to launch Julia workers. Subsequent master-worker and worker-worker
connections use plain, unencrypted TCP/IP sockets. The remote hosts must have passwordless login
enabled. Additional SSH flags or credentials may be specified via keyword argument `sshflags`.
* `addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)` is useful when
we wish to use SSH connections for master-worker too. A typical scenario for this is a local laptop
running the Julia REPL (i.e., the master) with the rest of the cluster on the cloud, say on Amazon
EC2. In this case only port 22 needs to be opened at the remote cluster coupled with SSH client
authenticated via public key infrastructure (PKI). Authentication credentials can be supplied
via `sshflags`, for example ```sshflags=`-e <keyfile>` ```.
Note that worker-worker connections are still plain TCP and the local security policy on the remote
cluster must allow for free connections between worker nodes, at least for ports 9009 and above.
Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages
can be done via a custom ClusterManager.
## Cluster Cookie
All processes in a cluster share the same cookie which, by default, is a randomly generated string
on the master process:
* [`Base.cluster_cookie()`](@ref) returns the cookie, while `Base.cluster_cookie(cookie)()` sets
it and returns the new cookie.
* All connections are authenticated on both sides to ensure that only workers started by the master
are allowed to connect to each other.
* The cookie must be passed to the workers at startup via argument `--worker <cookie>`. Custom ClusterManagers
can retrieve the cookie on the master by calling [`Base.cluster_cookie()`](@ref). Cluster managers
not using the default TCP/IP transport (and hence not specifying `--worker`) must call `init_worker(cookie, manager)`
with the same cookie as on the master.
Note that environments requiring higher levels of security can implement this via a custom `ClusterManager`.
For example, cookies can be pre-shared and hence not specified as a startup argument.
## Specifying Network Topology (Experimental)
The keyword argument `topology` passed to `addprocs` is used to specify how the workers must be
connected to each other:
* `:all_to_all`, the default: all workers are connected to each other.
* `:master_slave`: only the driver process, i.e. `pid` 1, has connections to the workers.
* `:custom`: the `launch` method of the cluster manager specifies the connection topology via the
fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster-manager-provided
identity `ident` will connect to all workers specified in `connect_idents`.
Currently, sending a message between unconnected workers results in an error. This behaviour,
as with the functionality and interface, should be considered experimental in nature and may change
in future releases.
## Multi-Threading (Experimental)
In addition to tasks, remote calls, and remote references, Julia from `v0.5` forwards will natively
support multi-threading. Note that this section is experimental and the interfaces may change
in the future.
### Setup
By default, Julia starts up with a single thread of execution. This can be verified by using the
command [`Threads.nthreads()`](@ref):
```julia
julia> Threads.nthreads()
1
```
The number of threads Julia starts up with is controlled by an environment variable called `JULIA_NUM_THREADS`.
Now, let's start up Julia with 4 threads:
```
export JULIA_NUM_THREADS=4
```
(The above command works on bourne shells on Linux and OSX. Note that if you're using a C shell
on these platforms, you should use the keyword `set` instead of `export`. If you're on Windows,
start up the command line in the location of `julia.exe` and use `set` instead of `export`.)
Let's verify there are 4 threads at our disposal.
```julia
julia> Threads.nthreads()
4
```
But we are currently on the master thread. To check, we use the command [`Threads.threadid()`](@ref)
```julia
julia> Threads.threadid()
1
```
### The `@threads` Macro
Let's work a simple example using our native threads. Let us create an array of zeros:
```jldoctest
julia> a = zeros(10)
10-element Array{Float64,1}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
```
Let us operate on this array simultaneously using 4 threads. We'll have each thread write its
thread ID into each location.
Julia supports parallel loops using the [`Threads.@threads`](@ref) macro. This macro is affixed
in front of a `for` loop to indicate to Julia that the loop is a multi-threaded region:
```julia
julia> Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
```
The iteration space is split amongst the threads, after which each thread writes its thread ID
to its assigned locations:
```julia
julia> a
10-element Array{Float64,1}:
1.0
1.0
1.0
2.0
2.0
2.0
3.0
3.0
4.0
4.0
```
Note that [`Threads.@threads`](@ref) does not have an optional reduction parameter like [`@parallel`](@ref).
## @threadcall (Experimental)
All I/O tasks, timers, REPL commands, etc are multiplexed onto a single OS thread via an event
loop. A patched version of libuv ([http://docs.libuv.org/en/v1.x/](http://docs.libuv.org/en/v1.x/))
provides this functionality. Yield points provide for co-operatively scheduling multiple tasks
onto the same OS thread. I/O tasks and timers yield implicitly while waiting for the event to
occur. Calling [`yield()`](@ref) explicitly allows for other tasks to be scheduled.
Thus, a task executing a [`ccall`](@ref) effectively prevents the Julia scheduler from executing any other
tasks till the call returns. This is true for all calls into external libraries. Exceptions are
calls into custom C code that call back into Julia (which may then yield) or C code that calls
`jl_yield()` (C equivalent of [`yield()`](@ref)).
Note that while Julia code runs on a single thread (by default), libraries used by Julia may launch
their own internal threads. For example, the BLAS library may start as many threads as there are
cores on a machine.
The `@threadcall` macro addresses scenarios where we do not want a `ccall` to block the main Julia
event loop. It schedules a C function for execution in a separate thread. A threadpool with a
default size of 4 is used for this. The size of the threadpool is controlled via environment variable
`UV_THREADPOOL_SIZE`. While waiting for a free thread, and during function execution once a thread
is available, the requesting task (on the main Julia event loop) yields to other tasks. Note that
`@threadcall` does not return till the execution is complete. From a user point of view, it is
therefore a blocking call like other Julia APIs.
It is very important that the called function does not call back into Julia.
`@threadcall` may be removed/changed in future versions of Julia.
[^1]:
In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee
introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access
(RMA). The motivation for adding RMA to the MPI standard was to facilitate one-sided communication
patterns. For additional information on the latest MPI standard, see [http://mpi-forum.org/docs](http://mpi-forum.org/docs/).