博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq网络框架剖析
阅读量:6656 次
发布时间:2019-06-25

本文共 8416 字,大约阅读时间需要 28 分钟。

 

Rabbitmq是一个MQ系统,也就是消息中间件,它实现了AMQP 0.8规范,简单来说就是一个TCP的广播服务器。AMQP协议,你可以类比JMS,不过JMS仅仅是java领域内的API规范,而AMQP比JMS更进一步,它有自己的wire-level protocol,有一套可编程的协议,中立于语言。简单介绍了Rabbitmq之后,进入正题。

    Rabbitmq充分利用了Erlang的分布式、高可靠性、并发等特性,首先看它的一个结构图:

这张图展现了Rabbitmq的主要组件和组件之间的关系,具体到监控树的结构,我画了一张图:
      顶层是rabbit_sup supervisor,它至少有两个子进程,一个是rabbit_tcp_client_sup,用来监控每个connection的处理进程 rabbit_reader的supervisor;rabbit_tcp_listener_sup是监控tcp_listener和 tcp_acceptor_sup的supervisor,tcp_listener里启动tcp服务器,监听端口,并且通过tcp_acceptor_sup启动N个tcp_accetpor,tcp_acceptor发起accept请求,等待客户端连接;tcp_acceptor_sup负责监控这些acceptor。这张图已经能给你一个大体的印象。
讲完大概,进入细节,说说几个我觉的值的注意的地方:
1、tcp_accepto.erl,r对于accept采用的是异步方式,利用prim_inet:async_accept/2方法,此模块没有被文档化,是otp库内部使用,通常来说没必要使用这一模块,gen_tcp:accept/1已经足够,不过rabbitmq是广播程序,因此采用了异步方式。使用async_accept,需要打patch,以使得socket好像我们从gen_tcp:accept/1得到的一样:

  1. handle_info({inet_async, LSock, Ref, {ok, Sock}},
  2.             State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
  3.     %%这里做了patch
  4.     %% patch up the socket so it looks like one we got from
  5.     %% gen_tcp:accept/1 
  6.     {ok, Mod} = inet_db:lookup_socket(LSock),
  7.     inet_db:register_socket(Sock, Mod),
  8.     try
  9.         %% report
  10.         {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
  11.         {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
  12.         error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
  13.                               [inet_parse:ntoa(Address), Port,
  14.                                inet_parse:ntoa(PeerAddress), PeerPort]),
  15.         %% 调用回调模块,将Sock作为附加参数
  16.         apply(M, F, A ++ [Sock])
  17.     catch {inet_error, Reason} ->
  18.             gen_tcp:close(Sock),
  19.             error_logger:error_msg("unable to accept TCP connection: ~p~n",
  20.                                    [Reason])
  21.     end,
  22.     %% 继续发起异步调用
  23.     case prim_inet:async_accept(LSock, -1) of
  24.         {ok, NRef} -> {noreply, State#state{ref=NRef}};
  25.         Error -> {stop, {cannot_accept, Error}, none}
  26.     end;
  27. %%处理错误情况
  28. handle_info({inet_async, LSock, Ref, {error, closed}},
  29.             State=#state{sock=LSock, ref=Ref}) ->
  30.     %% It would be wrong to attempt to restart the acceptor when we
  31.     %% know this will fail.
  32.     {stop, normal, State};

2、rabbitmq内部是使用了多个并发acceptor,这在高并发下、大量连接情况下有效率优势,类似java现在的nio框架采用多个reactor类似,查看tcp_listener.erl:

  1. init({IPAddress, Port, SocketOpts,
  2.       ConcurrentAcceptorCount, AcceptorSup,
  3.       {M,F,A} = OnStartup, OnShutdown, Label}) ->
  4.     process_flag(trap_exit, true),
  5.     case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
  6.                                              {active, false}]) of
  7.         {ok, LSock} ->
  8.              %%创建ConcurrentAcceptorCount个并发acceptor
  9.             lists:foreach(fun (_) ->
  10.                                   {ok, _APid} = supervisor:start_child(
  11.                                                   AcceptorSup, [LSock])
  12.                           end,
  13.                           lists:duplicate(ConcurrentAcceptorCount, dummy)),
  14.             {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
  15.             error_logger:info_msg("started ~s on ~s:~p~n",
  16.                                   [Label, inet_parse:ntoa(LIPAddress), LPort]),
  17.             %%调用初始化回调函数
  18.             apply(M, F, A ++ [IPAddress, Port]),
  19.             {ok, #state{sock = LSock,
  20.                         on_startup = OnStartup, on_shutdown = OnShutdown, 
  21.                         label = Label}};
  22.         {error, Reason} ->
  23.             error_logger:error_msg(
  24.               "failed to start ~s on ~s:~p - ~p~n",
  25.               [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
  26.             {stop, {cannot_listen, IPAddress, Port, Reason}}
  27.     end.

这里有一个技巧,如果要循环N次执行某个函数F,可以通过lists:foreach结合lists:duplicate(N,dummy)来处理。
lists:foreach(fun(_)-> F() end,lists:duplicate(N,dummy)).
3、simple_one_for_one策略的使用,可以看到对于tcp_client_sup和tcp_acceptor_sup都采用了simple_one_for_one策略,而非普通的one_fo_one,这是为什么呢?
这牵扯到simple_one_for_one的几个特点:
1)simple_one_for_one内部保存child是使用dict,而其他策略是使用list,因此simple_one_for_one更适合child频繁创建销毁、需要大量child进程的情况,具体来说例如网络连接的频繁接入断开。
2)使用了simple_one_for_one后,无法调用terminate_child/2 delete_child/2 restart_child/2 
3)start_child/2 对于simple_one_for_one来说,不必传入完整的child spect,传入参数list,会自动进行参数合并在一个地方定义好child spec之后,其他地方只要start_child传入参数即可启动child进程,简化child都是同一类型进程情况下的编程
在 rabbitmq中,tcp_acceptor_sup的子进程都是tcp_acceptor进程,在tcp_listener中是启动了 ConcurrentAcceptorCount个tcp_acceptor子进程,通过supervisor:start_child/2方法:

  1. %%创建ConcurrentAcceptorCount个并发acceptor
  2.             lists:foreach(fun (_) ->
  3.                                   {ok, _APid} = supervisor:start_child(
  4.                                                   AcceptorSup, [LSock])
  5.                           end,
  6.                           lists:duplicate(ConcurrentAcceptorCount, dummy)),

注意到,这里调用的start_child只传入了LSock一个参数,另一个参数CallBack是在定义child spec的时候传入的,参见tcp_acceptor_sup.erl:

  1. init(Callback) ->
  2.     {ok, {
    {simple_one_for_one, 10, 10},
  3.           [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
  4.             transient, brutal_kill, worker, [tcp_acceptor]}]}}.

