導航:首頁 > 源碼編譯 > amqp源碼

amqp源碼

發布時間:2023-08-06 14:53:46

1. 消息隊列(mq)是什麼

生產者先將消息投遞一個叫隊列的容器中,然後再從這個容器中取出消息,最後再轉發給消費者。

消息隊列是 Microsoft 的消息處理技術,它在任何安裝 Microsoft Windows 的計算機組合中,為任何應用程序提供消息處理和消息隊列功能,無論這些計算機是否在同一個網路上或者是否同時聯機。

消息隊列網路是能夠相互間來回發送消息的任何一組計算機。網路中的不同計算機在確保消息順利處理的過程中扮演不同的角色。它們中有些提供路由信息以確定如何發送消息,有些保存整個網路的重要信息,而有些只是發送和接收消息。

消息隊列的類型介紹:

消息隊列目前主要有兩種類型:POSIX消息隊列以及系統V消息隊列,系統V消息隊列目前被大量使用。每個消息隊列都有一個隊列頭,用結構struct msg_queue來描述。隊列頭中包含了該消息隊列的大量信息。包括消息隊列鍵值、用戶ID、組ID、消息隊列中消息數目等等。

消息隊列就是一個消息的鏈表,可以把消息看作一個記錄,具有特定的格式以及特定的優先順序。對消息隊列有寫許可權的進程可以向消息隊列中按照一定的規則添加新消息;對消息隊列有讀許可權的進程則可以從消息隊列中讀走消息。消息隊列是隨內核持續的。





2. 從 0 到 1:全面理解 RPC 遠程調用

作者 | python編程時光

責編 | 胡巍巍

什麼是RPC呢?網路給出的解釋是這樣的:「RPC(Remote Procere Call Protocol)——遠程過程調用協議,它是一種通過網路從遠程計算機程序上請求服務,而不需要了解底層網路技術的協議」。

這個概念聽起來還是比較抽象,沒關系,繼續往後看,後面概念性的東西,我會講得足夠清楚,讓你完全掌握 RPC 的基礎內容。

在 OpenStack 里的進程間通信方式主要有兩種,一種是基於HTTP協議的RESTFul API方式,另一種則是RPC調用。

那麼這兩種方式在應用場景上有何區別呢?

有使用經驗的人,就會知道:

首先,給你提兩個問題,帶著這兩個問題再往下看:

1、RPC 和 REST 區別是什麼?2、為什麼要採用RPC呢?

首先,第一個問題:RPC 和 REST 區別是什麼?

你一定會覺得這個問題很奇怪,是的,包括我,但是你在網路上一搜,會發現類似對比的文章比比皆是,我在想可能很多初學者由於基礎不牢固,才會將不相乾的二者拿出來對比吧。既然是這樣,那為了讓你更加了解陌生的RPC,就從你熟悉得不能再熟悉的 REST 入手吧。

01、所屬類別不同

REST,是Representational State Transfer 的簡寫,中文描述表述性狀態傳遞(是指某個瞬間狀態的資源數據的快照,包括資源數據的內容、表述格式(XML、JSON)等信息。)

REST 是一種軟體架構風格。這種風格的典型應用,就是HTTP。其因為簡單、擴展性強的特點而廣受開發者的青睞。

而RPC 呢,是 Remote Procere Call Protocol 的簡寫,中文描述是遠程過程調用,它可以實現客戶端像調用本地服務(方法)一樣調用伺服器的服務(方法)。

而 RPC 可以基於 TCP/UDP,也可以基於 HTTP 協議進行傳輸的,按理說它和REST不是一個層面意義上的東西,不應該放在一起討論,但是誰讓REST這么流行呢,它是目前最流行的一套互聯網應用程序的API設計標准,某種意義下,我們說 REST 可以其實就是指代 HTTP 協議。

02、使用方式不同

03、面向對象不同

從設計上來看,RPC,所謂的遠程過程調用 ,是面向方法的 ,REST:所謂的 Representational state transfer ,是面向資源的,除此之外,還有一種叫做 SOA,所謂的面向服務的架構,它是面向消息的,這個接觸不多,就不多說了。

04、序列化協議不同

介面調用通常包含兩個部分,序列化和通信協議。

通信協議,上面已經提及了,REST 是 基於 HTTP 協議,而 RPC 可以基於 TCP/UDP,也可以基於 HTTP 協議進行傳輸的。

常見的序列化協議,有:json、xml、hession、protobuf、thrift、text、bytes等,REST 通常使用的是 JSON或者XML,而 RPC 使用的是 JSON-RPC,或者 XML-RPC。

通過以上幾點,我們知道了 REST 和 RPC 之間有很明顯的差異。

然後第二個問題:為什麼要採用RPC呢?

那到底為何要使用 RPC,單純的依靠RESTful API不可以嗎?為什麼要搞這么多復雜的協議,渣渣表示真的學不過來了。

關於這一點,以下幾點僅是我的個人猜想,僅供交流哈:

說了這么多,我們該如何選擇這兩者呢?我總結了如下兩點,供你參考:

「遠程調用」意思就是:被調用方法的具體實現不在程序運行本地,而是在別的某個地方(分布到各個伺服器),調用者只想要函數運算的結果,卻不需要實現函數的具體細節。

光說不練嘴把式,接下來,我將分別用三種不同的方式全面地讓你搞明白 rpc 遠程調用是如何實現的。

01、基於 xml-rpc

Python實現 rpc,可以使用標准庫里的 SimpleXMLRPCServer,它是基於XML-RPC 協議的。

有了這個模塊,開啟一個 rpc server,就變得相當簡單了。執行以下代碼:

有了 rpc server,接下來就是 rpc client,由於我們上面使用的是 XML-RPC,所以 rpc clinet 需要使用xmlrpclib 這個庫。

然後,我們通過 server_proxy 對象就可以遠程調用之前的rpc server的函數了。

SimpleXMLRPCServer是一個單線程的伺服器。這意味著,如果幾個客戶端同時發出多個請求,其它的請求就必須等待第一個請求完成以後才能繼續。

若非要使用 SimpleXMLRPCServer 實現多線程並發,其實也不難。只要將代碼改成如下即可。

02、基於json-rpc

SimpleXMLRPCServer 是基於 xml-rpc 實現的遠程調用,上面我們也提到 除了 xml-rpc 之外,還有 json-rpc 協議。

那 python 如何實現基於 json-rpc 協議呢?

答案是很多,很多web框架其自身都自己實現了json-rpc,但我們要獨立這些框架之外,要尋求一種較為干凈的解決方案,我查找到的選擇有兩種

第一種是 jsonrpclib

第二種是 python-jsonrpc

先來看第一種 jsonrpclib

