导航:首页 > 源码编译 > amqp源码

amqp源码

发布时间:2023-08-06 14:53:46

1. 消息队列(mq)是什么

生产者先将消息投递一个叫队列的容器中,然后再从这个容器中取出消息,最后再转发给消费者。

消息队列是 Microsoft 的消息处理技术,它在任何安装 Microsoft Windows 的计算机组合中,为任何应用程序提供消息处理和消息队列功能,无论这些计算机是否在同一个网络上或者是否同时联机。

消息队列网络是能够相互间来回发送消息的任何一组计算机。网络中的不同计算机在确保消息顺利处理的过程中扮演不同的角色。它们中有些提供路由信息以确定如何发送消息,有些保存整个网络的重要信息,而有些只是发送和接收消息。

消息队列的类型介绍:

消息队列目前主要有两种类型:POSIX消息队列以及系统V消息队列,系统V消息队列目前被大量使用。每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该消息队列的大量信息。包括消息队列键值、用户ID、组ID、消息队列中消息数目等等。

消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息;对消息队列有读权限的进程则可以从消息队列中读走消息。消息队列是随内核持续的。





2. 从 0 到 1:全面理解 RPC 远程调用

作者 | python编程时光

责编 | 胡巍巍

什么是RPC呢?网络给出的解释是这样的:“RPC(Remote Procere Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议”。

这个概念听起来还是比较抽象,没关系,继续往后看,后面概念性的东西,我会讲得足够清楚,让你完全掌握 RPC 的基础内容。

在 OpenStack 里的进程间通信方式主要有两种,一种是基于HTTP协议的RESTFul API方式,另一种则是RPC调用。

那么这两种方式在应用场景上有何区别呢?

有使用经验的人,就会知道:

首先,给你提两个问题,带着这两个问题再往下看:

1、RPC 和 REST 区别是什么?2、为什么要采用RPC呢?

首先,第一个问题:RPC 和 REST 区别是什么?

你一定会觉得这个问题很奇怪,是的,包括我,但是你在网络上一搜,会发现类似对比的文章比比皆是,我在想可能很多初学者由于基础不牢固,才会将不相干的二者拿出来对比吧。既然是这样,那为了让你更加了解陌生的RPC,就从你熟悉得不能再熟悉的 REST 入手吧。

01、所属类别不同

REST,是Representational State Transfer 的简写,中文描述表述性状态传递(是指某个瞬间状态的资源数据的快照,包括资源数据的内容、表述格式(XML、JSON)等信息。)

REST 是一种软件架构风格。这种风格的典型应用,就是HTTP。其因为简单、扩展性强的特点而广受开发者的青睐。

而RPC 呢,是 Remote Procere Call Protocol 的简写,中文描述是远程过程调用,它可以实现客户端像调用本地服务(方法)一样调用服务器的服务(方法)。

而 RPC 可以基于 TCP/UDP,也可以基于 HTTP 协议进行传输的,按理说它和REST不是一个层面意义上的东西,不应该放在一起讨论,但是谁让REST这么流行呢,它是目前最流行的一套互联网应用程序的API设计标准,某种意义下,我们说 REST 可以其实就是指代 HTTP 协议。

02、使用方式不同

03、面向对象不同

从设计上来看,RPC,所谓的远程过程调用 ,是面向方法的 ,REST:所谓的 Representational state transfer ,是面向资源的,除此之外,还有一种叫做 SOA,所谓的面向服务的架构,它是面向消息的,这个接触不多,就不多说了。

04、序列化协议不同

接口调用通常包含两个部分,序列化和通信协议。

通信协议,上面已经提及了,REST 是 基于 HTTP 协议,而 RPC 可以基于 TCP/UDP,也可以基于 HTTP 协议进行传输的。

常见的序列化协议,有:json、xml、hession、protobuf、thrift、text、bytes等,REST 通常使用的是 JSON或者XML,而 RPC 使用的是 JSON-RPC,或者 XML-RPC。

通过以上几点,我们知道了 REST 和 RPC 之间有很明显的差异。

然后第二个问题:为什么要采用RPC呢?

那到底为何要使用 RPC,单纯的依靠RESTful API不可以吗?为什么要搞这么多复杂的协议,渣渣表示真的学不过来了。

关于这一点,以下几点仅是我的个人猜想,仅供交流哈:

说了这么多,我们该如何选择这两者呢?我总结了如下两点,供你参考:

“远程调用”意思就是:被调用方法的具体实现不在程序运行本地,而是在别的某个地方(分布到各个服务器),调用者只想要函数运算的结果,却不需要实现函数的具体细节。

光说不练嘴把式,接下来,我将分别用三种不同的方式全面地让你搞明白 rpc 远程调用是如何实现的。

01、基于 xml-rpc

Python实现 rpc,可以使用标准库里的 SimpleXMLRPCServer,它是基于XML-RPC 协议的。

有了这个模块,开启一个 rpc server,就变得相当简单了。执行以下代码:

有了 rpc server,接下来就是 rpc client,由于我们上面使用的是 XML-RPC,所以 rpc clinet 需要使用xmlrpclib 这个库。

然后,我们通过 server_proxy 对象就可以远程调用之前的rpc server的函数了。

SimpleXMLRPCServer是一个单线程的服务器。这意味着,如果几个客户端同时发出多个请求,其它的请求就必须等待第一个请求完成以后才能继续。

若非要使用 SimpleXMLRPCServer 实现多线程并发,其实也不难。只要将代码改成如下即可。

02、基于json-rpc

SimpleXMLRPCServer 是基于 xml-rpc 实现的远程调用,上面我们也提到 除了 xml-rpc 之外,还有 json-rpc 协议。

那 python 如何实现基于 json-rpc 协议呢?

答案是很多,很多web框架其自身都自己实现了json-rpc,但我们要独立这些框架之外,要寻求一种较为干净的解决方案,我查找到的选择有两种

第一种是 jsonrpclib

第二种是 python-jsonrpc

先来看第一种 jsonrpclib

它与 Python 标准库的 SimpleXMLRPCServer 很类似(因为它的类名就叫做 SimpleJSONRPCServer ,不明真相的人真以为它们是亲兄弟)。或许可以说,jsonrpclib 就是仿照 SimpleXMLRPCServer 标准库来进行编写的。

它的导入与 SimpleXMLRPCServer 略有不同,因为SimpleJSONRPCServer分布在jsonrpclib库中。

服务端

客户端

再来看第二种python-jsonrpc,写起来貌似有些复杂。

服务端

客户端

调用过程如下

还记得上面我提到过的 zabbix API,因为我有接触过,所以也拎出来讲讲。zabbix API 也是基于 json-rpc 2.0协议实现的。

因为内容较多,这里只带大家打个,zabbix 是如何调用的:直接指明要调用 zabbix server 的哪个方法,要传给这个方法的参数有哪些。

03、基于 zerorpc

以上介绍的两种rpc远程调用方式,如果你足够细心,可以发现他们都是http+rpc 两种协议结合实现的。

接下来,我们要介绍的这种(zerorpc),就不再使用走 http 了。

zerorpc 这个第三方库,它是基于TCP协议、 ZeroMQ 和 MessagePack的,速度相对快,响应时间短,并发高。zerorpc 和 pyjsonrpc 一样,需要额外安装,虽然SimpleXMLRPCServer不需要额外安装,但是SimpleXMLRPCServer性能相对差一些。

调用过程如下

客户端除了可以使用zerorpc框架实现代码调用之外,它还支持使用“命令行”的方式调用。

客户端可以使用命令行,那服务端是不是也可以呢?

是的,通过 Github 上的文档几个 demo 可以体验到这个第三方库做真的是优秀。

比如我们可以用下面这个命令,创建一个rpc server,后面这个 time Python 标准库中的 time 模块,zerorpc 会将 time 注册绑定以供client调用。

经过了上面的学习,我们已经学会了如何使用多种方式实现rpc远程调用。

通过对比,zerorpc 可以说是脱颖而出,一支独秀。

为此,我也做了一番思考:

OpenStack 组件繁多,在一个较大的集群内部每个组件内部通过rpc通信频繁,如果都采用rpc直连调用的方式,连接数会非常地多,开销大,若有些 server 是单线程的模式,超时会非常的严重。

OpenStack 是复杂的分布式集群架构,会有多个 rpc server 同时工作,假设有 server01,server02,server03 三个server,当 rpc client 要发出rpc请求时,发给哪个好呢?这是问题一。

你可能会说轮循或者随机,这样对大家都公平。这样的话还会引出另一个问题,倘若请求刚好发到server01,而server01刚好不凑巧,可能由于机器或者其他因为导致服务没在工作,那这个rpc消息可就直接失败了呀。要知道做为一个集群,高可用是基本要求,如果出现刚刚那样的情况其实是很尴尬的。这是问题二。

集群有可能根据实际需要扩充节点数量,如果使用直接调用,耦合度太高,不利于部署和生产。这是问题三。

引入消息中间件,可以很好的解决这些问题。

解决问题一:消息只有一份,接收者由AMQP的负载算法决定,默认为在所有Receiver中均匀发送(round robin)。

解决问题二:有了消息中间件做缓冲站,client 可以任性随意的发,server 都挂掉了?没有关系,等 server 正常工作后,自己来消息中间件取就行了。

解决问题三:无论有多少节点,它们只要认识消息中间件这一个中介就足够了。

既然讲到了消息队列,如果你之前没有接触过这块内容,最好花几分钟的时间跟我好好过下关于消息队列的几个基础概念。

首先,RPC只是定义了一个通信接口,其底层的实现可以各不相同,可以是 socket,也可以是今天要讲的 AMQP。

AMQP(Advanced Message Queuing Protocol)是一种基于队列的可靠消息服务协议,作为一种通信协议,AMQP同样存在多个实现,如Apache Qpid,RabbitMQ等。

以下是 AMQP 中的几个必知的概念:

Publisher:消息发布者

Queue:用来保存消息的存储空间,消息没有被receiver前,保存在队列中。

Exchange:用来接收Publisher发出的消息,根据Routing key 转发消息到对应的Message Queue中,至于转到哪个队列里,这个路由算法又由exchange type决定的。

Exchange type:主要四种描述exchange的类型。

direct:消息路由到满足此条件的队列中(queue,可以有多个):routing key = binding key

topic:消息路由到满足此条件的队列中(queue,可以有多个):routing key 匹配 binding pattern. binding pattern是类似正则表达式的字符串,可以满足复杂的路由条件。

fanout:消息路由到多有绑定到该exchange的队列中。

binding :binding是用来描述exchange和queue之间的关系的概念,一个exchang可以绑定多个队列,这些关系由binding建立。前面说的binding key /binding pattern也是在binding中给出。

为了让你明白这几者的关系,我画了一张模型图。

关于AMQP,有几下几点值得注意:

前面铺垫了那么久,终于到了讲真实应用的场景。在生产中RPC是如何应用的呢?

其他模型我不太清楚,在 OpenStack 中的应用模型是这样的

至于为什么要如此设计,前面我已经给出了自己的观点。

接下来,就是源码解读 OpenStack ,看看其是如何通过rpc进行远程调用的。如若你对此没有兴趣(我知道很多人对此都没有兴趣,所以不浪费大家时间),可以直接跳过这一节,进入下一节。

目前Openstack中有两种RPC实现,一种是在oslo messaging,一种是在openstack.common.rpc。

openstack.common.rpc是旧的实现,oslo messaging是对openstack.common.rpc的重构。openstack.common.rpc在每个项目中都存在一份拷贝,oslo messaging即将这些公共代码抽取出来,形成一个新的项目。oslo messaging也对RPC API 进行了重新设计,对多种 transport 做了进一步封装,底层也是用到了kombu这个AMQP库。(注:Kombu 是Python中的messaging库。Kombu旨在通过为AMQ协议提供惯用的高级接口,使Python中的消息传递尽可能简单,并为常见的消息传递问题提供经过验证和测试的解决方案。)

关于oslo_messaging库,主要提供了两种独立的API:

因为 notify 实现是太简单了,所以这里我就不多说了,如果有人想要看这方面内容,可以收藏我的博客(http://python-online.cn) ,我会更新补充 notify 的内容。

OpenStack RPC 模块提供了 rpc.call,rpc.cast, rpc.fanout_cast 三种 RPC 调用方法,发送和接收 RPC 请求。

rpc.call 和 .rpc.cast 从实现代码上看,他们的区别很小,就是call调用时候会带有wait_for_reply=True参数,而cast不带。

要了解 rpc 的调用机制呢,首先要知道 oslo_messaging 的几个概念主要方法有四个:

transport:RPC功能的底层实现方法,这里是rabbitmq的消息队列的访问路径

transport 就是定义你如何访连接消息中间件,比如你使用的是 Rabbitmq,那在 nova.conf中应该有一行transport_url的配置,可以很清楚地看出指定了 rabbitmq 为消息中间件,并配置了连接rabbitmq的user,passwd,主机,端口。

target用来表述 RPC 服务器监听topic,server名称和server监听的exchange,是否广播fanout。

rpc server 要获取消息,需要定义target,就像一个门牌号一样。

rpc client 要发送消息,也需要有target,说明消息要发到哪去。

endpoints:是可供别人远程调用的对象

RPC服务器暴露出endpoint,每个 endpoint 包涵一系列的可被远程客户端通过 transport 调用的方法。直观理解,可以参考nova-conctor创建rpc server的代码,这边的endpoints就是 nova/manager.py:ConctorManager

dispatcher:分发器,这是 rpc server 才有的概念

只有通过它 server 端才知道接收到的rpc请求,要交给谁处理,怎么处理?

在client端,是这样指定要调用哪个方法的。

而在server端,是如何知道要执行这个方法的呢?这就是dispatcher 要干的事,它从 endpoint 里找到这个方法,然后执行,最后返回。

Serializer:在 python 对象和message(notification) 之间数据做序列化或是反序列化的基类。

主要方法有四个:

每个notification listener都和一个executor绑定,来控制收到的notification如何分配。默认情况下,使用的是blocking executor(具体特性参加executor一节)

模仿是一种很高效的学习方法,我这里根据 OpenStack 的调用方式,抽取出核心内容,写成一个简单的 demo,有对 OpenStack 感兴趣的可以了解一下,大部分人也可以直接跳过这章节。

注意以下代码不能直接运行,你还需要配置 rabbitmq 的连接方式,你可以写在配置文件中,通过 get_transport 从cfg.CONF 中读取,也可以直接将其写成url的格式做成参数,传给 get_transport 。而且还要nova或者其他openstack组件的环境中运行(因为需要有ctxt这个环境变量)

简单的 rpc client

简单的 rpc server

【End】

热 文 推 荐

☞Facebook 发币 Libra;谷歌十亿美金为穷人造房;第四代树莓派 Raspberry Pi 4 发布 | 开发者周刊

☞WebRTC 将一统实时音视频天下?

☞小米崔宝秋:小米 AIoT 深度拥抱开源

☞华为在美研发机构 Futurewei 意欲分家?

☞老司机教你如何写出没人敢维护的代码!

☞Python有哪些技术上的优点?比其他语言好在哪儿?

☞上不了北大“图灵”、清华“姚班”,AI专业还能去哪上?

☞公链史记 | 从鸿蒙初辟到万物生长的十年激荡……

☞边缘计算容器化是否有必要?

☞马云曾经偶像,终于把阿里留下的1400亿败光了!

你点的每个“在看”,我都认真当成了喜欢

3. 全面认识openstack,它到底是什么包含什么

(1)官方的解释相信大家都已经了解了,不了解也没有关系。现在从常识的角度来给大家解释和说明。
OpenStack是一个云平台管理的项目,它不是一个软件。这个项目由几个主要的组件组合起来完成一些具体的工作。
OpenStack是一个旨在为公共及私有云的建设与管理提供软件的开源项目,OpenStack被公认作为基础设施即服务(简称IaaS)资源的通用前端。
如果这些还不明白,那么从另外的角度给大家介绍:
首先让大家看下面两个图就很简单明了了:
此图为openstack的登录界面

下面是openstack的一个管理界面

从这两个图,相信有一定开发经验,就能看出openstack是什么了。可以说他是一个框架,甚至可以从软件的角度来理解它。如果不明白,就从传统开发来讲解。不知道你是否了解oa,erp等系统,如果不了解可以到网上去找,资料一大把。他和oa,erp有什么不同。很简单就是openstack是用做云计算的一个平台,或则一个解决方案。它是云计算一个重要组成部分。
上面对openstack有了一个感性的认识。
(2)openstack能干什么。
大家都知道阿里云平台,网络云平台,而阿里云平台据传说就是对openstack的二次开发。对于二次开发相信只要接触过软件的都会明白这个概念。不明白的自己网上去查一下。也就是说openstack,可以搭建云平台,什么云平台,公有云,私有云。现在网络在招聘的私有云工程师,应该就是这方面的人才。
(3)openstack自身都包含什么
以下是5个OpenStack的重要构成部分:
l Nova – 计算服务
l Swift – 存储服务
l Glance – 镜像服务
l Keystone – 认证服务
l Horizon – UI服务

图1 OpenStack基本构架

下图展示了Keystone、Dashboard二者与其它OpenStack部分的交互。

下面详细介绍每一个服务:
(一)OpenStack计算设施—-Nova Nova是OpenStack计算的弹性控制器。OpenStack云实例生命期所需的各种动作都将由Nova进行处理和支撑,这就意味着Nova以管理平台的身份登场,负责管理整个云的计算资源、网络、授权及测度。虽然Nova本身并不提供任何虚拟能力,但是它将使用libvirt API与虚拟机的宿主机进行交互。Nova通过Web服务API来对外提供处理接口,而且这些接口与Amazon的Web服务接口是兼容的。

功能及特点
l 实例生命周期管理
l 计算资源管理
l 网络与授权管理
l 基于REST的API
l 异步连续通信
l 支持各种宿主:Xen、XenServer/XCP、KVM、UML、VMware vSphere及Hyper-V

OpenStack计算部件
l Nova弹性云包含以下主要部分:
l API Server(nova-api)
l 消息队列(rabbit-mq server)
l 运算工作站(nova-compute)
l 网络控制器(nova-network)
l 卷管理(nova-volume)
l 调度器(nova-scheler)

API服务器(nova-api)
API服务器提供了云设施与外界交互的接口,它是外界用户对云实施管理的唯一通道。通过使用web服务来调用各种EC2的API,接着API服务器便通过消息队列把请求送达至云内目标设施进行处理。作为对EC2-api的替代,用户也可以使用OpenStack的原生API,我们把它叫做“OpenStack API”。

消息队列(Rabbit MQ Server)
OpenStack内部在遵循AMQP(高级消息队列协议)的基础上采用消息队列进行通信。Nova对请求应答进行异步调用,当请求接收后便则立即触发一个回调。由于使用了异步通信,不会有用户的动作被长置于等待状态。例如,启动一个实例或上传一份镜像的过程较为耗时,API调用就将等待返回结果而不影响其它操作,在此异步通信起到了很大作用,使整个系统变得更加高效。

运算工作站(nova-compute)
运算工作站的主要任务是管理实例的整个生命周期。他们通过消息队列接收请求并执行,从而对实例进行各种操作。在典型实际生产环境下,会架设许多运算工作站,根据调度算法,一个实例可以在可用的任意一台运算工作站上部署。

网络控制器(nova-network)
网络控制器处理主机的网络配置,例如IP地址分配,配置项目VLAN,设定安全群组以及为计算节点配置网络。

卷工作站(nova-volume)
卷工作站管理基于LVM的实例卷,它能够为一个实例创建、删除、附加卷,也可以从一个实例中分离卷。卷管理为何如此重要?因为它提供了一种保持实例持续存储的手段,比如当结束一个实例后,根分区如果是非持续化的,那么对其的任何改变都将丢失。可是,如果从一个实例中将卷分离出来,或者为这个实例附加上卷的话,即使实例被关闭,数据仍然保存其中。这些数据可以通过将卷附加到原实例或其他实例的方式而重新访问。
因此,为了日后访问,重要数据务必要写入卷中。这种应用对于数据服务器实例的存储而言,尤为重要。

调度器(nova-scheler)
调度器负责把nova-API调用送达给目标。调度器以名为“nova-schele”的守护进程方式运行,并根据调度算法从可用资源池中恰当地选择运算服务器。有很多因素都可以影响调度结果,比如负载、内存、子节点的远近、CPU架构等等。强大的是nova调度器采用的是可插入式架构。
目前nova调度器使用了几种基本的调度算法:
随机化:主机随机选择可用节点;
可用化:与随机相似,只是随机选择的范围被指定;
简单化:应用这种方式,主机选择负载最小者来运行实例。负载数据可以从别处获得,如负载均衡服务器。

(二)OpenStack镜像服务器—-GlanceOpenStack镜像服务器是一套虚拟机镜像发现、注册、检索系统,我们可以将镜像存储到以下任意一种存储中:
本地文件系统(默认)
l OpenStack对象存储
l S3直接存储
l S3对象存储(作为S3访问的中间渠道)
l HTTP(只读)

功能及特点
提供镜像相关服务

Glance构件
l Glance控制器
l Glance注册器

(三)OpenStack存储设施—-Swift
Swift为OpenStack提供一种分布式、持续虚拟对象存储,它类似于Amazon Web Service的S3简单存储服务。Swift具有跨节点百级对象的存储能力。Swift内建冗余和失效备援管理,也能够处理归档和媒体流,特别是对大数据(千兆字节)和大容量(多对象数量)的测度非常高效。

功能及特点
l 海量对象存储
l 大文件(对象)存储
l 数据冗余管理
l 归档能力—–处理大数据集
l 为虚拟机和云应用提供数据容器
l 处理流媒体
l 对象安全存储
l 备份与归档
l 良好的可伸缩性

Swift组件
l Swift账户
l Swift容器
l Swift对象
l Swift代理
l Swift RING

Swift代理服务器
用户都是通过Swift-API与代理服务器进行交互,代理服务器正是接收外界请求的门卫,它检测合法的实体位置并路由它们的请求。
此外,代理服务器也同时处理实体失效而转移时,故障切换的实体重复路由请求。

Swift对象服务器
对象服务器是一种二进制存储,它负责处理本地存储中的对象数据的存储、检索和删除。对象都是文件系统中存放的典型的二进制文件,具有扩展文件属性的元数据(xattr)。
注意:xattr格式被linux中的ext3/4,XFS,Btrfs,JFS和ReiserFS所支持,但是并没有有效测试证明在XFS,JFS,ReiserFS,Reiser4和ZFS下也同样能运行良好。不过,XFS被认为是当前最好的选择。

Swift容器服务器
容器服务器将列出一个容器中的所有对象,默认对象列表将存储为SQLite文件(译者注:也可以修改为MySQL,安装中就是以MySQL为例)。容器服务器也会统计容器中包含的对象数量及容器的存储空间耗费。

Swift账户服务器
账户服务器与容器服务器类似,将列出容器中的对象。

Ring(索引环)
Ring容器记录着Swift中物理存储对象的位置信息,它是真实物理存储位置的实体名的虚拟映射,类似于查找及定位不同集群的实体真实物理位置的索引服务。这里所谓的实体指账户、容器、对象,它们都拥有属于自己的不同的Rings。

(四)OpenStack认证服务(Keystone)
Keystone为所有的OpenStack组件提供认证和访问策略服务,它依赖自身REST(基于Identity API)系统进行工作,主要对(但不限于)Swift、Glance、Nova等进行认证与授权。事实上,授权通过对动作消息来源者请求的合法性进行鉴定。如下图所示:

Keystone采用两种授权方式,一种基于用户名/密码,另一种基于令牌(Token)。除此之外,Keystone提供以下三种服务:
l 令牌服务:含有授权用户的授权信息
l 目录服务:含有用户合法操作的可用服务列表
l 策略服务:利用Keystone具体指定用户或群组某些访问权限

认证服务组件
服务入口:如Nova、Swift和Glance一样每个OpenStack服务都拥有一个指定的端口和专属的URL,我们称其为入口(endpoints)。

l 区位:在某个数据中心,一个区位具体指定了一处物理位置。在典型的云架构中,如果不是所有的服务都访问分布式数据中心或服务器的话,则也称其为区位。

l 用户:Keystone授权使用者
译者注:代表一个个体,OpenStack以用户的形式来授权服务给它们。用户拥有证书(credentials),且可能分配给一个或多个租户。经过验证后,会为每个单独的租户提供一个特定的令牌。[来源:http://blog.sina.com.cn/s/blog_70064f190100undy.html]

l 服务:总体而言,任何通过Keystone进行连接或管理的组件都被称为服务。举个例子,我们可以称Glance为Keystone的服务。

l 角色:为了维护安全限定,就云内特定用户可执行的操作而言,该用户关联的角色是非常重要的。
译者注:一个角色是应用于某个租户的使用权限集合,以允许某个指定用户访问或使用特定操作。角色是使用权限的逻辑分组,它使得通用的权限可以简单地分组并绑定到与某个指定租户相关的用户。

l 租间:租间指的是具有全部服务入口并配有特定成员角色的一个项目。
译者注:一个租间映射到一个Nova的“project-id”,在对象存储中,一个租间可以有多个容器。根据不同的安装方式,一个租间可以代表一个客户、帐号、组织或项目。

(五)OpenStack管理的Web接口—-Horizon
Horizon是一个用以管理、控制OpenStack服务的Web控制面板,它可以管理实例、镜像、创建密匙对,对实例添加卷、操作Swift容器等。除此之外,用户还可以在控制面板中使用终端(console)或VNC直接访问实例。总之,Horizon具有如下一些特点:
l 实例管理:创建、终止实例,查看终端日志,VNC连接,添加卷等
l 访问与安全管理:创建安全群组,管理密匙对,设置浮动IP等
l 偏好设定:对虚拟硬件模板可以进行不同偏好设定
l 镜像管理:编辑或删除镜像
l 查看服务目录
l 管理用户、配额及项目用途
l 用户管理:创建用户等
l 卷管理:创建卷和快照
l 对象存储处理:创建、删除容器和对象
l 为项目下载环境变量

4. 在linux下安装rabbitmq失败怎么解决

RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。
AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Procer 和 Consumer 两种类型:

1:mq的安装需要Erlang,所以首先下载Erlang,下载地址:http://www.erlang.org/download.html直接下载源码,编译安装即可。
将下载好的tar包解压编译安装,如下命令:
tar -zxvf otp_src_R16B03-1.tar.gz

cd otp_src_R16B03-1
./configure && make install

安装过程中可能出现如下错误:
configure:error:
No curses library functions found
configure: error:/bin/sh'/home/niewf/software/erlang_R13B01/erts/configure'
failed for erts

解决方法:
yum list|grep ncurses
yum -y install ncurses-devel
yum install ncurses-devel

或者直接下载ncurses包编译安装。
下载地址:http://download.chinaunix.net/download/0008000/7242.shtml
tar zxvf ncurses.tar.gz #解压缩并且释放 文件包
cd ncurses #进入解压缩的目录(注意版本)
./configure #按照你的系统环境制作安装配置文件
make #编译源代码并且编译NCURSES库
su root #切换到root用户环境
make install #安装编译好的NCURSES库

完成后继续返回上一步操作。

2:安装python,如果系统中python版本低于2.5的话需要升级python到2.6以上,具体可参考:http://gavinshaw.blog.51cto.com/385947/610585

3:安装simplejson,直接下载simplejson源码包编译安装即可,下载地址:https://pypi.python.org/pypi/simplejson/。
下载simplejson源码包后,运行python setup.py install即可完成安装。

4:安装rabbit mq,下载地址:https://www.rabbitmq.com/install-generic-unix.html
下载后放入相应目录解压,进入%RABBITMQ_HOME%/sbin目录下运行:./rabbitmq-server start即可启动mq。
如果遇到如下错误,则参考http://leeon.me/a/rabbitmq-start-fail-note解决方案
ERROR: epmd error for host "xxx": address (cannot connect to host/port)
到此mq已经安装完成。
在%RABBITMQ_HOME%/sbin目录运行./rabbitmqctl status可查看当前mq状态。
同时mq也提供了界面查看当前mq状态,但是需要启用该插件功能,运行如下命令:
rabbitmq-plugins enable rabbitmq_management,然后在浏览器中输入:http://host-name:15672/#/即可访问,页面结果如下:

5. android上的socket通信的开源框架有哪些

请去360手机助手下载android学习手册里面有例子、源码和文档

Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版中提供),MINA 所支持的功能也在进一步的扩展中。目前正在使用 MINA 的软件包括有:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、Openfire 等等。

以上是从网上找到的mina框架简单介绍。
由于正在开发的项目中要求加入及时通信功能(游戏方面),所以在网上找了好几种框架,像openfire、tigase等都是基于Xmpp协议开发的优秀框架。但这些侧重于消息的推送,不适合游戏上的简单交互。所以后来找到了mina这个框架,顺手搭建起来。接下来就是这几天学习的总结了,文章里面没有涉及到逻辑层的方面,只是简单的实现即时通信功能。资源下载我会放在文章的最后面。

一、相关资源下载

(1)Apache官方网站:http://mina.apache.org/downloads.html

(2) Android用jar包(包括官网的资源,我会一律放在网络网盘下)

二、Mina简单配置

服务器端一共要用到四个jar包,包括一个日志包。将他们放在lib中,并加载进去
分别为mina-core-2.0.7.jar slf4j-log4j12-1.7.6.jar slf4j-api-1.7.6.jar log4j-1.2.14.jar(日志管理包)

如果要使用日志的jar包,则要在项目的src目录下新建一个log4j.properties,添加内容如下:

log4j.rootCategory=INFO, stdout , R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n

log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n

log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL

log4j.logger.com.canoo.webtest=WARN

log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN

log4j.rootCategory=INFO, stdout , R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n

log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n

log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL

log4j.logger.com.canoo.webtest=WARN

log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN

Android客户端要加入的jar包:mina-core-2.0.7.jar slf4j-android-1.6.1-RC1.jar两个jar包(可能直接使用上面的jar包也会行,我没试过~)

二、Mina服务端

我这边使用的是mina2.0版本,所以可能与mina1.0的版本有所不同。那么首先在服务器端创建开始

新建一个Demo1Server.class文件,里面包含着程序的入口,端口号,Acceptor连接.

1 public class Demo1Server {
2 //日志类的实现
3 private static Logger logger = Logger.getLogger(Demo1Server.class);
4 //端口号,要求客户端与服务器端一致
5 private static int PORT = 4444;
6
7 public static void main(String[] args){
8 IoAcceptor acceptor = null;
9 try{
10 //创建一个非阻塞的server端的Socket
11 acceptor = new NioSocketAcceptor();
12 //设置过滤器(使用mina提供的文本换行符编解码器)
13 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
14 //自定义的编解码器
15 //acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
16 //设置读取数据的换从区大小
17 acceptor.getSessionConfig().setReadBufferSize(2048);
18 //读写通道10秒内无操作进入空闲状态
19 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
20 //为接收器设置管理服务
21 acceptor.setHandler(new Demo1ServerHandler());
22 //绑定端口
23 acceptor.bind(new InetSocketAddress(PORT));
24
25 logger.info("服务器启动成功... 端口号未:"+PORT);
26
27 }catch(Exception e){
28 logger.error("服务器启动异常...",e);
29 e.printStackTrace();
30 }
31 }
32
33 }

一个很简单的程序入口吧,简单的说就是在服务器上设置一个消息接收器,让它监听从端口传过来的消息并进行处理。那么接下来我们看看怎么进行消息处理。

新建一个消息处理类,或者说是是业务逻辑处理器——Demo1ServerHandler,它继承了IoHandlerAdapter类,它默认覆盖了七个方法,而我们主要使用messageReceived()。

public class Demo1ServerHandler extends IoHandlerAdapter {
public static Logger logger = Logger.getLogger(Demo1ServerHandler.class);

//从端口接受消息,会响应此方法来对消息进行处理
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String msg = message.toString();
if("exit".equals(msg)){
//如果客户端发来exit,则关闭该连接
session.close(true);
}
//向客户端发送消息
Date date = new Date();
session.write(date);
logger.info("服务器接受消息成功...");
super.messageReceived(session, message);
}

//向客服端发送消息后会调用此方法
@Override
public void messageSent(IoSession session, Object message) throws Exception {
logger.info("服务器发送消息成功...");
super.messageSent(session, message);
}

//关闭与客户端的连接时会调用此方法
@Override
public void sessionClosed(IoSession session) throws Exception {
logger.info("服务器与客户端断开连接...");
super.sessionClosed(session);
}

//服务器与客户端创建连接
@Override
public void sessionCreated(IoSession session) throws Exception {
logger.info("服务器与客户端创建连接...");
super.sessionCreated(session);
}

//服务器与客户端连接打开
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.info("服务器与客户端连接打开...");
super.sessionOpened(session);
}

@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
logger.info("服务器进入空闲状态...");
super.sessionIdle(session, status);
}

