首页 > cef > CEF-IPC通讯模块封装
2018一月5

CEF-IPC通讯模块封装

[隐藏]

1.简述

chromium的IPC通讯和socket通讯有点类似,server端先监听,各client端连接进来时,server端为各client端提供相应的service对象与之通讯

  

1.1.底层通讯

chromium内部是使用命名管道来建立通讯,参看\IPC\chrome\src\ipc\ipc_channel_win.cc

bool Channel::ChannelImpl::CreatePipe(const IPC::ChannelHandle &channel_handle,
                                      Mode mode) {
  if (channel_handle.pipe.handle) {
          ....
  } else if (mode & MODE_SERVER_FLAG) {
  
      // 1.如果是Server端,就创建Pipe
    pipe_ = CreateNamedPipeW(pipe_name.c_str(),
                             open_mode,
                             PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
                             1,
                             Channel::kReadBufferSize,
                             Channel::kReadBufferSize,
                             5000,
                             NULL);
  } else if (mode & MODE_CLIENT_FLAG) {
    //  2.如果是Client端就打开Pipe
    pipe_name = PipeName(channel_handle.name, &client_secret_);
    pipe_ = CreateFileW(pipe_name.c_str(),
                        GENERIC_READ | GENERIC_WRITE,
                        0,
                        NULL,
                        OPEN_EXISTING,
                        SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION |
                            FILE_FLAG_OVERLAPPED,
                        NULL);
  } else {
    NOTREACHED();
  }
  .....
  return true;
}
//------------------------------
bool Channel::ChannelImpl::ProcessConnection() {
  // 3.Server端等待连接
  BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped);
}

   

1.2.底层绑定+监听

注意,我们的chromium的IPC是基于windows平台的,所以它的底层监听也来自于windows特有的的IOCPI/O Completion Port),就是大名鼎鼎的IO完成端口,这里借用一点网上的资料:

IO完成端口是用于C/S通信模式中性能最好的网络通信模型,没有之一,它充分利用内核对象的调度,只使用少量的几个线程来处理和客户端的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能

MessagePumpForIO::MessagePumpForIO() {
   //1. 初始化完成端口
  port_.Set(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1));
  DCHECK(port_.IsValid());
}

//--------------------------------------------------------------------
bool Channel::ChannelImpl::Connect() {

   ........
   //2. 把我们的pipe绑定到完成端口中
  MessageLoopForIO::current()->RegisterIOHandler(pipe_, this);

  // 3.Check to see if there is a client connected to our pipe...
  if (waiting_connect_)
    ProcessConnection();
    ......
}

//--------------------------------------------------------------------
void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle,
                                         IOHandler* handler) {
                                         
  //2.绑定pipe到完成端口的真实实现
  ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler);
  HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1);
  DPCHECK(port);
}

//--------------------------------------------------------------------
 MessageLoopForIO::current()->PostTask(
        FROM_HERE, base::Bind(&Channel::ChannelImpl::OnIOCompleted,
                              weak_factory_.GetWeakPtr(), &input_state_.context,
                              0, 0));      

参看\IPC\chrome\src\ipc\ipc_channel_win.cc

    

通过完成端口,最终在Channel::ChannelImpl::OnIOCompleted中我们可以高效得到pipe发送过来的数据,pipe是否断开等信息

void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
                                         DWORD bytes_transfered,
                                         DWORD error) {
   // 1.把对端的数据广播出去
    if (ok)
      ok = ProcessIncomingMessages();
  } 
  if (!ok && INVALID_HANDLE_VALUE != pipe_) {
    // 2. pipe断开了, 通知一下
    Close();
    listener()->OnChannelError();
  }
}

//---------------------------------------------------------------------------------------
bool ChannelReader::ProcessIncomingMessages() {
  while (true) {
     //1. 读取数据
    int bytes_read = 0;
    ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
                                    &bytes_read);
    
    //2.分发到读取的数据
    if (!DispatchInputData(input_buf_, bytes_read))
      return false;
  }
}

