rabbitmq-c是一个C语言的AMQP(高级消息队列协议)客户端库,一般用于配合RabbitMQ使用。本文记录以非阻塞订阅方式使用该库的方法及注意事项。

本文参考rabbitmq-c版本:master branch commit 75a21e5

基础知识

参考以下链接了解AMQP相关知识:

编译

参考项目README文件既可编译安装。

如果需要在另外的项目中生成动态链接库并静态链接librabbitmq.a,可以参考如下命令

1
2
3
mkdir -p build && cd build
CFLAGS=-fPIC cmake -DCMAKE_BUILD_TYPE=Debug -DCMAKE_INSTALL_PREFIX="$(INSTALL_DIR)" ..
cmake --build . --target install

但是,在动态链接库中静态链接其他库并不是一个好主意,除非你确定知道自己在做什么以及可能会有什么问题。一个原因是可能发生冲突的符号表,可以参考为什么不能在动态库里静态链接?

API

代码库examples目录下的代码展示了如何以阻塞方式使用该库,不做更多说明。对于非阻塞订阅方式使用,包括以下两种API:

  • API对外仅支持阻塞方式调用,可以设置超时,但超时时间不能是0,否则没有意义。

  • API对外支持设置超时,无资源可以处理时返回超时状态,超时时间可以是0。这是非阻塞订阅的基础。

阻塞API

在调用阻塞API前,需要先申请资源,这里仅是本地操作,因此不需要超时设置。

  • amqp_new_connection
    创建一个AMQP客户端连接上下文,返回值为一个指针。

  • amqp_tcp_socket_new
    不使用SSL时,使用该API,SSL对应另外的一组API,参考头文件amqp_ssl_socket.h

做完以上两个操作,接下来登场的就是阻塞API,在rabbitmq-c中有三种超时场景,按发生的时间顺序分别是:

  • socket连接超时
    对应API为amqp_socket_open_noblock

  • 登录超时
    对应API为amqp_set_handshake_timeoutamqp_login

  • RPC调用超时
    远程过程调用,顾名思义,操作以request - response形式存在,在AMQP协议中以请求响应对形式存在的操作所对应的API都使用RPC超时设置。
    超时设置API为amqp_set_rpc_timeout,订阅中使用的RPC类API均适用于该超时设置,比如:

    • amqp_channel_open
    • amqp_queue_declare
    • amqp_queue_bind
    • amqp_basic_consume

非阻塞API

amqp_basic_consume成功返回后,意味着订阅已经开始,服务器会将相应的消息持续的通过Basic.Deliver方法推送给客户端,这时可以监听可读事件并读取消息了。

可读事件

rabbitmq-c库提供了一个APIamqp_socket_get_sockfd用于取得底层socket的fd。由于这个fd是库底层使用的,因此最好不要直接操作该fd做读写,可以使用select/poll/epoll等IO事件通知机进入event loop并监听该fd的可读,在可读时通过库API做非阻塞操作,这样可以满足我们的要求:

  1. 及时读取消息
  2. 避免该线程占满CPU
  3. 可以响应其他事件。比如平滑退出之类。

读取消息API

检测到fd可读时,需要做读取操作,rabbitmq-c中暴露了两个非阻塞的读取API:

  • amqp_simple_wait_frame_noblock
    基础知识中可以看到frame是AMQP协议的基本单元,该函数会在设定的超时时间内尝试接收并解码一个frame,超时设置为0既为非阻塞模式,函数更多介绍可以在amqp.h文件中看到。该函数是库中所有读取类函数的基础,当然也可以完成非阻塞订阅消息读取的目标,但是由于AMQP协议中推送一个消息共包含三个frame,分别是Method、Content header、Content body,做状态机读取有些复杂,因此可以考虑用一个更高层次封装的API。

  • amqp_consume_message
    这个函数可以设定一个超时时间,在超时时间内等待接收一个完整的消息,共三个frame,超时设置为0既为非阻塞模式。这个函数接收所有channel上的推送消息,如果需要过滤channel,需要接收后做判断并处理。这里简单说一下该函数逻辑:

    1. 首先调用amqp_simple_wait_frame_noblock函数在传入的超时时间内尝试接收一个frame。
    2. 如果接收到一个frame,检查是否为Method.Deliver,如果符合继续下一步。
    3. 阻塞接收步骤2接收到的frame中标明的channel上的两个frame,并做相应校验和解码。
    4. 返回相应的解码后的消息。

    注意到这里超时的设定仅仅生效在等待第一个frame时,一旦接收到符合的第一个frame,后两个frame将会阻塞接收。但是符合AMQP标准的服务端不会发生三个frame不同时发送的情况。假如服务端真的发生此种错误,那么客户端阻塞就阻塞了吧。

    该函数设定超时时间内没有接收到消息,返回值为ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTIONret.library_error == AMQP_STATUS_TIMEOUT

