Package org.elasticsearch.indices.recovery
package org.elasticsearch.indices.recovery
This package contains the logic for the recovery functionality.
Preliminaries
Recoveries are started on data nodes as a result of data node discovering shard assignments to themselves in the cluster state. The master node sets up these shard allocations in the cluster state (seeShardRouting
).
If a data node finds shard allocations that require recovery on itself, it will execute the required recoveries by executing the
logic starting at IndicesClusterStateService.createOrUpdateShards(org.elasticsearch.cluster.ClusterState)
. As the data nodes execute
the steps of the recovery state machine they report back success or failure to do so to the master node via the transport actions in
ShardStateAction
, which will then update the shard routing in the cluster state
accordingly to reflect the status of the recovered shards or to handle failures in the recovery process. Recoveries can have various
kinds of sources that are modeled via the RecoverySource
that is communicated to the recovery
target by ShardRouting.recoverySource()
for each shard routing. These sources and their state
machines will be described below. The actual recovery process for all of them is started by invoking
IndexShard.startRecovery(org.elasticsearch.indices.recovery.RecoveryState, org.elasticsearch.indices.recovery.PeerRecoveryTargetService, org.elasticsearch.indices.recovery.PeerRecoveryTargetService.RecoveryListener, org.elasticsearch.repositories.RepositoriesService, java.util.function.BiConsumer<java.lang.String, org.elasticsearch.cluster.metadata.MappingMetadata>, org.elasticsearch.indices.IndicesService)
.
Checkpoints
Aspects of the recovery logic are based on the concepts of local and global checkpoints. Each operation on a shard is tracked by a sequence number as well as the primary term during which it was applied to the index. The sequence number up to which operations have been fully processed on a shard is that shard's local checkpoint. The sequence number up to which operations on all replicas for a shard have been fully processed is referred to as the global checkpoint. Comparing the local checkpoints of shard copies enables determining which operations would have to be replayed to a shard copy to bring it in-sync with the primary shard. By retaining operations in theRecoveryState.Translog
or in soft deletes, they are available for this kind of replay that
moves a shard lower local checkpoint up to a higher local checkpoint. The global checkpoint allows for determining which operations have
been safely processed on all shards and thus don't have to be retained on the primary node for replay to replicas.
The primary node tracks the global checkpoint for a shard via the ReplicationTracker
. The primary
term is tracked by the master node and stored in the cluster state and incremented each time the primary node for a shard changes.
Retention Leases
The duration for which a shard retains individual operations for replay during recovery is governed by theRetentionLease
functionality. More information about this functionality can be found in the
org.elasticsearch.index.seqno
package and the "History retention" section in the docs.
Recovery Types
1. Peer Recovery
Peer recovery is the process of bringing a shard copy on one node, referred to as the target node below, in-sync with the shard copy on another node, referred to as the source node below. It is always the primary node of a shard that serves as the source of the recovery. On a high level, recovery happens by a combination of comparing and subsequently synchronizing files and operations from the source to the target. Synchronizing the on-disk file structure on the target with those on the source node is referred to as file-based recovery. Synchronizing operations based on comparing checkpoints is commonly referred to as ops-based recovery. As primaries and replicas are independent Lucene indices that will execute their Lucene level merges independently the concrete on-disk file structure on a pair of primary and replica nodes for a given shard will diverge over time even if both copies of the shard hold the exact same set of documents and operations. Peer recovery will therefore try to avoid file-based recovery where possible to reduce the amount of data that has to be transferred. It will prefer replaying just those operations missing on the target relative to the source instead as this avoids copying files from source to target that could contain data that is for the most part already present on the target. Replaying operations is possible as long as the primary node retains the missing operations as soft-deletes in its Lucene index.State Machine
Peer recoveries are modeled via aRecoverySource.PeerRecoverySource
. They start by moving the
shard's state to IndexShardState.RECOVERING
and then triggering the peer recovery through a call
to PeerRecoveryTargetService.startRecovery(org.elasticsearch.index.shard.IndexShard, org.elasticsearch.cluster.node.DiscoveryNode, org.elasticsearch.indices.recovery.PeerRecoveryTargetService.RecoveryListener)
which results in the following steps being
executed.
-
The target shard starts out with a
RecoveryState
at stageRecoveryState.Stage.INIT
. At the start of the peer recovery process, the target node will try to recover from its local translog as far as if there are any operations to recover from it. It will first move to stageRecoveryState.Stage.INDEX
and then try to recover as far as possible from existing files and the existing translog. During this process, it will move toRecoveryState.Stage.VERIFY_INDEX
, verifying that the files on disk are not corrupted, then toRecoveryState.Stage.TRANSLOG
during recovery from translog. AStartRecoveryRequest
is then sent to the primary node of the shard to recover by the target node for the recovery. This triggersPeerRecoverySourceService.recover(org.elasticsearch.indices.recovery.StartRecoveryRequest, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoveryResponse>)
on the primary node that receives the request. TheStartRecoveryRequest
contains information about the local state of the recovery target, based on which the recovery source will determine the recovery mechanism (file-based or ops-based) to use. -
When determining whether to use ops-based recovery the recovery source will check the following conditions
that must all be true simultaneously for ops-based recovery to be executed:
-
Target shard and source shard must share the same
Engine.HISTORY_UUID_KEY
in their latest Lucene commit. -
The source must have retained all operations between the latest sequence number present on the target.
See
IndexShard.hasCompleteHistoryOperations(java.lang.String, org.elasticsearch.index.engine.Engine.HistorySource, long)
for details. -
A peer recovery retention lease must exist for the target shard and it must retain a sequence number below or equal
to the starting sequence number in
StartRecoveryRequest.startingSeqNo()
.
-
Target shard and source shard must share the same
-
In case the preconditions for ops-based recovery aren't met, file-based recovery is executed first.
To trigger file-based recovery, the source node will execute phase 1 of the recovery by invoking
RecoverySourceHandler.phase1(org.apache.lucene.index.IndexCommit, long, java.util.function.IntSupplier, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoverySourceHandler.SendFileResult>)
. Using the information about the files on the target node found in theStartRecoveryRequest
, phase 1 will determine what segment files must be copied to the recovery target. The information about these files will then be sent to the recovery target via aRecoveryFilesInfoRequest
. Once the recovery target has received the list of files that will be copied to it,RecoverySourceHandler.sendFiles(org.elasticsearch.index.store.Store, org.elasticsearch.index.store.StoreFileMetadata[], java.util.function.IntSupplier, org.elasticsearch.action.ActionListener<java.lang.Void>)
is invoked which will send the segment files over to the recovery target via a series ofRecoveryFileChunkRequest
. Receiving aRecoveryFilesInfoRequest
on the target indicates to it that the recovery will be file-based so it will invokeIndexShard.resetRecoveryStage()
to reset the recovery back toINIT
stage and then prepare for receiving files and move to stageINDEX
again. -
Once all the file chunks have been received by the recovery target, a retention lease for the latest global checkpoint is
created by the source node to ensure all remaining operations from the latest global checkpoint are retained for replay in
the next step of the recovery. Also, after creating the retention lease and before moving on to the next step of the peer
recovery process, a
RecoveryCleanFilesRequest
is sent from the source to the target. The target will handle this request by doing the following:- The file chunks from the previous step were saved to temporary file names. They are now renamed to their original names.
- Cleanup all files in the shard directory that are not part of the recovering shard copy.
-
Trigger creation of a new translog on the target. This moves the recovery stage on the target to
RecoveryState.Stage.TRANSLOG
.
-
After the segment files synchronization from source to the target has finished or was skipped, the translog based recovery
step is executed by invoking
RecoverySourceHandler.prepareTargetForTranslog(int, org.elasticsearch.action.ActionListener<org.elasticsearch.core.TimeValue>)
on the recovery source. This sends aRecoveryPrepareForTranslogOperationsRequest
to the recovery target which contains the estimated number of translog operations that have to be copied to the target. On the target, this request is handled and triggers a call toIndexShard.openEngineAndSkipTranslogRecovery()
which opens a new engine and translog and then responds back to the recovery source. Once the recovery source receives that response, it invokesRecoverySourceHandler.phase2(long, long, org.elasticsearch.index.translog.Translog.Snapshot, long, long, org.elasticsearch.index.seqno.RetentionLeases, long, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoverySourceHandler.SendSnapshotResult>)
to replay outstanding translog operations on the target. This is done by sending a series ofRecoveryTranslogOperationsRequest
to the target which will respond withRecoveryTranslogOperationsResponse
s which contain the maximum persisted local checkpoint for the target. Tracking the maximum of the received local checkpoint values is necessary for the next step, finalizing the recovery. -
After replaying the translog operations on the target, the recovery is finalized by a call to
RecoverySourceHandler.finalizeRecovery(long, long, org.elasticsearch.action.ActionListener<java.lang.Void>)
on the source. With the knowledge that the target has received all operations up to the maximum local checkpoint tracked in the previous step, the source (which is also the primary) can now update its in-sync checkpoint state by a call toReplicationTracker.markAllocationIdAsInSync(java.lang.String, long)
. Once the in-sync sequence number information has been persisted successfully, the source sends aRecoveryFinalizeRecoveryRequest
to the target which contains the global checkpoint as well as a sequence number above which the target can trim all operations from its translog since all operations above this number have just been replayed in the previous step and were either of the same or a newer version that those in the existing translog on the target. This step then also moves the target to the recovery stageRecoveryState.Stage.FINALIZE
. -
After the finalization step, the recovery source will send a
RecoveryResponse
to the target which is implemented as a response to the initialStartRecoveryRequest
that the target sent to initiate the recovery. This leads to a call toIndexShard.postRecovery(java.lang.String)
which moves the recovery state to stageRecoveryState.Stage.DONE
, triggers a refresh of the shard and moves the shard to stateIndexShardState.POST_RECOVERY
. Finally, the recovery target will then send aShardStateAction.StartedShardEntry
transport message to master to inform it about the successful start of the shard. -
After receiving the
StartedShardEntry
, master will then update the cluster state to reflect the state of the now fully recovered recovery target by executing theShardStateAction.ShardStartedClusterStateTaskExecutor
. The resulting cluster state update is then observed byIndexShard.updateShardState(org.elasticsearch.cluster.routing.ShardRouting, long, java.util.function.BiConsumer<org.elasticsearch.index.shard.IndexShard, org.elasticsearch.action.ActionListener<org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask>>, long, java.util.Set<java.lang.String>, org.elasticsearch.cluster.routing.IndexShardRoutingTable)
which updates the shard state on the target node toIndexShardState.STARTED
thus completing the peer recovery.
-
InterfaceDescription
-
ClassDescriptionMultiChunkTransfer<Source,Request extends MultiChunkTransfer.ChunkRequest>File chunks are sent/requested sequentially by at most one thread at any time.The source recovery accepts recovery requests from other peer shards and start the recovery process from this source shard to the target shard.The recovery target handles recoveries of peer shards of the shard+node to recover to.This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node of those recoveries).a reference to
RecoveryTarget
, which implementsAutoCloseable
.RecoverySourceHandler handles the three phases of shard recovery, which is everything relating to copying the segment files as well as sending translog operations across the wire once the segments have been copied.Keeps track of state related to shard recovery.Represents a recovery where the current node is the target node of the recovery.Represents a request for starting a peer recovery.Represents a request for starting a peer recovery. -
ExceptionDescriptionAn exception marking that this recovery attempt should be ignored (since probably, we already recovered).