//---------------------------------------------------------------------------------------
bool ChannelReader::DispatchInputData(const char* input_data,
                                      int input_data_len) {

        //1. 最终走到此处
        listener_->OnMessageReceived(m);
    
  }

上述代码有几个关键点,如:

在pipe广播消息时会调用listener_->OnMessageReceived(m);

在pipe断开时会调用listener()->OnChannelError();

 即pipe的广播消息和自身的状态都可以通过

 Channel::Listener* listener_;

来完成监听

  

继续参看listener的ChannelProxy::Context::OnMessageReceived实现

// Called on the IPC::Channel thread
bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
  // First give a chance to the filters to process this message.
  if (!TryFilters(message))
    OnMessageReceivedNoFilter(message);
  return true;
}

我们发现有个

 if (!TryFilters(message))

这意味着我们可以在监听时,加入特定的MessageFilter,来专门处理某些特定的消息

   

以上所有代码都封装于channel类,channel实现了以上所有底层功能

   

1.3.跨线程发送

操作channel的总是IO线程,这是chromium的内部规定,但基本上我们都是需要从非IO线程与其他进程通讯,chromium最擅长是什么? task,对的,把对channel的所有操作全放在task里,再post到IO线程队列中, 为此,chromium封装了一个代理层ipc\ipc_channel_proy.h

class IPC_EXPORT ChannelProxy : public Message::Sender {

它通过以下方式Post Task到IO线程

bool ChannelProxy::Send(Message* message) { ...
  context_->ipc_message_loop()->PostTask(
      FROM_HERE,
      base::Bind(&ChannelProxy::Context::OnSendMessage,
                 context_, base::Passed(scoped_ptr<Message>(message))));  return true;
}---->// Called on the IPC::Channel threadvoid ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {  if (!channel_.get()) {
    OnChannelClosed();    return;
  }  if (!channel_->Send(message.release()))
    OnChannelError();
}

它的基类是Message::Sender,结合上面提到的IOCP完成端口通知, listener监听,形成一个完整的链条

   

如果要发送同步消息,使用SyncChannel,它其实是在ChannelProxy的基础上增加一个事件等待(为什么不叫SyncChannelProxy呢?)