它與 Python 標准庫的 SimpleXMLRPCServer 很類似(因為它的類名就叫做 SimpleJSONRPCServer ,不明真相的人真以為它們是親兄弟)。或許可以說,jsonrpclib 就是仿照 SimpleXMLRPCServer 標准庫來進行編寫的。

它的導入與 SimpleXMLRPCServer 略有不同,因為SimpleJSONRPCServer分布在jsonrpclib庫中。

服務端

客戶端

再來看第二種python-jsonrpc,寫起來貌似有些復雜。

服務端

客戶端

調用過程如下

還記得上面我提到過的 zabbix API,因為我有接觸過,所以也拎出來講講。zabbix API 也是基於 json-rpc 2.0協議實現的。

因為內容較多,這里只帶大家打個,zabbix 是如何調用的:直接指明要調用 zabbix server 的哪個方法,要傳給這個方法的參數有哪些。

03、基於 zerorpc

以上介紹的兩種rpc遠程調用方式,如果你足夠細心,可以發現他們都是http+rpc 兩種協議結合實現的。

接下來,我們要介紹的這種(zerorpc),就不再使用走 http 了。

zerorpc 這個第三方庫,它是基於TCP協議、 ZeroMQ 和 MessagePack的,速度相對快,響應時間短,並發高。zerorpc 和 pyjsonrpc 一樣,需要額外安裝,雖然SimpleXMLRPCServer不需要額外安裝,但是SimpleXMLRPCServer性能相對差一些。

調用過程如下

客戶端除了可以使用zerorpc框架實現代碼調用之外,它還支持使用「命令行」的方式調用。

客戶端可以使用命令行,那服務端是不是也可以呢?

是的,通過 Github 上的文檔幾個 demo 可以體驗到這個第三方庫做真的是優秀。

比如我們可以用下面這個命令,創建一個rpc server,後面這個 time Python 標准庫中的 time 模塊,zerorpc 會將 time 注冊綁定以供client調用。

經過了上面的學習,我們已經學會了如何使用多種方式實現rpc遠程調用。

通過對比,zerorpc 可以說是脫穎而出,一支獨秀。

為此,我也做了一番思考:

OpenStack 組件繁多,在一個較大的集群內部每個組件內部通過rpc通信頻繁,如果都採用rpc直連調用的方式,連接數會非常地多,開銷大,若有些 server 是單線程的模式,超時會非常的嚴重。

OpenStack 是復雜的分布式集群架構,會有多個 rpc server 同時工作,假設有 server01,server02,server03 三個server,當 rpc client 要發出rpc請求時,發給哪個好呢?這是問題一。

你可能會說輪循或者隨機,這樣對大家都公平。這樣的話還會引出另一個問題,倘若請求剛好發到server01,而server01剛好不湊巧,可能由於機器或者其他因為導致服務沒在工作,那這個rpc消息可就直接失敗了呀。要知道做為一個集群,高可用是基本要求,如果出現剛剛那樣的情況其實是很尷尬的。這是問題二。

集群有可能根據實際需要擴充節點數量,如果使用直接調用,耦合度太高,不利於部署和生產。這是問題三。

引入消息中間件,可以很好的解決這些問題。

解決問題一:消息只有一份,接收者由AMQP的負載演算法決定,默認為在所有Receiver中均勻發送(round robin)。

解決問題二:有了消息中間件做緩沖站,client 可以任性隨意的發,server 都掛掉了?沒有關系,等 server 正常工作後,自己來消息中間件取就行了。

解決問題三:無論有多少節點,它們只要認識消息中間件這一個中介就足夠了。

既然講到了消息隊列,如果你之前沒有接觸過這塊內容,最好花幾分鍾的時間跟我好好過下關於消息隊列的幾個基礎概念。

首先,RPC只是定義了一個通信介面,其底層的實現可以各不相同,可以是 socket,也可以是今天要講的 AMQP。

AMQP(Advanced Message Queuing Protocol)是一種基於隊列的可靠消息服務協議,作為一種通信協議,AMQP同樣存在多個實現,如Apache Qpid,RabbitMQ等。

以下是 AMQP 中的幾個必知的概念:

Publisher:消息發布者

Queue:用來保存消息的存儲空間,消息沒有被receiver前,保存在隊列中。

Exchange:用來接收Publisher發出的消息,根據Routing key 轉發消息到對應的Message Queue中,至於轉到哪個隊列里,這個路由演算法又由exchange type決定的。

Exchange type:主要四種描述exchange的類型。

direct:消息路由到滿足此條件的隊列中(queue,可以有多個):routing key = binding key

topic:消息路由到滿足此條件的隊列中(queue,可以有多個):routing key 匹配 binding pattern. binding pattern是類似正則表達式的字元串,可以滿足復雜的路由條件。

fanout:消息路由到多有綁定到該exchange的隊列中。

binding :binding是用來描述exchange和queue之間的關系的概念,一個exchang可以綁定多個隊列,這些關系由binding建立。前面說的binding key /binding pattern也是在binding中給出。

為了讓你明白這幾者的關系,我畫了一張模型圖。

關於AMQP,有幾下幾點值得注意:

前面鋪墊了那麼久,終於到了講真實應用的場景。在生產中RPC是如何應用的呢?

其他模型我不太清楚,在 OpenStack 中的應用模型是這樣的

至於為什麼要如此設計,前面我已經給出了自己的觀點。

接下來,就是源碼解讀 OpenStack ,看看其是如何通過rpc進行遠程調用的。如若你對此沒有興趣(我知道很多人對此都沒有興趣,所以不浪費大家時間),可以直接跳過這一節,進入下一節。

目前Openstack中有兩種RPC實現,一種是在oslo messaging,一種是在openstack.common.rpc。

openstack.common.rpc是舊的實現,oslo messaging是對openstack.common.rpc的重構。openstack.common.rpc在每個項目中都存在一份拷貝,oslo messaging即將這些公共代碼抽取出來,形成一個新的項目。oslo messaging也對RPC API 進行了重新設計,對多種 transport 做了進一步封裝,底層也是用到了kombu這個AMQP庫。(註:Kombu 是Python中的messaging庫。Kombu旨在通過為AMQ協議提供慣用的高級介面,使Python中的消息傳遞盡可能簡單,並為常見的消息傳遞問題提供經過驗證和測試的解決方案。)

關於oslo_messaging庫,主要提供了兩種獨立的API:

