irpas技术客

es中的IndicesClusterStateService_kgduu_es indices

irpas 2356

1、概述

IndicesClusterStateService作为ClusterStateApplier接口的实现类,在处理集群状态提交时,会执行ClusterStateApplier#applyClusterState。其作为索引的集群状态服务,管理索引及分片。

2、applyClusterState

其主要是管理索引以及分片。

public synchronized void applyClusterState(final ClusterChangedEvent event) { if (lifecycle.started() == false) { return; } final ClusterState state = event.state(); // we need to clean the shards and indices we have on this node, since we // are going to recover them again once state persistence is disabled (no master / not recovered) // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks? if (state.blocks().disableStatePersistence()) { for (AllocatedIndex<? extends Shard> indexService : indicesService) { // also cleans shards indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED, "cleaning index (disabled block persistence)"); } return; } updateFailedShardsCache(state); deleteIndices(event); // also deletes shards of deleted indices removeIndices(event); // also removes shards of removed indices failMissingShards(state); removeShards(state); // removes any local shards that doesn't match what the master expects updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache createIndices(state); createOrUpdateShards(state); }

其通信图为

在applyClusterState时,会调用方法createOrUpdateShards及createShard,创建分片是交给IndicesService来处理,最终是通过IndexShard#startRecovery来执行对一个特定分片的恢复流程,会根据恢复类型执行相应的恢复。

恢复类型有

类型说明EMPTY_STORE从本地恢复(主分片)EXISTING_STORE从本地恢复(主分片)PEER从远端主分片恢复(副分片)SNAPSHOT从快照恢复LOCAL_SHARDS从本节点的其它分片恢复
switch (recoveryState.getRecoverySource().getType()) { case EMPTY_STORE: case EXISTING_STORE: executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); break; case PEER: try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); } catch (Exception e) { failShard("corrupted preexisting index", e); recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } break; case SNAPSHOT: final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository(); executeRecovery("from snapshot", recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l)); break; case LOCAL_SHARDS: final IndexMetadata indexMetadata = indexSettings().getIndexMetadata(); final Index resizeSourceIndex = indexMetadata.getResizeSourceIndex(); final List<IndexShard> startedShards = new ArrayList<>(); final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex); final Set<ShardId> requiredShards; final int numShards; if (sourceIndexService != null) { requiredShards = IndexMetadata.selectRecoverFromShards(shardId().id(), sourceIndexService.getMetadata(), indexMetadata.getNumberOfShards()); for (IndexShard shard : sourceIndexService) { if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) { startedShards.add(shard); } } numShards = requiredShards.size(); } else { numShards = -1; requiredShards = Collections.emptySet(); } if (numShards == startedShards.size()) { assert requiredShards.isEmpty() == false; executeRecovery("from local shards", recoveryState, recoveryListener, l -> recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l)); } else { final RuntimeException e; if (numShards == -1) { e = new IndexNotFoundException(resizeSourceIndex); } else { e = new IllegalStateException("not all required shards of index " + resizeSourceIndex + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard " + shardId()); } throw e; } break; default: throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource()); }

?执行具体的恢复是在generic线程中处理。

private void executeRecovery(String reason, RecoveryState recoveryState, PeerRecoveryTargetService.RecoveryListener recoveryListener, CheckedConsumer<ActionListener<Boolean>, Exception> action) { markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> { if (r) { recoveryListener.onRecoveryDone(recoveryState, getTimestampRange()); } }, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); } 3、主分片恢复

主分片从translog中恢复,尚未执行flush到磁盘的Lucene分段可以从translog中重建。

包含以下几个阶段?

名称说明INIT恢复尚未启动INDEX恢复Lucene文件,以及在节点音复制索引数据VERIFY_INDEX验证索引TRANSLOG启动engine,?重放translog,?建立Lucene索引FINALIZE清理工作DONE完毕
3.1 INIT

从开始执行恢复的那一刻起,被标记为INIT阶段,在IndexShard#startRecovery函数的参数中传入。StoreRecovery#internalRecoverFromStore执行恢复。

IndexShard#prepareForIndexRecovery设置状态为INDEX

public void prepareForIndexRecovery() { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); assert currentEngineReference.get() == null; } 3.2 INDEX

从Lucene读取最后一次提交的分段信息,index中添加文件信息,标识index为完成。

final Store store = indexShard.store(); si = store.readLastCommittedSegmentsInfo(); final RecoveryState.Index index = recoveryState.getIndex(); addRecoveredFileDetails(si, store, index); index.setFileDetailsComplete(); 3.3 VERIFY_INDEX

VERIFY_INDEX中的INDEX指的是Lucene?index。验证当前分片是否损坏,是否进行本项检查依赖于配置项index.shard.check_on_startup,其取值如下

值说明false默认值,打开分片时不检查分片是否损坏checksum检查物理损坏true检查物理和逻辑损坏,将消耗大量的内存和CPU资源
3.4 TRANSLOG

一个Lucene索引由许多分段组成,每次搜索时遍历所有分段。内部维护了一个称为“提交点”的信息,其描述了当前Lucene索引都包括哪些分段,这些分段已经被fsyc系统调用,从操作系统的cache刷入磁盘。每次提交操作都会将分段刷入磁盘实现持久化。

此阶段需要重放事务日志中尚未刷入到磁盘的信息,根据最后一次提交的信息做快照,来确定事务日志中哪些需要重放。重放完毕后将新生成的Lucene数据刷入磁盘。

有好几个Engine和EngineFactory,以InternalEngineFactory和InternalEngine为例,其recoverFromTranslog通过从translog中恢复,内部调用recoverFromTranslogInternal

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { final int opsRecovered; final long localCheckpoint = getProcessedLocalCheckpoint(); if (localCheckpoint < recoverUpToSeqNo) { try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { opsRecovered = translogRecoveryRunner.run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); } } else { opsRecovered = 0; } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit logger.trace(() -> new ParameterizedMessage( "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", opsRecovered, translog.currentFileGeneration())); flush(false, true); translog.trimUnreferencedReaders(); }