class IPC_EXPORT SyncChannel : public ChannelProxy,
                               public base::WaitableEventWatcher::Delegate {

 另外,  SyncChannel在发送异步消息时和ChannelProxy是完全一至的,这从以下代码可以体现出来:

bool SyncChannel::Send(Message* message) {
  return SendWithTimeout(message, base::kNoTimeout);
}

bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
  if (!message->is_sync()) {
    ChannelProxy::Send(message);
    return true;
  }

   

   

2.自身代码封装

外部封装的代码和4366代码采用同一类处理方式,全局仅一个全局变量,从而控制所有对象的生命周期,参看IPC\inc\Common\IPC_Global.h

class IPC_Global : public base::AtExitManager// Thread如果最后退出需依赖于AtExitManager
{
        ........
	scoped_refptr<IPC_ThreadEnviroment>                     m_spThreadEnviroment;             ///< 线程环境
	scoped_refptr<IPC_HostMgr>                              m_spHostMgr;                      ///< Host管理
};

分别为线程环境和host管理,以下分别介绍:

   

2.1.线程环境

前面提到IPC的底层通讯是在IO线程模型上,对于chromium来说,它有三种线程模型, 以下列出我们要用到的两种线程模型:

MessagePumpDefault MessagePumpForIO
是否需要处理系统消息
是否需要处理Task
是否需要处理Watcher
是否阻塞在信号量上

参看:IPC\chrome\include\base\message_loop.h上的简介

 // TYPE_DEFAULT
  //   This type of ML only supports tasks and timers.
  //
  // TYPE_UI
  //   This type of ML also supports native UI events (e.g., Windows messages).
  //   See also MessageLoopForUI.
  //
  // TYPE_IO
  //   This type of ML also supports asynchronous IO.  See also
  //   MessageLoopForIO.

   

我们建立了三个chromium的三个线程,使用了上述两种线程模型,参看IPC\inc\Thread\IPC_ThreadEnviroment.h

enum DMTID 
{
	DM_TID_MAIN,			///< MessagePumpDefault, 用于常规host创建,发送消息
	DM_TID_IO,			///< MessagePumpForIO,用于IPC底层通讯,操作channel的总是IO线程
	DM_TID_EVENT,			///< MessagePumpDefault,用于lister的相关回调
	DM_TID_COUNT
};

  

1.IPC_Thread类为线程的实现体,代码位于IPC\inc\Thread\IPC_Thread.h,它提供最基础的线程相关函数

class IPC_Thread : public base::Thread, public base::RefCountedThreadSafe<IPC_Thread>

    

2.IPC_ThreadRunner类创建线程,启动线程,查找线程,关闭线程

class IPC_ThreadRunner
{
public:
	int StartThread(DMTID id);
	int ShutDownAllThreads();
	bool FindThread(DMTID id,scoped_refptr<IPC_Thread>& spThread);

public:
	typedef std::map<DMTID, scoped_refptr<IPC_Thread>>				ActivedThreadMap;										///< 通过引用计数来维持生命周期
	ActivedThreadMap								m_ActiveThreadMap;
	base::Lock									m_Lock;
};

 

3.IPC_ThreadEnviroment类提供统一的入口函数

1.png

  

4.IPC_ThreadMsgLoopProxy类用于channelproxy的消息转发

我们创建了一个IO线程,那怎么让IPC在我们的IO线程上通讯呢?

在channelproxy的构造函数里,它允许用户传入自定义的MessageLoopProxy类

ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
                           Channel::Mode mode,
                           Channel::Listener* listener,
                           base::MessageLoopProxy* ipc_thread)// 此处传入我们的IPC_ThreadMsgLoopProxy对象
    : context_(new Context(listener, ipc_thread)),
      outgoing_message_filter_(NULL),
      did_init_(false) {
  Init(channel_handle, mode, true);
}

所以,我们自定义了一个IPC_ThreadMsgLoopProxy类,它负责把所有的task通过我们的IO线程转发

class IPC_ThreadMsgLoopProxy : public base::MessageLoopProxy
{
    ....
    bool IPC_ThreadMsgLoopProxy::PostDelayedTask(const tracked_objects::Location& from_here,const base::Closure& task,int64 delay_ms)
    {
	return g_pIPCThreadEnviroment->PostDelayedTask(m_id,from_here,task,delay_ms);
    }
}

//----------------------------------------------------------------------------------------
m_pChannelProxy.reset(new IPC::ChannelProxy(m_strChannelName, IPC::Channel::MODE_SERVER, this, new IPC_ThreadMsgLoopProxy(DM_TID_IO)));

          

     

2.2.Host管理 

   

2.2.1.消息宏

chrome的消息分两种,路由(routed)和控制(control),路由消息是私密的,系统会依照路由信息将消息安全的传递到目的地,控制消息就是一个广播消息,谁想听等能够听得到。

这里我们的外部封装使用的是控制消息

 chrome的消息宏展开是反人类的,它允许定义一次宏,展开多段代码,所以没有#pragma once,以下是我们使用的IPC消息定义:

IPC_STRUCT_BEGIN(IPC_MsgContent)
	IPC_STRUCT_MEMBER(std::vector<unsigned char>, body)
	IPC_STRUCT_MEMBER(bool, reply_flag)	// 回应标志,是否为应答消息
	IPC_STRUCT_MEMBER(int32, reply_id)	// 同步序号