因為 notify 實現是太簡單了,所以這里我就不多說了,如果有人想要看這方面內容,可以收藏我的博客(http://python-online.cn) ,我會更新補充 notify 的內容。

OpenStack RPC 模塊提供了 rpc.call,rpc.cast, rpc.fanout_cast 三種 RPC 調用方法,發送和接收 RPC 請求。

rpc.call 和 .rpc.cast 從實現代碼上看,他們的區別很小,就是call調用時候會帶有wait_for_reply=True參數,而cast不帶。

要了解 rpc 的調用機制呢,首先要知道 oslo_messaging 的幾個概念主要方法有四個:

transport:RPC功能的底層實現方法,這里是rabbitmq的消息隊列的訪問路徑

transport 就是定義你如何訪連接消息中間件,比如你使用的是 Rabbitmq,那在 nova.conf中應該有一行transport_url的配置,可以很清楚地看出指定了 rabbitmq 為消息中間件,並配置了連接rabbitmq的user,passwd,主機,埠。

target用來表述 RPC 伺服器監聽topic,server名稱和server監聽的exchange,是否廣播fanout。

rpc server 要獲取消息,需要定義target,就像一個門牌號一樣。

rpc client 要發送消息,也需要有target,說明消息要發到哪去。

endpoints:是可供別人遠程調用的對象

RPC伺服器暴露出endpoint,每個 endpoint 包涵一系列的可被遠程客戶端通過 transport 調用的方法。直觀理解,可以參考nova-conctor創建rpc server的代碼,這邊的endpoints就是 nova/manager.py:ConctorManager

dispatcher:分發器,這是 rpc server 才有的概念

只有通過它 server 端才知道接收到的rpc請求,要交給誰處理,怎麼處理?

在client端,是這樣指定要調用哪個方法的。

而在server端,是如何知道要執行這個方法的呢?這就是dispatcher 要乾的事,它從 endpoint 里找到這個方法,然後執行,最後返回。

Serializer:在 python 對象和message(notification) 之間數據做序列化或是反序列化的基類。

主要方法有四個:

每個notification listener都和一個executor綁定,來控制收到的notification如何分配。默認情況下,使用的是blocking executor(具體特性參加executor一節)

模仿是一種很高效的學習方法,我這里根據 OpenStack 的調用方式,抽取出核心內容,寫成一個簡單的 demo,有對 OpenStack 感興趣的可以了解一下,大部分人也可以直接跳過這章節。

注意以下代碼不能直接運行,你還需要配置 rabbitmq 的連接方式,你可以寫在配置文件中,通過 get_transport 從cfg.CONF 中讀取,也可以直接將其寫成url的格式做成參數,傳給 get_transport 。而且還要nova或者其他openstack組件的環境中運行(因為需要有ctxt這個環境變數)

簡單的 rpc client

簡單的 rpc server

【End】

熱 文 推 薦

☞Facebook 發幣 Libra;谷歌十億美金為窮人造房;第四代樹莓派 Raspberry Pi 4 發布 | 開發者周刊

☞WebRTC 將一統實時音視頻天下?

☞小米崔寶秋:小米 AIoT 深度擁抱開源

☞華為在美研發機構 Futurewei 意欲分家?

☞老司機教你如何寫出沒人敢維護的代碼!

☞Python有哪些技術上的優點?比其他語言好在哪兒?

☞上不了北大「圖靈」、清華「姚班」,AI專業還能去哪上?

☞公鏈史記 | 從鴻蒙初辟到萬物生長的十年激盪……

☞邊緣計算容器化是否有必要?

☞馬雲曾經偶像,終於把阿里留下的1400億敗光了!

你點的每個「在看」,我都認真當成了喜歡

3. 全面認識openstack,它到底是什麼包含什麼

(1)官方的解釋相信大家都已經了解了,不了解也沒有關系。現在從常識的角度來給大家解釋和說明。
OpenStack是一個雲平台管理的項目,它不是一個軟體。這個項目由幾個主要的組件組合起來完成一些具體的工作。
OpenStack是一個旨在為公共及私有雲的建設與管理提供軟體的開源項目,OpenStack被公認作為基礎設施即服務(簡稱IaaS)資源的通用前端。
如果這些還不明白,那麼從另外的角度給大家介紹:
首先讓大家看下面兩個圖就很簡單明了了:
此圖為openstack的登錄界面

下面是openstack的一個管理界面

從這兩個圖,相信有一定開發經驗,就能看出openstack是什麼了。可以說他是一個框架,甚至可以從軟體的角度來理解它。如果不明白,就從傳統開發來講解。不知道你是否了解oa,erp等系統,如果不了解可以到網上去找,資料一大把。他和oa,erp有什麼不同。很簡單就是openstack是用做雲計算的一個平台,或則一個解決方案。它是雲計算一個重要組成部分。
上面對openstack有了一個感性的認識。
(2)openstack能幹什麼。
大家都知道阿里雲平台,網路雲平台,而阿里雲平台據傳說就是對openstack的二次開發。對於二次開發相信只要接觸過軟體的都會明白這個概念。不明白的自己網上去查一下。也就是說openstack,可以搭建雲平台,什麼雲平台,公有雲,私有雲。現在網路在招聘的私有雲工程師,應該就是這方面的人才。
(3)openstack自身都包含什麼
以下是5個OpenStack的重要構成部分:
l Nova – 計算服務
l Swift – 存儲服務
l Glance – 鏡像服務
l Keystone – 認證服務
l Horizon – UI服務

圖1 OpenStack基本構架

下圖展示了Keystone、Dashboard二者與其它OpenStack部分的交互。

下面詳細介紹每一個服務:
(一)OpenStack計算設施—-Nova Nova是OpenStack計算的彈性控制器。OpenStack雲實例生命期所需的各種動作都將由Nova進行處理和支撐,這就意味著Nova以管理平台的身份登場,負責管理整個雲的計算資源、網路、授權及測度。雖然Nova本身並不提供任何虛擬能力,但是它將使用libvirt API與虛擬機的宿主機進行交互。Nova通過Web服務API來對外提供處理介面,而且這些介面與Amazon的Web服務介面是兼容的。

功能及特點
l 實例生命周期管理
l 計算資源管理
l 網路與授權管理
l 基於REST的API
l 非同步連續通信
l 支持各種宿主:Xen、XenServer/XCP、KVM、UML、VMware vSphere及Hyper-V

OpenStack計算部件
l Nova彈性雲包含以下主要部分:
l API Server(nova-api)
l 消息隊列(rabbit-mq server)
l 運算工作站(nova-compute)
l 網路控制器(nova-network)
l 卷管理(nova-volume)
l 調度器(nova-scheler)

API伺服器(nova-api)
API伺服器提供了雲設施與外界交互的介面,它是外界用戶對雲實施管理的唯一通道。通過使用web服務來調用各種EC2的API,接著API伺服器便通過消息隊列把請求送達至雲內目標設施進行處理。作為對EC2-api的替代,用戶也可以使用OpenStack的原生API,我們把它叫做「OpenStack API」。

消息隊列(Rabbit MQ Server)
OpenStack內部在遵循AMQP(高級消息隊列協議)的基礎上採用消息隊列進行通信。Nova對請求應答進行非同步調用,當請求接收後便則立即觸發一個回調。由於使用了非同步通信,不會有用戶的動作被長置於等待狀態。例如,啟動一個實例或上傳一份鏡像的過程較為耗時,API調用就將等待返回結果而不影響其它操作,在此非同步通信起到了很大作用,使整個系統變得更加高效。

運算工作站(nova-compute)
運算工作站的主要任務是管理實例的整個生命周期。他們通過消息隊列接收請求並執行,從而對實例進行各種操作。在典型實際生產環境下,會架設許多運算工作站,根據調度演算法,一個實例可以在可用的任意一台運算工作站上部署。

網路控制器(nova-network)
網路控制器處理主機的網路配置,例如IP地址分配,配置項目VLAN,設定安全群組以及為計算節點配置網路。

卷工作站(nova-volume)
卷工作站管理基於LVM的實例卷,它能夠為一個實例創建、刪除、附加卷,也可以從一個實例中分離卷。卷管理為何如此重要?因為它提供了一種保持實例持續存儲的手段,比如當結束一個實例後,根分區如果是非持續化的,那麼對其的任何改變都將丟失。可是,如果從一個實例中將卷分離出來,或者為這個實例附加上卷的話,即使實例被關閉,數據仍然保存其中。這些數據可以通過將卷附加到原實例或其他實例的方式而重新訪問。
因此,為了日後訪問,重要數據務必要寫入卷中。這種應用對於數據伺服器實例的存儲而言,尤為重要。

調度器(nova-scheler)
調度器負責把nova-API調用送達給目標。調度器以名為「nova-schele」的守護進程方式運行,並根據調度演算法從可用資源池中恰當地選擇運算伺服器。有很多因素都可以影響調度結果,比如負載、內存、子節點的遠近、CPU架構等等。強大的是nova調度器採用的是可插入式架構。
目前nova調度器使用了幾種基本的調度演算法:
隨機化:主機隨機選擇可用節點;
可用化:與隨機相似,只是隨機選擇的范圍被指定;
簡單化:應用這種方式,主機選擇負載最小者來運行實例。負載數據可以從別處獲得,如負載均衡伺服器。

(二)OpenStack鏡像伺服器—-GlanceOpenStack鏡像伺服器是一套虛擬機鏡像發現、注冊、檢索系統,我們可以將鏡像存儲到以下任意一種存儲中:
本地文件系統(默認)
l OpenStack對象存儲
l S3直接存儲
l S3對象存儲(作為S3訪問的中間渠道)
l HTTP(只讀)

功能及特點
提供鏡像相關服務

Glance構件
l Glance控制器
l Glance注冊器

(三)OpenStack存儲設施—-Swift
Swift為OpenStack提供一種分布式、持續虛擬對象存儲,它類似於Amazon Web Service的S3簡單存儲服務。Swift具有跨節點百級對象的存儲能力。Swift內建冗餘和失效備援管理,也能夠處理歸檔和媒體流,特別是對大數據(千兆位元組)和大容量(多對象數量)的測度非常高效。

功能及特點
l 海量對象存儲
l 大文件(對象)存儲
l 數據冗餘管理
l 歸檔能力—–處理大數據集
l 為虛擬機和雲應用提供數據容器
l 處理流媒體
l 對象安全存儲
l 備份與歸檔
l 良好的可伸縮性

Swift組件
l Swift賬戶
l Swift容器
l Swift對象
l Swift代理
l Swift RING

Swift代理伺服器
用戶都是通過Swift-API與代理伺服器進行交互,代理伺服器正是接收外界請求的門衛,它檢測合法的實體位置並路由它們的請求。
此外,代理伺服器也同時處理實體失效而轉移時,故障切換的實體重復路由請求。

Swift對象伺服器
對象伺服器是一種二進制存儲,它負責處理本地存儲中的對象數據的存儲、檢索和刪除。對象都是文件系統中存放的典型的二進制文件,具有擴展文件屬性的元數據(xattr)。
注意:xattr格式被linux中的ext3/4,XFS,Btrfs,JFS和ReiserFS所支持,但是並沒有有效測試證明在XFS,JFS,ReiserFS,Reiser4和ZFS下也同樣能運行良好。不過,XFS被認為是當前最好的選擇。

Swift容器伺服器
容器伺服器將列出一個容器中的所有對象,默認對象列表將存儲為SQLite文件(譯者註:也可以修改為MySQL,安裝中就是以MySQL為例)。容器伺服器也會統計容器中包含的對象數量及容器的存儲空間耗費。

Swift賬戶伺服器
賬戶伺服器與容器伺服器類似,將列出容器中的對象。

Ring(索引環)
Ring容器記錄著Swift中物理存儲對象的位置信息,它是真實物理存儲位置的實體名的虛擬映射,類似於查找及定位不同集群的實體真實物理位置的索引服務。這里所謂的實體指賬戶、容器、對象,它們都擁有屬於自己的不同的Rings。

(四)OpenStack認證服務(Keystone)
Keystone為所有的OpenStack組件提供認證和訪問策略服務,它依賴自身REST(基於Identity API)系統進行工作,主要對(但不限於)Swift、Glance、Nova等進行認證與授權。事實上,授權通過對動作消息來源者請求的合法性進行鑒定。如下圖所示:

Keystone採用兩種授權方式,一種基於用戶名/密碼,另一種基於令牌(Token)。除此之外,Keystone提供以下三種服務:
l 令牌服務:含有授權用戶的授權信息
l 目錄服務:含有用戶合法操作的可用服務列表
l 策略服務:利用Keystone具體指定用戶或群組某些訪問許可權

認證服務組件
服務入口:如Nova、Swift和Glance一樣每個OpenStack服務都擁有一個指定的埠和專屬的URL,我們稱其為入口(endpoints)。

l 區位:在某個數據中心,一個區位具體指定了一處物理位置。在典型的雲架構中,如果不是所有的服務都訪問分布式數據中心或伺服器的話,則也稱其為區位。

l 用戶:Keystone授權使用者
譯者註:代表一個個體,OpenStack以用戶的形式來授權服務給它們。用戶擁有證書(credentials),且可能分配給一個或多個租戶。經過驗證後,會為每個單獨的租戶提供一個特定的令牌。[來源:http://blog.sina.com.cn/s/blog_70064f190100undy.html]

l 服務:總體而言,任何通過Keystone進行連接或管理的組件都被稱為服務。舉個例子,我們可以稱Glance為Keystone的服務。

l 角色:為了維護安全限定,就雲內特定用戶可執行的操作而言,該用戶關聯的角色是非常重要的。
譯者註:一個角色是應用於某個租戶的使用許可權集合,以允許某個指定用戶訪問或使用特定操作。角色是使用許可權的邏輯分組,它使得通用的許可權可以簡單地分組並綁定到與某個指定租戶相關的用戶。

l 租間:租間指的是具有全部服務入口並配有特定成員角色的一個項目。
譯者註:一個租間映射到一個Nova的「project-id」,在對象存儲中,一個租間可以有多個容器。根據不同的安裝方式,一個租間可以代表一個客戶、帳號、組織或項目。

(五)OpenStack管理的Web介面—-Horizon
Horizon是一個用以管理、控制OpenStack服務的Web控制面板,它可以管理實例、鏡像、創建密匙對,對實例添加卷、操作Swift容器等。除此之外,用戶還可以在控制面板中使用終端(console)或VNC直接訪問實例。總之,Horizon具有如下一些特點:
l 實例管理:創建、終止實例,查看終端日誌,VNC連接,添加卷等
l 訪問與安全管理:創建安全群組,管理密匙對,設置浮動IP等
l 偏好設定:對虛擬硬體模板可以進行不同偏好設定
l 鏡像管理:編輯或刪除鏡像
l 查看服務目錄
l 管理用戶、配額及項目用途
l 用戶管理:創建用戶等
l 卷管理:創建卷和快照
l 對象存儲處理:創建、刪除容器和對象
l 為項目下載環境變數

4. 在linux下安裝rabbitmq失敗怎麼解決

RabbitMQ 是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現,由以高性能、健壯以及可伸縮性出名的 Erlang 寫成,因此也是繼承了這些優點。
AMQP 里主要要說兩個組件:Exchange 和 Queue (在 AMQP 1.0 里還會有變動),如下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Procer 和 Consumer 兩種類型:

1:mq的安裝需要Erlang,所以首先下載Erlang,下載地址:http://www.erlang.org/download.html直接下載源碼,編譯安裝即可。
將下載好的tar包解壓編譯安裝,如下命令:
tar -zxvf otp_src_R16B03-1.tar.gz

cd otp_src_R16B03-1
./configure && make install

安裝過程中可能出現如下錯誤:
configure:error:
No curses library functions found
configure: error:/bin/sh'/home/niewf/software/erlang_R13B01/erts/configure'
failed for erts

解決方法:
yum list|grep ncurses
yum -y install ncurses-devel
yum install ncurses-devel

或者直接下載ncurses包編譯安裝。
下載地址:http://download.chinaunix.net/download/0008000/7242.shtml
tar zxvf ncurses.tar.gz #解壓縮並且釋放 文件包
cd ncurses #進入解壓縮的目錄(注意版本)
./configure #按照你的系統環境製作安裝配置文件
make #編譯源代碼並且編譯NCURSES庫
su root #切換到root用戶環境
make install #安裝編譯好的NCURSES庫

完成後繼續返回上一步操作。

2:安裝python,如果系統中python版本低於2.5的話需要升級python到2.6以上,具體可參考:http://gavinshaw.blog.51cto.com/385947/610585

3:安裝simplejson,直接下載simplejson源碼包編譯安裝即可,下載地址:https://pypi.python.org/pypi/simplejson/。
下載simplejson源碼包後,運行python setup.py install即可完成安裝。

4:安裝rabbit mq,下載地址:https://www.rabbitmq.com/install-generic-unix.html
下載後放入相應目錄解壓,進入%RABBITMQ_HOME%/sbin目錄下運行:./rabbitmq-server start即可啟動mq。
如果遇到如下錯誤,則參考http://leeon.me/a/rabbitmq-start-fail-note解決方案
ERROR: epmd error for host "xxx": address (cannot connect to host/port)
到此mq已經安裝完成。
在%RABBITMQ_HOME%/sbin目錄運行./rabbitmqctl status可查看當前mq狀態。
同時mq也提供了界面查看當前mq狀態,但是需要啟用該插件功能,運行如下命令:
rabbitmq-plugins enable rabbitmq_management,然後在瀏覽器中輸入:http://host-name:15672/#/即可訪問,頁面結果如下:

5. android上的socket通信的開源框架有哪些

請去360手機助手下載android學習手冊裡面有例子、源碼和文檔

Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 組織一個較新的項目,它為開發高性能和高可用性的網路應用程序提供了非常便利的框架。當前發行的 MINA 版本支持基於 java NIO 技術的 TCP/UDP 應用程序開發、串口通訊程序(只在最新的預覽版中提供),MINA 所支持的功能也在進一步的擴展中。目前正在使用 MINA 的軟體包括有:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、Openfire 等等。

以上是從網上找到的mina框架簡單介紹。
由於正在開發的項目中要求加入及時通信功能(游戲方面),所以在網上找了好幾種框架,像openfire、tigase等都是基於Xmpp協議開發的優秀框架。但這些側重於消息的推送,不適合游戲上的簡單交互。所以後來找到了mina這個框架,順手搭建起來。接下來就是這幾天學習的總結了,文章裡面沒有涉及到邏輯層的方面,只是簡單的實現即時通信功能。資源下載我會放在文章的最後面。

一、相關資源下載

(1)Apache官方網站:http://mina.apache.org/downloads.html

(2) Android用jar包(包括官網的資源,我會一律放在網路網盤下)

二、Mina簡單配置

伺服器端一共要用到四個jar包,包括一個日誌包。將他們放在lib中,並載入進去
分別為mina-core-2.0.7.jar slf4j-log4j12-1.7.6.jar slf4j-api-1.7.6.jar log4j-1.2.14.jar(日誌管理包)

如果要使用日誌的jar包,則要在項目的src目錄下新建一個log4j.properties,添加內容如下:

log4j.rootCategory=INFO, stdout , R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n

log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n

log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL

log4j.logger.com.canoo.webtest=WARN

log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN

log4j.rootCategory=INFO, stdout , R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n

log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n

log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL

log4j.logger.com.canoo.webtest=WARN

log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN

Android客戶端要加入的jar包:mina-core-2.0.7.jar slf4j-android-1.6.1-RC1.jar兩個jar包(可能直接使用上面的jar包也會行,我沒試過~)

二、Mina服務端

我這邊使用的是mina2.0版本,所以可能與mina1.0的版本有所不同。那麼首先在伺服器端創建開始

新建一個Demo1Server.class文件,裡麵包含著程序的入口,埠號,Acceptor連接.

1 public class Demo1Server {
2 //日誌類的實現
3 private static Logger logger = Logger.getLogger(Demo1Server.class);
4 //埠號,要求客戶端與伺服器端一致
5 private static int PORT = 4444;
6
7 public static void main(String[] args){
8 IoAcceptor acceptor = null;
9 try{
10 //創建一個非阻塞的server端的Socket
11 acceptor = new NioSocketAcceptor();
12 //設置過濾器(使用mina提供的文本換行符編解碼器)
13 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
14 //自定義的編解碼器
15 //acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
16 //設置讀取數據的換從區大小
17 acceptor.getSessionConfig().setReadBufferSize(2048);
18 //讀寫通道10秒內無操作進入空閑狀態
19 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
20 //為接收器設置管理服務
21 acceptor.setHandler(new Demo1ServerHandler());
22 //綁定埠
23 acceptor.bind(new InetSocketAddress(PORT));
24
25 logger.info("伺服器啟動成功... 埠號未:"+PORT);
26
27 }catch(Exception e){
28 logger.error("伺服器啟動異常...",e);
29 e.printStackTrace();
30 }
31 }
32
33 }

一個很簡單的程序入口吧,簡單的說就是在伺服器上設置一個消息接收器,讓它監聽從埠傳過來的消息並進行處理。那麼接下來我們看看怎麼進行消息處理。

新建一個消息處理類,或者說是是業務邏輯處理器——Demo1ServerHandler,它繼承了IoHandlerAdapter類,它默認覆蓋了七個方法,而我們主要使用messageReceived()。

public class Demo1ServerHandler extends IoHandlerAdapter {
public static Logger logger = Logger.getLogger(Demo1ServerHandler.class);

//從埠接受消息,會響應此方法來對消息進行處理
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String msg = message.toString();
if("exit".equals(msg)){
//如果客戶端發來exit,則關閉該連接
session.close(true);
}
//向客戶端發送消息
Date date = new Date();
session.write(date);
logger.info("伺服器接受消息成功...");
super.messageReceived(session, message);
}

//向客服端發送消息後會調用此方法
@Override
public void messageSent(IoSession session, Object message) throws Exception {
logger.info("伺服器發送消息成功...");
super.messageSent(session, message);
}

//關閉與客戶端的連接時會調用此方法
@Override
public void sessionClosed(IoSession session) throws Exception {
logger.info("伺服器與客戶端斷開連接...");
super.sessionClosed(session);
}

//伺服器與客戶端創建連接
@Override
public void sessionCreated(IoSession session) throws Exception {
logger.info("伺服器與客戶端創建連接...");
super.sessionCreated(session);
}

//伺服器與客戶端連接打開
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.info("伺服器與客戶端連接打開...");
super.sessionOpened(session);
}

@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
logger.info("伺服器進入空閑狀態...");
super.sessionIdle(session, status);
}

