RabbitMQ 启动过程

进程执行命令:

/usr/local/erl18.3/erts-7.3/bin/beam.smp -W w -A 64 -P 1048576 -K true -B i -- -root /usr/local/erl18.3 -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.1/ebin -noshell -noinput -s rabbit boot -sname rabbitmqTest199@rabbitmqTest199 -boot start_sasl -config /etc/rabbitmq/rabbitmq -kernel inet_default_connect_options [{nodelay,true}] -rabbit tcp_listeners [{"0.0.0.0",5672}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/data/rabbitmq/log/rabbitmqTest199@rabbitmqTest199.log"} -rabbit sasl_error_logger {file,"/data/rabbitmq/log/rabbitmqTest199@rabbitmqTest199-sasl.log"} -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.1/plugins" -rabbit plugins_expand_dir "/data/rabbitmq/mnesia/rabbitmqTest199@rabbitmqTest199-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/data/rabbitmq/mnesia/rabbitmqTest199@rabbitmqTest199" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672

参数解析

-s rabbit boot 等于erl shell脚本窗口的rabbit::boot() 执行,-s 为首个执行窗口,可以拥有多个-s 参数

-os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false

-os_mon 为启动erlang自带的一些监控进程,请参考:https://blog.csdn.net/pyf725324/article/details/77746321

boot() 方法

boot 方法里执行了 rabbit_mnesia:check_cluster_consistency()

首先 rabbitmq 会先把先确认 mnesia 存活节点是,确保和当前整个集群中其他节点的数据保持一致,在集群中,会把当前这个节点的mnesia 的信息给删除掉,再从其他节点中同步过来

启动加载

1 : 所有插件(每次启动的时候,都会事先把插件目录下的beam 删除掉,重新解压) 2 : os_mon 3 : mnesia 4 : rabbit

备注:虽然加载顺序是这样,但每个模块之间,可能会相互依懒,他会生成一个向量图,每个模块谁先启动,根据生成的向量图来决定

请参考 rabbit.erl 中的 -rabbit_boot_step

rabbit_mnesia::init()

会执行从其他节点上重新同步数据过来。以集群其他节点上的数据为准

如果在启动的过程中mnesia还没步数据或者还没同步完,集群中所有其他节点都宕机了,不好意思,game over( 当前只是猜测,没实际测试 )

rabbit::recovery()

on_node_up(Node)

遍历mnesia里的rabbit_queue 表里的信息,查找 队列没有从节点,并且队列主节点是在当前节点的 并且同时当前这个队列进程的PID 是没有运行的状态下,删除当前这条数据、同时删除路由,交换机的绑定信息,这些操作合并一个事务中操作

find_durable_queues() 从rabbit_durable_queue表中搜索出当前节点持久化的队列,并且在非持久的 rabbit_queue 表里没有找到数据的情况下

遍历启动 find_durable_queues() 找到的队列,rabbit_variable_queue::start(QName)

rabbit_variable_queue 这个模板是可以配置文件中进行配置的。{backing_queue_module, rabbit_variable_queue}

如果你有能力重写这一个模块是可以在配置中进行修改。

每个队列创建一个进程,并且进行索引恢复 。同时每个队列再创建 2个子进程。

rabbit_msg_store: TRANSIENT_MSG_STORE 这个这个是用于有大数据存进程进程之间的一个通信进程,可以理解为从大数据模块读取数据的一个客户端(数据非持久化)

rabbit_msg_store:PERSISTENT_MSG_STORE 这个这个是用于有大数据存进程进程之间的一个通信进程,可以理解为从大数据模块读取数据的一个客户端(持久化数据)

然后接着进行由主进程统一往所有队列进程中 发送 init 信号,进行队列数据结构初始化

rabbit_mirror_queue_misc::on_node_up() 恢复队列从节点进程

恢复过程

a.首先遍历 rabbit_queue 表,找到 需要在本节点进行做镜像的队列。

先查询出当前需要做镜像队列的队列的所有从节点,再把通过suggested_queue_nodes 函数找出队列建议在哪些节点上需要做镜像队列,再和当前节点对比,如果需要在当前节点需要做镜像队列,则进行镜像队列进程的初始化操作

suggested_queue_nodes 函数 是通过配置策略来进行运算的,参考:rabbit_mirror_queue_mode_exactly.erl

rabbit_mirror_queue_mode_nodes.erl rabbit_mirror_queue_mode_all.erl 这三个文件的 suggested_queue_nodes 函数

用python转换下,大概以下逻辑:

need_mirror_queues = []
for q in rabbit_queue:
    qSlaveNodeArr = []
    for sqid in q.Spids:
        if node(spid) != node():
            continue
        else:
            qSlaveNodeArr.append(node(spid))
            break
        if node() in qSlaveNodeArr:
            pass
        else:
            qSlaveNodeArr.append(node())

        suggestedNodes = suggested_queue_nodes(qSlaveNodeArr)
        if node() in suggestedNodes:
            need_mirror_queues.append(q)

for q in need_mirror_queues:
    start_mirror(q)
b. 开启镜像队列进程
rabbit_mirror_queue_misc::add_mirror() -- >

rabbit_amqqueue_sup_sup:start_queue_process(MirrorNode, Q, slave) --> 

rabbit_amqqueue_sub::start_link()

队列进程中加载了 rabbit_amqqueue_process 和 rabbit_mirror_queue_slave

c. 往镜像队列进程发送初始化消息
rabbit_mirror_queue_slave:go(SPid, SyncMode)
d. 删除消息队列名字为QName对应的目录下面所有的消息索引磁盘文件
e. 实际调用 rabbit_variable_queue 进行队列 索引等存储初始化操作( 因为索引等文件都删除了,这一步实际内存中镜像队列进程中队列数据是空的,这一步可以参考队列主节点数据恢复过程 )
f. 往队列主节点的进程PID发送 同步消息

这个是队列镜像队列进程起来后,由队列从节点进程往队列主节点进程发送这个消息,请参考rabbit_mirror_queue_slave::handle_go() 里的 rabbit_mirror_queue_misc:maybe_auto_sync(Q1)

g.队列主节点进程接收到 sync_mirrors 消息 (rabbit_amqqueue_process 下)

开启同步进程并且查找到当前队列所有从节点信息,并且往所有从队列从节点进程广播发送 sync_start 同步开始的消息

h.队列从节点进程接收到 sync_start 消息(在rabbit_mirror_queue_slave 里)

进行判断当前这个节点是否已经是最新的数据,因为有可能本来 队列需要 1主多人的情况下,又新加一个从节点进来,原来的从节点上的数据已经是和主节点上的数据保持一致了。那本来运行的从节点上是不需要进行数据同步的。

已经同步过返回:sync_deny

需要同步返回 :sync_ready

i. 同步进程给需要同步的队列从节点进程发送 同步数据

队列主节点进程遍历当前所有消息同步发送给同步进程,再由同步进程发送给从节点进程

每次发送消息之前会发送一条 这是第几条数据. 这条消息存在队列从节点进程 #state{depth_delta} 字段中,当初始化同步近个队列的时候,这个值>0大于代表不需要同步

参考:

rabbit_mirror_queue_slave::master_go() 遍历所有消息发送消息给同步进程。 当遍历完后 会往同步进程发送一条complete消息。同步进程结束

rabbit_mirror_queue_slave::syncer_loop() 同步进程在死循环接收队列主节点进程发送过来的消息,然后往从进程发送消息

j. 队列从节点处理同步进程同步过来的消息

当队列从节点接收到 sync_start 消息的时候 ,会进入一个死循环接收消息的状态。这个状态只会和 同步进程之间同步,和其他进程通信延迟

从进程初始化的时候 depth_delta = undefined,当然 depth_delta = 0 的时候,这个时候就不会进入接收数据的状态,会返回给同步进程不需要同步消息。在数据同步完的时候 ,从进程会把 depth_delta 设置成 0

rabbit_mirror_queue_slave::handle_cast({sync_start, Ref, Syncer},

数据处理逻辑大概是拿到消息后调用 rabbit_variable_queue::publish() 往队列进程中写消息

请参考方法:rabbit_mirror_queue_slave::slave_sync_loop