hadoop源码csdn,hadoop之BlockPoolManager源码分析

 2023-09-24 阅读 8 评论 0

摘要:在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在 DataNode定义了一个BlockPoolManager用于管理DataNode上所有的块池。 DataNode 其他模块要对BlockPool操作必须通过BlockPool Manager来执

在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在

DataNode定义了一个BlockPoolManager用于管理DataNode上所有的块池。

DataNode 其他模块要对BlockPool操作必须通过BlockPool Manager来执行,每一个DataNode都有一个BlockManager的实例

hadoop源码csdn? 

一 BPServiceActor分析

BPServiceActor负责与一个NameNode进行通信,每一个BPServiceActor都是一个独立的线程,主要功能:

>>与NameNode进行第一次握手,获取命名空间的信息

hadoop数据分析实战,>>向NameNode注册当前DataNode

>>定期向NameNode发送心跳,增量块汇报,全量块汇报,缓存块汇报等

>>执行NameNode传回的指令

 

static final Log LOG = DataNode.LOG;

//NameNode 地址

finalInetSocketAddress nnAddr;

//NameNode 状态

HAServiceStatestate;

//所持有的BPOfferService对象

hadoop源码,final BPOfferService bpos;

//当前的工作线程

ThreadbpThread;

//向Name Node发送RPC请求的代理

DatanodeProtocolClientSideTranslatorPBbpNamenode;

//当前BPServiceActor的运行状态,初始状态时CONNECTING

static enum RunningState {

    CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;

}

private volatile RunningState runningState = RunningState.CONNECTING

//用于保存2次块汇报之间Data Node存储数据块的变化

private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>

      pendingIncrementalBRperStorage = Maps.newHashMap();

//DataNode对象的引用

private final DataNode dn;

//用于记录Data Node的注册信息

private DatanodeRegistration bpRegistration;

hadoop大数据分析, 

//初始化

try {

    //与Name Node握手并进行Data Node注册

    connectToNNAndHandshake();

    break;

} catch (IOException ioe) {

    //初始化握手出现失败,运行状态置为INIT_FAILED

    runningState = RunningState.INIT_FAILED;

    if (shouldRetryInit()) {

        // Retry until all namenode's of BPOSfailed initialization

        sleepAndLogInterrupts(5000, "initializing");

    } else {

        runningState = RunningState.FAILED;

        return;

    }

}

}

//初始化成功,状态置为RUNNING

runningState = RunningState.RUNNING;

//循环调用offerService方法向NameNode发送心跳,块汇报,增量汇报以及缓存快汇报等

while (shouldRun()) {

    try {

      offerService();

    } catch (Exception ex) {

  //收到异常也不会处理直到BPServiceActor停止或者Data Node停止

  sleepAndLogInterrupts(5000, "offeringservice");

    }

  }

  //BPServiceActor停止以后,状态置为EXITED

  runningState = RunningState.EXITED;

 

  private void connectToNNAndHandshake() throws IOException {

    //获取Name Node 的PRC 代理

    bpNamenode = dn.connectToNN(nnAddr);

    //第一次握手去获取namespace 信息

    NamespaceInfo nsInfo = retrieveNamespaceInfo();

    bpos.verifyAndSetNamespaceInfo(nsInfo);

    //第二次握手则是向Name Node注册这个Data Node

    register(nsInfo);

  }

hdfs源码, 

