rabbitmq-c 非阻塞订阅
rabbitmq-c是一个C语言的AMQP(高级消息队列协议)客户端库,一般用于配合RabbitMQ使用。本文记录以非阻塞订阅方式使用该库的方法及注意事项。
本文参考rabbitmq-c版本:master branch commit 75a21e5
基础知识
参考以下链接了解AMQP相关知识:
- RabbitMQ Tutorials
- AMQP概念模型
- RabbitMQ网站文档,其中协议部分的Quick Reference和Complete Reference Guide可以增强对AMQP的理解。
- 关于AMQP数据包格式,可以参考该文章并对比抓包文件在wireshark中查看,可以对AMQP数据包交互过程有更直观的理解。
编译
参考项目README文件既可编译安装。
如果需要在另外的项目中生成动态链接库并静态链接librabbitmq.a,可以参考如下命令
1 | mkdir -p build && cd build |
但是,在动态链接库中静态链接其他库并不是一个好主意,除非你确定知道自己在做什么以及可能会有什么问题。一个原因是可能发生冲突的符号表,可以参考为什么不能在动态库里静态链接?
API
代码库examples目录下的代码展示了如何以阻塞方式使用该库,不做更多说明。对于非阻塞订阅方式使用,包括以下两种API:
API对外仅支持阻塞方式调用,可以设置超时,但超时时间不能是0,否则没有意义。
API对外支持设置超时,无资源可以处理时返回超时状态,超时时间可以是0。这是非阻塞订阅的基础。
阻塞API
在调用阻塞API前,需要先申请资源,这里仅是本地操作,因此不需要超时设置。
amqp_new_connection
创建一个AMQP客户端连接上下文,返回值为一个指针。amqp_tcp_socket_new
不使用SSL时,使用该API,SSL对应另外的一组API,参考头文件amqp_ssl_socket.h
做完以上两个操作,接下来登场的就是阻塞API,在rabbitmq-c中有三种超时场景,按发生的时间顺序分别是:
socket连接超时
对应API为amqp_socket_open_noblock
登录超时
对应API为amqp_set_handshake_timeout
和amqp_login
RPC调用超时
远程过程调用,顾名思义,操作以request - response形式存在,在AMQP协议中以请求响应对形式存在的操作所对应的API都使用RPC超时设置。
超时设置API为amqp_set_rpc_timeout
,订阅中使用的RPC类API均适用于该超时设置,比如:amqp_channel_open
amqp_queue_declare
amqp_queue_bind
amqp_basic_consume
非阻塞API
amqp_basic_consume
成功返回后,意味着订阅已经开始,服务器会将相应的消息持续的通过Basic.Deliver方法推送给客户端,这时可以监听可读事件并读取消息了。
可读事件
rabbitmq-c库提供了一个APIamqp_socket_get_sockfd
用于取得底层socket的fd。由于这个fd是库底层使用的,因此最好不要直接操作该fd做读写,可以使用select/poll/epoll等IO事件通知机进入event loop并监听该fd的可读,在可读时通过库API做非阻塞操作,这样可以满足我们的要求:
- 及时读取消息
- 避免该线程占满CPU
- 可以响应其他事件。比如平滑退出之类。
读取消息API
检测到fd可读时,需要做读取操作,rabbitmq-c中暴露了两个非阻塞的读取API:
amqp_simple_wait_frame_noblock
基础知识中可以看到frame是AMQP协议的基本单元,该函数会在设定的超时时间内尝试接收并解码一个frame,超时设置为0既为非阻塞模式,函数更多介绍可以在amqp.h文件中看到。该函数是库中所有读取类函数的基础,当然也可以完成非阻塞订阅消息读取的目标,但是由于AMQP协议中推送一个消息共包含三个frame,分别是Method、Content header、Content body,做状态机读取有些复杂,因此可以考虑用一个更高层次封装的API。amqp_consume_message
这个函数可以设定一个超时时间,在超时时间内等待接收一个完整的消息,共三个frame,超时设置为0既为非阻塞模式。这个函数接收所有channel上的推送消息,如果需要过滤channel,需要接收后做判断并处理。这里简单说一下该函数逻辑:- 首先调用
amqp_simple_wait_frame_noblock
函数在传入的超时时间内尝试接收一个frame。 - 如果接收到一个frame,检查是否为Method.Deliver,如果符合继续下一步。
- 阻塞接收步骤2接收到的frame中标明的channel上的两个frame,并做相应校验和解码。
- 返回相应的解码后的消息。
注意到这里超时的设定仅仅生效在等待第一个frame时,一旦接收到符合的第一个frame,后两个frame将会阻塞接收。但是符合AMQP标准的服务端不会发生三个frame不同时发送的情况。假如服务端真的发生此种错误,那么客户端阻塞就阻塞了吧。
该函数设定超时时间内没有接收到消息,返回值为
ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION
且ret.library_error == AMQP_STATUS_TIMEOUT
。- 首先调用
心跳检测
订阅消息检测和接收的正常流程就是上文的样子了,但是还需要做一些异常检测,比如网络原因造成的连接失效,AMQP中有相应的心跳检测来支持该情况,心跳超时在login时设置。但是amqp.h文件中可以看到rabbitmq-c库对心跳仅有部分程度的支持,表现为心跳frame的发送和接收只能在可能发生socket读写的API内完成,也就是说至少要调用可能读socket或写socket的API才能按时发送心跳并检查服务端的心跳是否按时到来,文档中描述的支持完成心跳的API为amqp_basic_publish
和amqp_simple_wait_frame
/amqp_simple_wait_frame_noblock
。已知amqp_consume_message
内部调用了amqp_simple_wait_frame_noblock
,因此调用amqp_consume_message
也可以完成心跳的发送和接收。
前文说过,我们可以启动一个event loop监听amqp_socket_get_sockfd
返回的fd的可读事件,并在可读时调用amqp_consume_message
以完成消息接收。但是涉及到心跳时就会出现一个问题,假如失去了和服务端的网络连接,socket无法接收到服务端发来的心跳frame,也就不会触发fd可读,也就不会调用amqp_consume_message
,也就无法检测到心跳超时,这不是一个好消息。
一个解决方式是,在event loop中启用一个定时器,定时器设置为每次间隔一个心跳时间激活,激活时同样调用amqp_consume_message
。这样无论fd上有没有数据到来,定时器都可以完成心跳包的发送并检测服务端心跳包是否超时。
bug fix
函数amqp_consume_message
对比文档和源码中的实现逻辑,可以确定设计逻辑为,如果服务端心跳超时过久,可以得到返回值如下:ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && ret.library_error == AMQP_STATUS_HEARTBEAT_TIMEOUT
但是在实际调试运行时发现如果超时时间设置为0,在与服务器网络无法连通的情况下,返回的ret.library_error一直是AMQP_STATUS_TIMEOUT,直到内核协议栈返回错误。AMQP_STATUS_HEARTBEAT_TIMEOUT永远都不会出现在返回值内。
bug的复现,可以在连接建立后使用iptables丢弃服务端发出的所有数据包来模拟:sudo iptables -A OUTPUT -p tcp --sport 5672 -j DROP
bug的发生位于函数wait_frame_inner
,调用栈为amqp_consume_message -> amqp_simple_wait_frame_noblock -> wait_frame_inner
原因是,在处理超时时间,以及与预期的心跳接收时间比较时,对0超时时间的处理有问题。
这里给出一个修复patch
1 | diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c |
PS: 如果之前使用iptables屏蔽了5672端口发出的tcp包,记得及时解除屏蔽sudo iptables -D OUTPUT -p tcp --sport 5672 -j DROP