跳至主要内容

RabbitMQ Learning (2): Application

In the last blog, we have skim the basic concept and abstraction in RabbitMQ. Today, we apply message communication into our specific problem to solve it.

Timer

Background

We would like to build a separated timer process to manage many timing jobs, which can schedule a job at specific time, run regularly.

In order to decrease degree of coupling, we have already make it a single process. So we have to use inter-process communication to interact with it. We choose the message queue to finish the job rather than RPC, because we don’t have a very demanding requirement of performance and we can make our system even more loosely coupled.

When choosing message middleware, we take reliability and performance into account, compare the RabbitMQ and Kafka. Because we prefer reliability than performance, we choose RabbitMQ.

Design Choice

In next step, we have to make some decision about RabbitMQ:

  • Exchange type: In last blog of RabbitMQ basic concept, we have introduce there exists three types of exchange, which can meet different requirements. In our cases, we choose topic exchange, which can use wildcard (*, #) to combine messages from different sources.
  • Routing key: Timer.*.init, Timer.*.timeout etc
  • Durable: In order to improve the reliability of message, we make the exhange and queue to durable;

Message Format

Because what RabbitMQ send and receive is just byte array, we need to make the conversion. Before conversion, we have to define the message format.

In order to define a general format to used in other message queue usage, we have defined format like following:

String REPLY_ROUTING_KEY = "replyRoutingKey";
Long id;
HashMap<String, Object> payload = new HashMap<>();

in which, id is message identifier; payload is where the data store, in which it have to have a pair whose key is replyRoutingKey in order to used to reply back.

Util Wrapper

As far as we have known, what RabbitMQ sends is just bytes, and we should marshal and unmarshal when we sending and receiving message.

In order to ease the pain of conversion, reducing the code other clients need to write, we add a utility wrapper layer to make the conversion between bytes and specific message format.

Full source code can be found here.

MessageFormat

private static final transient String REPLY_ROUTING_KEY = "replyRoutingKey";
private static Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
@Expose
private Long id;
@Expose
private HashMap<String, Object> payload = new HashMap<>();

Other field is described like above, one more field gson is used to convert to/from string to class.

Receiver

In order to make our receiver is invoked and convert the bytes to message, we define the abstract class to use some default settings of spring (might not so good) as following:

public abstract class MessageFormatReceiver implements MessageListener {

    /**
     * @see org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#ORIGINAL_DEFAULT_LISTENER_METHOD
     */
    private void handleMessage(String msg) {
        MessageFormat messageFormat = null;
        try {
            messageFormat = MessageFormat.getMessageFormat(msg);
        } catch (IOException e) {
            logger.error("", e);
        }
        receiveMessage(messageFormat);
    }

    public void onMessage(Message message) {
        try {
            handleMessage(new String(message.getBody(), "utf8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    public abstract void receiveMessage(MessageFormat message);
}

Debug Tool

When using message as the communication method for different parts of system, debugging is a hard task because it is hard to track the control flow.

The following is some tools which can help us understand what happens when something is wrong.

Wireshark

Wireshark is a useful tool to capture network packet and use filters to filter specific packet. In our cases, we often use the following filters:

AMQP || tcp.port == 5672

AMQP is the protocol that rabbit follows; port 5672 is the default port we used in rabbit server for listening message.

rabbitmqctl

This command line tool can be used in server to list server status:

rabbitmqctl list_queues -p vhost
rabbitmqctl list_bindings -p timer_host

Source Code

Reading source code is always a useful way to understand what happens, but in our cases, we can only read client code (including some spring wrapper).

Log

A simpler and common way is to read log, which is located at:

/var/log/rabbit@xxx.log

Management UI

This can be taken as the UI version of rabbitmqctl, which you can log in if your user have administrator tag at:

http://server_host:15672/

Written with StackEdit.

评论

此博客中的热门博文

Spring Boot: Customize Environment

Spring Boot: Customize Environment Environment variable is a very commonly used feature in daily programming: used in init script used in startup configuration used by logging etc In Spring Boot, all environment variables are a part of properties in Spring context and managed by Environment abstraction. Because Spring Boot can handle the parse of configuration files, when we want to implement a project which uses yml file as a separate config file, we choose the Spring Boot. The following is the problems we met when we implementing the parse of yml file and it is recorded for future reader. Bind to Class Property values can be injected directly into your beans using the @Value annotation, accessed via Spring’s Environment abstraction or bound to structured objects via @ConfigurationProperties. As the document says, there exists three ways to access properties in *.properties or *.yml : @Value : access single value Environment : can access multi

Elasticsearch: Join and SubQuery

Elasticsearch: Join and SubQuery Tony was bothered by the recent change of search engine requirement: they want the functionality of SQL-like join in Elasticsearch! “They are crazy! How can they think like that. Didn’t they understand that Elasticsearch is kind-of NoSQL 1 in which every index should be independent and self-contained? In this way, every index can work independently and scale as they like without considering other indexes, so the performance can boost. Following this design principle, Elasticsearch has little related supports.” Tony thought, after listening their requirements. Leader notice tony’s unwillingness and said, “Maybe it is hard to do, but the requirement is reasonable. We need to search person by his friends, didn’t we? What’s more, the harder to implement, the more you can learn from it, right?” Tony thought leader’s word does make sense so he set out to do the related implementations Application-Side Join “The first implementation

Implement isdigit

It is seems very easy to implement c library function isdigit , but for a library code, performance is very important. So we will try to implement it and make it faster. Function So, first we make it right. int isdigit ( char c) { return c >= '0' && c <= '9' ; } Improvements One – Macro When it comes to performance for c code, macro can always be tried. #define isdigit (c) c >= '0' && c <= '9' Two – Table Upper version use two comparison and one logical operation, but we can do better with more space: # define isdigit(c) table[c] This works and faster, but somewhat wasteful. We need only one bit to represent true or false, but we use a int. So what to do? There are many similar functions like isalpha(), isupper ... in c header file, so we can combine them into one int and get result by table[c]&SOME_BIT , which is what source do. Source code of ctype.h : # define _ISbit(bit) (1 << (