Redis如何實(shí)現(xiàn)延遲隊(duì)列?方法介紹

發(fā)布時(shí)間:2024-02-16
延遲隊(duì)列,顧名思義它是一種帶有延遲功能的消息隊(duì)列。那么,是在什么場景下我才需要這樣的隊(duì)列呢?
1. 背景
我們先看看以下業(yè)務(wù)場景:
當(dāng)訂單一直處于未支付狀態(tài)時(shí),如何及時(shí)的關(guān)閉訂單如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功在訂單長時(shí)間沒有收到下游系統(tǒng)的狀態(tài)通知的時(shí)候,如何實(shí)現(xiàn)階梯式的同步訂單狀態(tài)的策略在系統(tǒng)通知上游系統(tǒng)支付成功終態(tài)時(shí),上游系統(tǒng)返回通知失敗,如何進(jìn)行異步通知實(shí)行分頻率發(fā)送:15s 3m 10m 30m 30m 1h 2h 6h 15h
1.1 解決方案
最簡單的方式,定時(shí)掃表。例如對于訂單支付失效要求比較高的,每2s掃表一次檢查過期的訂單進(jìn)行主動(dòng)關(guān)單操作。優(yōu)點(diǎn)是簡單,缺點(diǎn)是每分鐘全局掃表,浪費(fèi)資源,如果遇到表數(shù)據(jù)訂單量即將過期的訂單量很大,會造成關(guān)單延遲。
使用rabbitmq或者其他mq改造實(shí)現(xiàn)延遲隊(duì)列,優(yōu)點(diǎn)是,開源,現(xiàn)成的穩(wěn)定的實(shí)現(xiàn)方案,缺點(diǎn)是:mq是一個(gè)消息中間件,如果團(tuán)隊(duì)技術(shù)棧本來就有mq,那還好,如果不是,那為了延遲隊(duì)列而去部署一套mq成本有點(diǎn)大
使用redis的zset、list的特性,我們可以利用redis來實(shí)現(xiàn)一個(gè)延遲隊(duì)列redisdelayqueue
2. 設(shè)計(jì)目標(biāo)
實(shí)時(shí)性:允許存在一定時(shí)間的秒級誤差高可用性:支持單機(jī)、支持集群支持消息刪除:業(yè)務(wù)會隨時(shí)刪除指定消息消息可靠性:保證至少被消費(fèi)一次消息持久化:基于redis自身的持久化特性,如果redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個(gè)可以考慮后續(xù)優(yōu)化將消息持久化到mangodb中
3. 設(shè)計(jì)方案
設(shè)計(jì)主要包含以下幾點(diǎn):
將整個(gè)redis當(dāng)做消息池,以kv形式存儲消息使用zset做優(yōu)先隊(duì)列,按照score維持優(yōu)先級使用list結(jié)構(gòu),以先進(jìn)先出的方式消費(fèi)zset和list存儲消息地址(對應(yīng)消息池的每個(gè)key)自定義路由對象,存儲zset和list名稱,以點(diǎn)對點(diǎn)的方式將消息從zset路由到正確的list使用定時(shí)器維護(hù)路由根據(jù)ttl規(guī)則實(shí)現(xiàn)消息延遲
3.1 設(shè)計(jì)圖
還是基于有贊的延遲隊(duì)列設(shè)計(jì),進(jìn)行優(yōu)化改造及代碼實(shí)現(xiàn)。有贊設(shè)計(jì)
3.2 數(shù)據(jù)結(jié)構(gòu)
zing:delay_queue:job_pool 是一個(gè)hash_table結(jié)構(gòu),里面存儲了所有延遲隊(duì)列的信息。kv結(jié)構(gòu):k=prefix projectname field = topic jobid v=conent;v由客戶端傳入的數(shù)據(jù),消費(fèi)的時(shí)候回傳zing:delay_queue:bucket 延遲隊(duì)列的有序集合zset,存放k=id和需要的執(zhí)行時(shí)間戳,根據(jù)時(shí)間戳排序zing:delay_queue:queue list結(jié)構(gòu),每個(gè)topic一個(gè)list,list存放的都是當(dāng)前需要被消費(fèi)的job
圖片僅供參考,基本可以描述整個(gè)流程的執(zhí)行過程,圖片源于文末的參考博客中
3.3 任務(wù)的生命周期
新增一個(gè)job,會在zing:delay_queue:job_pool中插入一條數(shù)據(jù),記錄了業(yè)務(wù)方消費(fèi)方。zing:delay_queue:bucket也會插入一條記錄,記錄執(zhí)行的時(shí)間戳搬運(yùn)線程會去zing:delay_queue:bucket中查找哪些執(zhí)行時(shí)間戳的runtimemillis比現(xiàn)在的時(shí)間小,將這些記錄全部刪除;同時(shí)會解析出每個(gè)任務(wù)的topic是什么,然后將這些任務(wù)push到topic對應(yīng)的列表zing:delay_queue:queue中每個(gè)topic的list都會有一個(gè)監(jiān)聽線程去批量獲取list中的待消費(fèi)數(shù)據(jù),獲取到的數(shù)據(jù)全部扔給這個(gè)topic的消費(fèi)線程池消費(fèi)線程池執(zhí)行會去zing:delay_queue:job_pool查找數(shù)據(jù)結(jié)構(gòu),返回給回調(diào)結(jié)構(gòu),執(zhí)行回調(diào)方法。
3.4 設(shè)計(jì)要點(diǎn)
3.4.1 基本概念
job:需要異步處理的任務(wù),是延遲隊(duì)列里的基本單元topic:一組相同類型job的集合(隊(duì)列)。供消費(fèi)者來訂閱
3.4.2 消息結(jié)構(gòu)
每個(gè)job必須包含以下幾個(gè)屬性
jobid:job的唯一標(biāo)識。用來檢索和刪除指定的job信息topic:job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱delay:job需要延遲的時(shí)間。單位:秒。(服務(wù)端會將其轉(zhuǎn)換為絕對時(shí)間)body:job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲retry:失敗重試次數(shù)url:通知url
3.5 設(shè)計(jì)細(xì)節(jié)
3.5.1 如何快速消費(fèi)zing:delay_queue:queue
最簡單的實(shí)現(xiàn)方式就是使用定時(shí)器進(jìn)行秒級掃描,為了保證消息執(zhí)行的時(shí)效性,可以設(shè)置每1s請求redis一次,判斷隊(duì)列中是否有待消費(fèi)的job。但是這樣會存在一個(gè)問題,如果queue中一直沒有可消費(fèi)的job,那頻繁的掃描就失去了意義,也浪費(fèi)了資源,幸好list中有一個(gè)blpop阻塞原語,如果list中有數(shù)據(jù)就會立馬返回,如果沒有數(shù)據(jù)就會一直阻塞在那里,直到有數(shù)據(jù)返回,可以設(shè)置阻塞的超時(shí)時(shí)間,超時(shí)會返回null;具體的實(shí)現(xiàn)方式及策略會在代碼中進(jìn)行具體的實(shí)現(xiàn)介紹
3.5.2 避免定時(shí)導(dǎo)致的消息重復(fù)搬運(yùn)及消費(fèi)
使用redis的分布式鎖來控制消息的搬運(yùn),從而避免消息被重復(fù)搬運(yùn)導(dǎo)致的問題使用分布式鎖來保證定時(shí)器的執(zhí)行頻率
4. 核心代碼實(shí)現(xiàn)
4.1 技術(shù)說明
技術(shù)棧:springboot,redisson,redis,分布式鎖,定時(shí)器
注意:本項(xiàng)目沒有實(shí)現(xiàn)設(shè)計(jì)方案中的多queue消費(fèi),只開啟了一個(gè)queue,這個(gè)待以后優(yōu)化
4.2 核心實(shí)體
4.2.1 job新增對象
/ * 消息結(jié)構(gòu) * * @author 睜眼看世界 * @date 2020年1月15日 */@datapublic class job implements serializable { private static final long serialversionuid = 1l; / * job的唯一標(biāo)識。用來檢索和刪除指定的job信息 */ @notblank private string jobid; / * job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱 */ @notblank private string topic; / * job需要延遲的時(shí)間。單位:秒。(服務(wù)端會將其轉(zhuǎn)換為絕對時(shí)間) */ private long delay; / * job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲 */ @notblank private string body; / * 失敗重試次數(shù) */ private int retry = 0; / * 通知url */ @notblank private string url;}4.2.2 job刪除對象
/ * 消息結(jié)構(gòu) * * @author 睜眼看世界 * @date 2020年1月15日 */@datapublic class jobdie implements serializable { private static final long serialversionuid = 1l; / * job的唯一標(biāo)識。用來檢索和刪除指定的job信息 */ @notblank private string jobid; / * job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱 */ @notblank private string topic;}4.3 搬運(yùn)線程
/ * 搬運(yùn)線程 * * @author 睜眼看世界 * @date 2020年1月17日 */@slf4j@componentpublic class carryjobscheduled { @autowired private redissonclient redissonclient; / * 啟動(dòng)定時(shí)開啟搬運(yùn)job信息 */ @scheduled(cron = "*/1 * * * * *") public void carryjobtoqueue() { system.out.println("carryjobtoqueue --->"); rlock lock = redissonclient.getlock(redisqueuekey.carry_thread_lock); try { boolean lockflag = lock.trylock(lock_wait_time, lock_release_time, timeunit.seconds); if (!lockflag) { throw new businessexception(errormessageenum.acquire_lock_fail); } rscoredsortedset<object> bucketset = redissonclient.gets
上一個(gè):櫸樹扦插育苗技術(shù)
下一個(gè):2000塊錢以下有沒有好手機(jī),2000元以下有什么好手機(jī) 全觸的

編碼器是自動(dòng)化控制的重要元件
臺式機(jī)可以裝幾個(gè)硬盤啊,臺式電腦最多能裝幾個(gè)多大的硬盤
阿里云服務(wù)器怎么設(shè)置免流卡
房屋租賃稅費(fèi)包括哪些呢
交通事故責(zé)任無法認(rèn)定法院會怎么判
智能隔離器
羊獅慕首場高山露營篝火美食節(jié)7月6日開啟
黑莓怎么看版本,如何查看黑莓系統(tǒng)版本
熱防護(hù)性能試驗(yàn)儀使用指南
國內(nèi)價(jià)格便宜帶寬大的云服務(wù)器有哪些
十八禁 网站在线观看免费视频_2020av天堂网_一 级 黄 色 片免费网站_绝顶高潮合集Videos