2020久久超碰欧美精品最新亚洲欧美日韩久久精品,国产福利电影一区二区三区,亚洲欧美日韩一区在线观看,亚洲国产欧美日韩欧美特级,亚洲欧美日韩成人一区久久,欧美日韩精品一区二区三区不卡,国产欧美日韩va另类影音先锋,亚洲欧美日韩久久精品,亚洲欧美日韩国产成人精品影院,亚洲国产欧美日韩精品一区二区三区,欧美日韩国产成人高清视频,日韩久久精品国产免费观看频道,久久人人爽人人爽从片av高清,国产精品综合一区二区

首頁技術文章正文

Redis實現消息隊列的三種方式

更新時間:2022-08-17 來源:黑馬程序員 瀏覽量:

消息隊列(Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括3個角色:

消息隊列:存儲消息。

生產者:發送消息到消息隊列,在秒殺任務中負責判斷秒殺時間和庫存,校驗消費者權限是否是一人一單,發送優惠券id和用戶id到消息隊列中。

消費者:從消息隊列獲取消息并處理消息,接受到訂單消息之后,完成下單。

Redis提供了三種不同的方式來實現消息隊列:

list結構:基于List結構模擬消息隊列。

PubSub:基本的點對點消息模型。

Stream:比較完善的消息隊列模型。

消息隊列

基于List結構模擬消息隊列

消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數據結構是一個雙向鏈表,很容易模擬出隊列效果。

隊列是入口和出口不在一邊,因此我們可以利用:LPUSH 結合 RPOP、或者 RPUSH 結合 LPOP來實現。

不過要注意的是,當隊列中沒有消息時RPOP或LPOP操作會返回null,并不像JVM的阻塞隊列那樣會阻塞并等待消息。因此這里應該使用BRPOP或者BLOPO來實現阻塞效果。

List結構

List的消息隊列可利用Redis存儲,不受限于JVM內存上限,是基于基于Redis的持久化機制,數據安全性有保證,同時也可以滿足消息隊列的有序性。但只支持但消費者,無法避免消息丟失是它最大的問題。

基于PubSub的消息隊列

PubSub(發布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。對應channel發送消息后,所有訂閱者都能收到相關消息。

SUBSCRIBE channel [channel] :訂閱一個或多個頻道。

PUBLISH channel msg :向一個頻道發送消息。

PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道。

基于PubSub的消息隊列

基于PubSub的消息隊列采用發布訂閱模型,支持多生產、多消費,但也有不支持數據持久化、無法避免消息丟失,消息堆積有上限的缺點,超出時數據會丟失。

例如:

基于Stream的消息隊列-XREAD

Stream讀取消息的方式之一:XREAD

Stream讀取消息的方式

例如:

XREAD阻塞方式,讀取最新的消息:

在業務開發中,我們可以循環的調用XREAD阻塞方式來查詢最新消息,從而實現持續監聽隊列的效果,偽代碼如下:

注意:當我們指定起始ID為$時,代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過1條以上的消息到達隊列,則下次獲取時也只能獲取到最新的一條,會出現漏讀消息的問題。
STREAM類型消息隊列的XREAD命令,消息可回溯,一個消息可以被多個消費者讀取,同時可能阻塞讀取,有消息漏讀的風險。

基于Stream的消息隊列-消費者組

消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:

1.消息分流:隊列中的消息會分流給組內的不同消費者,而不是重復消費,從而加快消息處理的速度。

2.消息標示:消費者組會維護一個標示,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標示之后讀取消息。確保每一個消息都會被消費。

3.消息確認:消費者獲取消息后,消息處于pending狀態,并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。

創建消費者組:

XGROUP CREATE  key groupName ID [MKSTREAM]

key:隊列名稱

groupName:消費者組名稱

ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息

MKSTREAM:隊列不存在時自動創建隊列。

其它常見命令:

# 刪除指定的消費者組
XGROUP DESTORY key groupName

# 給指定的消費者組添加消費者
XGROUP CREATECONSUMER key groupname consumername

# 刪除消費者組中的指定消費者
XGROUP DELCONSUMER key groupname consumername

從消費者組讀取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

group:消費組名稱

consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者

count:本次查詢的最大數量

BLOCK milliseconds:當沒有消息時最長等待時間

NOACK:無需手動ACK,獲取到消息后自動確認

STREAMS key:指定隊列名稱

ID:獲取消息的起始ID:">":從下一個未消費的消息開始

其它:根據指定id從pending-list中獲取已消費但未確認的消息,例如0,是從pending-list中的第一個消息開始。

消費者監聽消息的基本思路:

STREAM類型消息隊列的XREADGROUP命令特點:消息可回溯,一個消息可以只能被一個消費者讀取。可以阻塞讀取.沒有消息漏讀的風險,有消息確認機制,保證消息至少被消費一次。

分享到:
在線咨詢 我要報名
和我們在線交談!