@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
logger.info("服务器发送异常...");
super.exceptionCaught(session, cause);
}
}

很直白的一段程序,相当于将服务器分成了七个状态,而每个状态都有自己的一套逻辑处理方案。

至此,一个最简单的Mina服务器框架就搭好了,我们可以使用电脑上的telnet命令来测试一下服务器能否使用
cmd控制台—>telnet <ip地址> <端口号> 如我的服务器ip地为192.168.1.10 那我就写telnet 192.168.1.10 4444 .此时我们可以看到输出日志为

此时连接已经创建,我们在输入信息服务器就会对信息进行处理,并给出相应的应答。
(telnet的用法不知道的可以自行网络)

三、Mina客户端(Android端)

服务器简单搭建完毕,那么开始在Android端是配置服务器吧。同样的不要忘记加载jar包, 由于Android自带了Logout,所以就不使用Mina的日志包了。
由于接受消息会阻塞Android的进程,所以我把它开在子线程中(同时将其放在Service中,让其在后台运行)

1 public class MinaThread extends Thread {
2
3 private IoSession session = null;
4
5 @Override
6 public void run() {
7 // TODO Auto-generated method stub
8 Log.d("TEST","客户端链接开始...");
9 IoConnector connector = new NioSocketConnector();
10 //设置链接超时时间
11 connector.setConnectTimeoutMillis(30000);
12 //添加过滤器
13 //connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
14 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
15 connector.setHandler(new MinaClientHandler(minaService));
16
17 try{
18 ConnectFuture future = connector.connect(new InetSocketAddress(ConstantUtil.WEB_MATCH_PATH,ConstantUtil.WEB_MATCH_PORT));//创建链接
19 future.awaitUninterruptibly();// 等待连接创建完成
20 session = future.getSession();//获得session
21 session.write("start");
22 }catch (Exception e){
23 Log.d("TEST","客户端链接异常...");
24 }
25 session.getCloseFuture().awaitUninterruptibly();//等待连接断开
26 Log.d("TEST","客户端断开...");
27 connector.dispose();
28 super.run();
29 }
30
31 }

