k8s 源码分析,Hadoop源码分析16: IPC流程(11) 整体流程

 2023-09-20 阅读 21 评论 0

摘要:1.发送请求:query.getCPUStatus("Intel"); 2.传送到:RPCInvoker.invoke(Object proxy, Method method, Object[]args) 其中method:publicabstract org.hadoopinternal.ipc.copy.CPUStatusorg.hadoopinternal.ipc.copy.Query.getCPUStatus(java.lang.String)

1.发送请求:query.getCPUStatus("Intel");

 

2.传送到:RPCInvoker.invoke(Object proxy, Method method, Object[]args)

其中method:publicabstract org.hadoopinternal.ipc.copy.CPUStatusorg.hadoopinternal.ipc.copy.Query.getCPUStatus(java.lang.String) 

args:[Intel]

 

3.调用:     ObjectWritable value = (ObjectWritable)

       client.call(newRPCInvocation(method,args), remoteId);

 

4.构建 RPCInvocation对象 

   publicRPCInvocation(Methodmethod, Object[] parameters) {

     this.methodName=method.getName();

     this.parameterClasses=method.getParameterTypes();

     this.parameters=parameters;

   }

其中 

methodNamegetCPUStatus

parameterClasses[classjava.lang.String]

parameters[Intel]

 

5.构建ClientCall对象

 publicWritablecall(Writable param, ClientConnectionIdremoteId) 

                      throwsInterruptedException,IOException {

   ClientCall call = newClientCall(param,this);

   ClientConnection connection = getConnection(remoteId,call);

   connection.sendParam(call);

   ......

} 

其中: 

   protectedClientCall(Writableparam, Client client) {

     this.client=client;

     this.param=param;

     synchronized(client){

       this.id=client.counter++;

     }

   }

 

其中:

clientorg.hadoopinternal.ipc.copy.Client@5fae6db3

param RPCInvocation对象

id2

 

6.调用connection.sendParam(call) 

 publicvoidsendParam(ClientCallcall) {

     if(shouldCloseConnection.get()){

       return;

     } 

     DataOutputBuffer d=null;

     try{

       synchronized(this.out){           

         d = newDataOutputBuffer();

         d.writeInt(call.id);

         call.param.write(d);

         byte[]data = d.getData();

         intdataLength =d.getLength();

         out.writeInt(dataLength);     //first putthe data length

         out.write(data,0, dataLength);//write thedata

         out.flush();

       }

     } catch(IOExceptione) {

       markClosed(e);

     } finally{

       //the bufferis just an in-memory buffer, but it is still polite to

       // closeearly

       IOUtils.closeStream(d);

     }

   } 

 

先和CallIDRPCInvocation对象加在一起,先发送长度,最后发送ID+RPCInvocation对象

 

7.服务器端接收 

读取长度

count= server.channelRead(channel,dataLengthBuffer);

dataLength=dataLengthBuffer.getInt(); 18 

读取数据

data=ByteBuffer.allocate(dataLength);

count= server.channelRead(channel,data);

processData(data.array());

 

      privatevoidprocessData(byte[]buf) throwsIOException, InterruptedException{

             DataInputStream dis = newDataInputStream(newByteArrayInputStream(buf));

             intid =dis.readInt(); // try toread an id 

             Writable param =ReflectionUtils.newInstance(server.paramClass,server.conf);//read param

             param.readFields(dis); 

             ServerCall call = newServerCall(id,param, this);

             server.callQueue.put(call);// queue thecall; maybe blocked here

             rpcCount++;// Incrementthe rpc count

      } 

8.构建ServerCall对象: 

   publicServerCall(intid, Writableparam, ServerConnection connection) {

     this.id=id;

     this.param=param;

     this.connection=connection;

     this.timestamp=System.currentTimeMillis();

     this.response=null;

   } 

id:2

param:RPCInvocation对象

connection:新构建的ServerConnection对象

timestamp:当前时间戳

response:回应结果

 

9. 拿出ServerCall,调用相应方法 

finalServerCallcall = server.callQueue.take();

value= server.call(call.connection.protocol,call.param,call.timestamp);

 

10.反射调用真正方法

param还原成RPCInvocation对象,拿出MethodNameParameterClassesParameters调用

真正方法: 

publicWritablecall(Class? protocol, Writableparam, longreceivedTime)

   throwsIOException{

     try{

       RPCInvocation call = (RPCInvocation)param; 

       Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());

       method.setAccessible(true); 

       Object value = method.invoke(instance,call.getParameters());   

       returnnewObjectWritable(method.getReturnType(),value); 

     } catch(InvocationTargetExceptione) { }

   } 

根据methodReturnType和结果构建返回值。 

 

11.将结果(rv)写入ServerCallResponse 

      voidsetupResponse(ByteArrayOutputStreamresponse, ServerCall call,

                    Status status, Writable rv, String errorClass, Stringerror)

                    throwsIOException{

             response.reset();

             DataOutputStream out = newDataOutputStream(response);

             out.writeInt(call.id);// write callid

             out.writeInt(status.state);// writestatus

 

             if(status ==Status.SUCCESS){

                    rv.write(out);

             } else{

                    WritableUtils.writeString(out, errorClass);

                    WritableUtils.writeString(out, error);

             }

             call.setResponse(ByteBuffer.wrap(response.toByteArray()));

      }

 

先写ServerCallId,再写状态Status,再写结果(rv

 

12.response发送到客户端

 

intnumBytes= server.channelWrite(channel,call.response);

 

13.客户端接收

 

privatevoidreceiveResponse(){

     if(shouldCloseConnection.get()){

       return;

     }

     lastActivity.set(System.currentTimeMillis());

     

     try{

       intid =in.readInt();                   // try toread an id  

       ClientCall call = calls.get(id);

       intstate= in.readInt();    // read callstatus

       if(state ==Status.SUCCESS.state){

         Writable value =ReflectionUtils.newInstance(client.valueClass,client.conf);

         value.readFields(in);                // readvalue

         call.setValue(value);

         calls.remove(id);

       } elseif(state ==Status.ERROR.state){

         call.setException(new IOException(WritableUtils.readString(in)));

         calls.remove(id);

       } elseif(state ==Status.FATAL.state){

         markClosed(new IOException(WritableUtils.readString(in)));

       }

     } catch(IOExceptione) {

           markClosed(e);

     }

   }  

先读ID:2

再读state:0 

再读返回结果value:OW[class=classorg.hadoopinternal.ipc.copy.CPUStatus,value=CPU: Intel Create atFri Mar 14 10:45:56 CST 2014] 

其中:

client.valueClassclassorg.apache.hadoop.io.ObjectWritable

 

转载于:https://www.cnblogs.com/leeeee/p/7276516.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/80369.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息