Erlang内部自动为simple_one_for_one做了参数合并,最后调用的是tcp_acceptor的init/2:

  1. init({Callback, LSock}) ->
  2.     case prim_inet:async_accept(LSock, -1) of
  3.         {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
  4.         Error -> {stop, {cannot_accept, Error}}
  5.     end.

对于tcp_client_sup的情况类似,tcp_client_sup监控的子进程都是rabbit_reader类型,在 rabbit_networking.erl中启动tcp_listenner传入的处理connect事件的回调方法是是 rabbit_networking:start_client/1:

  1. start_tcp_listener(Host, Port) ->
  2.     start_listener(Host, Port, "TCP Listener",
  3.                    %回调的MFA
  4.                    {?MODULE, start_client, []}).
  5. start_client(Sock) ->
  6.     {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
  7.     ok = rabbit_net:controlling_process(Sock, Child),
  8.     Child ! {go, Sock},
  9.     Child.

start_client调用了supervisor:start_child/2来动态启动rabbit_reader进程。
4、协议的解析,消息的读取这部分也非常巧妙,这一部分主要在rabbit_reader.erl中,对于协议的解析没有采用gen_fsm,而是实现了一个巧妙的状态机机制,核心代码在mainloop/4中:

  1. %启动一个连接
  2. start_connection(Parent, Deb, ClientSock) ->
  3.     process_flag(trap_exit, true),
  4.     {PeerAddressS, PeerPort} = peername(ClientSock),
  5.     ProfilingValue = setup_profiling(),
  6.     try 
  7.         rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
  8.                         [self(), PeerAddressS, PeerPort]),
  9.          %延时发送握手协议
  10.         Erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
  11.                           handshake_timeout),
  12.         %进入主循环,更换callback模块,魔法就在这个switch_callback
  13.         mainloop(Parent, Deb, switch_callback(
  14.                                 #v1{sock = ClientSock,
  15.                                     connection = #connection{
  16.                                       user = none,
  17.                                       timeout_sec = ?HANDSHAKE_TIMEOUT,
  18.                                       frame_max = ?FRAME_MIN_SIZE,
  19.                                       vhost = none},
  20.                                     callback = uninitialized_callback,
  21.                                     recv_ref = none,
  22.                                     connection_state = pre_init},
  23.                                 %%注意到这里,handshake就是我们的回调模块,8就是希望接收的数据长度,AMQP协议头的八个字节。
  24.                                 handshake, 8))

