⑴ rocketmq的9876埠可以改嗎
⑵ RocketMQ的事務消息
RocketMQ的事務消息,是指發送消息事件和其他事件需要同時成功或同時失敗。比如銀行轉賬,A銀行的某賬戶要轉一萬元到B銀行的某賬戶。A銀行發送「B銀行賬戶增加一萬元」這個消息,要和「從A銀行賬戶扣除一萬元」這個操作同時成功或者同時失敗。
RocketMQ採用兩階段提交的方式實現事務消息,TransactionMQProcer處理上面情況的流程是,先發一個「准備從B銀行賬戶增加一萬元」的消息,發送成功後做從A銀行賬戶扣除一萬元的操作,根據操作結果是否成功,確定之前的「准備從B銀行賬戶增加一萬元」的消息是做commit還是rollback,具體流程如下:
1)發送方向RocketMQ發送「待確認」消息。
2)RocketMQ將收到的「待確認」消息持久化成功後,向發送方回復消息已經發送成功,此時第一階段消息發送完成。
3)發送方開始執行本地事件邏輯。
4)發送方根據本地事件執行結果向RocketMQ發送二次確認(Commit或是Rollback)消息,RocketMQ收到Commit狀態則將第一階段消息標記為可投遞,訂閱方將能夠收到該消息;收到Rollback狀態則刪除第一階段的消息,訂閱方接收不到該消息。
5)如果出現異常情況,步驟4)提交的二次確認最終未到達RocketMQ,伺服器在經過固定時間段後將對「待確認」消息發起回查請求。
6)發送方收到消息回查請求後(如果發送一階段消息的Procer不能工作,回查請求將被發送到和Procer在同一個Group里的其他Procer),通過檢查對應消息的本地事件執行結果返回Commit或Roolback狀態。
7)RocketMQ收到回查請求後,按照步驟4)的邏輯處理。
上面的邏輯似乎很好地實現了事務消息功能,它也是RocketMQ之前的版本實現事務消息的邏輯。
但是因為RocketMQ依賴將數據順序寫到磁碟這個特徵來提高性能,步驟4)卻需要更改第一階段消息的狀態,這樣會造成磁碟Catch的臟頁過多,降低系統的性能。所以RocketMQ在4.x的版本中將這部分功能去除。系統中的一些上層Class都還在,用戶可以根據實際需求實現自己的事務功能。
客戶端有三個類來支持用戶實現事務消息,
第一個類是LocalTransaction-Executer,用來實例化步驟3)的邏輯,根據情況返回LocalTransactionState.ROLLBACK_MESSAGE或者
LocalTransactionState.COMMIT_MESSAGE狀態。
第二個類是TransactionMQProcer,它的用法和DefaultMQProcer類似,要通過它啟動一個Procer並發消息,但是比DefaultMQProcer多設置本地事務處理函數和回查狀態函數。
第三個類是TransactionCheckListener,實現步驟5)中MQ伺服器的回查請求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE
上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。
1.事務消息發送及提交:
(1) 發送消息(half消息)。
(2) 服務端響應消息寫入結果。
(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)。
2.補償流程:
(1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次「回查」。
(2) Procer收到回查消息,檢查回查消息對應的本地事務的狀態。
(3) 根據本地事務狀態,重新Commit或者Rollback。
其中,補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況。
在RocketMQ事務消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的。那麼,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然後改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由於消費組未訂閱該主題,故消費端無法消費half類型的消息。然後二階段會顯示執行提交或者回滾half消息(邏輯刪除)。當然,為了防止二階段操作失敗,RocketMQ會開啟一個定時任務,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據生產者組獲取一個服務提供者發送回查事務狀態請求,根據事務狀態來決定是提交或回滾消息。
在RocketMQ中,消息在服務端的存儲結構如下,每條消息都會有對應的索引信息,Consumer通 過ConsumeQueue這個二級索引來讀取消息實體內容,其流程如下:
RocketMQ的具體實現策略是:寫入的如果事務消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因為消息主題被替換,故消息並不會轉發到該原主題的消息消費隊列,消費者無法感知消息的存在,不會消費。其實改變消息主題是RocketMQ的常用「套路」,回想一下延時消息的實現機制。RMQ_SYS_TRANS_HALF_TOPIC
在完成一階段寫入一條對用戶不可見的消息後,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對於Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序寫文件的)。但是區別於這條消息沒有確定狀態(Pending狀態,事務懸而未決),需要一個操作來標識這條消息的最終狀態。RocketMQ事務消息方案中引入了Op消息的概念,用Op消息標識事務消息已經確定的狀態(Commit或者Rollback)。如果一條事務消息沒有對應的Op消息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op消息後,事務消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對於Rollback只是在寫入Op消息前創建Half消息的索引。
RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—
TransactionalMessageUtil.buildOpTopic();這個Topic是一個內部的Topic(像Half消息的Topic一樣),不會被用戶消費。Op消息的內容為對應的Half消息的存儲的Offset,這樣通過Op消息能索引到Half消息進行後續的回查操作。
在執行二階段Commit操作時,需要構建出Half消息的索引。一階段的Half消息由於是寫到一個特殊的Topic,所以二階段構建索引時需要讀取出Half消息,並將Topic和Queue替換成真正的目標的Topic和Queue,之後通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務消息二階段其實是利用了一階段存儲的消息的內容,在二階段時恢復出一條完整的普通消息,然後走一遍消息寫入流程。
如果在RocketMQ事務消息的二階段過程中失敗了,例如在做Commit操作時,出現網路問題導致Commit失敗,那麼需要通過一定的策略使這條消息最終被Commit。RocketMQ採用了一種補償機制,稱為「回查」。Broker端對未確定狀態的消息發起回查,將消息發送到對應的Procer端(同一個Group的Procer),由Procer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。
Broker端通過對比Half消息和Op消息進行事務消息的回查並且推進CheckPoint(記錄那些事務消息的狀態是確定的)。
值得注意的是,rocketmq並不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,rocketmq默認回滾該消息。
TxConsumer類實現
⑶ 消息隊列(mq)是什麼
是生產者先將消息投遞一個叫隊列的容器中,然後再從這個容器中取出消息,最後再轉發給消費者。
消息隊列是 Microsoft 的消息處理技術,它在任何安裝 Microsoft Windows 的計算機組合中,為任何應用程序提供消息處理和消息隊列功能,無論這些計算機是否在同一個網路上或者是否同時聯機。
消息隊列網路是能夠相互間來回發送消息的任何一組計算機。網路中的不同計算機在確保消息順利處理的過程中扮演不同的角色。它們中有些提供路由信息以確定如何發送消息,有些保存整個網路的重要信息,而有些只是發送和接收消息。
消息隊列的類型介紹:
消息隊列目前主要有兩種類型:POSIX消息隊列以及系統V消息隊列,系統V消息隊列目前被大量使用。每個消息隊列都有一個隊列頭,用結構struct msg_queue來描述。隊列頭中包含了該消息隊列的大量信息。包括消息隊列鍵值、用戶ID、組ID、消息隊列中消息數目等等。
消息隊列就是一個消息的鏈表,可以把消息看作一個記錄,具有特定的格式以及特定的優先順序。對消息隊列有寫許可權的進程可以向消息隊列中按照一定的規則添加新消息;對消息隊列有讀許可權的進程則可以從消息隊列中讀走消息。消息隊列是隨內核持續的。
⑷ rocketmq 同步雙寫 什麼意思
rocket league 4-pack的中文翻譯rocket league 4-pack火箭聯賽4包-------------------------------如有疑問,可繼續追問,如果滿意,請採納,謝謝。
⑸ 怎麼指定rocketmq的jdk
一:RocketMQ簡介RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:1.能夠保證嚴格的消息順序2.提供豐富的消息拉取模式3.高效的訂閱者水平擴展能力4.實時的消息訂閱機制5.億級消息堆積能力二:安裝RocketMQ下載源碼首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為:或者wget/alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之後才能使用。編譯源碼在進行編譯源碼之前我們需要安裝JDK。如果你已經安裝過了,請跳過這里。如果你還沒有安裝過JDK,請參考這篇文章(Linux環境下安裝JDK)。然後我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然後直接解壓,再配置一下環境變數就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示:。運行shinstall.sh。在編譯完成之後,我們只要target目錄下的alibaba-rocketmq這個文件夾中內容,把alibaba-rocketmq文件夾中的內容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之後的rocketmq。(rocketmq3.5.8)。配置環境變數接下來我們需要配置一下環境變數。在終端中輸入以下命令:vi/etc/profile,在文件的末尾中添加如下兩句話:exportrocketmq=/usr/local/rocketmqexportPATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變數生效:source/etc/profile.三:啟動RocketMQ接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內存大小(如果你的系統內存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內存大小:virunserver.sh.修改為如圖的內容:,接下來修改broker的內存大小:virunbroker.sh:啟動mqnameserver進入到/usr/local/rocketmq/bin中輸入以下命令:nohupshmqnamesrv>~/logs/rocketmqlogs/namesrv.log2>&1&。注意最後的這個&不要少。啟動mqbroker進入到/usr/local/rocketmq/bin中輸入以下命令:nohupshmqbroker-nlocalhost:9876autoCreateTopicEnable=true>~/logs/rocketmqlogs/broker.log2>&1&。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true這句話不要少了。最後的&也不要少了。我們可以通過psaux|grepjava命令來查看啟動的情況。到此,rocketmq的安裝完畢。四:RocketMQ的小例子procer:[java]viewplainpackagecom.zkn.newlearn.rocketmq;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.procer.DefaultMQProcer;importcom.alibaba.rocketmq.client.procer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.remoting.exception.RemotingException;importjava.util.concurrent.TimeUnit;/***Createdbyzknon2016/10/27.*/publicclassProcerTest01{publicstaticvoidmain(String[]args){/***一個應用創建一個Procer,由應用來維護此對象,可以設置為全局對象或者單例*注意:ProcerGroupName需要由應用來保證唯一*ProcerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,*因為伺服器會回查這個Group下的任意一個Procer*/DefaultMQProcerprocer=newDefaultMQProcer("ProcerGroupName");//procer.setNamesrvAddr("192.168.180.1:9876");procer.setNamesrvAddr("192.168.180.133:9876");procer.setInstanceName("Procer");/***Procer對象在使用之前必須要調用start初始化,初始化一次即可*注意:切記不可以在每次發送消息時,都調用start方法*/try{procer.start();}catch(MQClientExceptione){e.printStackTrace();}for(inti=0;i<100;i++){try{/***下面這段代碼表明一個Procer對象可以發送多個topic,多個tag的消息。*注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,*例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,*需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。*/{Messagemsg=newMessage("TopicTest1",//topic"TagA",//tag"OrderID001",//key("HelloMetaQ").getBytes());//bodySendResultsendResult=procer.send(msg);System.out.println(sendResult);}{Messagemsg=newMessage("TopicTest2","TagB","OrderID001",("HelloMetaQTagB".getBytes()));SendResultsendResult=procer.send(msg);System.out.println(sendResult);}{Messagemsg=newMessage("TopicTest3","TagC","OrderID001",("HelloMetaQTagC").getBytes());SendResultsendResult=procer.send(msg);System.out.println(sendResult);}TimeUnit.MILLISECONDS.sleep(1000);}catch(MQClientExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}catch(RemotingExceptione){e.printStackTrace();}catch(MQBrokerExceptione){e.printStackTrace();}}/***應用退出時,要調用shutdown來清理資源,關閉網路連接,從MetaQ伺服器上注銷自己*注意:我們建議應用在JBOSS、Tomcat等容器的退出銷毀方法里調用shutdown方法*/procer.shutdown();}}
⑹ rocketmq 發送失敗一般怎麼處理
一:RocketMQ簡介
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
1.能夠保證嚴格的消息順序
2.提供豐富的消息拉取模式
3.高效的訂閱者水平擴展能力
4.實時的消息訂閱機制
5.億級消息堆積能力
二:安裝RocketMQ
下載源碼
首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為: 或者 wget /alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之後才能使用。
編譯源碼
在進行編譯源碼之前我們需要安裝JDK。如果你已經安裝過了,請跳過這里。如果你還沒有安裝過JDK,請參考這篇文章(Linux環境下安裝JDK)。然後我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然後直接解壓,再配置一下環境變數就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示:
。運行sh install.sh。在編譯完成之後,我們只要target目錄下的alibaba-rocketmq這個文件夾中內容,把alibaba-rocketmq文件夾中的內容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之後的rocketmq。(rocketmq3.5.8)。
配置環境變數
接下來我們需要配置一下環境變數。在終端中輸入以下命令:vi /etc/profile ,在文件的末尾中添加如下兩句話:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變數生效:source /etc/profile.
三:啟動RocketMQ
接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內存大小(如果你的系統內存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內存大小:vi runserver.sh.修改為如圖的內容:
,接下來修改broker的內存大小:vi runbroker.sh:
啟動mqnameserver
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最後的這個 & 不要少。
啟動mqbroker
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true
這句話不要少了。最後的 & 也不要少了。
我們可以通過 ps aux | grep java命令來查看啟動的情況。
到此,rocketmq的安裝完畢。
四:RocketMQ的小例子
procer:
[java] view plain
package com.zkn.newlearn.rocketmq;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* Created by zkn on 2016/10/27.
*/
public class ProcerTest01 {
public static void main(String[] args) {
/**
* 一個應用創建一個Procer,由應用來維護此對象,可以設置為全局對象或者單例<br>
* 注意:ProcerGroupName需要由應用來保證唯一<br>
* ProcerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
* 因為伺服器會回查這個Group下的任意一個Procer
*/
DefaultMQProcer procer = new DefaultMQProcer("ProcerGroupName");
//procer.setNamesrvAddr("192.168.180.1:9876");
procer.setNamesrvAddr("192.168.180.133:9876");
procer.setInstanceName("Procer");
/**
* Procer對象在使用之前必須要調用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
try {
procer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < 100; i++) {
try {
/**
* 下面這段代碼表明一個Procer對象可以發送多個topic,多個tag的消息。
* 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br>
* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br>
* 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。
*/
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",
"TagB",
"OrderID001",
("Hello MetaQ TagB".getBytes()));
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",
"TagC",
"OrderID001",
("Hello MetaQ TagC").getBytes());
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
TimeUnit.MILLISECONDS.sleep(1000);
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
/**
* 應用退出時,要調用shutdown來清理資源,關閉網路連接,從MetaQ伺服器上注銷自己
* 注意:我們建議應用在JBOSS、Tomcat等容器的退出銷毀方法里調用shutdown方法
*/
procer.shutdown();
}
}
⑺ 網易傳媒技術團隊:消息中間件實現延遲隊列的應用與實踐
早期需要延遲處理的業務場景,更多的是通過定時任務掃表,然後執行滿足條件的記錄,具有頻率高、命中低、資源消耗大的缺點。隨著消息中間件的普及,延遲消息可以很好的處理這種場景,本文主要介紹延遲消息的使用場景以及基於常見的消息中間件如何實現延遲隊列,最後給出了一個在網易公開課使用延遲隊列的實踐。
1、有效期:限時活動、拼團。。。
2、超時處理:取消超時未支付訂單、超時自動確認收貨。。。
4、重試:網路異常重試、打車派單、依賴條件未滿足重試。。。
5、定時任務:智能設備定時啟動。。。
1、RabbitMQ
1)簡介:基於AMQP協議,使用Erlang編寫,實現了一個Broker框架
a、Broker:接收和分發消息的代理伺服器
b、Virtual Host:虛擬主機之間相互隔離,可理解為一個虛擬主機對應一個消息服務
c、Exchange:交換機,消息發送到指定虛擬機的交換機上
d、Binding:交換機與隊列綁定,並通過路由策略和routingKey將消息投遞到一個或多個隊列中
e、Queue:存放消息的隊列,FIFO,可持久化
f、Channel:信道,消費者通過信道消費消息,一個TCP連接上可同時創建成百上千個信道,作為消息隔離
2)延遲隊列實現:RabbitMQ的延遲隊列基於消息的存活時間TTL(Time To Live)和死信交換機DLE(Dead Letter Exchanges)實現
a、TTL:RabbitMQ支持對隊列和消息各自設置存活時間,取二者中較小的值,即隊列無消費者連接或消息在隊列中一直未被消費的過期時間
b、DLE:過期的消息通過綁定的死信交換機,路由到指定的死信隊列,消費者實際上消費的是死信隊列上的消息
3)缺點:
a、配置麻煩,額外增加一個死信交換機和一個死信隊列的配置
b、脆弱性,配置錯誤或者生產者消費者連接的隊列錯誤都有可能造成延遲失效
2、RocketMQ
1)簡介:來源於阿里,目前為Apache頂級開源項目,使用Java編寫,基於長輪詢的拉取方式,支持事務消息,並解決了順序消息和海量堆積的問題
a、Broker:存放Topic並根據讀取Procer的提交日誌,將邏輯上的一個Topic分多個Queue存儲,每個Queue上存儲消息在提交日誌上的位置
b、Name Server:無狀態的節點,維護Topic與Broker的對應關系以及Broker的主從關系
2)延遲隊列實現:RocketMQ發送延時消息時先把消息按照延遲時間段發送到指定的隊列中(rocketmq把每種延遲時間段的消息都存放到同一個隊列中),然後通過一個定時器進行輪訓這些隊列,查看消息是否到期,如果到期就把這個消息發送到指定topic的隊列中
3)缺點:延遲時間粒度受限制(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
3、Kafka
1)簡介:來源於Linkedin,目前為Apache頂級開源項目,使用Scala和Java編寫,基於zookeeper協調的分布式、流處理的日誌系統,升級版為Jafka
2)延遲隊列實現:Kafka支持延時生產、延時拉取、延時刪除等,其基於時間輪和JDK的DelayQueue實現
a、時間輪(TimingWheel):是一個存儲定時任務的環形隊列,底層採用數組實現,數組中的每個元素可以存放一個定時任務列表
b、定時任務列表(TimerTaskList):是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項
c、定時任務項(TimerTaskEntry):封裝了真正的定時任務TimerTask
d、層級時間輪:當任務的到期時間超過了當前時間輪所表示的時間范圍時,就會嘗試添加到上層時間輪中,類似於鍾表就是一個三級時間輪
e、JDK DelayQueue:存儲TimerTaskList,並根據其expiration來推進時間輪的時間,每推進一次除執行相應任務列表外,層級時間輪也會進行相應調整
3)缺點:
a、延遲精度取決於時間格設置
b、延遲任務除由超時觸發還可能被外部事件觸發而執行
4、ActiveMQ
1)簡介:基於JMS協議,Java編寫的Apache頂級開源項目,支持點對點和發布訂閱兩種模式。
a、點對點(point-to-point):消息發送到指定的隊列,每條消息只有一個消費者能夠消費,基於拉模型
b、發布訂閱(publish/subscribe):消息發送到主題Topic上,每條消息會被訂閱該Topic的所有消費者各自消費,基於推模型
2)延遲隊列實現:需要延遲的消息會先存儲在JobStore中,通過非同步線程任務JobScheler將到達投遞時間的消息投遞到相應隊列上
a、Broker Filter:Broker中定義了一系列BrokerFilter的子類構成攔截器鏈,按順序對消息進行相應處理
b、ScheleBroker:當消息中指定了延遲相關屬性,並且jobId為空時,會生成調度任務存儲到JobStore中,此時消息不會進入到隊列
c、JobStore:基於BTree存儲,key為任務執行的時間戳,value為該時間戳下需要執行的任務列表
d、JobScheler:取JobStore中最小的key執行(調度時間最早的),執行時間<=當前時間,將該任務列表依次投遞到所屬的隊列,對於需要重復投遞和投遞失敗的會再次存入JobStore中。
註: 此處JobScheler的執行時間間隔可動態變化,默認0.5s,有新任務時會立即執行(Object->notifyAll())並設置時間間隔為0.1s,沒有新任務後,下次執行時間為最近任務的調度執行時間。
3)缺點:投遞到隊列失敗,將消息重新存入JobStore,消息調度執行時間=系統當前時間+延遲時間,會導致消息被真實投遞的時間可能為設置的延遲時間的整數倍
5、Redis
1)簡介:基於Key-Value的NoSQL資料庫,由於其極高的性能常被當作緩存來使用,其數據結構支持:字元串、哈希、列表、集合、有序集合
2)延遲隊列實現:Redis的延遲隊列基於有序集合,score為執行時間戳,value為任務實體或任務實體引用
3)缺點:
a、實現復雜,本身不支持
b、完全基於內存,延遲時間長浪費內存資源
6、消息隊列對比
1、公開課延遲隊列技術選型
1)業務場景:關閉超時未支付訂單、限時優惠活動、拼團
2)性能要求:訂單、活動、拼團 數據量可控,上述MQ均能滿足要求
3)可靠性:使用ActiveMQ、RabbitMQ、RocketMQ作為延遲隊列更普遍
4)可用性:ActiveMQ、RocketMQ自身支持延遲隊列功能,且目前公開課業務中使用的中間件為ActiveMQ和Kafka
5)延遲時間靈活:活動的開始和結束時間比較靈活,而RocketMQ時間粒度較粗,Kafka會依賴時間格有精度缺失
結論: 最終選擇ActiveMQ來作為延遲隊列
2、業務場景:關閉未支付訂單
1)關閉微信未支付訂單
2)關閉IOS未支付訂單
3、ActiveMQ使用方式
1)activemq.xml中支持調度任務
2)發送消息時,設置message的延遲屬性
其中:
a、延遲處理
AMQ_SCHEDULED_DELAY:設置多長時間後,投遞給消費者(毫秒)
b、重復投遞
AMQ_SCHEDULED_PERIOD:重復投遞時間間隔(毫秒)
AMQ_SCHEDULED_REPEAT:重復投遞次數
c、指定調度計劃
AMQ_SCHEDULED_CRON:corn正則表達式
4、公開課使用中進行的優化
1)可靠性:針對實際投遞時間可能翻倍的問題,結合ActiveMQ的重復投遞,在消費者邏輯中做冪等處理來保證延遲時間的准確性
2)可追溯性:延遲消息及消費情況做資料庫冗餘存儲
3)易用性:業務上定義好延遲枚舉類型,直接使用JmsDelayTemplate發送,無需關心數據備份和參數等細節
1、無論是基於死信隊列還是基於數據先存儲後投遞,本質上都是將延遲待發送的消息數據與正常訂閱的隊列分開存儲,從而降低耦合度
2、無論是檢查隊頭消息TTL還是調度存儲的延遲數據,本質上都是通過定時任務來完成的,但是定時任務的觸發策略以及延遲數據的存儲方式決定了不同中間件之間的性能優劣
張浩,2018年加入網易傳媒,高級Java開發工程師,目前在網易公開課主要做支付財務體系、版本迭代相關的工作。
⑻ 各位大牛,從github下載的rocketmq源碼,怎麼導入到myeclipse中運行調試,直接導入maven項目沒有目錄結構
可能是版本的問題。建議重新下載安裝看看,
myeclipse2017安裝破解說明
注意!下載包中有myeclipse 2017 ci8 windows在線安裝包下載以及離線安裝包下載,附破解文件,小編建議使用離線下載包進行安裝。
1、首先點擊「myeclipse-2017-ci-8-online-installer-windows.exe」程序運行安裝,點擊「next」下一步;
2、選擇「我同意」許可協議,點擊「next」下一步;
3、選擇安裝目錄,您可以自行選擇,小編建議默認,點擊「下一步」;
4、選擇您的電腦系統32/64位,小編的是64位,點擊「next」下一步;
5、正在安裝,文件有點大,請等待片刻;
6、安裝完成
MyEclipse2017破解版:http://www.xue51.com/soft/1207.html