心跳检测

订阅消息检测和接收的正常流程就是上文的样子了,但是还需要做一些异常检测,比如网络原因造成的连接失效,AMQP中有相应的心跳检测来支持该情况,心跳超时在login时设置。但是amqp.h文件中可以看到rabbitmq-c库对心跳仅有部分程度的支持,表现为心跳frame的发送和接收只能在可能发生socket读写的API内完成,也就是说至少要调用可能读socket或写socket的API才能按时发送心跳并检查服务端的心跳是否按时到来,文档中描述的支持完成心跳的API为amqp_basic_publishamqp_simple_wait_frame/amqp_simple_wait_frame_noblock。已知amqp_consume_message内部调用了amqp_simple_wait_frame_noblock,因此调用amqp_consume_message也可以完成心跳的发送和接收。

前文说过,我们可以启动一个event loop监听amqp_socket_get_sockfd返回的fd的可读事件,并在可读时调用amqp_consume_message以完成消息接收。但是涉及到心跳时就会出现一个问题,假如失去了和服务端的网络连接,socket无法接收到服务端发来的心跳frame,也就不会触发fd可读,也就不会调用amqp_consume_message,也就无法检测到心跳超时,这不是一个好消息。

一个解决方式是,在event loop中启用一个定时器,定时器设置为每次间隔一个心跳时间激活,激活时同样调用amqp_consume_message。这样无论fd上有没有数据到来,定时器都可以完成心跳包的发送并检测服务端心跳包是否超时。

bug fix

函数amqp_consume_message对比文档和源码中的实现逻辑,可以确定设计逻辑为,如果服务端心跳超时过久,可以得到返回值如下:
ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && ret.library_error == AMQP_STATUS_HEARTBEAT_TIMEOUT

但是在实际调试运行时发现如果超时时间设置为0,在与服务器网络无法连通的情况下,返回的ret.library_error一直是AMQP_STATUS_TIMEOUT,直到内核协议栈返回错误。AMQP_STATUS_HEARTBEAT_TIMEOUT永远都不会出现在返回值内。

bug的复现,可以在连接建立后使用iptables丢弃服务端发出的所有数据包来模拟:
sudo iptables -A OUTPUT -p tcp --sport 5672 -j DROP

bug的发生位于函数wait_frame_inner,调用栈为amqp_consume_message -> amqp_simple_wait_frame_noblock -> wait_frame_inner

原因是,在处理超时时间,以及与预期的心跳接收时间比较时,对0超时时间的处理有问题。

这里给出一个修复patch

rabbitmq-c.diff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 061192e..0240efb 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -804,6 +804,9 @@ static int wait_frame_inner(amqp_connection_state_t state,
return res;
}
}
+ if (amqp_time_equal(timeout_deadline, amqp_time_immediate())) {
+ timeout_deadline.time_point_ns = amqp_get_monotonic_timestamp();
+ }
deadline = amqp_time_first(timeout_deadline,
amqp_time_first(state->next_recv_heartbeat,
state->next_send_heartbeat));

PS: 如果之前使用iptables屏蔽了5672端口发出的tcp包,记得及时解除屏蔽
sudo iptables -D OUTPUT -p tcp --sport 5672 -j DROP