@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
logger.info("伺服器發送異常...");
super.exceptionCaught(session, cause);
}
}

很直白的一段程序,相當於將伺服器分成了七個狀態,而每個狀態都有自己的一套邏輯處理方案。

至此,一個最簡單的Mina伺服器框架就搭好了,我們可以使用電腦上的telnet命令來測試一下伺服器能否使用
cmd控制台—>telnet <ip地址> <埠號> 如我的伺服器ip地為192.168.1.10 那我就寫telnet 192.168.1.10 4444 .此時我們可以看到輸出日誌為

此時連接已經創建,我們在輸入信息伺服器就會對信息進行處理,並給出相應的應答。
(telnet的用法不知道的可以自行網路)

三、Mina客戶端(Android端)

伺服器簡單搭建完畢,那麼開始在Android端是配置伺服器吧。同樣的不要忘記載入jar包, 由於Android自帶了Logout,所以就不使用Mina的日誌包了。
由於接受消息會阻塞Android的進程,所以我把它開在子線程中(同時將其放在Service中,讓其在後台運行)

1 public class MinaThread extends Thread {
2
3 private IoSession session = null;
4
5 @Override
6 public void run() {
7 // TODO Auto-generated method stub
8 Log.d("TEST","客戶端鏈接開始...");
9 IoConnector connector = new NioSocketConnector();
10 //設置鏈接超時時間
11 connector.setConnectTimeoutMillis(30000);
12 //添加過濾器
13 //connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
14 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
15 connector.setHandler(new MinaClientHandler(minaService));
16
17 try{
18 ConnectFuture future = connector.connect(new InetSocketAddress(ConstantUtil.WEB_MATCH_PATH,ConstantUtil.WEB_MATCH_PORT));//創建鏈接
19 future.awaitUninterruptibly();// 等待連接創建完成
20 session = future.getSession();//獲得session
21 session.write("start");
22 }catch (Exception e){
23 Log.d("TEST","客戶端鏈接異常...");
24 }
25 session.getCloseFuture().awaitUninterruptibly();//等待連接斷開
26 Log.d("TEST","客戶端斷開...");
27 connector.dispose();
28 super.run();
29 }
30
31 }

