CEF-IPC通讯模块封装
1.简述
chromium的IPC通讯和socket通讯有点类似,server端先监听,各client端连接进来时,server端为各client端提供相应的service对象与之通讯
1.1.底层通讯
chromium内部是使用命名管道来建立通讯,参看\IPC\chrome\src\ipc\ipc_channel_win.cc
bool Channel::ChannelImpl::CreatePipe(const IPC::ChannelHandle &channel_handle, Mode mode) { if (channel_handle.pipe.handle) { .... } else if (mode & MODE_SERVER_FLAG) { // 1.如果是Server端,就创建Pipe pipe_ = CreateNamedPipeW(pipe_name.c_str(), open_mode, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, 1, Channel::kReadBufferSize, Channel::kReadBufferSize, 5000, NULL); } else if (mode & MODE_CLIENT_FLAG) { // 2.如果是Client端就打开Pipe pipe_name = PipeName(channel_handle.name, &client_secret_); pipe_ = CreateFileW(pipe_name.c_str(), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | FILE_FLAG_OVERLAPPED, NULL); } else { NOTREACHED(); } ..... return true; } //------------------------------ bool Channel::ChannelImpl::ProcessConnection() { // 3.Server端等待连接 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); }
1.2.底层绑定+监听
注意,我们的chromium的IPC是基于windows平台的,所以它的底层监听也来自于windows特有的的IOCP(I/O Completion Port),就是大名鼎鼎的IO完成端口,这里借用一点网上的资料:
IO完成端口是用于C/S通信模式中性能最好的网络通信模型,没有之一,它充分利用内核对象的调度,只使用少量的几个线程来处理和客户端的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能
MessagePumpForIO::MessagePumpForIO() { //1. 初始化完成端口 port_.Set(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1)); DCHECK(port_.IsValid()); } //-------------------------------------------------------------------- bool Channel::ChannelImpl::Connect() { ........ //2. 把我们的pipe绑定到完成端口中 MessageLoopForIO::current()->RegisterIOHandler(pipe_, this); // 3.Check to see if there is a client connected to our pipe... if (waiting_connect_) ProcessConnection(); ...... } //-------------------------------------------------------------------- void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle, IOHandler* handler) { //2.绑定pipe到完成端口的真实实现 ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler); HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1); DPCHECK(port); } //-------------------------------------------------------------------- MessageLoopForIO::current()->PostTask( FROM_HERE, base::Bind(&Channel::ChannelImpl::OnIOCompleted, weak_factory_.GetWeakPtr(), &input_state_.context, 0, 0));
参看\IPC\chrome\src\ipc\ipc_channel_win.cc
通过完成端口,最终在Channel::ChannelImpl::OnIOCompleted中我们可以高效得到pipe发送过来的数据,pipe是否断开等信息
void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, DWORD bytes_transfered, DWORD error) { // 1.把对端的数据广播出去 if (ok) ok = ProcessIncomingMessages(); } if (!ok && INVALID_HANDLE_VALUE != pipe_) { // 2. pipe断开了, 通知一下 Close(); listener()->OnChannelError(); } } //--------------------------------------------------------------------------------------- bool ChannelReader::ProcessIncomingMessages() { while (true) { //1. 读取数据 int bytes_read = 0; ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, &bytes_read); //2.分发到读取的数据 if (!DispatchInputData(input_buf_, bytes_read)) return false; } } //--------------------------------------------------------------------------------------- bool ChannelReader::DispatchInputData(const char* input_data, int input_data_len) { //1. 最终走到此处 listener_->OnMessageReceived(m); }
上述代码有几个关键点,如:
在pipe广播消息时会调用listener_->OnMessageReceived(m);
在pipe断开时会调用listener()->OnChannelError();
即pipe的广播消息和自身的状态都可以通过
Channel::Listener* listener_;
来完成监听
继续参看listener的ChannelProxy::Context::OnMessageReceived实现
// Called on the IPC::Channel thread bool ChannelProxy::Context::OnMessageReceived(const Message& message) { // First give a chance to the filters to process this message. if (!TryFilters(message)) OnMessageReceivedNoFilter(message); return true; }
我们发现有个
if (!TryFilters(message))
这意味着我们可以在监听时,加入特定的MessageFilter,来专门处理某些特定的消息
以上所有代码都封装于channel类,channel实现了以上所有底层功能
1.3.跨线程发送
操作channel的总是IO线程,这是chromium的内部规定,但基本上我们都是需要从非IO线程与其他进程通讯,chromium最擅长是什么? task,对的,把对channel的所有操作全放在task里,再post到IO线程队列中, 为此,chromium封装了一个代理层ipc\ipc_channel_proy.h
class IPC_EXPORT ChannelProxy : public Message::Sender {
它通过以下方式Post Task到IO线程
bool ChannelProxy::Send(Message* message) { ... context_->ipc_message_loop()->PostTask( FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, context_, base::Passed(scoped_ptr<Message>(message)))); return true; }---->// Called on the IPC::Channel threadvoid ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) { if (!channel_.get()) { OnChannelClosed(); return; } if (!channel_->Send(message.release())) OnChannelError(); }
它的基类是Message::Sender,结合上面提到的IOCP完成端口通知, listener监听,形成一个完整的链条
如果要发送同步消息,使用SyncChannel,它其实是在ChannelProxy的基础上增加一个事件等待(为什么不叫SyncChannelProxy呢?)
class IPC_EXPORT SyncChannel : public ChannelProxy, public base::WaitableEventWatcher::Delegate {
另外, SyncChannel在发送异步消息时和ChannelProxy是完全一至的,这从以下代码可以体现出来:
bool SyncChannel::Send(Message* message) { return SendWithTimeout(message, base::kNoTimeout); } bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { if (!message->is_sync()) { ChannelProxy::Send(message); return true; }
2.自身代码封装
外部封装的代码和4366代码采用同一类处理方式,全局仅一个全局变量,从而控制所有对象的生命周期,参看IPC\inc\Common\IPC_Global.h
class IPC_Global : public base::AtExitManager// Thread如果最后退出需依赖于AtExitManager { ........ scoped_refptr<IPC_ThreadEnviroment> m_spThreadEnviroment; ///< 线程环境 scoped_refptr<IPC_HostMgr> m_spHostMgr; ///< Host管理 };
分别为线程环境和host管理,以下分别介绍:
2.1.线程环境
前面提到IPC的底层通讯是在IO线程模型上,对于chromium来说,它有三种线程模型, 以下列出我们要用到的两种线程模型:
MessagePumpDefault | MessagePumpForIO | |
是否需要处理系统消息 | 否 | 是 |
是否需要处理Task | 是 | 是 |
是否需要处理Watcher | 否 | 是 |
是否阻塞在信号量上 | 否 | 是 |
参看:IPC\chrome\include\base\message_loop.h上的简介
// TYPE_DEFAULT // This type of ML only supports tasks and timers. // // TYPE_UI // This type of ML also supports native UI events (e.g., Windows messages). // See also MessageLoopForUI. // // TYPE_IO // This type of ML also supports asynchronous IO. See also // MessageLoopForIO.
我们建立了三个chromium的三个线程,使用了上述两种线程模型,参看IPC\inc\Thread\IPC_ThreadEnviroment.h
enum DMTID { DM_TID_MAIN, ///< MessagePumpDefault, 用于常规host创建,发送消息 DM_TID_IO, ///< MessagePumpForIO,用于IPC底层通讯,操作channel的总是IO线程 DM_TID_EVENT, ///< MessagePumpDefault,用于lister的相关回调 DM_TID_COUNT };
1.IPC_Thread类为线程的实现体,代码位于IPC\inc\Thread\IPC_Thread.h,它提供最基础的线程相关函数
class IPC_Thread : public base::Thread, public base::RefCountedThreadSafe<IPC_Thread>
2.IPC_ThreadRunner类创建线程,启动线程,查找线程,关闭线程
class IPC_ThreadRunner { public: int StartThread(DMTID id); int ShutDownAllThreads(); bool FindThread(DMTID id,scoped_refptr<IPC_Thread>& spThread); public: typedef std::map<DMTID, scoped_refptr<IPC_Thread>> ActivedThreadMap; ///< 通过引用计数来维持生命周期 ActivedThreadMap m_ActiveThreadMap; base::Lock m_Lock; };
3.IPC_ThreadEnviroment类提供统一的入口函数
4.IPC_ThreadMsgLoopProxy类用于channelproxy的消息转发
我们创建了一个IO线程,那怎么让IPC在我们的IO线程上通讯呢?
在channelproxy的构造函数里,它允许用户传入自定义的MessageLoopProxy类
ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle, Channel::Mode mode, Channel::Listener* listener, base::MessageLoopProxy* ipc_thread)// 此处传入我们的IPC_ThreadMsgLoopProxy对象 : context_(new Context(listener, ipc_thread)), outgoing_message_filter_(NULL), did_init_(false) { Init(channel_handle, mode, true); }
所以,我们自定义了一个IPC_ThreadMsgLoopProxy类,它负责把所有的task通过我们的IO线程转发
class IPC_ThreadMsgLoopProxy : public base::MessageLoopProxy { .... bool IPC_ThreadMsgLoopProxy::PostDelayedTask(const tracked_objects::Location& from_here,const base::Closure& task,int64 delay_ms) { return g_pIPCThreadEnviroment->PostDelayedTask(m_id,from_here,task,delay_ms); } } //---------------------------------------------------------------------------------------- m_pChannelProxy.reset(new IPC::ChannelProxy(m_strChannelName, IPC::Channel::MODE_SERVER, this, new IPC_ThreadMsgLoopProxy(DM_TID_IO)));
2.2.Host管理
2.2.1.消息宏
chrome的消息分两种,路由(routed)和控制(control),路由消息是私密的,系统会依照路由信息将消息安全的传递到目的地,控制消息就是一个广播消息,谁想听等能够听得到。
这里我们的外部封装使用的是控制消息
chrome的消息宏展开是反人类的,它允许定义一次宏,展开多段代码,所以没有#pragma once,以下是我们使用的IPC消息定义:
IPC_STRUCT_BEGIN(IPC_MsgContent) IPC_STRUCT_MEMBER(std::vector<unsigned char>, body) IPC_STRUCT_MEMBER(bool, reply_flag) // 回应标志,是否为应答消息 IPC_STRUCT_MEMBER(int32, reply_id) // 同步序号 IPC_STRUCT_END() /// 客户端请求服务端代理为其分配一个服务者,并返回服务者的名称 IPC_SYNC_MESSAGE_CONTROL0_1(IPCMsg_ClientRequestService, std::string) /// 客户端向服务端查询管道ID IPC_SYNC_MESSAGE_CONTROL0_1(IPCMsg_ClientQueryId,int32) /// CONTROL消息 IPC_MESSAGE_CONTROL2(IPCMsg_RouteMsg, int32, IPC_MsgContent)
SYNC表示同步调用,Send在等到回复时才返回,0_1表示0 in,1 out
比如:ChannelMsg_ClientQueryId,表示out一个int32(查询到的channel id)
更多细节可以参看IPC\chrome\include\ipc\ipc_message_macros.h的头简介
2.2.2.消息Filter
最开始提到了,我们可以在监听时,加入特定的MessageFilter,来专门处理某些特定的消息,因为我们定义了三个消息ID,所以在我们的代码中,也定义了针对这三个消息ID的四个特定MessageFilter,它们的对应关系如下
2.2.3.ChannelProxy发送接收MSG封装
前面也提到,channelProxy发送,IPC::Channel::Listener监听处理,基于此,我们针对server和client端分别做了封装
class IPC_ChannelListener : public IPC::Channel::Listener, public base::RefCountedThreadSafe<IPC_ChannelListener> {
针对server端,它自身会分配一个IPC_ChannelService类对象,它组合了IPC_ChannelListener和IPC::ChannelProxy来实现发送消息和监听channel的状态(IOCP通知)
class IPC_ChannelService : public IPC_ChannelListener { ...... virtual void AddFilter(ChannelProxy::MessageFilter* filter); public: scoped_ptr<IPC::ChannelProxy> m_pChannelProxy; ///< 发送消息
另外,它会添加MessageFilter,用于处理client的请求消息,针对每个client连接过来时都会由server端分配一个IPC_ChannelService类对象
针对Client端,它会创建一个IPC_ChannelClient类对象,类似于IPC_ChannelService,它会向server端发送IPC_MsgFilterRouteMsgServer请求server端分配对应的service,以及发送IPC_MsgFilterQueryId查询分配的id
如下图:
2.2.4.Host封装
Host即针对于上面的IPC_ChannelClient和IPC_ChannelService的更上层封装
其详细流程如下:
IPC_HostServer:
-
创建一个IPC_ChannelService类对象(m_spChannelServer),初始化,并加入IPC_MsgFilterRequestService做为Filter
-
预先创建一个空闲的IPC_ChannelService类对象(名字由内部自动分配),并放入空闲列表中(m_IdleChannelWorkerList)
-
当有client连接到m_spChannelServer,并收到了RequestService请求时
-
从空闲列表中(m_IdleChannelWorkerList)分配一个空闲的IPC_ChannelService类对象给对应的client,并加入到工作列表中(m_ChannelWorkerMap),并把它的连接名称同步返回给client,同时重复步骤2的预创建
-
当m_spChannelServer发生channel错误时,试着重新初始化m_spChannelServer
-
当某个client连接发生错误时,把m_ChannelWorkerMap中对应此client的对象清除
7.转发消息和状态到外部
文章作者:hgy413
本文地址:https://hgy413.com/4587.html
版权所有 © 转载时必须以链接形式注明作者和原始出处!