不知道你们注意到了没,客户端的代码与服务器端的极其相似,不同的是服务器是创建NioSocketAcceptor对象,而客户端是创建NioSocketConnect对象。当然同样需要添加编码解码过滤器和业务逻辑过滤器。

业务逻辑过滤器代码:

1 public class MinaClientHandler extends IoHandlerAdapter{
2
3
4 @Override
5 public void exceptionCaught(IoSession session, Throwable cause)
6 throws Exception {
7 Log.d("TEST","客户端发生异常");
8 super.exceptionCaught(session, cause);
9 }
10
11 @Override
12 public void messageReceived(IoSession session, Object message)
13 throws Exception {
14 String msg = message.toString();
15 Log.d("TEST","客户端接收到的信息为:" + msg);
16 super.messageReceived(session, message);
17 }
18
19 @Override
20 public void messageSent(IoSession session, Object message) throws Exception {
21 // TODO Auto-generated method stub
22 super.messageSent(session, message);
23 }
24 }

方法功能与服务器端一样。测试这里就不做了。可以的话自己写个Demo效果更好

四、Mina的更多功能

拿到所有客户端Session

Collection<IoSession> sessions = session.getService().getManagedSessions().values();

自定义编码解码器,可以对消息进行预处理。要继承ProtocolEncoder和ProtocolDecode类。