不知道你們注意到了沒,客戶端的代碼與伺服器端的極其相似,不同的是伺服器是創建NioSocketAcceptor對象,而客戶端是創建NioSocketConnect對象。當然同樣需要添加編碼解碼過濾器和業務邏輯過濾器。

業務邏輯過濾器代碼:

1 public class MinaClientHandler extends IoHandlerAdapter{
2
3
4 @Override
5 public void exceptionCaught(IoSession session, Throwable cause)
6 throws Exception {
7 Log.d("TEST","客戶端發生異常");
8 super.exceptionCaught(session, cause);
9 }
10
11 @Override
12 public void messageReceived(IoSession session, Object message)
13 throws Exception {
14 String msg = message.toString();
15 Log.d("TEST","客戶端接收到的信息為:" + msg);
16 super.messageReceived(session, message);
17 }
18
19 @Override
20 public void messageSent(IoSession session, Object message) throws Exception {
21 // TODO Auto-generated method stub
22 super.messageSent(session, message);
23 }
24 }

方法功能與伺服器端一樣。測試這里就不做了。可以的話自己寫個Demo效果更好

四、Mina的更多功能

拿到所有客戶端Session

Collection<IoSession> sessions = session.getService().getManagedSessions().values();

自定義編碼解碼器,可以對消息進行預處理。要繼承ProtocolEncoder和ProtocolDecode類。

