首页 > cef > CEF-IPC通讯模块封装
2018一月5

CEF-IPC通讯模块封装

[隐藏]

1.简述

chromium的IPC通讯和socket通讯有点类似,server端先监听,各client端连接进来时,server端为各client端提供相应的service对象与之通讯

  

1.1.底层通讯

chromium内部是使用命名管道来建立通讯,参看\IPC\chrome\src\ipc\ipc_channel_win.cc

bool Channel::ChannelImpl::CreatePipe(const IPC::ChannelHandle &channel_handle,
                                      Mode mode) {
  if (channel_handle.pipe.handle) {
          ....
  } else if (mode & MODE_SERVER_FLAG) {
  
      // 1.如果是Server端,就创建Pipe
    pipe_ = CreateNamedPipeW(pipe_name.c_str(),
                             open_mode,
                             PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
                             1,
                             Channel::kReadBufferSize,
                             Channel::kReadBufferSize,
                             5000,
                             NULL);
  } else if (mode & MODE_CLIENT_FLAG) {
    //  2.如果是Client端就打开Pipe
    pipe_name = PipeName(channel_handle.name, &client_secret_);
    pipe_ = CreateFileW(pipe_name.c_str(),
                        GENERIC_READ | GENERIC_WRITE,
                        0,
                        NULL,
                        OPEN_EXISTING,
                        SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION |
                            FILE_FLAG_OVERLAPPED,
                        NULL);
  } else {
    NOTREACHED();
  }
  .....
  return true;
}
//------------------------------
bool Channel::ChannelImpl::ProcessConnection() {
  // 3.Server端等待连接
  BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped);
}

   

1.2.底层绑定+监听

注意,我们的chromium的IPC是基于windows平台的,所以它的底层监听也来自于windows特有的的IOCPI/O Completion Port),就是大名鼎鼎的IO完成端口,这里借用一点网上的资料:

IO完成端口是用于C/S通信模式中性能最好的网络通信模型,没有之一,它充分利用内核对象的调度,只使用少量的几个线程来处理和客户端的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能

MessagePumpForIO::MessagePumpForIO() {
   //1. 初始化完成端口
  port_.Set(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1));
  DCHECK(port_.IsValid());
}

//--------------------------------------------------------------------
bool Channel::ChannelImpl::Connect() {

   ........
   //2. 把我们的pipe绑定到完成端口中
  MessageLoopForIO::current()->RegisterIOHandler(pipe_, this);

  // 3.Check to see if there is a client connected to our pipe...
  if (waiting_connect_)
    ProcessConnection();
    ......
}

//--------------------------------------------------------------------
void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle,
                                         IOHandler* handler) {
                                         
  //2.绑定pipe到完成端口的真实实现
  ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler);
  HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1);
  DPCHECK(port);
}

//--------------------------------------------------------------------
 MessageLoopForIO::current()->PostTask(
        FROM_HERE, base::Bind(&Channel::ChannelImpl::OnIOCompleted,
                              weak_factory_.GetWeakPtr(), &input_state_.context,
                              0, 0));      

参看\IPC\chrome\src\ipc\ipc_channel_win.cc

    

通过完成端口,最终在Channel::ChannelImpl::OnIOCompleted中我们可以高效得到pipe发送过来的数据,pipe是否断开等信息

void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
                                         DWORD bytes_transfered,
                                         DWORD error) {
   // 1.把对端的数据广播出去
    if (ok)
      ok = ProcessIncomingMessages();
  } 
  if (!ok && INVALID_HANDLE_VALUE != pipe_) {
    // 2. pipe断开了, 通知一下
    Close();
    listener()->OnChannelError();
  }
}

//---------------------------------------------------------------------------------------
bool ChannelReader::ProcessIncomingMessages() {
  while (true) {
     //1. 读取数据
    int bytes_read = 0;
    ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
                                    &bytes_read);
    
    //2.分发到读取的数据
    if (!DispatchInputData(input_buf_, bytes_read))
      return false;
  }
}

//---------------------------------------------------------------------------------------
bool ChannelReader::DispatchInputData(const char* input_data,
                                      int input_data_len) {

        //1. 最终走到此处
        listener_->OnMessageReceived(m);
    
  }

上述代码有几个关键点,如:

在pipe广播消息时会调用listener_->OnMessageReceived(m);

在pipe断开时会调用listener()->OnChannelError();

 即pipe的广播消息和自身的状态都可以通过

 Channel::Listener* listener_;

来完成监听

  

继续参看listener的ChannelProxy::Context::OnMessageReceived实现

// Called on the IPC::Channel thread
bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
  // First give a chance to the filters to process this message.
  if (!TryFilters(message))
    OnMessageReceivedNoFilter(message);
  return true;
}

我们发现有个

 if (!TryFilters(message))

这意味着我们可以在监听时,加入特定的MessageFilter,来专门处理某些特定的消息

   

以上所有代码都封装于channel类,channel实现了以上所有底层功能

   

1.3.跨线程发送