private void offerService() throwsException {

    long fullBlockReportLeaseId = 0;

    while (shouldRun()) {

      try {

        final long startTime = scheduler.monotonicNow();

        //判断是否发送心跳信息

        final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);

        HeartbeatResponse resp = null;

        //如果要发送心跳

        if (sendHeartbeat) {

          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&

                  scheduler.isBlockReportDue(startTime);

          scheduler.scheduleNextHeartbeat();

          if (!dn.areHeartbeatsDisabledForTests()) {

            //发送心跳信息

            resp = sendHeartBeat(requestBlockReportLease);

            assert resp != null;

            if (resp.getFullBlockReportLeaseId() != 0) {

              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();

            }

            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);

            //对心跳响应中携带的NameNode的HA状态进行处理

            bpos.updateActorStatesFromHeartbeat(

                this, resp.getNameNodeHaState());

            state = resp.getNameNodeHaState().getState();

 

            if (state == HAServiceState.ACTIVE) {

              handleRollingUpgradeStatus(resp);

            }

 

            long startProcessCommands = monotonicNow();

            //处理响应中带回的Name Node指令

            if (!processCommand(resp.getCommands()))

              continue;

            long endProcessCommands = monotonicNow();

          }

        }

        if (sendImmediateIBR || sendHeartbeat) {

          reportReceivedDeletedBlocks();

        }

 

        List<DatanodeCommand> cmds = null;

        boolean forceFullBr =

            scheduler.forceFullBlockReport.getAndSet(false);

        if (forceFullBr) {

          LOG.info("Forcinga full block report to " + nnAddr);

        }

        if ((fullBlockReportLeaseId != 0) || forceFullBr) {

          cmds = blockReport(fullBlockReportLeaseId);

          fullBlockReportLeaseId = 0;

        }

        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));

 

        if (!dn.areCacheReportsDisabledForTests()) {

          DatanodeCommand cmd = cacheReport();

          processCommand(new DatanodeCommand[]{ cmd });

        }

 

        //

        // There is no work to do;  sleep until hearbeat timer elapses,

        // or work arrives, and then iterate again.

        //

        long waitTime = scheduler.getHeartbeatWaitTime();

        synchronized(pendingIncrementalBRperStorage) {

          if (waitTime > 0 && !sendImmediateIBR) {

            try {

              pendingIncrementalBRperStorage.wait(waitTime);

            } catch (InterruptedException ie) {

              LOG.warn("BPOfferServicefor " + this + " interrupted");

            }

          }

        } // synchronized

      } catch(RemoteException re) {

        String reClass = re.getClassName();

        if (UnregisteredNodeException.class.getName().equals(reClass) ||

            DisallowedDatanodeException.class.getName().equals(reClass) ||

            IncorrectVersionException.class.getName().equals(reClass)) {

          LOG.warn(this + " is shutting down", re);

          shouldServiceRun = false;

          return;

        }

        LOG.warn("RemoteExceptionin offerService", re);

        try {

          long sleepTime = Math.min(1000, dnConf.heartBeatInterval);

          Thread.sleep(sleepTime);

        } catch (InterruptedException ie) {

          Thread.currentThread().interrupt();

        }

      } catch (IOException e) {

        LOG.warn("IOExceptionin offerService", e);

      }

      processQueueMessages();

    } // while (shouldRun())

  } // offerService

 

void updateActorStatesFromHeartbeat(BPServiceActor actor,

      NNHAStatusHeartbeat nnHaState) {

    writeLock();

    try {

      //取到Name Nodetxid

      final long txid = nnHaState.getTxId();

      //当前这个NameNode是否声明自己为Active NameNode

      final boolean nnClaimsActive =

          nnHaState.getState() == HAServiceState.ACTIVE;

      //BPOfferService是否认为当前Name Node为Active NameNode

      final boolean bposThinksActive = bpServiceToActive == actor;

      //当前Name Node携带的txid是否大于原Active NameNode 的txid

      final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;

      //当前这个是Acitve,但是BPOfferService所记录的不是Active,说明Standby已经切换成Active

      if (nnClaimsActive && !bposThinksActive) {

        //如果有两个namenode声明为active,当前的请求过时

        if (!isMoreRecentClaim) {

           //直接忽略

          return;

        } else {//当前请求是最新的请求

          if (bpServiceToActive == null) {

            //BPOfferService上还没有保存active name node

          } else {

          }

          //将bpServiceToActive指向当前的NameNode对应的 BPOfferService

          bpServiceToActive = actor;

        }

      } else if (!nnClaimsActive && bposThinksActive) {

        //原来Active Name Node现在声明为StandbyName Node

        bpServiceToActive = null;

      }

      //更新lastActiveClaimTxId

      if (bpServiceToActive == actor) {

        assert txid >= lastActiveClaimTxId;

        lastActiveClaimTxId = txid;

      }

    } finally {

      writeUnlock();

    }

  }

 