數據對象的傳遞

這些功能不便放在這里講了,可能我會以後再找機會另開一篇來講述這些功能~,大家可以瀏覽結尾處的參考文章來加深對mina的理解。

在我認為,熟悉和快速使用一個新的的框架可以看出一個程序員的水平,同樣及時總結和歸納自己學到的新知識也是一個好的程序員該具有的習慣。那麼Mina的簡單搭建就到這里為止了,希望對大家有所幫助

6. Spring整合rabbitmq實踐(一):基礎

Spring整合rabbitmq實踐(二):擴展
Spring整合rabbitmq實踐(三):源碼

procer:消息生產者;

consumer:消息消費者;

queue:消息隊列;

exchange:接收procer發送的消息按照binding規則轉發給相應的queue;

binding:exchange與queue之間的關系;

virtualHost:每個virtualHost持有自己的exchange、queue、binding,用戶只能在virtualHost粒度控制許可權。

fanout:

群發到所有綁定的queue;

direct:

根據routing key routing到相應的queue,routing不到任何queue的消息扔掉;可以不同的key綁到同一個queue,也可以同一個key綁到不同的queue;

topic:

類似direct,區別是routing key是由一組以「.」分隔的單片語成,可以有通配符,「*」匹配一個單詞,「#」匹配0個或多個單詞;

