阿里云EMR Remote Shuffle Service在小米的實(shí)踐
阿里云EMR自2020年推出Remote Shuffle Service(RSS)以來,幫助了諸多客戶解決Spark作業(yè)的性能、穩(wěn)定性問題,并使得存算分離架構(gòu)得以實(shí)施,與此同時(shí)RSS也在跟合作方小米的共建下不斷演進(jìn)。本文將介紹RSS的最新架構(gòu),在小米的實(shí)踐,以及開源。
一 問題回顧
傳統(tǒng)Shuffle如下圖所示,Mapper把Shuffle數(shù)據(jù)按PartitionId排序?qū)懕P后交給External Shuffle Service(ESS)管理,Reducer從每個(gè)Mapper Output中讀取屬于自己的Block。
- 本地盤依賴限制了存算分離。存算分離是近年來興起的新型架構(gòu),它解耦了計(jì)算和存儲,可以更靈活地做機(jī)型設(shè)計(jì):計(jì)算節(jié)點(diǎn)強(qiáng)CPU弱磁盤,存儲節(jié)點(diǎn)強(qiáng)磁盤強(qiáng)網(wǎng)絡(luò)弱CPU。計(jì)算節(jié)點(diǎn)無狀態(tài),可根據(jù)負(fù)載彈性伸縮。存儲端,隨著對象存儲(OSS, S3)+數(shù)據(jù)湖格式(Delta, Iceberg, Hudi)+本地/近地緩存等方案的成熟,可當(dāng)作容量無限的存儲服務(wù)。用戶通過計(jì)算彈性+存儲按量付費(fèi)獲得成本節(jié)約。然而,Shuffle對本地盤的依賴限制了存算分離。
- 寫放大。當(dāng)Mapper Output數(shù)據(jù)量超過內(nèi)存時(shí)觸發(fā)外排,從而引入額外磁盤IO。
- 大量隨機(jī)讀。Mapper Output屬于某個(gè)Reducer的數(shù)據(jù)量很小,如Output 128M,Reducer并發(fā)2000,則每個(gè)Reducer只讀64K,從而導(dǎo)致大量小粒度隨機(jī)讀。對于HDD,隨機(jī)讀性能極差;對于SSD,會快速消耗SSD壽命。
- 高網(wǎng)絡(luò)連接數(shù),導(dǎo)致線程池消耗過多CPU,帶來性能和穩(wěn)定性問題。
- Shuffle數(shù)據(jù)單副本,大規(guī)模集群場景壞盤/壞節(jié)點(diǎn)很普遍,Shuffle數(shù)據(jù)丟失引發(fā)的Stage重算帶來性能和穩(wěn)定性問題。
二 RSS發(fā)展歷程
1 Sailfish
2 Dataflow
3 Riffle
4 Cosco
5 Zeus
6 RPMP
7 Magnet
8 FireStorm
從上述描述可知,當(dāng)前的方案基本收斂到Push Shuffle,但在一些關(guān)鍵設(shè)計(jì)上的選擇各家不盡相同,主要體現(xiàn)在:
-
集成到Spark內(nèi)部還是獨(dú)立服務(wù)。
-
RSS服務(wù)側(cè)架構(gòu),選項(xiàng)包括:Master-Worker,含輕量級狀態(tài)管理的去中心化,完全去中心化。
-
Shuffle數(shù)據(jù)的存儲,選項(xiàng)包括:內(nèi)存,本地盤,DFS,對象存儲。
- 多副本的實(shí)現(xiàn),選項(xiàng)包括:Client多推,服務(wù)端做Replication。
阿里云RSS[12][13]由2020年推出,核心設(shè)計(jì)參考了Sailfish和Cosco,并且在架構(gòu)和實(shí)現(xiàn)層面做了改良,下文將詳細(xì)介紹。
三 阿里云RSS核心架構(gòu)
-
獨(dú)立服務(wù)??紤]到將RSS集成到Spark內(nèi)部無法滿足存算分離架構(gòu),阿里云RSS將作為獨(dú)立服務(wù)提供Shuffle服務(wù)。
-
Master-Worker架構(gòu)。通過Master節(jié)點(diǎn)做服務(wù)狀態(tài)管理非常必要,基于etcd的狀態(tài)狀態(tài)管理能力受限。
-
多種存儲方式。目前支持本地盤/DFS等存儲方式,主打本地盤,將來會往分層存儲方向發(fā)展。
- 服務(wù)端做Replication。Client多推會額外消耗計(jì)算節(jié)點(diǎn)的網(wǎng)絡(luò)和計(jì)算資源,在獨(dú)立部署或者服務(wù)化的場景下對計(jì)算集群不友好。
下圖展示了阿里云RSS的關(guān)鍵架構(gòu),包含Client(RSS Client, Meta Service),Master(Resource Manager)和Worker三個(gè)角色。Shuffle的過程如下:
-
Mapper在首次PushData時(shí)請求Master分配Worker資源,Worker記錄自己所需要服務(wù)的Partition列表。
-
Mapper把Shuffle數(shù)據(jù)緩存到內(nèi)存,超過閾值時(shí)觸發(fā)Push。
-
隸屬同個(gè)Partition的數(shù)據(jù)被Push到同一個(gè)Worker做合并,主Worker內(nèi)存接收到數(shù)據(jù)后立即向從Worker發(fā)起Replication,數(shù)據(jù)達(dá)成內(nèi)存兩副本后即向Client發(fā)送ACK,F(xiàn)lusher后臺線程負(fù)責(zé)刷盤。
-
Mapper Stage運(yùn)行結(jié)束,MetaService向Worker發(fā)起CommitFiles命令,把殘留在內(nèi)存的數(shù)據(jù)全部刷盤并返回文件列表。
- Reducer從對應(yīng)的文件列表中讀取Shuffle數(shù)據(jù)。
1 狀態(tài)下沉
為了緩解Master壓力,我們把生命周期狀態(tài)管理下沉到Driver,由Application管理自己的Shuffle,Master只需維護(hù)RSS集群本身的狀態(tài)。這個(gè)優(yōu)化大大降低Master的負(fù)載,并使得Master HA得以順利實(shí)現(xiàn)。
2 Adaptive Pusher
Sort-Based Pusher會額外引入一次排序,性能上比Hash-Based Pusher略差。我們在ShuffleWriter初始化階段根據(jù)Reducer的并發(fā)度自動選擇合適的Pusher。
3 磁盤容錯(cuò)
4 滾動升級
5 混亂測試框架
仿真測試框架架構(gòu)如下圖所示,首先定義測試Plan來描述事件類型、事件觸發(fā)的順序及持續(xù)時(shí)間,事件類型包括節(jié)點(diǎn)異常,磁盤異常,IO異常,CPU過載等??蛻舳藢lan提交給Scheduler,Scheduler根據(jù)Plan的描述給每個(gè)節(jié)點(diǎn)的Runner發(fā)送具體的Operation,Runner負(fù)責(zé)具體執(zhí)行并匯報(bào)當(dāng)前節(jié)點(diǎn)的狀態(tài)。在觸發(fā)Operation之前,Scheduler會推演該事件發(fā)生產(chǎn)生的后果,若導(dǎo)致無法滿足RSS的最小可運(yùn)行環(huán)境,將拒絕此事件。
我們認(rèn)為仿真測試框架的思路是通用設(shè)計(jì),可以推廣到更多的服務(wù)測試中。
6 多引擎支持
當(dāng)前大多數(shù)引擎都沒有Shuffle插件化的抽象,需要一定程度的引擎修改。此外,流計(jì)算和MPP都是上游即時(shí)Push給下游的模式,而RSS是上游Push,下游Pull的模式,這兩者如何結(jié)合也是需要探索的。
7 測試
測試環(huán)境
Header * 1: ecs.g6e.4xlarge, 16 * 2.5GHz/3.2GHz, 64GiB, 10GbpsWorker * 3: ecs.g6e.8xlarge, 32 * 2.5GHz/3.2GHz, 128GiB, 10Gbps
阿里云RSS vs. Magnet
5T Terasort的性能測試如下圖所示,如上文描述,Magent的Shuffle Write有額外開銷,差于RSS和傳統(tǒng)做法。Magent的Shuffle Read有提升,但差于RSS。在這個(gè)Benchmark下,RSS明顯優(yōu)于另外兩個(gè),Magent的e2e時(shí)間略好于傳統(tǒng)Shuffle。
RSS跟開源系統(tǒng)X在TPCDS-3T的性能對比如下,總時(shí)間RSS快了20%。
在穩(wěn)定性方面,我們測試了Reducer大規(guī)模并發(fā)的場景,Magnet可以跑通但時(shí)間比RSS慢了數(shù)倍,System X在Shuffle Write階段報(bào)錯(cuò)。
四 阿里云RSS在小米的實(shí)踐
1 現(xiàn)狀及痛點(diǎn)
2 RSS在小米的落地
在落地的過程,小米主導(dǎo)了磁盤容錯(cuò)的開發(fā),大大提高了RSS的服務(wù)穩(wěn)定性,技術(shù)細(xì)節(jié)如上文所述。此外,在前期RSS還未完全穩(wěn)定階段,小米在多個(gè)環(huán)節(jié)對RSS的作業(yè)進(jìn)行了容錯(cuò)。在調(diào)度端,若開啟RSS的Spark作業(yè)因Shuffle報(bào)錯(cuò),則Yarn的下次重試會回退到ESS。在ShuffleWriter初始化階段,小米主導(dǎo)了自適應(yīng)Fallback機(jī)制,根據(jù)當(dāng)前RSS集群的負(fù)載和作業(yè)的特征(如Reducer并發(fā)是否過大)自動選擇RSS或ESS,從而提升穩(wěn)定性。
3 效果
ESS:
RSS:
ESS:
在阿里云EMR團(tuán)隊(duì)及小米Spark團(tuán)隊(duì)的共同努力下,RSS帶來的穩(wěn)定性和性能提升得到了充分的驗(yàn)證。后續(xù)小米將會持續(xù)擴(kuò)大RSS集群規(guī)模以及作業(yè)規(guī)模,并且在彈性資源伸縮場景下發(fā)揮更大的作用。
五 開源
git地址: https://github.com/alibaba/RemoteShuffleService
開源代碼包含核心功能及容錯(cuò),滿足生產(chǎn)要求。
計(jì)劃中的重要Feature:
- AE
- Spark多版本支持
- Better 流控
- Better 監(jiān)控
- Better HA
- 多引擎支持
歡迎各路開發(fā)者共建!
六 Reference
Redis數(shù)據(jù)庫入門
