事务
RabbitMQ 为了数据安全提供了confirm 机制及事务机制。
事务机制性能消耗相对更大,所以在大部分应用场景里都不推荐使用。但是可以它可以保证多个消息都到达服务器后再一起提交。
解析
RabbitMQ的事务是相对于Channel来说的,我们以php为例说下使用基本使用:
$msg = 'myTest_';
$exchangeName = 'amq.default';
$routingkey = 'myTestQueue'
$this->_channel->tx_select();
for($i=0;$i<100;$i++){
$this->_channel->basic_publish($msg.$i, $exchangeName, $routingkey,false);
}
$this->_channel->tx_commit();
==在上面例子中,可以确保 100条消息都到达了服务器再提交,但这个是不是保证 这100条消息在宕机情况下,也要么成功要么失败呢?==
答案 当然是否定的
在前面其他章节中,我们有讲到过我们写数据是写入到连接的时候 ,每个连接一个Reader进程,数据经过Reader进程协议解析出来后,再交给Channel进程,Channel进程里通过Routingkey及Exchange找到相对应的队列,再转发过去。
RabbitMQ事务所有操作都是在Channel里完成。
事务过程
1. tx_select() 的时候标记一下当前Channel是事务开始。
handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
{reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
handle_method(#'tx.select'{}, _, State) ->
{reply, #'tx.select_ok'{}, State};
2. basic_publish 的时候,数据到达了Channel进程,Channel进程判断当前为事务状态,则将数据保存到一个List 里
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
end;
3. tx_rollback() 的时候就把 Channel进程中保存 basic_publish 进来数据的List进行清空
handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = {_Msgs, Acks}}) ->
AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
4. tx_commit() 的时候,遍历 List 逐个遍历,并提交到相应的队列进程。
这个提交到哪些进程是在遍历的时间点来计算的,并不是 publish 的时候 决定的
也就是说假如 publish 的时候,routingkey 和 exchange 相对就有有Q1,Q2个队列
但是commit 之前 Q1 和routingkey及exchange解绑了,只当前只会提交到Q2这个队列 这个提交过程有正常我们采用confirm机制提供机制是一样的
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1);
({Requeue, A}) -> reject(Requeue, Rev(A), Limiter)
end, lists:reverse(Acks)),
{noreply, maybe_complete_tx(State1#ch{tx = committing})};
RabbitMQ提交是在channel里一个一个提交的。假如在提交到10条消息的时候,那不好意思。那队列里最多也就只有10条数据存储,甚至下的90条没有了
小结
==RabbitMQ事务只能保证在事务里的数据都到了服务器再提交到队列,并不能保证在宕机情况下所有数据一定100%落盘==