数据对象的传递

这些功能不便放在这里讲了,可能我会以后再找机会另开一篇来讲述这些功能~,大家可以浏览结尾处的参考文章来加深对mina的理解。

在我认为,熟悉和快速使用一个新的的框架可以看出一个程序员的水平,同样及时总结和归纳自己学到的新知识也是一个好的程序员该具有的习惯。那么Mina的简单搭建就到这里为止了,希望对大家有所帮助

6. Spring整合rabbitmq实践(一):基础

Spring整合rabbitmq实践(二):扩展
Spring整合rabbitmq实践(三):源码

procer:消息生产者;

consumer:消息消费者;

queue:消息队列;

exchange:接收procer发送的消息按照binding规则转发给相应的queue;

binding:exchange与queue之间的关系;

virtualHost:每个virtualHost持有自己的exchange、queue、binding,用户只能在virtualHost粒度控制权限。

fanout:

群发到所有绑定的queue;

direct:

根据routing key routing到相应的queue,routing不到任何queue的消息扔掉;可以不同的key绑到同一个queue,也可以同一个key绑到不同的queue;

topic:

类似direct,区别是routing key是由一组以“.”分隔的单词组成,可以有通配符,“*”匹配一个单词,“#”匹配0个或多个单词;

headers:

根据arguments来routing。