操作channel的总是IO线程,这是chromium的内部规定,但基本上我们都是需要从非IO线程与其他进程通讯,chromium最擅长是什么? task,对的,把对channel的所有操作全放在task里,再post到IO线程队列中, 为此,chromium封装了一个代理层ipc\ipc_channel_proy.h

class IPC_EXPORT ChannelProxy : public Message::Sender {

它通过以下方式Post Task到IO线程

bool ChannelProxy::Send(Message* message) { ...
  context_->ipc_message_loop()->PostTask(
      FROM_HERE,
      base::Bind(&ChannelProxy::Context::OnSendMessage,
                 context_, base::Passed(scoped_ptr<Message>(message))));  return true;
}---->// Called on the IPC::Channel threadvoid ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {  if (!channel_.get()) {
    OnChannelClosed();    return;
  }  if (!channel_->Send(message.release()))
    OnChannelError();
}

它的基类是Message::Sender,结合上面提到的IOCP完成端口通知, listener监听,形成一个完整的链条

   

如果要发送同步消息,使用SyncChannel,它其实是在ChannelProxy的基础上增加一个事件等待(为什么不叫SyncChannelProxy呢?)

class IPC_EXPORT SyncChannel : public ChannelProxy,
                               public base::WaitableEventWatcher::Delegate {

 另外,  SyncChannel在发送异步消息时和ChannelProxy是完全一至的,这从以下代码可以体现出来:

bool SyncChannel::Send(Message* message) {
  return SendWithTimeout(message, base::kNoTimeout);
}

bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
  if (!message->is_sync()) {
    ChannelProxy::Send(message);
    return true;
  }

   

   

2.自身代码封装

外部封装的代码和4366代码采用同一类处理方式,全局仅一个全局变量,从而控制所有对象的生命周期,参看IPC\inc\Common\IPC_Global.h

class IPC_Global : public base::AtExitManager// Thread如果最后退出需依赖于AtExitManager
{
        ........
	scoped_refptr<IPC_ThreadEnviroment>                     m_spThreadEnviroment;             ///< 线程环境
	scoped_refptr<IPC_HostMgr>                              m_spHostMgr;                      ///< Host管理
};

分别为线程环境和host管理,以下分别介绍:

   

2.1.线程环境

前面提到IPC的底层通讯是在IO线程模型上,对于chromium来说,它有三种线程模型, 以下列出我们要用到的两种线程模型:

MessagePumpDefault MessagePumpForIO
是否需要处理系统消息
是否需要处理Task
是否需要处理Watcher
是否阻塞在信号量上

参看:IPC\chrome\include\base\message_loop.h上的简介

 // TYPE_DEFAULT
  //   This type of ML only supports tasks and timers.
  //
  // TYPE_UI
  //   This type of ML also supports native UI events (e.g., Windows messages).
  //   See also MessageLoopForUI.
  //
  // TYPE_IO
  //   This type of ML also supports asynchronous IO.  See also
  //   MessageLoopForIO.

   

我们建立了三个chromium的三个线程,使用了上述两种线程模型,参看IPC\inc\Thread\IPC_ThreadEnviroment.h

enum DMTID 
{
	DM_TID_MAIN,			///< MessagePumpDefault, 用于常规host创建,发送消息
	DM_TID_IO,			///< MessagePumpForIO,用于IPC底层通讯,操作channel的总是IO线程
	DM_TID_EVENT,			///< MessagePumpDefault,用于lister的相关回调
	DM_TID_COUNT
};

  

1.IPC_Thread类为线程的实现体,代码位于IPC\inc\Thread\IPC_Thread.h,它提供最基础的线程相关函数

class IPC_Thread : public base::Thread, public base::RefCountedThreadSafe<IPC_Thread>

    

2.IPC_ThreadRunner类创建线程,启动线程,查找线程,关闭线程

class IPC_ThreadRunner
{
public:
	int StartThread(DMTID id);
	int ShutDownAllThreads();
	bool FindThread(DMTID id,scoped_refptr<IPC_Thread>& spThread);

public:
	typedef std::map<DMTID, scoped_refptr<IPC_Thread>>				ActivedThreadMap;										///< 通过引用计数来维持生命周期
	ActivedThreadMap								m_ActiveThreadMap;
	base::Lock									m_Lock;
};

 

3.IPC_ThreadEnviroment类提供统一的入口函数

1.png

  

4.IPC_ThreadMsgLoopProxy类用于channelproxy的消息转发

我们创建了一个IO线程,那怎么让IPC在我们的IO线程上通讯呢?

在channelproxy的构造函数里,它允许用户传入自定义的MessageLoopProxy类

ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
                           Channel::Mode mode,
                           Channel::Listener* listener,
                           base::MessageLoopProxy* ipc_thread)// 此处传入我们的IPC_ThreadMsgLoopProxy对象
    : context_(new Context(listener, ipc_thread)),
      outgoing_message_filter_(NULL),
      did_init_(false) {
  Init(channel_handle, mode, true);
}

所以,我们自定义了一个IPC_ThreadMsgLoopProxy类,它负责把所有的task通过我们的IO线程转发

