中文字幕在线视频第一页,黄色毛片在线看,日本爱爱网站,亚洲系列中文字幕一区二区

您當(dāng)前的位置是:  首頁 > 資訊 > 文章精選 >
 首頁 > 資訊 > 文章精選 >

基于Redis實現(xiàn)特殊的消息隊列

2021-11-25 09:26:54   作者:默達      來源:阿里巴巴淘系技術(shù)團隊   評論:0  點擊:


  說到消息隊列,首先映入腦海的就是Kafka等,消息隊列在各個領(lǐng)域都發(fā)揮了很大的作用。但是,在一些場景下,傳統(tǒng)的消息隊列Kafka無法滿足需求,比如以下場景:
  • 消息重復(fù)概率比較高時,需要對重復(fù)消息進行合并處理避免浪費有限的資源,減少消費延遲;
  • 需要根據(jù)業(yè)務(wù)自定義優(yōu)先級進行消息處理,高優(yōu)先級的消息比低優(yōu)先級的消息先處理;
  • 消息需要定時消費的場景,消息只有在設(shè)定的消費時間到了之后立馬被消費。
  本文將介紹一種基于Redis實現(xiàn)的消息隊列(Redis message queue, RMQ),RMQ可以作為傳統(tǒng)消息隊列的互補選擇,在傳統(tǒng)消息隊列沒有涉及的場景中使用RMQ。
  功能介紹
  RMQ設(shè)計為一個二方庫,可以幫助用戶基于Redis快速實現(xiàn)消息隊列的功能,RMQ消息隊列具有消息合并、區(qū)分優(yōu)先級、支持定時消息等特性。RMQ消息隊列可以用于異步解耦、削峰填谷,支持億級數(shù)據(jù)堆積。RMQ消息隊列目前支持三種類型的消息,分別是RangeMergeMessage(區(qū)間重復(fù)合并消息)、PriorityMessage(優(yōu)先級消息)、FixedTimeMessage(任意定時消息)。
  區(qū)間重復(fù)合并消息
  RangeMergeMessage支持區(qū)間重復(fù)消息合并,發(fā)送消息時需要設(shè)置時間區(qū)間,消息延遲該時間區(qū)間長度后被消費,在該時間區(qū)間內(nèi)如果發(fā)送重復(fù)的消息,重復(fù)消息將會被合并。如果消息在Redis服務(wù)端發(fā)生堆積,重復(fù)到來的消息依然會被合并處理。該類型消息適用于消息重復(fù)率較高且希望重復(fù)消息合并處理的場景,對重復(fù)消息進行合并可以減少下游消費系統(tǒng)的壓力,減少不必要的資源消耗,將有限的資源最大化的利用,提升消費效率。
  優(yōu)先級消息
  PriorityMessage支持給消息設(shè)置任意等級的優(yōu)先級,優(yōu)先級高的消息會被優(yōu)先消費,相同優(yōu)先級的消息被隨機消費。如果消息在Redis服務(wù)端發(fā)生堆積,重復(fù)的消息將被合并處理,合并后消息的優(yōu)先級等于最后存儲的消息的優(yōu)先級。該類型消息適用于希望重復(fù)消息合并處理且需要設(shè)置優(yōu)先級的場景,下游消費者資源有限時,合并重復(fù)消息且優(yōu)先處理優(yōu)先級高的消息將可以合理利用有限的資源。
  任意定時消息
  FixedTimeMessage支持給消息設(shè)置任意消費時間,只有消費時間到了之后消息才被消費,消費時間可精確到秒。消息到期后沒有及時被消費時,消費者將按照時間由遠及近進行消費。如果消息在Redis服務(wù)端發(fā)生堆積,重復(fù)的消息將被合并處理,合并后消息的消費時間等于最后存儲的消息的消費時間。該類型消息適用于希望重復(fù)消息合并處理且需要定時消費的場景,定時消息應(yīng)用場景非常豐富,比如定時打標(biāo)去標(biāo)、活動結(jié)束后清理動作、訂單超時關(guān)閉等。
  并發(fā)消費控制
  使用傳統(tǒng)消息中間件進行集群消費的時候,為了避免并發(fā)處理同一元數(shù)據(jù)導(dǎo)致不一致問題,通常需要對元數(shù)據(jù)加分布式鎖,頻繁的鎖沖突會導(dǎo)致消費效率低下。加分布式鎖的最終目的其實就是保障屬于同一元數(shù)據(jù)的消息被串行消費。加分布式鎖并不是最好的方案,最好的方案應(yīng)該是從根上解決并發(fā)問題,讓屬于同一元數(shù)據(jù)的消息串行消費。RMQ消息隊列具有并發(fā)消費控制能力,屬于同一元數(shù)據(jù)的消息只會被分配給全局唯一一個線程進行消費,因此屬于同一元數(shù)據(jù)的消息將被串行消費。使用方如果需要該能力,除了需要提供Redis,還需要提供ZooKeeper。
  重試次數(shù)控制
  RMQ消息隊列支持失敗重試消費16次,業(yè)務(wù)返回消費失敗后,消息會被回滾并等待重試消費,重試16次后消息進入死信隊列,消息不再被消費,除非人工干預(yù)。
  技術(shù)原理
  總體框架
  RMQ消息隊列由三部分組成,分別為ZooKeeper、RMQ二方庫、Redis。ZooKeeper負責(zé)維護集群worker的信息,將topic的所有slot分配給全局的woker。Redis負責(zé)存儲消息,采用Sorted Set結(jié)構(gòu)存儲,Store Queue是消息存放的隊列,Prepare Queue是采用二階段消費方式正在消費的消息存放隊列,Dead Queue是死信隊列。RMQ二方庫由RmqClient、Consumer、Producer三部分組成。RmqClient負責(zé)RMQ的啟動工作,包括上報TopicDef、Worker給ZooKeeper,分配Slot給Worker,掃描業(yè)務(wù)定義的MessageListener Bean。Producer負責(zé)根據(jù)不用消息類型將消息按照指定的方式存儲到Redis。Consumer負責(zé)根據(jù)不用消息類型按照指定方式從Redis彈出消息并調(diào)用業(yè)務(wù)的MessageListener。
  消息存儲
 
  Topic的設(shè)計
  Topic的定義有三部分組成,topic表示主題名稱,slotAmount表示消息存儲劃分的槽數(shù)量,topicType表示消息的類型。主題名稱是一個Topic的唯一標(biāo)示,相同主題名稱Topic的slotAmount和topicType一定是一樣的。消息存儲采用Redis的Sorted Set結(jié)構(gòu),為了支持大量消息的堆積,需要把消息分散存儲到很多個槽中,slotAmount表示該Topic消息存儲共使用的槽數(shù)量,槽數(shù)量一定需要是2的n次冪。在消息存儲的時候,采用對指定數(shù)據(jù)或者消息體哈希求余得到槽位置。
  StoreQueue 的設(shè)計
  上圖中topic劃分了8個槽位,編號0-7。如果發(fā)送方指定了消息的slotBasis,則計算slotBasis的CRC32值,CRC32值對槽數(shù)量進行取模得到槽序號,SlotKey設(shè)計為#{topic}_#{index}(也即Redis的鍵),其中#{}表示占位符。發(fā)送方需要保證相同內(nèi)容的消息的slotBasis相同,如果沒有指定slotBasis則采用消息內(nèi)容計算SlotKey,這樣內(nèi)容相同的消息體就會落在同一個Sorted Set里面,所以內(nèi)容相同的消息會進行合并。Redis的Sorted Set中的數(shù)據(jù)按照分數(shù)排序,實現(xiàn)不同類型的消息的關(guān)鍵就在于如何利用分數(shù)、如何添加消息到Sorted Set、如何從Sorted Set中彈出消息。優(yōu)先級消息將優(yōu)先級作為分數(shù),消費時每次彈出分數(shù)最大的消息。任意定時消息將時間戳作為分數(shù),消費時每次彈出分數(shù)大于當(dāng)前時間戳的一個消息。區(qū)間重復(fù)合并消息將時間戳作為分數(shù),添加消息時將(當(dāng)前時間戳+時間區(qū)間)作為分數(shù),消費時每次彈出分數(shù)大于當(dāng)前時間戳的一個消息。
  PrepareQueue 的設(shè)計
  為了保障RMQ消息隊列的可用性,做到每條消息至少消費一次,消費者不是直接pop有序集合中的元素,而是將元素從StoreQueue移動到PrepareQueue并返回消息給消費者,等消費成功后再從PrepareQueue從刪除,或者消費失敗后從PreapreQueue重新移動到StoreQueue,這便是根據(jù)二階段提交的思想實現(xiàn)的二階段消費。在后面將會詳細介紹二階段消費的實現(xiàn)思路,這里重點介紹下PrepareQueue的存儲設(shè)計。StoreQueue中每一個Slot對應(yīng)PrepareQueue中的Slot,PrepareQueue的SlotKey設(shè)計為prepare{#{topic}#{index}}。PrepareQueue采用Sorted Set作為存儲,消息移動到PrepareQueue時刻對應(yīng)的(秒級時間戳*1000+重試次數(shù))作為分數(shù),字符串存儲的是消息體內(nèi)容。這里分數(shù)的設(shè)計與重試次數(shù)的設(shè)計密切相關(guān),所以在重試次數(shù)設(shè)計章節(jié)詳細介紹。PrepareQueue的SlotKey設(shè)計中需要注意的一點,由于消息從StoreQueue移動到PrepareQueue是通過Lua腳本操作的,因此需要保證Lua腳本操作的Slot在同一個Redis節(jié)點上,如何保證PrepareQueue的SlotKey和對應(yīng)的StoreQueue的SlotKey被hash到同一個Redis槽中呢。Redis的hash tag功能可以指定SlotKey中只有某一部分參與計算hash,這一部分采用{}包括,因此PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey。
  DeadQueue 的設(shè)計
  消息重試消費16次后,消息將進入DeadQueue。DeadQueue的SlotKey設(shè)計為prepare{#{topic}#{index}},這里同樣采用hash tag功能保證DeadQueue的SlotKey與對應(yīng)StoreQueue的SlotKey存儲在同一Redis節(jié)點。
  生產(chǎn)者
  生產(chǎn)者的任務(wù)就是將消息添加到Redis的Sorted Set中。首先,需要計算出消息添加到Redis的SlotKey,如果發(fā)送方指定了消息的slotBasis(否則采用content代替),則計算slotBasis的CRC32值,CRC32值對槽數(shù)量進行取模得到槽序號,SlotKey設(shè)計為#{topic}_#{index},其中#{}表示占位符。然后,不同類型的消息有不同的添加方式,因此分布講述三種類型消息的添加過程。
  區(qū)間重復(fù)合并消息
  發(fā)送該消息時需要設(shè)置timeRange,timeRange必須大于0,單位為毫秒,表示消息將延遲timeRange毫秒后被消費,期間到來的重復(fù)消息將被合并,合并后的消息依然維持原來的消費時間。因此在存儲該類型消息的時候,采用(當(dāng)前時間戳+timeRange)作為分數(shù),添加消息采用Lua腳本執(zhí)行,保證操作的原子性,Lua腳本首先采用zscore命令檢查消息是否已經(jīng)存在,如果已經(jīng)存在則直接返回,如果不存在則執(zhí)行zadd命令添加。
  優(yōu)先級消息
  發(fā)送該消息時需要設(shè)置priority,priority必須大于16,表示消息的優(yōu)先級,數(shù)值越大表示優(yōu)先級越高。因此在存儲該類型消息的時候,采用priority作為分數(shù),采用zadd命令直接添加。
  任意定時消息
  發(fā)送該類型消息時需要設(shè)置fixedTime,fixedTime必須大于當(dāng)前時間,表示消費時間戳,當(dāng)前時間大于該消費時間戳的時候,消息才會被消費。因此在存儲該類型消息的時候,采用fixedTime作為分數(shù),采用命令zadd直接添加。
  消費者
  二階段消費方式
  三種消費模式
  一般消息隊列存在三種消費模式,分別是:最多消費一次、至少消費一次、只消費一次。最多消費一次模式消息可能丟失,一般不怎么使用。至少消費一次模式消息不會丟失,但是可能存在重復(fù)消費,比較常用。只消費一次模式消息被精確只消費一次,實現(xiàn)較困難,一般需要業(yè)務(wù)記錄冪等ID來實現(xiàn)。RMQ實現(xiàn)了至少消費一次的模式,那么如何保證消息至少被消費一次呢?
  至少消費一次模式實現(xiàn)的難點
  從最簡單的消費模式——最多消費一次說起,消費者端只需要從消息隊列服務(wù)中取出消息就行,即執(zhí)行Redis的zpopmax命令,不倫消費者是否接收到該消息并成功消費,消息隊列服務(wù)都認為消息消費成功。最多一次消費模式導(dǎo)致消息丟失的因素可能有:網(wǎng)絡(luò)丟包導(dǎo)致消費者沒有接收到消息,消費者接收到消息但在消費的時候宕機了,消費者接收到消息但消費失敗。針對消費失敗導(dǎo)致消息丟失的情況比較好解決,只需要把消費失敗的消息重新放入消息隊列服務(wù)就行,但是網(wǎng)絡(luò)丟包和消費系統(tǒng)異常導(dǎo)致的消息丟失問題不好解決。可能有人會想到,我們不把元素從有序集合中pop出來,我們先查詢優(yōu)先級最高的元素,然后消費,再刪除消費成功的元素,但是這樣消息服務(wù)隊列就變成了同步阻塞隊列,性能會很差。
  至少消費一次模式的實現(xiàn)
  至少消費一次的問題比較類似銀行轉(zhuǎn)賬問題,A向B賬戶轉(zhuǎn)賬100元,如何保障A賬戶扣減100同時B賬戶增加100,因此我們可以想到二階段提交的思想。第一個準(zhǔn)備階段,A、B分別進行資源凍結(jié)并持久化undo和redo日志,A、B分別告訴協(xié)調(diào)者已經(jīng)準(zhǔn)備好;第二個提交階段,協(xié)調(diào)者告訴A、B進行提交,A、B分別提交事務(wù)。RMQ基于二階段提交的思想來實現(xiàn)至少消費一次的模式。RMQ存儲設(shè)計中PrepareQueue的作用就是用來凍結(jié)資源并記錄事務(wù)日志,消費者端即是參與者也是協(xié)調(diào)者。第一個準(zhǔn)備階段,消費者端通過執(zhí)行Lua腳本從StoreQueue中Pop消息并存儲到PrepareQueue,同時消息傳輸?shù)较M者端,消費者端消費該消息;第二個提交階段,消費者端根據(jù)消費結(jié)果是否成功協(xié)調(diào)消息隊列服務(wù)是提交還是回滾,如果消費成功則提交事務(wù),該消息從PrepareQueue中刪除,如果消費失敗則回滾事務(wù),消費者端將該消息從PrepareQueue移動到StoreQueue,如果因為各種異常導(dǎo)致PrepareQueue中消息滯留超時,超時后將自動執(zhí)行回滾操作。二階段消費的流程圖如下所示。
  實現(xiàn)方案的異常情況分析
  我們來分析下采用二階段消費方案可能存在的異常情況,從以下分析來看二階段消費方案可以保障消息至少被消費一次。
  1. 網(wǎng)絡(luò)丟包導(dǎo)致消費者沒有接收到消息,這時消息已經(jīng)記錄到PrepareQueue,如果到了超時時間,消息被回滾放回StoreQueue,等待下次被消費,消息不丟失。
  2. 消費者接收到了消息,但是消費者還沒來得及消費完成系統(tǒng)就宕機了,消息消費超時到了后,消息會被重新放入StoreQueue,等待下次被消費,消息不丟失。
  3. 消費者接收到了消息并消費成功,消費者端在協(xié)調(diào)事務(wù)提交的時候宕機了,消息消費超時到了后,消息會被重新放入StoreQueue,等待下次被消費,消息被重復(fù)消費。
  4. 消費者接收到了消息但消費失敗,消費者端在協(xié)調(diào)事務(wù)提交的時候宕機了,消息消費超時到了后,消息會被重新放入StoreQueue,等待下次被消費,消息不丟失。
  5. 消費者接收到了消息并消費成功,但是由于fullgc等原因使消費時間太長,PrepareQueue中的消息由于超時已經(jīng)回滾到StoreQueue,等待下次被消費,消息被重復(fù)消費。
  重試次數(shù)控制的實現(xiàn)
  采用二階段消費方式,需要將消息在StoreQueue和PrepareQueue之間移動,如何實現(xiàn)重試次數(shù)控制呢,其關(guān)鍵在StoreQueue和PrepareQueue的分數(shù)設(shè)計。PrepareQueue的分數(shù)需要與時間相關(guān),正常情況下,消費者不管消費失敗還是消費成功,都會從PrepareQueue刪除消息,當(dāng)消費者系統(tǒng)發(fā)生異常或者宕機的時候,消息就無法從PrepareQueue中刪除,我們也不知道消費者是否消費成功,為保障消息至少被消費一次,我們需要做到超時回滾,因此分數(shù)需要與消費時間相關(guān)。當(dāng)PrepareQueue中的消息發(fā)生超時的時候,將消息從PrepareQueue移動到StoreQueue。因此PrepareQueue的分數(shù)設(shè)計為:秒級時間戳*1000+重試次數(shù)。不同類型的消息首次存儲到StoreQueue中的分數(shù)表示的含義不盡相同,區(qū)間重復(fù)合并消息和任意定時消息存儲時的分數(shù)表示消費時間戳,優(yōu)先級消息存儲時的分數(shù)表示優(yōu)先級。如果消息消費失敗,消息從PrepareQueue回滾到StoreQueue,所有類型的消息存儲時的分數(shù)都表示剩余重試次數(shù),剩余重試次數(shù)從16次不斷降低最后為0,消息進入死信隊列。消息在StoreQueue和PrepareQueue之間移動流程如下:
  
  Pop 消息
  不同類型的消息在消費的時候Pop消息的方式不一樣,因此接下來分別講述三種類型消息的Pop方式。
  區(qū)間重復(fù)合并消息
  該消息存儲的分數(shù)設(shè)計為消費時間戳,當(dāng)前時間大于消息的消費時間戳?xí)r,該消息應(yīng)該被消費。因此采用Redis命令ZRANGEBYSCORE彈出分數(shù)小于當(dāng)前時間戳的一條消息。
  優(yōu)先級消息
  該消息存儲的分數(shù)設(shè)計為優(yōu)先級,優(yōu)先級越高分數(shù)越大,因此采用Redis命令ZPOPMAX彈出分數(shù)最大的一條消息。
  任意定時消息該消息存儲的分數(shù)設(shè)計為消費時間戳,當(dāng)前時間大于消息的消費時間戳?xí)r,該消息應(yīng)該被消費。因此采用Redis命令ZRANGEBYSCORE彈出分數(shù)小于當(dāng)前時間戳的一條消息。
  相關(guān)應(yīng)用
  主圖價格表達項目
  在主圖價格表達中需要實現(xiàn)一個功能,商品價格發(fā)生變化時將商品價格打印在商品主圖上面,那么需要在價格發(fā)生變動的時候觸發(fā)合成一張帶價格的圖片,每一次觸發(fā)合圖時計算價格都是獲取當(dāng)前最新的價格。上游價格變化的因素很多,變化很頻繁,下游合圖消耗GPU資源較大,處理容量較低。因此需要盡可能合并觸發(fā)合圖消息,減輕下游處理壓力,于是使用了RMQ作為消息隊列來進行削峰填谷、消息合并。不僅如此,還可以根據(jù)商家等級劃分觸發(fā)合圖消息的等級,使KA商家能夠優(yōu)先得到處理,縮短價格變化的延遲。
  在線上實際環(huán)境中,集群共130臺機器,RMQ消息隊列的發(fā)送消息能力和消費消息能力均可以達到5w tps,而且這并不是峰值,理論上可以達到10w tps。
  在線數(shù)據(jù)圈選引擎
  在線數(shù)據(jù)圈選引擎需要處理各種來源的大量動態(tài)數(shù)據(jù),需要將一段時間區(qū)間內(nèi)的消息合并處理,減少處理壓力,并且在對同一元數(shù)據(jù)進行并發(fā)處理需要加分布式鎖,鎖沖突導(dǎo)致消費效率下降。RMQ的區(qū)間重復(fù)合并消息和并發(fā)消費控制能力可以幫助解決這些問題。目前,在線數(shù)據(jù)圈選引擎已經(jīng)采用了RMQ消息隊列作為核心組件,RMQ消息隊列發(fā)揮了很大的作用。
  總結(jié)
  本文提出了一種可實現(xiàn)的基于Redis的消息隊列,充分利用Sorted Set結(jié)構(gòu)設(shè)計了消息合并、優(yōu)先級、定時等特性,與傳統(tǒng)消息隊列形成互補,彌補傳統(tǒng)消息隊列這方面特性的缺失。為了實現(xiàn)高可用,本文在二階段提交的思想上進行改進設(shè)計了二階段消費方式,保障消息至少被消費一次。未來將基于Redis的特性打造更多獨特的功能,與傳統(tǒng)消息中間件形成互補。在消費控制方面會增加流量自動調(diào)控能力,根據(jù)消息類型調(diào)控消費速度,減少因為某種類型消息消費瓶頸導(dǎo)致整體消費性能下降。
 
【免責(zé)聲明】本文僅代表作者本人觀點,與CTI論壇無關(guān)。CTI論壇對文中陳述、觀點判斷保持中立,不對所包含內(nèi)容的準(zhǔn)確性、可靠性或完整性提供任何明示或暗示的保證。請讀者僅作參考,并請自行承擔(dān)全部責(zé)任。

相關(guān)熱詞搜索: Redis

上一篇:低代碼時代的呼叫中心(十四)

下一篇:最后一頁

相關(guān)閱讀:

專題

CTI論壇會員企業(yè)

克山县| 乌鲁木齐县| 尤溪县| 富川| 德令哈市| 铅山县| 乌鲁木齐市| 榆树市| 江城| 崇信县| 如皋市| 房产| 扎赉特旗| 潜山县| 隆子县| 襄垣县| 犍为县| 武邑县| 江北区| 玉林市| 莱阳市| 大新县| 荣成市| 高密市| 宁阳县| 定南县| 丹凤县| 珲春市| 宜兰县| 黑龙江省| 沂源县| 新竹县| 灵璧县| 霍山县| 巨野县| 金川县| 新蔡县| 四子王旗| 绥宁县| 如皋市| 布拖县|