arguments为一组key-value对,任意设置。

“x-match”是一个特殊的key,值为“all”时必须匹配所有argument,值为“any”时只需匹配任意一个argument,不设置默认为“all”。

通过以下配置,可以获得最基础的发送消息到queue,以及从queue接收消息的功能。

这个包同时包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想单纯一点,可以单独引入。

最主要的是以下几个包,

spring-amqp:

spring-rabbit:

amqp-client:

个人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一个实现,所以可能spring-rabbit也是类似关系),amqp-client提供操作rabbitmq的java api。

目前最新的是2.0.5.RELEASE版本。如果编译报错,以下信息或许能有所帮助:

(1)

解决方案:spring-amqp版本改为2.0.5.RELEASE。

(2)

解决方案:spring-context版本改为5.0.7.RELEASE。

(3)

解决方案:spring-core版本改为5.0.7.RELEASE。

(4)

解决方案:spring-beans版本改为5.0.7.RELEASE。

(5)

解决方案:spring-aop版本改为5.0.7.RELEASE。

总之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。

后面所讲的这些bean配置,spring-amqp中都有默认配置,如果不需要修改默认配置,则不用人为配置这些bean。后面这些配置也没有涉及到所有的属性。

这里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory接口,不是amqp-client包下面的ConnectionFactory类。