class IPC_ThreadMsgLoopProxy : public base::MessageLoopProxy
{
    ....
    bool IPC_ThreadMsgLoopProxy::PostDelayedTask(const tracked_objects::Location& from_here,const base::Closure& task,int64 delay_ms)
    {
	return g_pIPCThreadEnviroment->PostDelayedTask(m_id,from_here,task,delay_ms);
    }
}

//----------------------------------------------------------------------------------------
m_pChannelProxy.reset(new IPC::ChannelProxy(m_strChannelName, IPC::Channel::MODE_SERVER, this, new IPC_ThreadMsgLoopProxy(DM_TID_IO)));

          

     

2.2.Host管理 

   

2.2.1.消息宏

chrome的消息分两种,路由(routed)和控制(control),路由消息是私密的,系统会依照路由信息将消息安全的传递到目的地,控制消息就是一个广播消息,谁想听等能够听得到。

这里我们的外部封装使用的是控制消息

 chrome的消息宏展开是反人类的,它允许定义一次宏,展开多段代码,所以没有#pragma once,以下是我们使用的IPC消息定义:

IPC_STRUCT_BEGIN(IPC_MsgContent)
	IPC_STRUCT_MEMBER(std::vector<unsigned char>, body)
	IPC_STRUCT_MEMBER(bool, reply_flag)	// 回应标志,是否为应答消息
	IPC_STRUCT_MEMBER(int32, reply_id)	// 同步序号
IPC_STRUCT_END()

	///	客户端请求服务端代理为其分配一个服务者,并返回服务者的名称 
	IPC_SYNC_MESSAGE_CONTROL0_1(IPCMsg_ClientRequestService, std::string)

	///	客户端向服务端查询管道ID 
	IPC_SYNC_MESSAGE_CONTROL0_1(IPCMsg_ClientQueryId,int32)

	///	CONTROL消息
	IPC_MESSAGE_CONTROL2(IPCMsg_RouteMsg, int32, IPC_MsgContent)

SYNC表示同步调用,Send在等到回复时才返回,0_1表示0 in,1 out 

比如:ChannelMsg_ClientQueryId,表示out一个int32(查询到的channel id)

更多细节可以参看IPC\chrome\include\ipc\ipc_message_macros.h的头简介

   

2.2.2.消息Filter

最开始提到了,我们可以在监听时,加入特定的MessageFilter,来专门处理某些特定的消息,因为我们定义了三个消息ID,所以在我们的代码中,也定义了针对这三个消息ID的四个特定MessageFilter,它们的对应关系如下

2.png

    

2.2.3.ChannelProxy发送接收MSG封装

前面也提到,channelProxy发送,IPC::Channel::Listener监听处理,基于此,我们针对server和client端分别做了封装

class IPC_ChannelListener : public IPC::Channel::Listener, public base::RefCountedThreadSafe<IPC_ChannelListener> 
{

  

针对server端,它自身会分配一个IPC_ChannelService类对象,它组合了IPC_ChannelListenerIPC::ChannelProxy来实现发送消息和监听channel的状态(IOCP通知)

class IPC_ChannelService : public IPC_ChannelListener
{
......
      virtual void AddFilter(ChannelProxy::MessageFilter* filter);
public:
      scoped_ptr<IPC::ChannelProxy>							m_pChannelProxy;					///< 发送消息

另外,它会添加MessageFilter,用于处理client的请求消息,针对每个client连接过来时都会由server端分配一个IPC_ChannelService类对象

针对Client端,它会创建一个IPC_ChannelClient类对象,类似于IPC_ChannelService,它会向server端发送IPC_MsgFilterRouteMsgServer请求server端分配对应的service,以及发送IPC_MsgFilterQueryId查询分配的id

如下图:

3.png

   

2.2.4.Host封装

Host即针对于上面的IPC_ChannelClient和IPC_ChannelService的更上层封装

4.png

其详细流程如下:

IPC_HostServer:

  1. 创建一个IPC_ChannelService类对象(m_spChannelServer),初始化,并加入IPC_MsgFilterRequestService做为Filter

  2. 预先创建一个空闲的IPC_ChannelService类对象(名字由内部自动分配),并放入空闲列表中(m_IdleChannelWorkerList)

  3. 当有client连接到m_spChannelServer,并收到了RequestService请求时

  4. 从空闲列表中(m_IdleChannelWorkerList)分配一个空闲的IPC_ChannelService类对象给对应的client,并加入到工作列表中(m_ChannelWorkerMap),并把它的连接名称同步返回给client,同时重复步骤2的预创建

  5. 当m_spChannelServer发生channel错误时,试着重新初始化m_spChannelServer

  6. 当某个client连接发生错误时,把m_ChannelWorkerMap中对应此client的对象清除

 7.转发消息和状态到外部

IPC通讯模块.zip

  

   











文章作者:hgy413
本文地址:http://hgy413.com/4587.html
版权所有 © 转载时必须以链接形式注明作者和原始出处!

本文目前尚无任何评论.

发表评论