欧阳亮的博客

编程不止是一份工作,还是一种乐趣!!!

聊聊ActiveMQ

消息队列在互联网系统架构中一直是最关键的组件之一,起着举足轻重的作用,它:

  • 提搞应用的响应性
  • 解偶系统间、模块间的偶合度
  • 提高系统的可扩展性,开闭原则,事件驱动
  • 提高系统的可靠性


对于大部分人来讲,当我们提到消息队列这个概念时,脑海里面呈现的应该是这样:

消息队列


这么理解是对的,最简单的队列就是这样的。但是实际情况往往会更复杂一些,至少在分布式环境下,应用的部署一般会采用集群的方式,所以队列也变成这样:

集群环境下的消息队列


这可能是现实中最简单的队列了,多个消费者同时订阅一个队列,消息会以轮循的方式转发给各个Consumer。这样部署有什么问题吗?同一个队列有多个生产者和消费者,消息的顺序是否能够得到保证?从生产的角度看,消息在队列中的顺序取决于Producer的提交顺序,这里没有问题;但是从消费的角度看呢?队列中的消息虽然按照顺序轮循的递交给各个Consumer,看似没有问题,但实际上消息被处理的顺序是得不到保证的:

  • Consumer虽然是顺序的接收消息的,但不同的Consumer处理消息的快慢却不同。
  • 消息处理失败,确认机制使得消息重新递交到另一个消费者。


如果在业务上对消息的处理顺序有要求的话,就需要额外的处理了。ActiveMQ提供了两种机制来保证消息的顺序:Exclusive Consumer和Message Group。

Exclusive Consumer


Exclusive Consumer很容易理解,在这个模式下,消息只会转发给固定的某一个Consumer,其它的Consumer只是充当一个backup的角色。当活跃的消费者发生故障,其它消费者会取代它成为新的活跃消费者。

Exclusive Consumer


启用Exclusive Consumer非常简单,在声明队列的时候增加一个参数即可:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);


Exclusive Consumer解决了顺序和单点的问题,但是它也有个致命的缺陷:伸缩性,无论集群中有多少机器,多少个Consumer,永远只有一个在接收和处理消息,单个Consumer会成为瓶颈。保证消息顺序更好的方式是Message Group。

Message Group


Message Group的原理很也简单,它把消息进行分组,并保证每个组的消息转发给固定的Consumer,这样可以保证每个组的消息顺序。

Message Group


比起Exclusive Consumer,采用Message Group的方式无疑更好,它保证消息顺序问题的同时,也实现了Consumer的负载,把消息的消费均衡的负载到不同的Consumer。

启用Message Group也很简单,只需要在Producer端构造消息的时候增加一个属性JMSXGroupID即可:

Message message = session.createTextMessage("message " + n);
message.setStringProperty("JMSXGroupID", Integer.toString(n & 1));


Message Group同样实现了Consumer的高可用,如果某个Consumer发生故障,对应组的消息会被均衡的转发给其它的Consumer。但是它对动态增加Consumer的支持不够智能。如果所有的消息组都已经路由到固定的几个Consumer了,动态地增加Consumer并不会像你想像那样,将现有的分组均衡的路由到新增的Consumer,新增的Consumer只是充当了一个backup的角色。

Message Group


例如,现有的消息共分4个组,分组1-2的消息被路由到Consumer 1,分组3-4的消息被路由到Consumer 2。在增加Consumer 3后,不会有任何消费会路由到它上面,它实际上只起到一个backup的作用。只有当Consumer 1或者2故障下线了,对应的消息才会重新均衡的被路由到其它的Consumer。


Message Group的实际原理,使得Producer与Consumer的上线发布顺序有严格的要求。适想一下,如果某个Consumer上线的速度比其它的都快,就有可能出现所有的消息都被路由到第一个上线的Consumer,而其它的Consumer都变成了backup。所以,最简单的办法是保证所有的Consumer都上线以后,再上线Producer。


ActiveMQ本身也提供了这种情况的处理办法,在activemq.xml文件中:

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" consumersBeforeDispatchStarts="2"
        timeBeforeDispatchStarts="2000"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>


consumersBeforeDispatchStartstimeBeforeDispatchStarts的作用是在开始分发消息前,等待多少个Consumer上线,或者等待多长时间(单位:ms)再开始分发消息。


不过,这仅仅是在发布应用的时候可以有一点点作用。如果系统运行过程中某个Consumer意外下线了并快速恢复,也会导致原来路由到该Consumer的消息已经被重新路由到别的Consumer了,使得Consumer端压力不均衡。所以,ActiveMQ提供了一个close group的机制,如下:

message.setStringProperty("JMSXGroupID", "group-1");
message.setIntProperty("JMSXGroupSeq", -1);
...
producer.send(message);


通过在消息中设置一个属性JMSXGroupSeq=-1,用来关闭对应的组(group-1)。在这之后同组(group-1)的消息ActiveMQ会重新选择新的Consumer来消费。所以在实际应用中如果某个Consumer意外下线了,我们可以在故障恢复之后借助JMSXGroupSeq来重新负载消息的消费。


到目前为止,我们介绍了消息顺序的问题,也实现了Consumer的负载均衡,但是Broker还存在单点的问题。ActiveMQ提供的最简单的解决Broker单点问题的方式是failover。

failover


failover是ActiveMQ中最简单的集群方式了,系统中有若干个不相干的Broker,任一时刻,客户端只会连接到它们之中的一个。当某一个Broker下线之后,对应的客户端会连接到其它的Broker上。

failover


如图所示,在客户端(Producer与Consumer)都配置了Broker 1与Broker 2的url,正常情况下客户端只会连接到某一个Broker(如图列中的Broker 1)。这时Broker 2充当了backup的角色,不会有任何的负载。当Broker 1故障下线了,客户端会自动连接到Broker 2,待Broker 1故障恢复后,Broker 1会变成Broker 2的backup。