headers:

根據arguments來routing。

arguments為一組key-value對,任意設置。

「x-match」是一個特殊的key,值為「all」時必須匹配所有argument,值為「any」時只需匹配任意一個argument,不設置默認為「all」。

通過以下配置,可以獲得最基礎的發送消息到queue,以及從queue接收消息的功能。

這個包同時包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想單純一點,可以單獨引入。

最主要的是以下幾個包,

spring-amqp:

spring-rabbit:

amqp-client:

個人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一個實現,所以可能spring-rabbit也是類似關系),amqp-client提供操作rabbitmq的java api。

目前最新的是2.0.5.RELEASE版本。如果編譯報錯,以下信息或許能有所幫助:

(1)

解決方案:spring-amqp版本改為2.0.5.RELEASE。

(2)

解決方案:spring-context版本改為5.0.7.RELEASE。

(3)

解決方案:spring-core版本改為5.0.7.RELEASE。

(4)

解決方案:spring-beans版本改為5.0.7.RELEASE。

(5)

解決方案:spring-aop版本改為5.0.7.RELEASE。

總之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。

後面所講的這些bean配置,spring-amqp中都有默認配置,如果不需要修改默認配置,則不用人為配置這些bean。後面這些配置也沒有涉及到所有的屬性。

這里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory介面,不是amqp-client包下面的ConnectionFactory類。

上面這個bean是spring-amqp的核心,不論是發送消息還是接收消息都需要這個bean,下面描述一下裡面這些配置的含義。

setAddresses :設置了rabbitmq的地址、埠,集群部署的情況下可填寫多個,「,」分隔。

setUsername :設置rabbitmq的用戶名。

setPassword :設置rabbitmq的用戶密碼。

setVirtualHost :設置virtualHost。

setCacheMode :設置緩存模式,共有兩種, CHANNEL 和 CONNECTION 模式。

CHANNEL 模式,程序運行期間ConnectionFactory會維護著一個Connection,所有的操作都會使用這個Connection,但一個Connection中可以有多個Channel,操作rabbitmq之前都必須先獲取到一個Channel,否則就會阻塞(可以通過setChannelCheckoutTimeout()設置等待時間),這些Channel會被緩存(緩存的數量可以通過setChannelCacheSize()設置);

CONNECTION 模式,這個模式下允許創建多個Connection,會緩存一定數量的Connection,每個Connection中同樣會緩存一些Channel,除了可以有多個Connection,其它都跟CHANNEL模式一樣。

這里的Connection和Channel是spring-amqp中的概念,並非rabbitmq中的概念,官方文檔對Connection和Channel有這樣的描述:

關於 CONNECTION 模式中,可以存在多個Connection的使用場景,官方文檔的描述:

setChannelCacheSize :設置每個Connection中(注意是每個Connection)可以緩存的Channel數量,注意只是緩存的Channel數量,不是Channel的數量上限,操作rabbitmq之前(send/receive message等)要先獲取到一個Channel,獲取Channel時會先從緩存中找閑置的Channel,如果沒有則創建新的Channel,當Channel數量大於緩存數量時,多出來沒法放進緩存的會被關閉。

注意,改變這個值不會影響已經存在的Connection,隻影響之後創建的Connection。

setChannelCheckoutTimeout :當這個值大於0時, channelCacheSize 不僅是緩存數量,同時也會變成數量上限,從緩存獲取不到可用的Channel時,不會創建新的Channel,會等待這個值設置的毫秒數,到時間仍然獲取不到可用的Channel會拋出AmqpTimeoutException異常。

同時,在 CONNECTION 模式,這個值也會影響獲取Connection的等待時間,超時獲取不到Connection也會拋出AmqpTimeoutException異常。

setPublisherReturns、setPublisherConfirms :procer端的消息確認機制(confirm和return),設為true後開啟相應的機制,後文詳述。

官方文檔描述publisherReturns設為true打開return機制,publisherComfirms設為true打開confirm機制,但測試結果(2.0.5.RELEASE版本)是,任意一個設為true,兩個都會打開。

addConnectionListener、addChannelListener、setRecoveryListener :添加或設置相應的Listener,後文詳述。

setConnectionCacheSize :僅在 CONNECTION 模式使用,設置Connection的緩存數量。

setConnectionLimit :僅在 CONNECTION 模式使用,設置Connection的數量上限。

上面的bean配置,除了需要注入的幾個listener bean以外,其它設置的都是其默認值(2.0.5.RELEASE版本),後面的bean示例配置也是一樣,部分屬性不同版本的默認值可能有所不同。

一般不用配置這個bean,這里簡單提一下。

這個ConnectionFactory是rabbit api中的ConnectionFactory類,這裡面是連接rabbitmq節點的Connection配置。

如果想修改這些配置,可以按如下方式配置:

consumer端如果通過@RabbitListener註解的方式接收消息,不需要這個bean。

不建議直接通過ConnectionFactory獲取Channel操作rabbitmq,建議通過amqpTemplate操作。

setConnectionFactory :設置spring-amqp的ConnectionFactory。

setRetryTemplate :設置重試機制,詳情見後文。

setMessageConverter :設置MessageConverter,用於java對象與Message對象(實際發送和接收的消息對象)之間的相互轉換,詳情見後文。

setChannelTransacted :打開或關閉Channel的事務,關於amqp的事務後文描述。

setReturnCallback、setConfirmCallback :return和confirm機制的回調介面,後文詳述。

setMandatory :設為true使ReturnCallback生效。

這個bean僅在consumer端通過@RabbitListener註解的方式接收消息時使用,每一個@RabbitListener註解的方法都會由這個創建一個MessageListenerContainer,負責接收消息。

setConnectionFactory :設置spring-amqp的ConnectionFactory。

setMessageConverter :對於consumer端,MessageConverter也可以在這里配置。

setAcknowledgeMode :設置consumer端的應答模式,共有三種:NONE、AUTO、MANUAL。

NONE,無應答,這種模式下rabbitmq默認consumer能正確處理所有發出的消息,所以不管消息有沒有被consumer收到,有沒有正確處理都不會恢復;

AUTO,由Container自動應答,正確處理發出ack信息,處理失敗發出nack信息,rabbitmq發出消息後將會等待consumer端的應答,只有收到ack確認信息才會把消息清除掉,收到nack信息的處理辦法由setDefaultRequeueRejected()方法設置,所以在這種模式下,發生錯誤的消息是可以恢復的。

MANUAL,基本同AUTO模式,區別是需要人為調用方法給應答。

setConcurrentConsumers :設置每個MessageListenerContainer將會創建的Consumer的最小數量,默認是1個。

setMaxConcurrentConsumers :設置每個MessageListenerContainer將會創建的Consumer的最大數量,默認等於最小數量。