魔法就在switch_callback这个方法上:

  1. switch_callback(OldState, NewCallback, Length) ->
  2.     %发起一个异步recv请求,请求Length字节的数据
  3.     Ref = inet_op(fun () -> rabbit_net:async_recv(
  4.                               OldState#v1.sock, Length, infinity) end),
  5.     %更新状态,替换ref和处理模块
  6.     OldState#v1{callback = NewCallback,
  7.                 recv_ref = Ref}.

异步接收Length个数据,如果有,erlang会通知你处理。处理模块是什么概念呢?其实就是一个状态的概念,表示当前协议解析进行到哪一步,起一个label的作用,看看mainloop/4中的应用:

  1. mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
  2.     %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
  3.     receive
  4.         %%接收到数据,交给handle_input处理,注意handle_input的第一个参数就是callback
  5.         {inet_async, Sock, Ref, {ok, Data}} ->
  6.             %handle_input处理
  7.             {State1, Callback1, Length1} =
  8.                 handle_input(State#v1.callback, Data,
  9.                              State#v1{recv_ref = none}),
  10.             %更新回调模块,再次发起异步请求,并进入主循环
  11.             mainloop(Parent, Deb,
  12.                      switch_callback(State1, Callback1, Length1));

handle_input有多个分支,每个分支都对应一个处理模块,例如我们刚才提到的握手协议:

  1. %handshake模块,注意到第一个参数,第二个参数就是我们得到的数据
  2. handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
  3.              State = #v1{sock = Sock, connection = Connection}) ->
  4.      %检测协议是否兼容
  5.     case check_version({ProtocolMajor, ProtocolMinor},
  6.                        {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
  7.         true ->
  8.             {ok, Product} = application:get_key(id),
  9.             {ok, Version} = application:get_key(vsn),
  10.             %兼容的话,进入connections start,协商参数
  11.             ok = send_on_channel0(
  12.                    Sock,
  13.                    #'connection.start'{
  14.                      version_major = ?PROTOCOL_VERSION_MAJOR,
  15.                      version_minor = ?PROTOCOL_VERSION_MINOR,
  16.                      server_properties =
  17.                      [{list_to_binary(K), longstr, list_to_binary(V)} ||
  18.                          {K, V} <-
  19.                              [{"product", Product},
  20.                               {"version", Version},
  21.                               {"platform", "Erlang/OTP"},
  22.                               {"copyright", ?COPYRIGHT_MESSAGE},
  23.                               {"information", ?INFORMATION_MESSAGE}]],
  24.                      mechanisms = <<"PLAIN AMQPLAIN">>,
  25.                      locales = <<"en_US">> }),
  26.             {State#v1{connection = Connection#connection{
  27.                                      timeout_sec = ?NORMAL_TIMEOUT},
  28.                       connection_state = starting},
  29.              frame_header, 7};
  30.          %否则,断开连接,返回可以接受的协议
  31.         false ->
  32.             throw({bad_version, ProtocolMajor, ProtocolMinor})
  33.     end;

其他协议的处理也是类似,通过动态替换callback的方式来模拟状态机做协议的解析和数据的接收,真的很巧妙!让我们体会到Erlang的魅力,FP的魅力。
5、序列图:
1)tcp server的启动过程:
2)一个client连接上来的处理过程:
小结:从上面的分析可以看出,rabbitmq的网络层是非常健壮和高效的,通过层层监控,对每个可能出现的风险点都做了考虑,并且利用了prim_net模块做异步IO处理。分层也是很清晰,将业务处理模块隔离到client_sup监控下的子进程,将网络处理细节和业务逻辑分离。在协议的解析和业务处理上虽然没有采用gen_fsm,但是也实现了一套类似的状态机机制,通过动态替换Callback来模拟状态的变迁,非常巧妙。如果你要实现一个tcp server,强烈推荐从rabbitmq中扣出这个网络层,你只需要实现自己的业务处理模块即可拥有一个高效、健壮、分层清晰的TCP服务器。
网友讨论:
prim_inet,按照余锋老大的说法是可以用的,基本上接口不会有大的变更,gen_tcp其实是基于prim_net实现的。
使用simple_one_for_one,可以有多个child的,只不过这些child的是同一种类型的,看supervisor.erl的源码就知道,内部是动态保存在一个dict结构里dynamics = ?DICT:new(),因此是可保存多个:

  1. {ok, Pid} ->
  2. NState = State#state{dynamics = 
  3. ?DICT:store(Pid, Args, State#state.dynamics)},
  4. {reply, {ok, Pid}, NState};

文章来自:http://blog.chinaunix.net/uid-429659-id-3536524.html

 

转载于:https://www.cnblogs.com/huangliang-hb/p/6625307.html

你可能感兴趣的文章
Android AChartEngine 饼图渐变效果
查看>>
python基本语法(持续更新)
查看>>
Java单例模式
查看>>
记录一个浏览器主页被篡改的解决方法
查看>>
Docker 常用命令 (持续更新)
查看>>
JAVA一个关于传递引用的测试
查看>>
洛谷P2219 [HAOI2007]修筑绿化带(单调队列)
查看>>
Atcoder Tenka1 Programmer Contest 2019题解
查看>>
GlusterFS 安装
查看>>
HDU 1907 John 与 poj 3480
查看>>
短信发送器
查看>>
循环次数( M - 暴力求解、打表)
查看>>
MyBatis错题解析
查看>>
linux===linux在线模拟器汇总
查看>>
poj 2985
查看>>
bzoj2039
查看>>
poj1637
查看>>
azkaban group分组,权限
查看>>
[TJOI2015]旅游
查看>>
Jquery事件委托之Safari
查看>>