上面这个bean是spring-amqp的核心,不论是发送消息还是接收消息都需要这个bean,下面描述一下里面这些配置的含义。

setAddresses :设置了rabbitmq的地址、端口,集群部署的情况下可填写多个,“,”分隔。

setUsername :设置rabbitmq的用户名。

setPassword :设置rabbitmq的用户密码。

setVirtualHost :设置virtualHost。

setCacheMode :设置缓存模式,共有两种, CHANNEL 和 CONNECTION 模式。

CHANNEL 模式,程序运行期间ConnectionFactory会维护着一个Connection,所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,操作rabbitmq之前都必须先获取到一个Channel,否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);

CONNECTION 模式,这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,除了可以有多个Connection,其它都跟CHANNEL模式一样。

这里的Connection和Channel是spring-amqp中的概念,并非rabbitmq中的概念,官方文档对Connection和Channel有这样的描述:

关于 CONNECTION 模式中,可以存在多个Connection的使用场景,官方文档的描述:

setChannelCacheSize :设置每个Connection中(注意是每个Connection)可以缓存的Channel数量,注意只是缓存的Channel数量,不是Channel的数量上限,操作rabbitmq之前(send/receive message等)要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。

注意,改变这个值不会影响已经存在的Connection,只影响之后创建的Connection。