hadoop getmerge、二 BPOfferService

BPOfferService就是对DataNode每一个BlockPool进行管理的类。

重要的字段:

//握手之后获取到的Namespace信息

NamespaceInfobpNSInfo;

//当前Block Pool在Name Node上的注册信息,这个信息是在Data Node注册阶段获取的

volatileDatanodeRegistration bpRegistration;

//当前DataNode的引用

private final DataNode dn;

//BPServiceActor的引用,这个代表着是Active的Name Node对应的对象

privateBPServiceActor bpServiceToActive = null;

//当前命名空间中所有NameNode对应的BPServiceActor对象

private final List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();

//每当收到一个NameNode的时候,就记录最近的txid

private long lastActiveClaimTxId = -1;

hadoop使用实例,主要的方法分类:

1)触发汇报

trySendErrorReport(),reportRemoteBadBlock,reportBadBlock()实现了向NameNode发送错误汇报,汇报远程坏块以及本地坏块的操作,会直接调用BPServiceActor对应操作

void trySendErrorReport(int errCode, String errMsg) {

    for (BPServiceActor actor : bpServices) {

      ErrorReportAction errorReportAction = new ErrorReportAction

          (errCode, errMsg);

      actor.bpThreadEnqueue(errorReportAction);

    }

}

2)添加与删除数据块操作

当DataNode接收一个新的数据块时,比如客户端通过数据流管道写入一个数据块,或者通过DataTransferProtocal流式接口复制一个数据块时候,都会调用BPOfferService.notifyNameNodeReceiveBlock()。

当DataNdoe删除一个已有的数据块的时候,会调用BPOfferService

.notifyNamenodeDeletedBlock()方法通知命名空间。

 

3)响应NameNode的指令

boolean processCommandFromActor(DatanodeCommand cmd,BPServiceActor actor) throws IOException {

    if (cmd == null) {

      return true;

    }

    //如果Name Node返回的指令要求Data Node重新注册的,则调用BPServiceActor.register方法

    if (DatanodeProtocol.DNA_REGISTER == cmd.getAction()) {

      actor.reRegister();

      return false;

    }

    writeLock();

    try {

      //对于Active Name Node返回的指令,调用processCommandFromActive

      if (actor == bpServiceToActive) {

        return processCommandFromActive(cmd, actor);

      } else {

      //对于Standbu Name Node返回的指令,调用processCommandFromStandby

        return processCommandFromStandby(cmd, actor);

      }

    } finally {

      writeUnlock();

    }

}

processCommandFromStandby处理来自StandbyName Node的指令,直接忽略即可。防止在HA部署下出现脑裂的情况,也就是ActiveNameNode和StandbyNameNode同时向DataNode下指令。所以BPOfferService对象并不执行Standby返回的字节指令

 

三BlockPoolManager

BlockPoolManager类负责管理所有的BPOfferService实例,对外提供添加、删除、启动关闭BPOfferService类的接口。所有BPOfferService的操作,都必须通过BlockPoolManager类提供的方法来执行

 

DataNode启动的时候,会初始化BlockPoolManager对象,然后调用BlockPoolManager.refreshNamenodes()完成对BlockPoolManager的构造

//<namespaceId,BPOfferService>命名空间id与BPOfferService

private final Map<String, BPOfferService> bpByNameserviceId = Maps.newHashMap();

//<blockId,BPOfferService>块池id与BPOfferService映射

private final Map<String, BPOfferService> bpByBlockPoolId = Maps.newHashMap();

//持有一个BPOfferService List

private final List<BPOfferService> offerServices =Lists.newArrayList();

//持有一个Data Node的引用

private final DataNode dn;

 

 

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/93050.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息