translogRecoveryRunner的实现为

final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { translogRecoveryStats.totalOperations(snapshot.totalOperations()); translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, translogRecoveryStats::incrementRecoveredOperations); };

内部调用runTranslogRecovery,遍历所有需要重放的事务日志,执行具体的写操作。

int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, Runnable onOperationRecovered) throws IOException { int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) { try { logger.trace("[translog] recover op {}", operation); Engine.Result result = applyTranslogOperation(engine, operation, origin); switch (result.getResultType()) { case FAILURE: throw result.getFailure(); case MAPPING_UPDATE_REQUIRED: throw new IllegalArgumentException("unexpected mapping update: " + result.getRequiredMappingUpdate()); case SUCCESS: break; default: throw new AssertionError("Unknown result type [" + result.getResultType() + "]"); } opsRecovered++; onOperationRecovered.run(); } catch (Exception e) { // TODO: Don't enable this leniency unless users explicitly opt-in if (origin == Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY && ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent logger.info("ignoring recovery of a corrupt translog entry", e); } else { throw ExceptionsHelper.convertToRuntime(e); } } } return opsRecovered; }

?applyTranslogOperation执行具体的写操作

private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; final Engine.Result result; switch (operation.opType()) { case INDEX: final Translog.Index index = (Translog.Index) operation; // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(), versionType, UNASSIGNED_SEQ_NO, 0, index.getAutoGeneratedIdTimestamp(), true, origin, new SourceToParse(shardId.getIndexName(), index.id(), index.source(), XContentHelper.xContentType(index.source()), index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.id(), versionType, UNASSIGNED_SEQ_NO, 0, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; result = markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin); break; default: throw new IllegalStateException("No operation defined for [" + operation + "]"); } return result; }

StoreRecovery#internalRecoverFromStore中调用indexShard.finalizeRecovery()进入FINALIZE阶段。

3.5 FINALIZE

执行refresh操作,将缓冲的数据写入文件 ,但不刷盘,数据在系统的cache中。

public void finalizeRecovery() { recoveryState().setStage(RecoveryState.Stage.FINALIZE); Engine engine = getEngine(); engine.refresh("recovery_finalization"); engine.config().setEnableGcDeletes(true); }

StoreRecovery#internalRecoverFromStore中调用indexShard.postRecovery进入DONE阶段。

3.6 DONE

进入DONE阶段之前再次执行refresh,然后更新分片状态

public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { synchronized (postRecoveryMutex) { // we need to refresh again to expose all operations that were index until now. Otherwise // we may not expose operations that were indexed with a refresh listener that was immediately // responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener // and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard). getEngine().refresh("post_recovery"); synchronized (mutex) { recoveryState.setStage(RecoveryState.Stage.DONE); changeState(IndexShardState.POST_RECOVERY, reason); } } } 3.7?恢复结果处理

主分片恢复完毕后,对恢复结果进行处理。

如果恢复成功,执行IndicesClusterStateService.RecoveryListener#onRecoveryDone

public void onRecoveryDone(final RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { shardStateAction.shardStarted( shardRouting, primaryTerm, "after " + state.getRecoverySource(), timestampMillisFieldRange, SHARD_STATE_ACTION_LISTENER); }

主要是向Master发送action为internal:cluster/shard/started的RPC请求。

ShardStateAction#sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener)

private void sendShardAction(final String actionName, final ClusterState currentState, final TransportRequest request, final ActionListener<Void> listener) { ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); DiscoveryNode masterNode = currentState.nodes().getMasterNode(); Predicate<ClusterState> changePredicate = MasterNodeChangePredicate.build(currentState); if (masterNode == null) { logger.warn("no master known for action [{}] for shard entry [{}]", actionName, request); waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { logger.debug("sending [{}] to [{}] for shard entry [{}]", actionName, masterNode.getId(), request); transportService.sendRequest(masterNode, actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { listener.onResponse(null); } @Override public void handleException(TransportException exp) { if (isMasterChannelException(exp)) { waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { logger.warn(new ParameterizedMessage("unexpected failure while sending request [{}]" + " to [{}] for shard entry [{}]", actionName, masterNode, request), exp); listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp); } } }); } }

如果恢复失败,则执行IndicesClusterStateService.RecoveryListener#onRecoveryFailure

public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { handleRecoveryFailure(shardRouting, sendShardFailure, e); }

主要是调用IndicesClusterStateService#handleRecoveryFailure,主要实现是关闭IndexShard,?向Master发送internal:cluster/shard/failure的RPC请求。

private void failAndRemoveShard(ShardRouting shardRouting, boolean sendShardFailure, String message, @Nullable Exception failure, ClusterState state) { AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex()); if (indexService != null) { Shard shard = indexService.getShardOrNull(shardRouting.shardId().id()); if (shard != null && shard.routingEntry().isSameAllocation(shardRouting)) { indexService.removeShard(shardRouting.shardId().id(), message); } } if (sendShardFailure) { sendFailShard(shardRouting, message, failure, state); } } private void sendFailShard(ShardRouting shardRouting, String message, @Nullable Exception failure, ClusterState state) { failedShardsCache.put(shardRouting.shardId(), shardRouting); shardStateAction.localShardFailed(shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER, state); } //ShardStateAction public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, ActionListener<Void> listener, final ClusterState currentState) { FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, true); sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener); } private void sendShardAction(final String actionName, final ClusterState currentState, final TransportRequest request, final ActionListener<Void> listener) { ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); DiscoveryNode masterNode = currentState.nodes().getMasterNode(); Predicate<ClusterState> changePredicate = MasterNodeChangePredicate.build(currentState); if (masterNode == null) { logger.warn("no master known for action [{}] for shard entry [{}]", actionName, request); waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { logger.debug("sending [{}] to [{}] for shard entry [{}]", actionName, masterNode.getId(), request); transportService.sendRequest(masterNode, actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { listener.onResponse(null); } @Override public void handleException(TransportException exp) { if (isMasterChannelException(exp)) { waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); } else { logger.warn(new ParameterizedMessage("unexpected failure while sending request [{}]" + " to [{}] for shard entry [{}]", actionName, masterNode, request), exp); listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp); } } }); } } 4、副分片恢复 4.1 INIT

从开始执行恢复的那一刻起,被标记为INIT阶段,与主分片恢复一样。

副分片通过PeerRecoveryTargetService#startRecovery来开始恢复过程,通过generic线程来执行RecoveryRunner。

//PeerRecoveryTargetService public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); threadPool.generic().execute(new RecoveryRunner(recoveryId)); } 4.2 INDEX

IndexShard#prepareForIndexRecovery设置状态为INDEX。

副分片向主分片发送internal:index/shard/recovery/start_recovery的RPC请求,主分片节点对此请求的处理注册在PeerRecoverySourceService类中,具体处理是PeerRecoverySourceService.StartRecoveryTransportRequestHandler,最终处理是交给RecoverySourceHandler#recoverToTarget。

副分片等待主分片处理结果后,处理响应,标识状态为DONE。

public void handleResponse(RecoveryResponse recoveryResponse) { final TimeValue recoveryTime = new TimeValue(timer.time()); onGoingRecoveries.markRecoveryAsDone(recoveryId); } //RecoveriesCollection public void markRecoveryAsDone(long id) { RecoveryTarget removed = onGoingRecoveries.remove(id); if (removed != null) { removed.markAsDone(); } } //RecoveryTarget public void markAsDone() { if (finished.compareAndSet(false, true)) { try { indexShard.postRecovery("peer recovery done"); } finally { decRef(); } listener.onRecoveryDone(state(), indexShard.getTimestampRange()); } } 4.3 VERIFY_INDEX

副分片接收到主分片的internal:index/shard/recovery/clean_files RPC请求,调用CleanFilesRequestHandler处理。RecoveryTarget#cleanFiles调用indexShard.maybeCheckIndex()进入VERIFY_INDEX阶段。

//PeerRecoveryTargetService class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanFilesRequest> { @Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request); if (listener == null) { return; } recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), listener.delegateFailure((l, r) -> { Releasable reenableMonitor = recoveryRef.target().disableRecoveryMonitor(); recoveryRef.target().indexShard().afterCleanFiles(() -> { reenableMonitor.close(); l.onResponse(null); }); })); } } } //RecoveryTarget public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata, ActionListener<Void> listener) { ActionListener.completeWith(listener, () -> { state().getTranslog().totalOperations(totalTranslogOps); multiFileWriter.renameAllTempFiles(); final Store store = store(); store.incRef(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); if (indexShard.getRetentionLeases().leases().isEmpty()) { indexShard.persistRetentionLeases(); } indexShard.maybeCheckIndex(); state().setStage(RecoveryState.Stage.TRANSLOG); } return null; }); } 4.4?TRANSLOG RecoveryTarget#cleanFiles此时进入TRANSLOG阶段。主分片向副分片发送internal:index/shard/recovery/prepare_translog RPC请求。 //RemoteRecoveryTargetHandler public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) { final String action = PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG; final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); final RecoveryPrepareForTranslogOperationsRequest request = new RecoveryPrepareForTranslogOperationsRequest(recoveryId, requestSeqNo, shardId, totalTranslogOps); final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE; executeRetryableAction(action, request, standardTimeoutRequestOptions, listener.map(r -> null), reader); }

副分片收到请求后,调用IndexShard#openEngineAndSkipTranslogRecovery,创建新的Engine,?跳过Engine自身的translog恢复。此时主分片的phase2还没有开始,接下来的TRANSLOG阶段就是等待主分片将translog发到副分片重放。

//RecoveryTarget public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) { ActionListener.completeWith(listener, () -> { state().getIndex().setFileDetailsComplete(); // ops-based recoveries don't send the file details state().getTranslog().totalOperations(totalTranslogOps); indexShard().openEngineAndSkipTranslogRecovery(); return null; }); }

主分片接着发送internal:index/shard/recovery/translog_ops RPC请求,此时主分片处理phase2阶段。副分片通过TranslogOperationsRequestHandler来处理,回放主分片发送过来?的操作。

class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> { @Override public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel, Task task) throws IOException { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.target(); final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.TRANSLOG_OPS, request, nullVal -> new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); if (listener == null) { return; } performTranslogOps(request, listener, recoveryRef); } } private void performTranslogOps(final RecoveryTranslogOperationsRequest request, final ActionListener<Void> listener, final RecoveryRef recoveryRef) { final RecoveryTarget recoveryTarget = recoveryRef.target(); final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); final Consumer<Exception> retryOnMappingException = exception -> { // in very rare cases a translog replay from primary is processed before a mapping update on this node // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node. logger.debug("delaying recovery due to missing mapping changes", exception); // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be // canceled) observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { threadPool.generic().execute(ActionRunnable.wrap(listener, l -> { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { performTranslogOps(request, listener, recoveryRef); } })); } @Override public void onClusterServiceClose() { listener.onFailure(new ElasticsearchException( "cluster service was closed while waiting for mapping updates")); } @Override public void onTimeout(TimeValue timeout) { // note that we do not use a timeout (see comment above) listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " + "(timeout [" + timeout + "])")); } }); }; final IndexMetadata indexMetadata = clusterService.state().metadata().index(request.shardId().getIndex()); final long mappingVersionOnTarget = indexMetadata != null ? indexMetadata.getMappingVersion() : 0L; recoveryTarget.indexTranslogOperations( request.operations(), request.totalTranslogOps(), request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), request.retentionLeases(), request.mappingVersionOnPrimary(), ActionListener.wrap( checkpoint -> listener.onResponse(null), e -> { // do not retry if the mapping on replica is at least as recent as the mapping // that the primary used to index the operations in the request. if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) { retryOnMappingException.accept(e); } else { listener.onFailure(e); } }) ); } } 4.5 FINALIZE

主分片执行完phase2,?调用RemoteRecoveryTargetHandler#finalizeRecovery向副分片发送action为internal:index/shard/recovery/finalize的RPC请求。

//RemoteRecoveryTargetHandler public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener<Void> listener) { final String action = PeerRecoveryTargetService.Actions.FINALIZE; final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); final RecoveryFinalizeRecoveryRequest request = new RecoveryFinalizeRecoveryRequest(recoveryId, requestSeqNo, shardId, globalCheckpoint, trimAboveSeqNo); final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE; executeRetryableAction(action, request, TransportRequestOptions.timeout(recoverySettings.internalActionLongTimeout()), listener.map(r -> null), reader); }

副分片对应的处理为FinalizeRecoveryRequestHandler,先更新全局检查点,然后执行与主分片相同的清理操作。

class FinalizeRecoveryRequestHandler implements TransportRequestHandler<RecoveryFinalizeRecoveryRequest> { @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request); if (listener == null) { return; } recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener); } } } //RecoveryTarget public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener<Void> listener) { ActionListener.completeWith(listener, () -> { indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); // Persist the global checkpoint. indexShard.sync(); indexShard.persistRetentionLeases(); if (trimAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { // We should erase all translog operations above trimAboveSeqNo as we have received either the same or a newer copy // from the recovery source in phase2. Rolling a new translog generation is not strictly required here for we won't // trim the current generation. It's merely to satisfy the assumption that the current generation does not have any // operation that would be trimmed (see TranslogWriter#assertNoSeqAbove). This assumption does not hold for peer // recovery because we could have received operations above startingSeqNo from the previous primary terms. indexShard.rollTranslogGeneration(); // the flush or translog generation threshold can be reached after we roll a new translog indexShard.afterWriteOperation(); indexShard.trimOperationOfPreviousPrimaryTerms(trimAboveSeqNo); } if (hasUncommittedOperations()) { indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); } indexShard.finalizeRecovery(); return null; }); } 4.6?副分片恢复时主分片处理

主分片分为phase1,phase2两个阶段。

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #es #indices #PUBLIC #synchronized #void