使用failover也很简单,在客户端配置Broker地址的时候使用下面的语法:

failover:(uri1,...,uriN)?transportOptions
failover:uri1,...,uriN

//如
failover:(tcp://localhost:61616,tcp://remotehost:61626)?randomize=false


初始化时客户端会连接到failover的第一个Broker,当Broker意外下线,客户端会随机的连接到其它的Broker,这样可能会出现Producer与Consumer连接到不同的Broker,导致有的Broker只有Produer在生产消息却没有Consumer消费消息,有的Broker只有Consumer在消费消息却没有Produer在生产消息。为了避免这样的情况发生,我们应用使用参数randomize=false,强制客户端重连的时候根据failover的配置顺序连接新的Broker。


failover完全是在客户端进行失败转移控制的,换句话说,Broker之间是完全隔离的,彼此之间互不通信。如果客户端之间的配置没有按照严格的规范来,很容易出现Producer与Consumer连接到不同Broker的问题,导致消息堆积。为了解决这个问题,ActiveMQ提供了另一个机制:Networks Of Brokers。

Networks Of Brokers


Networks of Brokers其实说白了就是Brokers之间会相互转发消息,直到消息最终被真正的Consumer消费。

Network of Brokers

对于这个例子而言,Producer将消息发送给Broker 1,Broker 1会将消息转发给Broker 2,从而Consumer可以从Broker 2接收消息。对于Broker 1而言,它会把Broker 2看作一个Consumer,并且仅当真正的Consumer从Broker 2订阅消息时,Broker 1才会把消息转发给Broker 2。

值得注意的一点是,Broker之间的关系是消息转发,而不是消息同步。比如Producer和Consumer都连接到Broker 1时,消息是不会从Broker 1转发到Broker 2的。例如下面这种情况,没有Consumer订阅Broker 2,所以Broker 1不会将消息转发给Broker 2。

Network of Brokers

Brokers之间的连接是通过networkConnector配置的,要使Broker 1可以转发消息给Broker 2,需要在Broker 1增加以下配置:

<broker brokerName="broker1" dataDirectory="${activemq.data}">
  <networkConnectors>
    <!-- Broker 2的URL: localhost:61626 -->
    <networkConnector uri="static:(tcp://localhost:61626)"/>
  </networkConnectors>
  ...
</broker>


如果需要Broker 2也可以转发消息给Broker 1的话,同样的需要在Broker 2作类似的配置

<broker brokerName="broker2" dataDirectory="${activemq.data}">
  <networkConnectors>
    <!-- Broker 1的URL: localhost:61616 -->
    <networkConnector uri="static:(tcp://localhost:61616)"/>
  </networkConnectors>
  ...
</broker>


这样,Broker 1与Broker 2就互为主备,实现了HA,并且解决了failover可能导至某个Broker只有Producer没有Consumer导致消息堆积的问题,如下所示:

Network of Brokers

进一步探索Networks Of Brokers


Networks of Brokers的设计保证了它在伸缩性方面的优势,动态地往集群中增加机器对客户端完全透明。当应用规模日渐增长时,2个Brokers可能仍然抗不住访问压力,这时候可以增加一台机器来提升一个更大规模的Broker集群:

Network of Brokers

在这个例子中,只有一个Producer与一个Consumer。其中Producer向Broker 1发送消息,Consumer从Broker 2订阅消息,那么从Producer到Consumer之间的通路存在两种可能:

  • Producer -> Broker 1 -> Broker 2 -> Consumer
  • Producer -> Broker 1 -> Broker 3 -> Broker 2 -> Consumer


前面我们说过,多个消费者同时订阅一个队列,消息会以轮循的方式路由到每个Consumer。这个例子中,Broker 2和Broker 3对于Broker 1来说,都是消息订阅者,但是消息会以轮循的方式从Broker 1转发给Broker 2和Broker 3吗?答案是不会,因为Brokers集群之间的消息转发会采用路径最短的通路。这点与这篇博文所讲的不同,个人猜想可能是ActiveMQ从某个版本开始对Brokers集群的路由算法进行了改进。如果在Broker 2上有两个订阅者呢?

Network of Brokers

两个Consumer同时订单Broker 2,会走Broker 1 -> Broker 3 -> Broker 2这条通路吗?答案还是一样的,从Producer到Consumer 1和2的通路都是Broker 1 -> Broker 2,集群之间的消息转发采用路径最短的通路。我们再看看另一种情况:

Network of Brokers

Consumer 1和2分别订阅Brokers 2和3,你应该能猜出答案:从Producer到Consumer 1的通路走的是Broker 1 -> Broker 2;从Producer到Consumer 2的通路走的是Broker 1 -> Broker 3。


至此,我们已经了解了Brokers集群的原理,通过在Broker之间选择最短的路径进行消息的转发,极大的提升了系统的高可用。但是对于压力的负载均衡来说,却是一些讲究的。

Network of Brokers

这个例子中,我们使用了三台Broker组建了一个集群,所有的客户端都连接到了Broker 1上,实际是只是达到了系统的高可用,但是所有的压力都在Broker 1上,负载并没有均衡到另外两个Broker上面,所以说这是一个不好的架构。可以稍微做一下调整:

Network of Brokers

这样的架构无疑更好,既实现了系统的HA,又把压力负载均衡到三个Broker上,只是在系统构建和发布的时候要增加一些额外的工作,但是带来的好处也是明显的。当然,如果系统的流量大到需要这么复杂的架构的话,我会建议使用像Kafka那样的队列,毕竟两者在吞吐量方面不是一个量级的。