setChannelCheckoutTimeout :当这个值大于0时, channelCacheSize 不仅是缓存数量,同时也会变成数量上限,从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数,到时间仍然获取不到可用的Channel会抛出AmqpTimeoutException异常。

同时,在 CONNECTION 模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。

setPublisherReturns、setPublisherConfirms :procer端的消息确认机制(confirm和return),设为true后开启相应的机制,后文详述。

官方文档描述publisherReturns设为true打开return机制,publisherComfirms设为true打开confirm机制,但测试结果(2.0.5.RELEASE版本)是,任意一个设为true,两个都会打开。

addConnectionListener、addChannelListener、setRecoveryListener :添加或设置相应的Listener,后文详述。

setConnectionCacheSize :仅在 CONNECTION 模式使用,设置Connection的缓存数量。

setConnectionLimit :仅在 CONNECTION 模式使用,设置Connection的数量上限。

上面的bean配置,除了需要注入的几个listener bean以外,其它设置的都是其默认值(2.0.5.RELEASE版本),后面的bean示例配置也是一样,部分属性不同版本的默认值可能有所不同。

一般不用配置这个bean,这里简单提一下。

这个ConnectionFactory是rabbit api中的ConnectionFactory类,这里面是连接rabbitmq节点的Connection配置。

如果想修改这些配置,可以按如下方式配置:

consumer端如果通过@RabbitListener注解的方式接收消息,不需要这个bean。

不建议直接通过ConnectionFactory获取Channel操作rabbitmq,建议通过amqpTemplate操作。

setConnectionFactory :设置spring-amqp的ConnectionFactory。

setRetryTemplate :设置重试机制,详情见后文。

setMessageConverter :设置MessageConverter,用于java对象与Message对象(实际发送和接收的消息对象)之间的相互转换,详情见后文。

setChannelTransacted :打开或关闭Channel的事务,关于amqp的事务后文描述。

setReturnCallback、setConfirmCallback :return和confirm机制的回调接口,后文详述。

setMandatory :设为true使ReturnCallback生效。

这个bean仅在consumer端通过@RabbitListener注解的方式接收消息时使用,每一个@RabbitListener注解的方法都会由这个创建一个MessageListenerContainer,负责接收消息。

setConnectionFactory :设置spring-amqp的ConnectionFactory。

setMessageConverter :对于consumer端,MessageConverter也可以在这里配置。

setAcknowledgeMode :设置consumer端的应答模式,共有三种:NONE、AUTO、MANUAL。

NONE,无应答,这种模式下rabbitmq默认consumer能正确处理所有发出的消息,所以不管消息有没有被consumer收到,有没有正确处理都不会恢复;

AUTO,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。

MANUAL,基本同AUTO模式,区别是需要人为调用方法给应答。

setConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。

setMaxConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。

