In the last blog, we have skim the basic concept and abstraction in RabbitMQ. Today, we apply message communication into our specific problem to solve it.
Timer
Background
We would like to build a separated timer process to manage many timing jobs, which can schedule a job at specific time, run regularly.
In order to decrease degree of coupling, we have already make it a single process. So we have to use inter-process communication to interact with it. We choose the message queue to finish the job rather than RPC, because we don’t have a very demanding requirement of performance and we can make our system even more loosely coupled.
When choosing message middleware, we take reliability and performance into account, compare the RabbitMQ and Kafka. Because we prefer reliability than performance, we choose RabbitMQ.
Design Choice
In next step, we have to make some decision about RabbitMQ:
- Exchange type: In last blog of RabbitMQ basic concept, we have introduce there exists three types of exchange, which can meet different requirements. In our cases, we choose topic exchange, which can use wildcard (
*
,#
) to combine messages from different sources. - Routing key:
Timer.*.init
,Timer.*.timeout
etc - Durable: In order to improve the reliability of message, we make the exhange and queue to durable;
Message Format
Because what RabbitMQ send and receive is just byte array, we need to make the conversion. Before conversion, we have to define the message format.
In order to define a general format to used in other message queue usage, we have defined format like following:
String REPLY_ROUTING_KEY = "replyRoutingKey";
Long id;
HashMap<String, Object> payload = new HashMap<>();
in which, id
is message identifier; payload
is where the data store, in which it have to have a pair whose key is replyRoutingKey
in order to used to reply back.
Util Wrapper
As far as we have known, what RabbitMQ sends is just bytes, and we should marshal and unmarshal when we sending and receiving message.
In order to ease the pain of conversion, reducing the code other clients need to write, we add a utility wrapper layer to make the conversion between bytes and specific message format.
Full source code can be found here.
MessageFormat
private static final transient String REPLY_ROUTING_KEY = "replyRoutingKey";
private static Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
@Expose
private Long id;
@Expose
private HashMap<String, Object> payload = new HashMap<>();
Other field is described like above, one more field gson
is used to convert to/from string to class.
Receiver
In order to make our receiver is invoked and convert the bytes to message, we define the abstract class to use some default settings of spring (might not so good) as following:
public abstract class MessageFormatReceiver implements MessageListener {
/**
* @see org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#ORIGINAL_DEFAULT_LISTENER_METHOD
*/
private void handleMessage(String msg) {
MessageFormat messageFormat = null;
try {
messageFormat = MessageFormat.getMessageFormat(msg);
} catch (IOException e) {
logger.error("", e);
}
receiveMessage(messageFormat);
}
public void onMessage(Message message) {
try {
handleMessage(new String(message.getBody(), "utf8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public abstract void receiveMessage(MessageFormat message);
}
Debug Tool
When using message as the communication method for different parts of system, debugging is a hard task because it is hard to track the control flow.
The following is some tools which can help us understand what happens when something is wrong.
Wireshark
Wireshark is a useful tool to capture network packet and use filters to filter specific packet. In our cases, we often use the following filters:
AMQP || tcp.port == 5672
AMQP is the protocol that rabbit follows; port 5672 is the default port we used in rabbit server for listening message.
rabbitmqctl
This command line tool can be used in server to list server status:
rabbitmqctl list_queues -p vhost
rabbitmqctl list_bindings -p timer_host
Source Code
Reading source code is always a useful way to understand what happens, but in our cases, we can only read client code (including some spring wrapper).
Log
A simpler and common way is to read log, which is located at:
/var/log/rabbit@xxx.log
Management UI
This can be taken as the UI version of rabbitmqctl
, which you can log in if your user have administrator tag at:
http://server_host:15672/
Written with StackEdit.
评论
发表评论