IPC_STRUCT_END()

	///	客户端请求服务端代理为其分配一个服务者,并返回服务者的名称 
	IPC_SYNC_MESSAGE_CONTROL0_1(IPCMsg_ClientRequestService, std::string)

	///	客户端向服务端查询管道ID 
	IPC_SYNC_MESSAGE_CONTROL0_1(IPCMsg_ClientQueryId,int32)

	///	CONTROL消息
	IPC_MESSAGE_CONTROL2(IPCMsg_RouteMsg, int32, IPC_MsgContent)

SYNC表示同步调用,Send在等到回复时才返回,0_1表示0 in,1 out 

比如:ChannelMsg_ClientQueryId,表示out一个int32(查询到的channel id)

更多细节可以参看IPC\chrome\include\ipc\ipc_message_macros.h的头简介

   

2.2.2.消息Filter

最开始提到了,我们可以在监听时,加入特定的MessageFilter,来专门处理某些特定的消息,因为我们定义了三个消息ID,所以在我们的代码中,也定义了针对这三个消息ID的四个特定MessageFilter,它们的对应关系如下

2.png

    

2.2.3.ChannelProxy发送接收MSG封装

前面也提到,channelProxy发送,IPC::Channel::Listener监听处理,基于此,我们针对server和client端分别做了封装

class IPC_ChannelListener : public IPC::Channel::Listener, public base::RefCountedThreadSafe<IPC_ChannelListener> 
{

  

针对server端,它自身会分配一个IPC_ChannelService类对象,它组合了IPC_ChannelListenerIPC::ChannelProxy来实现发送消息和监听channel的状态(IOCP通知)

class IPC_ChannelService : public IPC_ChannelListener
{
......
      virtual void AddFilter(ChannelProxy::MessageFilter* filter);
public:
      scoped_ptr<IPC::ChannelProxy>							m_pChannelProxy;					///< 发送消息

另外,它会添加MessageFilter,用于处理client的请求消息,针对每个client连接过来时都会由server端分配一个IPC_ChannelService类对象

针对Client端,它会创建一个IPC_ChannelClient类对象,类似于IPC_ChannelService,它会向server端发送IPC_MsgFilterRouteMsgServer请求server端分配对应的service,以及发送IPC_MsgFilterQueryId查询分配的id

如下图:

3.png

   

2.2.4.Host封装

Host即针对于上面的IPC_ChannelClient和IPC_ChannelService的更上层封装

4.png

其详细流程如下:

IPC_HostServer:

  1. 创建一个IPC_ChannelService类对象(m_spChannelServer),初始化,并加入IPC_MsgFilterRequestService做为Filter

  2. 预先创建一个空闲的IPC_ChannelService类对象(名字由内部自动分配),并放入空闲列表中(m_IdleChannelWorkerList)

  3. 当有client连接到m_spChannelServer,并收到了RequestService请求时

  4. 从空闲列表中(m_IdleChannelWorkerList)分配一个空闲的IPC_ChannelService类对象给对应的client,并加入到工作列表中(m_ChannelWorkerMap),并把它的连接名称同步返回给client,同时重复步骤2的预创建

  5. 当m_spChannelServer发生channel错误时,试着重新初始化m_spChannelServer

  6. 当某个client连接发生错误时,把m_ChannelWorkerMap中对应此client的对象清除

 7.转发消息和状态到外部

IPC通讯模块.zip

  

   











文章作者:hgy413
本文地址:http://hgy413.com/4587.html
版权所有 © 转载时必须以链接形式注明作者和原始出处!

3 Responses to “CEF-IPC通讯模块封装”

  1. I loved as much as you will receive carried out right
    here. The sketch is tasteful, your authored subject matter
    stylish. nonetheless, you command get got an shakiness over that you wish be delivering the following.
    unwell unquestionably come further formerly again as exactly
    the same nearly very often inside case you shield this increase.

  2. Hello! Quick question that’s entirely off topic.
    Do you know how to make your site mobile friendly?
    My site looks weird when browsing from my iphone4.
    I’m trying to find a template or plugin that might be able to fix
    this issue. If you have any suggestions, please share.

    Thank you!

  3. Great article.

发表评论