setPrefetchCount :设置每次请求发送给每个Consumer的消息数量。

setChannelTransacted :设置Channel的事务。

setTxSize :设置事务当中可以处理的消息数量。

setDefaultRequeueRejected :设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。

setErrorHandler :实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。

AmqpTamplate里面有下面几个方法可以向queue发送消息:

这里,exchange必须存在,否则消息发不出去,会看到错误日志,但不影响程序运行:

Message是org.springframework.amqp.core.Message类,spring-amqp发送和接收的都是这个Message。

从Message类源码可以看到消息内容放在byte[]里面,MessageProperties对象包含了非常多的一些其它信息,如Header、exchange、routing key等。

这种方式,需要将消息内容(String,或其它Object)转换为byte[],示例:

也可以直接调用下面几个方法,Object将会自动转为Message对象发送:

有两种方法接收消息:

1.polling consumer,轮询调用方法一次获取一条;

2.asynchronous consumer,listener异步接收消息。

polling consumer

直接通过AmqpTemplate的方法从queue获取消息,有如下方法:

如果queue里面没有消息,会立刻返回null;传入timeoutMillis参数后可阻塞等待一段时间。

如果想直接从queue获取想要的java对象,可调用下面这一组方法:

后面4个方法是带泛型的,示例如下:

使用这四个方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,这是一个接口,Jackson2JsonMessageConverter已经实现了这个接口,所以只要将Jackson2JsonMessageConverter设置到RabbitTemplate中即可。

asynchronous consumer

有多种方式可以实现,详情参考官方文档。

最简单的实现方式是@RabbitListener注解,示例:

这里接收消息的对象用的是Message,也可以是自定义的java对象,但调用Converter转换失败会报错。

注解上指定的queue必须是已经存在并且绑定到某个exchange的,否则会报错:

如果在@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。

direct和topic类型的exchange需要routingKey,示例:

fanout类型的exchange,示例:

2.0版本之后,可以指定多个routingKey,示例:

并且支持arguments属性,可用于headers类型的exchange,示例:

@Queue有两个参数exclusive和autoDelete顺便解释一下:

exclusive,排他队列,只对创建这个queue的Connection可见,Connection关闭queue删除;

autoDelete,没有consumer对这个queue消费时删除。

对于这两种队列,rable=true是不起作用的。

另外,如果注解申明的queue和exchange及binding关系都已经存在,但与已存在的设置不同,比如,已存在的exchange的是direct类型,这里尝试改为fanout类型,结果是不会有任何影响,不论是修改或者新增参数都不会生效。

如果queue存在,exchange存在,但没有binding,那么程序启动后会自动建立起binding关系。

7. RabbitMQ消息过滤的一个思路

生产者 Procer 向 一个 队列发送消息,并且为消息打上不同的 Tag。假设这个队列有 3 个消费者:Consumer #[1:3],Consumer #1 只想消费 tag1 标记的消息,Consumer #2 只想消费 tag2 标记的消息,Consumer #3 只想消费 tag3 标记的消息。

生产者 publish 消息时,将 Tag 保存在 Map<String, Object> 类型的 header 字段,作为构建 AMQP.BasicProperties 参数

消费者如何告知 Broker 只消费特定 Tag?

假设 Consumer #1 只希望消费带 tag1 标记的消息,那么 Consumer #1 可以在向 Broker 请求 Basic.Consume 指令时,捎带自己期望的 Tag 字符串。Client 在具体生成 consumerTag 时可以用 Tag 关键字加上随机字符串(避免 consumerTag 重复):

消费者通过 Basic.Consume 指令来监听队列的消息,这些消费者信息服务端是如何存储的?

保存在队列主进程(Pid)的 state 中(具体调试可以通过 sys:get_state(Pid) )

并且队列进程在初始化时,会进行 consumers 初始化:

consumers 字段实际由 priority_queue:new() 初始化。当有新的 consumer 注册到队列进程,那么会调用 rabbit_queue_consumers 模块的 add_consumer 方法来向 priority_queue 添加一个元素;同理当有 consumer下线时,最终也会调用该模块的 remove_consumer 方法。 priority_queue 完整实现见 附二

Broker 向 Consumer 投递消息时,底层是通过 rabbit_amqqueue_process 调用 rabbit_queue_consumers 模块的 deliver 方法。默认采用

从 priority_queue 中获取一个 QEntry( {ChPid, Consumer} ),然后通过 FetchFun 从队列中获取消息,发送到 ChPid(Channel 进程)

在 consumers 不为空的情况下,通过 FetchFun 获取消息,此时可以获取该消息的 header,取出 Tag 值(如果消息打了 Tag 标记),然后通过 priority_queue 的 filter/2 方法

在 Pred 实现中,我们可以判断当前消息 Tag 值是否被包含在 consumerTag 中,从而可以过滤出消费特定 tag 的consumers,最后向这些 consumers 中的一个发送 Message 消息。

附一 (队列进程 state 中的 consumers 信息例子)

附二 (priority_queue 模块实现
rabbit_common )

注 :上述思路建议在测试环境测试,考虑到有可能出现的性能问题,作为一个调研也会有很多工作要做,整个过程会涉及 RabbitMQ 服务端源码改造、编译、打包( rabbitmq-public-umbrella )以及客户端的相关改造,如果能实际尝试下,也会有不小的收获。

阅读全文

与amqp源码相关的资料

热点内容
如何恢复被炸的游戏服务器 浏览:731
windows下perl语言编译环境 浏览:856
部队执行命令方面 浏览:760
学美甲视频教程下载什么APP 浏览:501
c语言使用什么软件来编译 浏览:317
linux关闭防火墙的命令 浏览:532
腾讯云总服务器建造花费了多少钱 浏览:688
表格制作程序员 浏览:639
淘宝上的云服务器是什么 浏览:376
ccs编程语言 浏览:183
美国高防弹性云服务器 浏览:663
程序员找不到对象 浏览:18
图片社区PHP源码云存储 浏览:650
国外编程少儿 浏览:986
昆特牌安卓国际版怎么下 浏览:877
什么软件适合拍照片安卓 浏览:440
国际版的安卓手机怎么样 浏览:81
什么app可以口红试色 浏览:572
国家二级程序员 浏览:192
有什么app看漫画 浏览:309