1.发送请求:query.getCPUStatus("Intel");
2.传送到:RPCInvoker.invoke(Object proxy, Method method, Object[]args)
其中method:publicabstract org.hadoopinternal.ipc.copy.CPUStatusorg.hadoopinternal.ipc.copy.Query.getCPUStatus(java.lang.String)
args:[Intel]
3.调用: ObjectWritable value = (ObjectWritable)
client.call(newRPCInvocation(method,args), remoteId);
4.构建 RPCInvocation对象
publicRPCInvocation(Methodmethod, Object[] parameters) {
this.methodName=method.getName();
this.parameterClasses=method.getParameterTypes();
this.parameters=parameters;
}
其中
methodName:getCPUStatus
parameterClasses:[classjava.lang.String]
parameters:[Intel]
5.构建ClientCall对象
publicWritablecall(Writable param, ClientConnectionIdremoteId)
throwsInterruptedException,IOException {
ClientCall call = newClientCall(param,this);
ClientConnection connection = getConnection(remoteId,call);
connection.sendParam(call);
......
}
其中:
protectedClientCall(Writableparam, Client client) {
this.client=client;
this.param=param;
synchronized(client){
this.id=client.counter++;
}
}
其中:
client:org.hadoopinternal.ipc.copy.Client@5fae6db3
param: RPCInvocation对象
id:2
6.调用connection.sendParam(call):
publicvoidsendParam(ClientCallcall) {
if(shouldCloseConnection.get()){
return;
}
DataOutputBuffer d=null;
try{
synchronized(this.out){
d = newDataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[]data = d.getData();
intdataLength =d.getLength();
out.writeInt(dataLength); //first putthe data length
out.write(data,0, dataLength);//write thedata
out.flush();
}
} catch(IOExceptione) {
markClosed(e);
} finally{
//the bufferis just an in-memory buffer, but it is still polite to
// closeearly
IOUtils.closeStream(d);
}
}
先和Call的ID和RPCInvocation对象加在一起,先发送长度,最后发送ID+RPCInvocation对象
7.服务器端接收
读取长度
count= server.channelRead(channel,dataLengthBuffer);
dataLength=dataLengthBuffer.getInt(); (18)
读取数据
data=ByteBuffer.allocate(dataLength);
count= server.channelRead(channel,data);
processData(data.array());
privatevoidprocessData(byte[]buf) throwsIOException, InterruptedException{
DataInputStream dis = newDataInputStream(newByteArrayInputStream(buf));
intid =dis.readInt(); // try toread an id
Writable param =ReflectionUtils.newInstance(server.paramClass,server.conf);//read param
param.readFields(dis);
ServerCall call = newServerCall(id,param, this);
server.callQueue.put(call);// queue thecall; maybe blocked here
rpcCount++;// Incrementthe rpc count
}
8.构建ServerCall对象:
publicServerCall(intid, Writableparam, ServerConnection connection) {
this.id=id;
this.param=param;
this.connection=connection;
this.timestamp=System.currentTimeMillis();
this.response=null;
}
id:2
param:RPCInvocation对象
connection:新构建的ServerConnection对象
timestamp:当前时间戳
response:回应结果
9. 拿出ServerCall,调用相应方法
finalServerCallcall = server.callQueue.take();
value= server.call(call.connection.protocol,call.param,call.timestamp);
10.反射调用真正方法
将param还原成RPCInvocation对象,拿出MethodName、ParameterClasses、Parameters调用
真正方法:
publicWritablecall(Class<?> protocol, Writableparam, longreceivedTime)
throwsIOException{
try{
RPCInvocation call = (RPCInvocation)param;
Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());
method.setAccessible(true);
Object value = method.invoke(instance,call.getParameters());
returnnewObjectWritable(method.getReturnType(),value);
} catch(InvocationTargetExceptione) { }
}
根据method的ReturnType和结果构建返回值。
11.将结果(rv)写入ServerCall的Response中
voidsetupResponse(ByteArrayOutputStreamresponse, ServerCall call,
Status status, Writable rv, String errorClass, Stringerror)
throwsIOException{
response.reset();
DataOutputStream out = newDataOutputStream(response);
out.writeInt(call.id);// write callid
out.writeInt(status.state);// writestatus
if(status ==Status.SUCCESS){
rv.write(out);
} else{
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
先写ServerCall的Id,再写状态Status,再写结果(rv)
12.response发送到客户端
intnumBytes= server.channelWrite(channel,call.response);
13.客户端接收
privatevoidreceiveResponse(){
if(shouldCloseConnection.get()){
return;
}
lastActivity.set(System.currentTimeMillis());
try{
intid =in.readInt(); // try toread an id
ClientCall call = calls.get(id);
intstate= in.readInt(); // read callstatus
if(state ==Status.SUCCESS.state){
Writable value =ReflectionUtils.newInstance(client.valueClass,client.conf);
value.readFields(in); // readvalue
call.setValue(value);
calls.remove(id);
} elseif(state ==Status.ERROR.state){
call.setException(new IOException(WritableUtils.readString(in)));
calls.remove(id);
} elseif(state ==Status.FATAL.state){
markClosed(new IOException(WritableUtils.readString(in)));
}
} catch(IOExceptione) {
markClosed(e);
}
}
先读ID:2
再读state:0
再读返回结果value:OW[class=classorg.hadoopinternal.ipc.copy.CPUStatus,value=CPU: Intel Create atFri Mar 14 10:45:56 CST 2014]
其中:
client.valueClass:classorg.apache.hadoop.io.ObjectWritable