跳至主要内容

RabbitMQ Learning (3): Example

We have talked about the basic concepts of rabbitmq and a sample of application. Now, in this blog, we will show some working code about how to config and send/receive message. (Notice this blog is mainly composed of code, you may would like to open you IDE and type/clone code and run it along the blog).

Pure Java

If we use pure java code, we can do like following:

// set up connection
CachingConnectionFactory cf = new CachingConnectionFactory("host", 5672);
cf.setUsername("timer");
cf.setPassword("xxx");
cf.setVirtualHost("timer_host");

// set up the queue, exchange, binding on the broker
RabbitAdmin admin = new RabbitAdmin(cf);
Queue queue = new Queue("timerServer");
admin.declareQueue(queue);
TopicExchange exchange = new TopicExchange("timerExchange");
admin.declareExchange(exchange);
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("timer.*.add"));

// set up the listener and container, which can be used to consume message
SimpleMessageListenerContainer container =
        new SimpleMessageListenerContainer(cf);

MessageListenerAdapter adapter = new MessageListenerAdapter(new Object() {
    public void handleMessage(String msg) {
        // ...
    }
});
container.setMessageListener(adapter);
container.setQueueNames("timerServer");
// start listening
container.start();

RabbitTemplate template = new RabbitTemplate(cf);
template.setExchange("timerExchange");
// use template to send message

It works, but a little bit tedious, we can do better through the help of spring.

A little bit complicated example which works with utility wrapper (which is described in last serial blog) can be found here.

Spring

Spring has the support of rabbit, we can config them using XML file:

<rabbit:connection-factory id="cf" host="host" virtual-host="timer_host" username="timer" password="xxx"/>

<rabbit:template id="rabbitTemplate" connection-factory="cf" exchange="timerExchange" routing-key="timer.announcement.add"/>

<rabbit:admin connection-factory="cf" />

<rabbit:queue name="timerClient" />

<rabbit:topic-exchange name="timerExchange">
    <rabbit:bindings>
        <rabbit:binding queue="timerClient" pattern="timer.announcement.timeout" />
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="cf">
    <rabbit:listener ref="receiver" queue-names="timerClient" method="xxx"/>
</rabbit:listener-container>


<bean id="receiver" class="x.x.EchoReceiver" />

The upper XML config is the same with Java code in functionality, but just expressed
Receiver class

public class EchoReceiver {
  public void xxx(String msg) {}
}

Now, can use Autowire rabbit template to send message, use receiver to consume message consistently.

Spring Boot

Spring Boot move back to prefer to use Java code config with annotations. Using Spring Boot can save a lot of config effort because of the help of annotations which provides many default settings:

  • ConditionalOnBean
  • ConditionalOnMissingBean
  • ConditionalOnClass

You can view those matching process by setting logging levels of Spring Boot to be DEBUG or lower.

The following is the sample code to config a RabbitMQ consumer:

@Bean
Queue queue() {
    return new Queue(queueName, true);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("spring-boot-exchange", true, false);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                         MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return container;
}

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
    return new MessageListenerAdapter(receiver, "receiveMessage");
}

And some properties used by Spring RabbitMQ integration to create connection:

spring.rabbitmq.host=192.168.1.100
spring.rabbitmq.virtual-host=timer_host
spring.rabbitmq.username=timer
spring.rabbitmq.password=timer

A producer is much simpler, which only needs set up the connection and the queue it want to send to. The working example of this sample can be found here.

Multiple Listener in a Single Project

In some cases, we want to consume multiple queues in the same project, which needs multiple listeners. This can be done as following configuration XML shows:

<rabbit:topic-exchange name="timerExchange">
    <rabbit:bindings>
        <rabbit:binding queue="timerClient" pattern="timer.announcement.timeout" />
        <rabbit:binding queue="testClient" pattern="timer.test.timeout" />
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="cf">
    <rabbit:listener ref="receiver" queue-names="timerClient" />
    <rabbit:listener ref="receiverTest" queue-names="testClient" />
</rabbit:listener-container>


<bean id="receiver" class="cn.superid.webapp.message.EchoReceiver" />

<!--following is for test-->

<rabbit:queue name="testClient" />

<bean id="receiverTest" class="cn.superid.webapp.message.TestReceiver" />

Written with StackEdit.

评论

此博客中的热门博文

Spring Boot: Customize Environment

Spring Boot: Customize Environment Environment variable is a very commonly used feature in daily programming: used in init script used in startup configuration used by logging etc In Spring Boot, all environment variables are a part of properties in Spring context and managed by Environment abstraction. Because Spring Boot can handle the parse of configuration files, when we want to implement a project which uses yml file as a separate config file, we choose the Spring Boot. The following is the problems we met when we implementing the parse of yml file and it is recorded for future reader. Bind to Class Property values can be injected directly into your beans using the @Value annotation, accessed via Spring’s Environment abstraction or bound to structured objects via @ConfigurationProperties. As the document says, there exists three ways to access properties in *.properties or *.yml : @Value : access single value Environment : can access multi

Elasticsearch: Join and SubQuery

Elasticsearch: Join and SubQuery Tony was bothered by the recent change of search engine requirement: they want the functionality of SQL-like join in Elasticsearch! “They are crazy! How can they think like that. Didn’t they understand that Elasticsearch is kind-of NoSQL 1 in which every index should be independent and self-contained? In this way, every index can work independently and scale as they like without considering other indexes, so the performance can boost. Following this design principle, Elasticsearch has little related supports.” Tony thought, after listening their requirements. Leader notice tony’s unwillingness and said, “Maybe it is hard to do, but the requirement is reasonable. We need to search person by his friends, didn’t we? What’s more, the harder to implement, the more you can learn from it, right?” Tony thought leader’s word does make sense so he set out to do the related implementations Application-Side Join “The first implementation

Implement isdigit

It is seems very easy to implement c library function isdigit , but for a library code, performance is very important. So we will try to implement it and make it faster. Function So, first we make it right. int isdigit ( char c) { return c >= '0' && c <= '9' ; } Improvements One – Macro When it comes to performance for c code, macro can always be tried. #define isdigit (c) c >= '0' && c <= '9' Two – Table Upper version use two comparison and one logical operation, but we can do better with more space: # define isdigit(c) table[c] This works and faster, but somewhat wasteful. We need only one bit to represent true or false, but we use a int. So what to do? There are many similar functions like isalpha(), isupper ... in c header file, so we can combine them into one int and get result by table[c]&SOME_BIT , which is what source do. Source code of ctype.h : # define _ISbit(bit) (1 << (