事务

RabbitMQ 为了数据安全提供了confirm 机制及事务机制。

事务机制性能消耗相对更大,所以在大部分应用场景里都不推荐使用。但是可以它可以保证多个消息都到达服务器后再一起提交。

解析

RabbitMQ的事务是相对于Channel来说的,我们以php为例说下使用基本使用:

$msg = 'myTest_';
$exchangeName = 'amq.default';
$routingkey = 'myTestQueue'
$this->_channel->tx_select();

for($i=0;$i<100;$i++){
    $this->_channel->basic_publish($msg.$i, $exchangeName, $routingkey,false);
}

$this->_channel->tx_commit();

==在上面例子中,可以确保 100条消息都到达了服务器再提交,但这个是不是保证 这100条消息在宕机情况下,也要么成功要么失败呢?==

答案 当然是否定的

在前面其他章节中,我们有讲到过我们写数据是写入到连接的时候 ,每个连接一个Reader进程,数据经过Reader进程协议解析出来后,再交给Channel进程,Channel进程里通过Routingkey及Exchange找到相对应的队列,再转发过去。

RabbitMQ事务所有操作都是在Channel里完成。

事务过程

1. tx_select() 的时候标记一下当前Channel是事务开始。
handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
    {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};

handle_method(#'tx.select'{}, _, State) ->
    {reply, #'tx.select_ok'{}, State};
2. basic_publish 的时候,数据到达了Channel进程,Channel进程判断当前为事务状态,则将数据保存到一个List 里
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
    {ok, Message} ->
        Delivery = rabbit_basic:delivery(
                     Mandatory, DoConfirm, Message, MsgSeqNo),
        QNames = rabbit_exchange:route(Exchange, Delivery),
        rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
                            Username, TraceState),
        DQ = {Delivery#delivery{flow = Flow}, QNames},
        {noreply, case Tx of
                      none         -> deliver_to_queues(DQ, State1);
                      {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
                                      State1#ch{tx = {Msgs1, Acks}}
                  end};
    {error, Reason} ->
        precondition_failed("invalid message: ~p", [Reason])
end;
3. tx_rollback() 的时候就把 Channel进程中保存 basic_publish 进来数据的List进行清空
handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
    precondition_failed("channel is not transactional");

handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
                                               tx = {_Msgs, Acks}}) ->
    AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
    UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
    {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
                                          tx                = new_tx()}};
4. tx_commit() 的时候,遍历 List 逐个遍历,并提交到相应的队列进程。

这个提交到哪些进程是在遍历的时间点来计算的,并不是 publish 的时候 决定的

也就是说假如 publish 的时候,routingkey 和 exchange 相对就有有Q1,Q2个队列

但是commit 之前 Q1 和routingkey及exchange解绑了,只当前只会提交到Q2这个队列 这个提交过程有正常我们采用confirm机制提供机制是一样的

handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
    precondition_failed("channel is not transactional");

handle_method(#'tx.commit'{}, _, State = #ch{tx      = {Msgs, Acks},
                                             limiter = Limiter}) ->
    State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
    Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
    lists:foreach(fun ({ack,     A}) -> ack(Rev(A), State1);
                      ({Requeue, A}) -> reject(Requeue, Rev(A), Limiter)
                  end, lists:reverse(Acks)),
    {noreply, maybe_complete_tx(State1#ch{tx = committing})};

RabbitMQ提交是在channel里一个一个提交的。假如在提交到10条消息的时候,那不好意思。那队列里最多也就只有10条数据存储,甚至下的90条没有了

小结

==RabbitMQ事务只能保证在事务里的数据都到了服务器再提交到队列,并不能保证在宕机情况下所有数据一定100%落盘==