setPrefetchCount :設置每次請求發送給每個Consumer的消息數量。

setChannelTransacted :設置Channel的事務。

setTxSize :設置事務當中可以處理的消息數量。

setDefaultRequeueRejected :設置當rabbitmq收到nack/reject確認信息時的處理方式,設為true,扔回queue頭部,設為false,丟棄。

setErrorHandler :實現ErrorHandler介面設置進去,所有未catch的異常都會由ErrorHandler處理。

AmqpTamplate裡面有下面幾個方法可以向queue發送消息:

這里,exchange必須存在,否則消息發不出去,會看到錯誤日誌,但不影響程序運行:

Message是org.springframework.amqp.core.Message類,spring-amqp發送和接收的都是這個Message。

從Message類源碼可以看到消息內容放在byte[]裡面,MessageProperties對象包含了非常多的一些其它信息,如Header、exchange、routing key等。

這種方式,需要將消息內容(String,或其它Object)轉換為byte[],示例:

也可以直接調用下面幾個方法,Object將會自動轉為Message對象發送:

有兩種方法接收消息:

1.polling consumer,輪詢調用方法一次獲取一條;

2.asynchronous consumer,listener非同步接收消息。

polling consumer

直接通過AmqpTemplate的方法從queue獲取消息,有如下方法:

如果queue裡面沒有消息,會立刻返回null;傳入timeoutMillis參數後可阻塞等待一段時間。

如果想直接從queue獲取想要的java對象,可調用下面這一組方法:

後面4個方法是帶泛型的,示例如下:

使用這四個方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,這是一個介面,Jackson2JsonMessageConverter已經實現了這個介面,所以只要將Jackson2JsonMessageConverter設置到RabbitTemplate中即可。

asynchronous consumer

有多種方式可以實現,詳情參考官方文檔。

最簡單的實現方式是@RabbitListener註解,示例:

這里接收消息的對象用的是Message,也可以是自定義的java對象,但調用Converter轉換失敗會報錯。

註解上指定的queue必須是已經存在並且綁定到某個exchange的,否則會報錯:

如果在@RabbitListener註解中指明binding信息,就能自動創建queue、exchange並建立binding關系。

direct和topic類型的exchange需要routingKey,示例:

fanout類型的exchange,示例:

2.0版本之後,可以指定多個routingKey,示例:

並且支持arguments屬性,可用於headers類型的exchange,示例:

@Queue有兩個參數exclusive和autoDelete順便解釋一下:

exclusive,排他隊列,只對創建這個queue的Connection可見,Connection關閉queue刪除;

autoDelete,沒有consumer對這個queue消費時刪除。

對於這兩種隊列,rable=true是不起作用的。

另外,如果註解申明的queue和exchange及binding關系都已經存在,但與已存在的設置不同,比如,已存在的exchange的是direct類型,這里嘗試改為fanout類型,結果是不會有任何影響,不論是修改或者新增參數都不會生效。

如果queue存在,exchange存在,但沒有binding,那麼程序啟動後會自動建立起binding關系。

7. RabbitMQ消息過濾的一個思路

生產者 Procer 向 一個 隊列發送消息,並且為消息打上不同的 Tag。假設這個隊列有 3 個消費者:Consumer #[1:3],Consumer #1 只想消費 tag1 標記的消息,Consumer #2 只想消費 tag2 標記的消息,Consumer #3 只想消費 tag3 標記的消息。

生產者 publish 消息時,將 Tag 保存在 Map<String, Object> 類型的 header 欄位,作為構建 AMQP.BasicProperties 參數

消費者如何告知 Broker 只消費特定 Tag?

假設 Consumer #1 只希望消費帶 tag1 標記的消息,那麼 Consumer #1 可以在向 Broker 請求 Basic.Consume 指令時,捎帶自己期望的 Tag 字元串。Client 在具體生成 consumerTag 時可以用 Tag 關鍵字加上隨機字元串(避免 consumerTag 重復):

消費者通過 Basic.Consume 指令來監聽隊列的消息,這些消費者信息服務端是如何存儲的?

保存在隊列主進程(Pid)的 state 中(具體調試可以通過 sys:get_state(Pid) )

並且隊列進程在初始化時,會進行 consumers 初始化:

consumers 欄位實際由 priority_queue:new() 初始化。當有新的 consumer 注冊到隊列進程,那麼會調用 rabbit_queue_consumers 模塊的 add_consumer 方法來向 priority_queue 添加一個元素;同理當有 consumer下線時,最終也會調用該模塊的 remove_consumer 方法。 priority_queue 完整實現見 附二

Broker 向 Consumer 投遞消息時,底層是通過 rabbit_amqqueue_process 調用 rabbit_queue_consumers 模塊的 deliver 方法。默認採用

從 priority_queue 中獲取一個 QEntry( {ChPid, Consumer} ),然後通過 FetchFun 從隊列中獲取消息,發送到 ChPid(Channel 進程)

在 consumers 不為空的情況下,通過 FetchFun 獲取消息,此時可以獲取該消息的 header,取出 Tag 值(如果消息打了 Tag 標記),然後通過 priority_queue 的 filter/2 方法

在 Pred 實現中,我們可以判斷當前消息 Tag 值是否被包含在 consumerTag 中,從而可以過濾出消費特定 tag 的consumers,最後向這些 consumers 中的一個發送 Message 消息。

附一 (隊列進程 state 中的 consumers 信息例子)

附二 (priority_queue 模塊實現
rabbit_common )

注 :上述思路建議在測試環境測試,考慮到有可能出現的性能問題,作為一個調研也會有很多工作要做,整個過程會涉及 RabbitMQ 服務端源碼改造、編譯、打包( rabbitmq-public-umbrella )以及客戶端的相關改造,如果能實際嘗試下,也會有不小的收獲。

閱讀全文

與amqp源碼相關的資料

熱點內容
新手學電腦編程語言 瀏覽:891
雲空間在哪個文件夾 瀏覽:926
編程游戲小貓抓小魚 瀏覽:790
安卓dosbox怎麼打開 瀏覽:774
伺服器無影響是怎麼回事 瀏覽:952
比德電子采購平台加密 瀏覽:202
加密貨幣400億 瀏覽:524
植發2次加密 瀏覽:44
vc6查看編譯的錯誤 瀏覽:595
心理大全pdf 瀏覽:1002
區域鏈加密幣怎麼樣 瀏覽:343
查找命令符 瀏覽:95
壓縮工具zar 瀏覽:735
白盤怎麼解壓 瀏覽:475
辰語程序員學習筆記 瀏覽:47
程序員被公司勸退 瀏覽:523
java三子棋 瀏覽:693
加密空間怎麼強制進入 瀏覽:345
ug分割曲線命令 瀏覽:209
學碼思程序員 瀏覽:610