/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HexFormat;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.CandidateState;
import org.apache.kafka.raft.Endpoints;
import org.apache.kafka.raft.ExpirationService;
import org.apache.kafka.raft.ExternalKRaftMetrics;
import org.apache.kafka.raft.FollowerState;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.LeaderState;
import org.apache.kafka.raft.LogAppendInfo;
import org.apache.kafka.raft.LogFetchInfo;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.NetworkChannel;
import org.apache.kafka.raft.NomineeState;
import org.apache.kafka.raft.ProspectiveState;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.QuorumStateStore;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.RaftMessage;
import org.apache.kafka.raft.RaftMessageQueue;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.raft.RaftUtil;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.raft.RequestManager;
import org.apache.kafka.raft.ResignedState;
import org.apache.kafka.raft.UnattachedState;
import org.apache.kafka.raft.ValidOffsetAndEpoch;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.AddVoterHandler;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.apache.kafka.raft.internals.BlockingMessageQueue;
import org.apache.kafka.raft.internals.CloseListener;
import org.apache.kafka.raft.internals.DefaultRequestSender;
import org.apache.kafka.raft.internals.FuturePurgatory;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.KRaftVersionUpgrade;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.RemoveVoterHandler;
import org.apache.kafka.raft.internals.RequestSendResult;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.raft.internals.UpdateVoterHandler;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;

public final class KafkaRaftClient<T>
implements RaftClient<T> {
    static final int RETRY_BACKOFF_BASE_MS = 50;
    private static final int MAX_NUMBER_OF_BATCHES = 10;
    public static final int MAX_FETCH_WAIT_MS = 500;
    public static final int MAX_BATCH_SIZE_BYTES = 0x800000;
    public static final int MAX_FETCH_SIZE_BYTES = 0x800000;
    private final OptionalInt nodeId;
    private final Uuid nodeDirectoryId;
    private final AtomicReference<GracefulShutdown> shutdown = new AtomicReference();
    private final LogContext logContext;
    private final Logger logger;
    private final Time time;
    private final int fetchMaxWaitMs;
    private final boolean followersAlwaysFlush;
    private final String clusterId;
    private final Endpoints localListeners;
    private final SupportedVersionRange localSupportedKRaftVersion;
    private final NetworkChannel channel;
    private final ReplicatedLog log;
    private final Random random;
    private final FuturePurgatory<Long> appendPurgatory;
    private final FuturePurgatory<Long> fetchPurgatory;
    private final RecordSerde<T> serde;
    private final MemoryPool memoryPool;
    private final RaftMessageQueue messageQueue;
    private final QuorumConfig quorumConfig;
    private final RaftMetadataLogCleanerManager snapshotCleaner;
    private final Map<RaftClient.Listener<T>, ListenerContext> listenerContexts = new IdentityHashMap<RaftClient.Listener<T>, ListenerContext>();
    private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations = new ConcurrentLinkedQueue();
    private volatile KRaftControlRecordStateMachine partitionState;
    private volatile KafkaRaftMetrics kafkaRaftMetrics;
    private volatile QuorumState quorum;
    private volatile RequestManager requestManager;
    private volatile AddVoterHandler addVoterHandler;
    private volatile RemoveVoterHandler removeVoterHandler;
    private volatile UpdateVoterHandler updateVoterHandler;

    public KafkaRaftClient(OptionalInt nodeId, Uuid nodeDirectoryId, RecordSerde<T> serde, NetworkChannel channel, ReplicatedLog log, Time time, ExpirationService expirationService, LogContext logContext, boolean followersAlwaysFlush, String clusterId, Collection<InetSocketAddress> bootstrapServers, Endpoints localListeners, SupportedVersionRange localSupportedKRaftVersion, QuorumConfig quorumConfig) {
        this(nodeId, nodeDirectoryId, serde, channel, new BlockingMessageQueue(), log, new BatchMemoryPool(5, 0x800000), time, expirationService, 500, followersAlwaysFlush, clusterId, bootstrapServers, localListeners, localSupportedKRaftVersion, logContext, new Random(), quorumConfig);
    }

    KafkaRaftClient(OptionalInt nodeId, Uuid nodeDirectoryId, RecordSerde<T> serde, NetworkChannel channel, RaftMessageQueue messageQueue, ReplicatedLog log, MemoryPool memoryPool, Time time, ExpirationService expirationService, int fetchMaxWaitMs, boolean followersAlwaysFlush, String clusterId, Collection<InetSocketAddress> bootstrapServers, Endpoints localListeners, SupportedVersionRange localSupportedKRaftVersion, LogContext logContext, Random random, QuorumConfig quorumConfig) {
        if (nodeDirectoryId.equals((Object)Uuid.ZERO_UUID)) {
            throw new IllegalArgumentException("The node directory id must be set and not be the zero uuid");
        }
        this.nodeId = nodeId;
        this.nodeDirectoryId = nodeDirectoryId;
        this.logContext = logContext;
        this.serde = serde;
        this.channel = channel;
        this.messageQueue = messageQueue;
        this.log = log;
        this.memoryPool = memoryPool;
        this.fetchPurgatory = new ThresholdPurgatory<Long>(expirationService);
        this.appendPurgatory = new ThresholdPurgatory<Long>(expirationService);
        this.time = time;
        this.clusterId = clusterId;
        this.localListeners = localListeners;
        this.localSupportedKRaftVersion = localSupportedKRaftVersion;
        this.fetchMaxWaitMs = fetchMaxWaitMs;
        this.followersAlwaysFlush = followersAlwaysFlush;
        this.logger = logContext.logger(KafkaRaftClient.class);
        this.random = random;
        this.quorumConfig = quorumConfig;
        this.snapshotCleaner = new RaftMetadataLogCleanerManager(this.logger, time, 60000L, log::maybeClean);
        if (!bootstrapServers.isEmpty()) {
            AtomicInteger id = new AtomicInteger(-2);
            List<Node> bootstrapNodes = bootstrapServers.stream().map(address -> new Node(id.getAndDecrement(), address.getHostString(), address.getPort())).collect(Collectors.toList());
            this.logger.info("Starting request manager with bootstrap servers: {}", bootstrapNodes);
            this.requestManager = new RequestManager(bootstrapNodes, quorumConfig.retryBackoffMs(), quorumConfig.requestTimeoutMs(), random);
        }
    }

    private void updateFollowerHighWatermark(FollowerState state, OptionalLong highWatermarkOpt) {
        highWatermarkOpt.ifPresent(highWatermark -> {
            long newHighWatermark = Math.min(this.endOffset().offset(), highWatermark);
            if (state.updateHighWatermark(OptionalLong.of(newHighWatermark))) {
                this.logger.debug("Follower high watermark updated to {}", (Object)newHighWatermark);
                this.log.updateHighWatermark(new LogOffsetMetadata(newHighWatermark));
                this.updateListenersProgress(newHighWatermark);
            }
        });
    }

    private void updateLeaderEndOffsetAndTimestamp(LeaderState<T> state, long currentTimeMs) {
        LogOffsetMetadata endOffsetMetadata = this.log.endOffset();
        if (state.updateLocalState(endOffsetMetadata, this.partitionState.lastVoterSet())) {
            this.onUpdateLeaderHighWatermark(state, currentTimeMs);
        }
        this.fetchPurgatory.maybeComplete(endOffsetMetadata.offset(), currentTimeMs);
    }

    private void onUpdateLeaderHighWatermark(LeaderState<T> state, long currentTimeMs) {
        state.highWatermark().ifPresent(highWatermark -> {
            this.logger.debug("Leader high watermark updated to {}", highWatermark);
            this.log.updateHighWatermark((LogOffsetMetadata)highWatermark);
            this.addVoterHandler.highWatermarkUpdated(state);
            this.removeVoterHandler.highWatermarkUpdated(state);
            this.appendPurgatory.maybeComplete(highWatermark.offset(), currentTimeMs);
            this.fetchPurgatory.completeAll(currentTimeMs);
            this.updateListenersProgress(highWatermark.offset());
        });
    }

    private void updateListenersProgress(long highWatermark) {
        for (ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                if (nextExpectedOffset < highWatermark && (nextExpectedOffset == -1L || nextExpectedOffset < this.log.startOffset()) && this.latestSnapshot().isPresent()) {
                    listenerContext.fireHandleSnapshot(this.latestSnapshot().get());
                } else if (nextExpectedOffset == -1L) {
                    this.logger.info("Setting the next offset of {} to {} since there are no snapshots", (Object)listenerContext.listenerName(), (Object)0L);
                    listenerContext.resetOffsetToSmallestLogOffset();
                } else if (nextExpectedOffset < this.log.startOffset()) {
                    throw new IllegalStateException(String.format("Snapshot expected since next offset of %s is %d, log start offset is %d and high-watermark is %d", listenerContext.listenerName(), nextExpectedOffset, this.log.startOffset(), highWatermark));
                }
            });
            listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                if (nextExpectedOffset < highWatermark) {
                    LogFetchInfo readInfo = this.log.read(nextExpectedOffset, Isolation.COMMITTED);
                    listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records);
                }
            });
        }
    }

    private Optional<SnapshotReader<T>> latestSnapshot() {
        return this.log.latestSnapshot().map(reader -> RecordsSnapshotReader.of(reader, this.serde, BufferSupplier.create(), 0x800000, true, this.logContext));
    }

    private void maybeFireHandleCommit(long baseOffset, int epoch, long appendTimestamp, int sizeInBytes, List<T> records) {
        for (ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(nextOffset -> {
                if (nextOffset == baseOffset) {
                    listenerContext.fireHandleCommit(baseOffset, epoch, appendTimestamp, sizeInBytes, records);
                }
            });
        }
    }

    private void maybeFireLeaderChange(LeaderState<T> state) {
        for (ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.maybeFireLeaderChange(this.quorum.leaderAndEpoch(), state.epochStartOffset());
        }
    }

    private void maybeFireLeaderChange() {
        for (ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.maybeFireLeaderChange(this.quorum.leaderAndEpoch());
        }
    }

    public void initialize(Map<Integer, InetSocketAddress> voterAddresses, QuorumStateStore quorumStateStore, Metrics metrics, ExternalKRaftMetrics externalKRaftMetrics) {
        VoterSet staticVoters = voterAddresses.isEmpty() ? VoterSet.empty() : VoterSet.fromInetSocketAddresses(this.channel.listenerName(), voterAddresses);
        this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft");
        this.partitionState = new KRaftControlRecordStateMachine(staticVoters, this.log, this.serde, BufferSupplier.create(), 0x800000, this.logContext, this.kafkaRaftMetrics, externalKRaftMetrics);
        this.logger.info("Reading KRaft snapshot and log as part of the initialization");
        this.partitionState.updateState();
        this.logger.info("Starting voters are {}", (Object)this.partitionState.lastVoterSet());
        if (this.requestManager == null) {
            if (voterAddresses.isEmpty()) {
                throw new ConfigException(String.format("Missing kraft bootstrap servers. Must specify a value for %s.", "controller.quorum.bootstrap.servers"));
            }
            List<Node> bootstrapNodes = voterAddresses.entrySet().stream().map(entry -> new Node(((Integer)entry.getKey()).intValue(), ((InetSocketAddress)entry.getValue()).getHostString(), ((InetSocketAddress)entry.getValue()).getPort())).collect(Collectors.toList());
            this.logger.info("Starting request manager with static voters: {}", bootstrapNodes);
            this.requestManager = new RequestManager(bootstrapNodes, this.quorumConfig.retryBackoffMs(), this.quorumConfig.requestTimeoutMs(), this.random);
        }
        this.quorum = new QuorumState(this.nodeId, this.nodeDirectoryId, this.partitionState, this.localListeners, this.localSupportedKRaftVersion, this.quorumConfig.electionTimeoutMs(), this.quorumConfig.fetchTimeoutMs(), quorumStateStore, this.time, this.logContext, this.random, this.kafkaRaftMetrics);
        this.kafkaRaftMetrics.initialize(this.quorum);
        this.kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
        this.quorum.initialize(new OffsetAndEpoch(this.log.endOffset().offset(), this.log.lastFetchedEpoch()));
        long currentTimeMs = this.time.milliseconds();
        if (this.quorum.isLeader()) {
            throw new IllegalStateException("Voter cannot initialize as a Leader");
        }
        if (this.quorum.isOnlyVoter() && (this.quorum.isUnattached() || this.quorum.isFollower() || this.quorum.isResigned())) {
            this.transitionToProspective(currentTimeMs);
        } else if (this.quorum.isCandidate()) {
            this.onBecomeCandidate(currentTimeMs);
        } else if (this.quorum.isFollower()) {
            this.onBecomeFollower(currentTimeMs);
        }
        this.addVoterHandler = new AddVoterHandler(this.partitionState, new DefaultRequestSender(this.requestManager, this.channel, this.messageQueue, this.logContext), this.time, this.logContext);
        this.removeVoterHandler = new RemoveVoterHandler(this.nodeId, this.nodeDirectoryId, this.partitionState, this.time, this.quorumConfig.requestTimeoutMs(), this.logContext);
        this.updateVoterHandler = new UpdateVoterHandler(this.nodeId, this.partitionState, this.channel.listenerName(), this.logContext);
    }

    @Override
    public void register(RaftClient.Listener<T> listener) {
        this.pendingRegistrations.add(Registration.register(listener));
        this.wakeup();
    }

    @Override
    public void unregister(RaftClient.Listener<T> listener) {
        this.pendingRegistrations.add(Registration.unregister(listener));
    }

    @Override
    public LeaderAndEpoch leaderAndEpoch() {
        if (this.isInitialized()) {
            return this.quorum.leaderAndEpoch();
        }
        return LeaderAndEpoch.UNKNOWN;
    }

    @Override
    public OptionalInt nodeId() {
        return this.nodeId;
    }

    private OffsetAndEpoch endOffset() {
        return new OffsetAndEpoch(this.log.endOffset().offset(), this.log.lastFetchedEpoch());
    }

    private void resetConnections() {
        this.requestManager.resetAll();
    }

    private void onBecomeLeader(long currentTimeMs) {
        long endOffset = this.log.endOffset().offset();
        BatchAccumulator<T> accumulator = new BatchAccumulator<T>(this.quorum.epoch(), endOffset, this.quorumConfig.appendLingerMs(), 0x800000, 10, this.memoryPool, this.time, (Compression)Compression.NONE, this.serde);
        LeaderState<T> state = this.quorum.transitionToLeader(endOffset, accumulator);
        this.log.initializeLeaderEpoch(this.quorum.epoch());
        state.appendStartOfEpochControlRecords(currentTimeMs);
        this.resetConnections();
        this.kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
    }

    private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
        this.updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
        this.log.flush(false);
    }

    private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs) {
        if (state.epochElection().isVoteGranted()) {
            this.onBecomeLeader(currentTimeMs);
            return true;
        }
        return false;
    }

    private boolean maybeTransitionToCandidate(ProspectiveState state, long currentTimeMs) {
        if (state.epochElection().isVoteGranted()) {
            this.transitionToCandidate(currentTimeMs);
            return true;
        }
        return false;
    }

    private void maybeTransitionForward(NomineeState state, long currentTimeMs) {
        if (state instanceof ProspectiveState) {
            ProspectiveState prospective = (ProspectiveState)state;
            this.maybeTransitionToCandidate(prospective, currentTimeMs);
        } else if (state instanceof CandidateState) {
            CandidateState candidate = (CandidateState)state;
            this.maybeTransitionToLeader(candidate, currentTimeMs);
        } else {
            throw new IllegalStateException("Expected to be a NomineeState (Prospective or Candidate), but current state is " + String.valueOf(state));
        }
    }

    private void onBecomeCandidate(long currentTimeMs) {
        CandidateState state = this.quorum.candidateStateOrThrow();
        if (!this.maybeTransitionToLeader(state, currentTimeMs)) {
            this.resetConnections();
            this.kafkaRaftMetrics.updateElectionStartMs(currentTimeMs);
        }
    }

    private void transitionToCandidate(long currentTimeMs) {
        this.quorum.transitionToCandidate();
        this.maybeFireLeaderChange();
        this.onBecomeCandidate(currentTimeMs);
    }

    private void onBecomeProspective(long currentTimeMs) {
        ProspectiveState state = this.quorum.prospectiveStateOrThrow();
        if (!this.maybeTransitionToCandidate(state, currentTimeMs)) {
            this.resetConnections();
            this.kafkaRaftMetrics.updateElectionStartMs(currentTimeMs);
        }
    }

    private void transitionToProspective(long currentTimeMs) {
        this.quorum.transitionToProspective();
        this.onBecomeProspective(currentTimeMs);
    }

    private void transitionToUnattached(int epoch, OptionalInt leaderId) {
        this.quorum.transitionToUnattached(epoch, leaderId);
        this.maybeFireLeaderChange();
        this.resetConnections();
    }

    private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
        this.fetchPurgatory.completeAllExceptionally((Throwable)Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
        this.quorum.transitionToResigned(preferredSuccessors);
        this.resetConnections();
    }

    private void onBecomeFollower(long currentTimeMs) {
        this.kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
        this.resetConnections();
        this.fetchPurgatory.completeAllExceptionally((Throwable)Errors.NOT_LEADER_OR_FOLLOWER.exception("Cannot process the fetch request because the node is no longer the leader"));
        this.appendPurgatory.completeAllExceptionally((Throwable)Errors.NOT_LEADER_OR_FOLLOWER.exception("Failed to receive sufficient acknowledgments for this append before leader change"));
    }

    private void transitionToFollower(int epoch, int leaderId, Endpoints endpoints, long currentTimeMs) {
        if (endpoints.isEmpty()) {
            throw new IllegalArgumentException(String.format("Unknown leader endpoints (%s) after request or response with leader (%s) and the voters %s", endpoints, leaderId, this.partitionState.lastVoterSet()));
        }
        this.quorum.transitionToFollower(epoch, leaderId, endpoints);
        this.maybeFireLeaderChange();
        this.onBecomeFollower(currentTimeMs);
    }

    private VoteResponseData buildVoteResponse(ListenerName listenerName, short apiVersion, Errors partitionLevelError, boolean voteGranted) {
        return RaftUtil.singletonVoteResponse(listenerName, apiVersion, Errors.NONE, this.log.topicPartition(), partitionLevelError, this.quorum.epoch(), this.quorum.leaderIdOrSentinel(), voteGranted, this.quorum.leaderEndpoints());
    }

    private VoteResponseData handleVoteRequest(RaftRequest.Inbound requestMetadata) {
        Optional<ReplicaKey> voterKey;
        boolean isIllegalEpoch;
        VoteRequestData request = (VoteRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(request.clusterId())) {
            return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(request, this.log.topicPartition())) {
            return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        VoteRequestData.PartitionData partitionRequest = (VoteRequestData.PartitionData)((VoteRequestData.TopicData)request.topics().get(0)).partitions().get(0);
        int replicaId = partitionRequest.replicaId();
        int replicaEpoch = partitionRequest.replicaEpoch();
        boolean preVote = partitionRequest.preVote();
        int lastEpoch = partitionRequest.lastOffsetEpoch();
        long lastEpochEndOffset = partitionRequest.lastOffset();
        boolean bl = preVote ? lastEpoch > replicaEpoch : (isIllegalEpoch = lastEpoch >= replicaEpoch);
        if (isIllegalEpoch) {
            this.logger.info("Received a vote request from replica {} with illegal epoch {}, last epoch {}, preVote={}", new Object[]{replicaId, replicaEpoch, lastEpoch, preVote});
        }
        if (lastEpochEndOffset < 0L || lastEpoch < 0 || isIllegalEpoch) {
            return this.buildVoteResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.INVALID_REQUEST, false);
        }
        Optional<Errors> errorOpt = this.validateVoterOnlyRequest(replicaId, replicaEpoch);
        if (errorOpt.isPresent()) {
            return this.buildVoteResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), errorOpt.get(), false);
        }
        if (replicaEpoch > this.quorum.epoch()) {
            this.transitionToUnattached(replicaEpoch, OptionalInt.empty());
        }
        if (!this.isValidVoterKey(voterKey = RaftUtil.voteRequestVoterKey(request, partitionRequest))) {
            this.logger.info("A replica {} sent a voter key ({}) in the VOTE request that doesn't match the local key ({}, {}); rejecting the vote", new Object[]{replicaId, voterKey, this.nodeId, this.nodeDirectoryId});
            return this.buildVoteResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.INVALID_VOTER_KEY, false);
        }
        OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
        ReplicaKey replicaKey = ReplicaKey.of(replicaId, partitionRequest.replicaDirectoryId());
        boolean voteGranted = this.quorum.canGrantVote(replicaKey, lastEpochEndOffsetAndEpoch.compareTo(this.endOffset()) >= 0, preVote);
        if (!preVote && voteGranted) {
            if (this.quorum.isUnattachedNotVoted()) {
                this.quorum.unattachedAddVotedState(replicaEpoch, replicaKey);
            } else if (this.quorum.isProspectiveNotVoted()) {
                this.quorum.prospectiveAddVotedState(replicaEpoch, replicaKey);
            }
        }
        this.logger.info("Vote request {} with epoch {} is {}", new Object[]{request, replicaEpoch, voteGranted ? "granted" : "rejected"});
        return this.buildVoteResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.NONE, voteGranted);
    }

    private boolean handleVoteResponse(RaftResponse.Inbound responseMetadata, long currentTimeMs) {
        int remoteNodeId = responseMetadata.source().id();
        VoteResponseData response = (VoteResponseData)responseMetadata.data();
        Errors topLevelError = Errors.forCode((short)response.errorCode());
        if (topLevelError == Errors.UNSUPPORTED_VERSION && this.quorum.isProspective()) {
            this.logger.info("Prospective received unsupported version error in vote response in epoch {}, transitioning to Candidate state immediately since at least one voter doesn't support PreVote.", (Object)this.quorum.epoch());
            this.transitionToCandidate(currentTimeMs);
            return true;
        }
        if (topLevelError != Errors.NONE) {
            return this.handleTopLevelError(topLevelError, responseMetadata);
        }
        if (!RaftUtil.hasValidTopicPartition(response, this.log.topicPartition())) {
            return false;
        }
        VoteResponseData.PartitionData partitionResponse = (VoteResponseData.PartitionData)((VoteResponseData.TopicData)response.topics().get(0)).partitions().get(0);
        Errors error = Errors.forCode((short)partitionResponse.errorCode());
        OptionalInt responseLeaderId = KafkaRaftClient.optionalLeaderId(partitionResponse.leaderId());
        int responseEpoch = partitionResponse.leaderEpoch();
        Endpoints leaderEndpoints = responseLeaderId.isPresent() ? (response.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(responseLeaderId.getAsInt()) : Endpoints.fromVoteResponse(this.channel.listenerName(), responseLeaderId.getAsInt(), response.nodeEndpoints())) : Endpoints.empty();
        Optional<Boolean> handled = this.maybeHandleCommonResponse(error, responseLeaderId, responseEpoch, leaderEndpoints, responseMetadata.source(), currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }
        if (error == Errors.NONE) {
            if (this.quorum.isLeader()) {
                this.logger.debug("Ignoring vote response {} since we already became leader for epoch {}", (Object)partitionResponse, (Object)this.quorum.epoch());
            } else if (this.quorum.isNomineeState()) {
                NomineeState state = this.quorum.nomineeStateOrThrow();
                if (partitionResponse.voteGranted()) {
                    state.recordGrantedVote(remoteNodeId);
                    this.maybeTransitionForward(state, currentTimeMs);
                } else {
                    state.recordRejectedVote(remoteNodeId);
                    this.maybeHandleElectionLoss(state, currentTimeMs);
                }
            } else {
                this.logger.debug("Ignoring vote response {} since we are no longer a NomineeState (Prospective or Candidate) in epoch {}", (Object)partitionResponse, (Object)this.quorum.epoch());
            }
            return true;
        }
        return this.handleUnexpectedError(error, responseMetadata);
    }

    private void maybeHandleElectionLoss(NomineeState state, long currentTimeMs) {
        if (state instanceof CandidateState) {
            CandidateState candidate = (CandidateState)state;
            if (candidate.epochElection().isVoteRejected()) {
                this.logger.info("Insufficient remaining votes to become leader. Candidate will wait the remaining election timeout ({}) before transitioning back to Prospective. Current epoch election state is {}.", (Object)candidate.remainingElectionTimeMs(currentTimeMs), (Object)candidate.epochElection());
            }
        } else if (state instanceof ProspectiveState) {
            ProspectiveState prospective = (ProspectiveState)state;
            if (prospective.epochElection().isVoteRejected()) {
                this.logger.info("Insufficient remaining votes to become candidate. Current epoch election state is {}. ", (Object)prospective.epochElection());
                this.prospectiveTransitionAfterElectionLoss(prospective, currentTimeMs);
            }
        } else {
            throw new IllegalStateException("Expected to be a NomineeState (Prospective or Candidate), but current state is " + String.valueOf(state));
        }
    }

    private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) {
        if (positionInSuccessors == 0) {
            return 0;
        }
        if (positionInSuccessors < 0 || positionInSuccessors >= totalNumSuccessors) {
            return this.quorumConfig.electionBackoffMaxMs();
        }
        int retryBackOffBaseMs = this.quorumConfig.electionBackoffMaxMs() >> totalNumSuccessors - 1;
        return Math.min(this.quorumConfig.electionBackoffMaxMs(), retryBackOffBaseMs << positionInSuccessors - 1);
    }

    private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(ListenerName listenerName, short apiVersion, Errors partitionLevelError) {
        return RaftUtil.singletonBeginQuorumEpochResponse(listenerName, apiVersion, Errors.NONE, this.log.topicPartition(), partitionLevelError, this.quorum.epoch(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints());
    }

    private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        int requestEpoch;
        BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(request.clusterId())) {
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(request, this.log.topicPartition())) {
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        BeginQuorumEpochRequestData.PartitionData partitionRequest = (BeginQuorumEpochRequestData.PartitionData)((BeginQuorumEpochRequestData.TopicData)request.topics().get(0)).partitions().get(0);
        int requestLeaderId = partitionRequest.leaderId();
        Optional<Errors> errorOpt = this.validateVoterOnlyRequest(requestLeaderId, requestEpoch = partitionRequest.leaderEpoch());
        if (errorOpt.isPresent()) {
            return this.buildBeginQuorumEpochResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), errorOpt.get());
        }
        Endpoints leaderEndpoints = request.leaderEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(requestLeaderId) : Endpoints.fromBeginQuorumEpochRequest(request.leaderEndpoints());
        this.maybeTransition(OptionalInt.of(requestLeaderId), requestEpoch, leaderEndpoints, currentTimeMs);
        Optional<ReplicaKey> voterKey = RaftUtil.beginQuorumEpochRequestVoterKey(request, partitionRequest);
        if (!this.isValidVoterKey(voterKey)) {
            this.logger.info("Leader sent a voter key ({}) in the BEGIN_QUORUM_EPOCH request that doesn't match the local key ({}, {}); returning INVALID_VOTER_KEY", new Object[]{voterKey, this.nodeId, this.nodeDirectoryId});
            return this.buildBeginQuorumEpochResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.INVALID_VOTER_KEY);
        }
        return this.buildBeginQuorumEpochResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.NONE);
    }

    private boolean handleBeginQuorumEpochResponse(RaftResponse.Inbound responseMetadata, long currentTimeMs) {
        int remoteNodeId = responseMetadata.source().id();
        BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData)responseMetadata.data();
        Errors topLevelError = Errors.forCode((short)response.errorCode());
        if (topLevelError != Errors.NONE) {
            return this.handleTopLevelError(topLevelError, responseMetadata);
        }
        if (!RaftUtil.hasValidTopicPartition(response, this.log.topicPartition())) {
            return false;
        }
        BeginQuorumEpochResponseData.PartitionData partitionResponse = (BeginQuorumEpochResponseData.PartitionData)((BeginQuorumEpochResponseData.TopicData)response.topics().get(0)).partitions().get(0);
        Errors partitionError = Errors.forCode((short)partitionResponse.errorCode());
        OptionalInt responseLeaderId = KafkaRaftClient.optionalLeaderId(partitionResponse.leaderId());
        int responseEpoch = partitionResponse.leaderEpoch();
        Endpoints leaderEndpoints = responseLeaderId.isPresent() ? (response.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(responseLeaderId.getAsInt()) : Endpoints.fromBeginQuorumEpochResponse(this.channel.listenerName(), responseLeaderId.getAsInt(), response.nodeEndpoints())) : Endpoints.empty();
        Optional<Boolean> handled = this.maybeHandleCommonResponse(partitionError, responseLeaderId, responseEpoch, leaderEndpoints, responseMetadata.source(), currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }
        if (partitionError == Errors.NONE) {
            if (this.quorum.isLeader()) {
                LeaderState state = this.quorum.leaderStateOrThrow();
                state.addAcknowledgementFrom(remoteNodeId);
            } else {
                this.logger.debug("Ignoring BeginQuorumEpoch response {} since this node is not the leader anymore", (Object)response);
            }
            return true;
        }
        return this.handleUnexpectedError(partitionError, responseMetadata);
    }

    private EndQuorumEpochResponseData buildEndQuorumEpochResponse(ListenerName listenerName, short apiVersion, Errors partitionLevelError) {
        return RaftUtil.singletonEndQuorumEpochResponse(listenerName, apiVersion, Errors.NONE, this.log.topicPartition(), partitionLevelError, this.quorum.epoch(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints());
    }

    private EndQuorumEpochResponseData handleEndQuorumEpochRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        FollowerState state;
        EndQuorumEpochRequestData request = (EndQuorumEpochRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(request.clusterId())) {
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(request, this.log.topicPartition())) {
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        EndQuorumEpochRequestData.PartitionData partitionRequest = (EndQuorumEpochRequestData.PartitionData)((EndQuorumEpochRequestData.TopicData)request.topics().get(0)).partitions().get(0);
        int requestEpoch = partitionRequest.leaderEpoch();
        int requestLeaderId = partitionRequest.leaderId();
        Optional<Errors> errorOpt = this.validateVoterOnlyRequest(requestLeaderId, requestEpoch);
        if (errorOpt.isPresent()) {
            return this.buildEndQuorumEpochResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), errorOpt.get());
        }
        Endpoints leaderEndpoints = request.leaderEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(requestLeaderId) : Endpoints.fromEndQuorumEpochRequest(request.leaderEndpoints());
        this.maybeTransition(OptionalInt.of(requestLeaderId), requestEpoch, leaderEndpoints, currentTimeMs);
        if (this.quorum.isFollower() && (state = this.quorum.followerStateOrThrow()).leaderId() == requestLeaderId) {
            List<ReplicaKey> preferredCandidates = EndQuorumEpochRequest.preferredCandidates((EndQuorumEpochRequestData.PartitionData)partitionRequest).stream().map(replica -> ReplicaKey.of(replica.candidateId(), replica.candidateDirectoryId())).collect(Collectors.toList());
            long electionBackoffMs = this.endEpochElectionBackoff(preferredCandidates);
            this.logger.debug("Overriding follower fetch timeout to {} after receiving EndQuorumEpoch request from leader {} in epoch {}", new Object[]{electionBackoffMs, requestLeaderId, requestEpoch});
            state.overrideFetchTimeout(currentTimeMs, electionBackoffMs);
        }
        return this.buildEndQuorumEpochResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.NONE);
    }

    private long endEpochElectionBackoff(Collection<ReplicaKey> preferredCandidates) {
        int position = 0;
        for (ReplicaKey candidate : preferredCandidates) {
            if (candidate.id() == this.quorum.localIdOrThrow() && (candidate.directoryId().isEmpty() || candidate.directoryId().get().equals((Object)this.quorum.localDirectoryId()))) break;
            ++position;
        }
        return this.strictExponentialElectionBackoffMs(position, preferredCandidates.size());
    }

    private boolean handleEndQuorumEpochResponse(RaftResponse.Inbound responseMetadata, long currentTimeMs) {
        EndQuorumEpochResponseData response = (EndQuorumEpochResponseData)responseMetadata.data();
        Errors topLevelError = Errors.forCode((short)response.errorCode());
        if (topLevelError != Errors.NONE) {
            return this.handleTopLevelError(topLevelError, responseMetadata);
        }
        if (!RaftUtil.hasValidTopicPartition(response, this.log.topicPartition())) {
            return false;
        }
        EndQuorumEpochResponseData.PartitionData partitionResponse = (EndQuorumEpochResponseData.PartitionData)((EndQuorumEpochResponseData.TopicData)response.topics().get(0)).partitions().get(0);
        Errors partitionError = Errors.forCode((short)partitionResponse.errorCode());
        OptionalInt responseLeaderId = KafkaRaftClient.optionalLeaderId(partitionResponse.leaderId());
        int responseEpoch = partitionResponse.leaderEpoch();
        Endpoints leaderEndpoints = responseLeaderId.isPresent() ? (response.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(responseLeaderId.getAsInt()) : Endpoints.fromEndQuorumEpochResponse(this.channel.listenerName(), responseLeaderId.getAsInt(), response.nodeEndpoints())) : Endpoints.empty();
        Optional<Boolean> handled = this.maybeHandleCommonResponse(partitionError, responseLeaderId, responseEpoch, leaderEndpoints, responseMetadata.source(), currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }
        if (partitionError == Errors.NONE) {
            ResignedState resignedState = this.quorum.resignedStateOrThrow();
            resignedState.acknowledgeResignation(responseMetadata.source().id());
            return true;
        }
        return this.handleUnexpectedError(partitionError, responseMetadata);
    }

    private FetchResponseData buildFetchResponse(ListenerName listenerName, short apiVersion, Errors error, Records records, ValidOffsetAndEpoch validOffsetAndEpoch, Optional<LogOffsetMetadata> highWatermark) {
        return RaftUtil.singletonFetchResponse(listenerName, apiVersion, this.log.topicPartition(), this.log.topicId(), Errors.NONE, this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), partitionData -> {
            partitionData.setRecords((BaseRecords)records).setErrorCode(error.code()).setLogStartOffset(this.log.startOffset()).setHighWatermark(highWatermark.map(LogOffsetMetadata::offset).orElse(-1L).longValue());
            partitionData.currentLeader().setLeaderEpoch(this.quorum.epoch()).setLeaderId(this.quorum.leaderIdOrSentinel());
            switch (validOffsetAndEpoch.kind()) {
                case DIVERGING: {
                    partitionData.divergingEpoch().setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch()).setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset());
                    break;
                }
                case SNAPSHOT: {
                    partitionData.snapshotId().setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch()).setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset());
                    break;
                }
            }
        });
    }

    private FetchResponseData buildEmptyFetchResponse(ListenerName listenerName, short apiVersion, Errors error, Optional<LogOffsetMetadata> highWatermark) {
        return this.buildFetchResponse(listenerName, apiVersion, error, (Records)MemoryRecords.EMPTY, ValidOffsetAndEpoch.valid(), highWatermark);
    }

    private boolean hasValidClusterId(String requestClusterId) {
        if (requestClusterId == null) {
            return true;
        }
        return this.clusterId.equals(requestClusterId);
    }

    private CompletableFuture<FetchResponseData> handleFetchRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        FetchRequestData request = (FetchRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(request.clusterId())) {
            return CompletableFuture.completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
        }
        if (!RaftUtil.hasValidTopicPartition(request, this.log.topicPartition(), this.log.topicId())) {
            return CompletableFuture.completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
        }
        ((FetchRequestData.FetchTopic)request.topics().get(0)).setTopic(this.log.topicPartition().topic());
        FetchRequestData.FetchPartition fetchPartition = (FetchRequestData.FetchPartition)((FetchRequestData.FetchTopic)request.topics().get(0)).partitions().get(0);
        if (request.maxWaitMs() < 0 || fetchPartition.fetchOffset() < 0L || fetchPartition.lastFetchedEpoch() < 0 || fetchPartition.lastFetchedEpoch() > fetchPartition.currentLeaderEpoch()) {
            return CompletableFuture.completedFuture(this.buildEmptyFetchResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.INVALID_REQUEST, Optional.empty()));
        }
        ReplicaKey replicaKey = ReplicaKey.of(FetchRequest.replicaId((FetchRequestData)request), fetchPartition.replicaDirectoryId());
        FetchResponseData response = this.tryCompleteFetchRequest(requestMetadata.listenerName(), requestMetadata.apiVersion(), replicaKey, fetchPartition, currentTimeMs);
        FetchResponseData.PartitionData partitionResponse = (FetchResponseData.PartitionData)((FetchResponseData.FetchableTopicResponse)response.responses().get(0)).partitions().get(0);
        if (partitionResponse.errorCode() != Errors.NONE.code() || FetchResponse.recordsSize((FetchResponseData.PartitionData)partitionResponse) > 0 || request.maxWaitMs() == 0 || KafkaRaftClient.isPartitionDiverged(partitionResponse) || KafkaRaftClient.isPartitionSnapshotted(partitionResponse) || KafkaRaftClient.isHighWatermarkUpdated(partitionResponse, fetchPartition)) {
            return CompletableFuture.completedFuture(response);
        }
        CompletableFuture<Long> future = this.fetchPurgatory.await(fetchPartition.fetchOffset(), request.maxWaitMs());
        return future.handle((T completionTimeMs, U exception) -> {
            if (exception != null) {
                Throwable cause = exception instanceof ExecutionException ? exception.getCause() : exception;
                Errors error = Errors.forException((Throwable)cause);
                if (error == Errors.REQUEST_TIMED_OUT) {
                    return response;
                }
                this.logger.info("Failed to handle fetch from {} at {} due to {}", new Object[]{replicaKey, fetchPartition.fetchOffset(), error});
                return this.buildEmptyFetchResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), error, Optional.empty());
            }
            this.logger.trace("Completing delayed fetch from {} starting at offset {} at {}", new Object[]{replicaKey, fetchPartition.fetchOffset(), completionTimeMs});
            return this.tryCompleteFetchRequest(requestMetadata.listenerName(), requestMetadata.apiVersion(), replicaKey, fetchPartition, (long)completionTimeMs);
        });
    }

    private FetchResponseData tryCompleteFetchRequest(ListenerName listenerName, short apiVersion, ReplicaKey replicaKey, FetchRequestData.FetchPartition request, long currentTimeMs) {
        try {
            MemoryRecords records;
            Optional<Errors> errorOpt = this.validateLeaderOnlyRequest(request.currentLeaderEpoch());
            if (errorOpt.isPresent()) {
                return this.buildEmptyFetchResponse(listenerName, apiVersion, errorOpt.get(), Optional.empty());
            }
            long fetchOffset = request.fetchOffset();
            int lastFetchedEpoch = request.lastFetchedEpoch();
            LeaderState state = this.quorum.leaderStateOrThrow();
            Optional<OffsetAndEpoch> latestSnapshotId = this.log.latestSnapshotId();
            ValidOffsetAndEpoch validOffsetAndEpoch = fetchOffset == 0L && latestSnapshotId.isPresent() && !latestSnapshotId.get().equals((Object)Snapshots.BOOTSTRAP_SNAPSHOT_ID) ? ValidOffsetAndEpoch.snapshot(latestSnapshotId.get()) : this.log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
            if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                LogFetchInfo info = this.log.read(fetchOffset, Isolation.UNCOMMITTED);
                if (state.updateReplicaState(replicaKey, currentTimeMs, info.startOffsetMetadata)) {
                    this.onUpdateLeaderHighWatermark(state, currentTimeMs);
                }
                records = info.records;
            } else {
                records = MemoryRecords.EMPTY;
            }
            return this.buildFetchResponse(listenerName, apiVersion, Errors.NONE, (Records)records, validOffsetAndEpoch, state.highWatermark());
        }
        catch (Exception e) {
            this.logger.error("Caught unexpected error in fetch completion of request {}", (Object)request, (Object)e);
            return this.buildEmptyFetchResponse(listenerName, apiVersion, Errors.UNKNOWN_SERVER_ERROR, Optional.empty());
        }
    }

    private static boolean isPartitionDiverged(FetchResponseData.PartitionData partitionResponseData) {
        FetchResponseData.EpochEndOffset divergingEpoch = partitionResponseData.divergingEpoch();
        return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != -1L;
    }

    private static boolean isPartitionSnapshotted(FetchResponseData.PartitionData partitionResponseData) {
        FetchResponseData.SnapshotId snapshotId = partitionResponseData.snapshotId();
        return snapshotId.epoch() != -1 || snapshotId.endOffset() != -1L;
    }

    private static boolean isHighWatermarkUpdated(FetchResponseData.PartitionData partitionResponseData, FetchRequestData.FetchPartition partitionRequestData) {
        return partitionRequestData.highWatermark() < partitionResponseData.highWatermark();
    }

    private static OptionalInt optionalLeaderId(int leaderIdOrNil) {
        if (leaderIdOrNil < 0) {
            return OptionalInt.empty();
        }
        return OptionalInt.of(leaderIdOrNil);
    }

    private static String listenerName(RaftClient.Listener<?> listener) {
        return String.format("%s@%d", listener.getClass().getTypeName(), System.identityHashCode(listener));
    }

    private boolean handleFetchResponse(RaftResponse.Inbound responseMetadata, long currentTimeMs) {
        FetchResponseData response = (FetchResponseData)responseMetadata.data();
        Errors topLevelError = Errors.forCode((short)response.errorCode());
        if (topLevelError != Errors.NONE) {
            return this.handleTopLevelError(topLevelError, responseMetadata);
        }
        if (!RaftUtil.hasValidTopicPartition(response, this.log.topicPartition(), this.log.topicId())) {
            return false;
        }
        ((FetchResponseData.FetchableTopicResponse)response.responses().get(0)).setTopic(this.log.topicPartition().topic());
        FetchResponseData.PartitionData partitionResponse = (FetchResponseData.PartitionData)((FetchResponseData.FetchableTopicResponse)response.responses().get(0)).partitions().get(0);
        FetchResponseData.LeaderIdAndEpoch currentLeaderIdAndEpoch = partitionResponse.currentLeader();
        OptionalInt responseLeaderId = KafkaRaftClient.optionalLeaderId(currentLeaderIdAndEpoch.leaderId());
        int responseEpoch = currentLeaderIdAndEpoch.leaderEpoch();
        Errors error = Errors.forCode((short)partitionResponse.errorCode());
        Endpoints leaderEndpoints = responseLeaderId.isPresent() ? (response.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(responseLeaderId.getAsInt()) : Endpoints.fromFetchResponse(this.channel.listenerName(), responseLeaderId.getAsInt(), response.nodeEndpoints())) : Endpoints.empty();
        Optional<Boolean> handled = this.maybeHandleCommonResponse(error, responseLeaderId, responseEpoch, leaderEndpoints, responseMetadata.source(), currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }
        FollowerState state = this.quorum.followerStateOrThrow();
        if (error == Errors.NONE) {
            FetchResponseData.EpochEndOffset divergingEpoch = partitionResponse.divergingEpoch();
            if (divergingEpoch.epoch() >= 0) {
                OffsetAndEpoch divergingOffsetAndEpoch = new OffsetAndEpoch(divergingEpoch.endOffset(), divergingEpoch.epoch());
                state.highWatermark().ifPresent(highWatermark -> {
                    if (divergingOffsetAndEpoch.offset() < highWatermark.offset()) {
                        throw new KafkaException("The leader requested truncation to offset " + divergingOffsetAndEpoch.offset() + ", which is below the current high watermark " + String.valueOf(highWatermark));
                    }
                });
                long truncationOffset = this.log.truncateToEndOffset(divergingOffsetAndEpoch);
                this.logger.info("Truncated to offset {} from Fetch response from leader {}", (Object)truncationOffset, (Object)this.quorum.leaderIdOrSentinel());
                this.partitionState.truncateNewEntries(truncationOffset);
            } else if (partitionResponse.snapshotId().epoch() >= 0 || partitionResponse.snapshotId().endOffset() >= 0L) {
                if (partitionResponse.snapshotId().epoch() < 0) {
                    this.logger.error("The leader sent a snapshot id with a valid end offset {} but with an invalid epoch {}", (Object)partitionResponse.snapshotId().endOffset(), (Object)partitionResponse.snapshotId().epoch());
                    return false;
                }
                if (partitionResponse.snapshotId().endOffset() < 0L) {
                    this.logger.error("The leader sent a snapshot id with a valid epoch {} but with an invalid end offset {}", (Object)partitionResponse.snapshotId().epoch(), (Object)partitionResponse.snapshotId().endOffset());
                    return false;
                }
                OffsetAndEpoch snapshotId = new OffsetAndEpoch(partitionResponse.snapshotId().endOffset(), partitionResponse.snapshotId().epoch());
                state.setFetchingSnapshot(this.log.createNewSnapshotUnchecked(snapshotId));
                if (state.fetchingSnapshot().isPresent()) {
                    this.logger.info("Fetching snapshot {} from Fetch response from leader {}", (Object)snapshotId, (Object)this.quorum.leaderIdOrSentinel());
                } else {
                    this.logger.info("Leader {} returned a snapshot {} in the FETCH response which is already stored", (Object)this.quorum.leaderIdOrSentinel(), (Object)snapshotId);
                }
            } else {
                this.appendAsFollower(FetchResponse.recordsOrFail((FetchResponseData.PartitionData)partitionResponse));
                OptionalLong highWatermark2 = partitionResponse.highWatermark() < 0L ? OptionalLong.empty() : OptionalLong.of(partitionResponse.highWatermark());
                this.updateFollowerHighWatermark(state, highWatermark2);
            }
            state.resetFetchTimeoutForSuccessfulFetch(currentTimeMs);
            return true;
        }
        return this.handleUnexpectedError(error, responseMetadata);
    }

    private static String convertToHexadecimal(Records records) {
        ByteBuffer buffer = ((MemoryRecords)records).buffer();
        byte[] bytes = new byte[Math.min(buffer.remaining(), 61)];
        buffer.get(bytes);
        return HexFormat.of().formatHex(bytes);
    }

    private void appendAsFollower(Records records) {
        if (records.sizeInBytes() == 0) {
            return;
        }
        try {
            LogAppendInfo info = this.log.appendAsFollower(records, this.quorum.epoch());
            this.kafkaRaftMetrics.updateFetchedRecords(info.lastOffset() - info.firstOffset() + 1L);
        }
        catch (InvalidRecordException | CorruptRecordException e) {
            this.logger.info("Failed to append the records with the batch header '{}' to the log", (Object)KafkaRaftClient.convertToHexadecimal(records), (Object)e);
        }
        if (this.quorum.isVoter() || this.followersAlwaysFlush) {
            this.log.flush(false);
        }
        this.partitionState.updateState();
        OffsetAndEpoch endOffset = this.endOffset();
        this.kafkaRaftMetrics.updateLogEnd(endOffset);
        this.logger.trace("Follower end offset updated to {} after append", (Object)endOffset);
    }

    private LogAppendInfo appendAsLeader(Records records) {
        LogAppendInfo info = this.log.appendAsLeader(records, this.quorum.epoch());
        this.partitionState.updateState();
        OffsetAndEpoch endOffset = this.endOffset();
        this.kafkaRaftMetrics.updateAppendRecords(info.lastOffset() - info.firstOffset() + 1L);
        this.kafkaRaftMetrics.updateLogEnd(endOffset);
        this.logger.trace("Leader appended records at base offset {}, new end offset is {}", (Object)info.firstOffset(), (Object)endOffset);
        return info;
    }

    private DescribeQuorumResponseData handleDescribeQuorumRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        DescribeQuorumRequestData describeQuorumRequestData = (DescribeQuorumRequestData)requestMetadata.data();
        if (!RaftUtil.hasValidTopicPartition(describeQuorumRequestData, this.log.topicPartition())) {
            return DescribeQuorumRequest.getPartitionLevelErrorResponse((DescribeQuorumRequestData)describeQuorumRequestData, (Errors)Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        if (!this.quorum.isLeader()) {
            return DescribeQuorumResponse.singletonErrorResponse((TopicPartition)this.log.topicPartition(), (Errors)Errors.NOT_LEADER_OR_FOLLOWER);
        }
        LeaderState leaderState = this.quorum.leaderStateOrThrow();
        return RaftUtil.singletonDescribeQuorumResponse(requestMetadata.apiVersion(), this.log.topicPartition(), this.quorum.localIdOrThrow(), leaderState.epoch(), leaderState.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L), leaderState.voterStates().values(), leaderState.observerStates(currentTimeMs).values(), currentTimeMs);
    }

    private FetchSnapshotResponseData handleFetchSnapshotRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        int maxSnapshotSize;
        FetchSnapshotRequestData data = (FetchSnapshotRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(data.clusterId())) {
            return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (data.topics().size() != 1 && ((FetchSnapshotRequestData.TopicSnapshot)data.topics().get(0)).partitions().size() != 1) {
            return FetchSnapshotResponse.withTopLevelError((Errors)Errors.INVALID_REQUEST);
        }
        Optional partitionSnapshotOpt = FetchSnapshotRequest.forTopicPartition((FetchSnapshotRequestData)data, (TopicPartition)this.log.topicPartition());
        if (partitionSnapshotOpt.isEmpty()) {
            TopicPartition unknownTopicPartition = new TopicPartition(((FetchSnapshotRequestData.TopicSnapshot)data.topics().get(0)).name(), ((FetchSnapshotRequestData.PartitionSnapshot)((FetchSnapshotRequestData.TopicSnapshot)data.topics().get(0)).partitions().get(0)).partition());
            return RaftUtil.singletonFetchSnapshotResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), unknownTopicPartition, this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), responsePartitionSnapshot -> responsePartitionSnapshot.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
        }
        FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = (FetchSnapshotRequestData.PartitionSnapshot)partitionSnapshotOpt.get();
        Optional<Errors> leaderValidation = this.validateLeaderOnlyRequest(partitionSnapshot.currentLeaderEpoch());
        if (leaderValidation.isPresent()) {
            return RaftUtil.singletonFetchSnapshotResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), responsePartitionSnapshot -> this.addQuorumLeader((FetchSnapshotResponseData.PartitionSnapshot)responsePartitionSnapshot).setErrorCode(((Errors)leaderValidation.get()).code()));
        }
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(partitionSnapshot.snapshotId().endOffset(), partitionSnapshot.snapshotId().epoch());
        Optional<RawSnapshotReader> snapshotOpt = this.log.readSnapshot(snapshotId);
        if (snapshotOpt.isEmpty() || snapshotId.equals((Object)Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
            return RaftUtil.singletonFetchSnapshotResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), responsePartitionSnapshot -> this.addQuorumLeader((FetchSnapshotResponseData.PartitionSnapshot)responsePartitionSnapshot).setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()));
        }
        RawSnapshotReader snapshot = snapshotOpt.get();
        long snapshotSize = snapshot.sizeInBytes();
        if (partitionSnapshot.position() < 0L || partitionSnapshot.position() >= snapshotSize) {
            return RaftUtil.singletonFetchSnapshotResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), responsePartitionSnapshot -> this.addQuorumLeader((FetchSnapshotResponseData.PartitionSnapshot)responsePartitionSnapshot).setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()));
        }
        if (partitionSnapshot.position() > Integer.MAX_VALUE) {
            throw new IllegalStateException(String.format("Trying to fetch a snapshot with size (%d) and a position (%d) larger than %d", snapshotSize, partitionSnapshot.position(), Integer.MAX_VALUE));
        }
        try {
            maxSnapshotSize = Math.toIntExact(snapshotSize);
        }
        catch (ArithmeticException e) {
            maxSnapshotSize = Integer.MAX_VALUE;
        }
        UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));
        LeaderState state = this.quorum.leaderStateOrThrow();
        state.updateCheckQuorumForFollowingVoter(ReplicaKey.of(data.replicaId(), partitionSnapshot.replicaDirectoryId()), currentTimeMs);
        return RaftUtil.singletonFetchSnapshotResponse(requestMetadata.listenerName(), requestMetadata.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), responsePartitionSnapshot -> {
            this.addQuorumLeader((FetchSnapshotResponseData.PartitionSnapshot)responsePartitionSnapshot).snapshotId().setEndOffset(snapshotId.offset()).setEpoch(snapshotId.epoch());
            return responsePartitionSnapshot.setSize(snapshotSize).setPosition(partitionSnapshot.position()).setUnalignedRecords((BaseRecords)records);
        });
    }

    private Endpoints computeFetchSnapshotLeaderEndpoints(OptionalInt leaderId, FetchSnapshotResponseData.NodeEndpointCollection nodeEndpoints) {
        Endpoints leaderEndpoints = Endpoints.empty();
        if (leaderId.isPresent() && (leaderEndpoints = Endpoints.fromFetchSnapshotResponse(this.channel.listenerName(), leaderId.getAsInt(), nodeEndpoints)).isEmpty()) {
            leaderEndpoints = this.partitionState.lastVoterSet().listeners(leaderId.getAsInt());
        }
        return leaderEndpoints;
    }

    private boolean handleFetchSnapshotResponse(RaftResponse.Inbound responseMetadata, long currentTimeMs) {
        UnalignedMemoryRecords records;
        Endpoints leaderEndpoints;
        FetchSnapshotResponseData data = (FetchSnapshotResponseData)responseMetadata.data();
        Errors topLevelError = Errors.forCode((short)data.errorCode());
        if (topLevelError != Errors.NONE) {
            return this.handleTopLevelError(topLevelError, responseMetadata);
        }
        if (data.topics().size() != 1 && ((FetchSnapshotResponseData.TopicSnapshot)data.topics().get(0)).partitions().size() != 1) {
            return false;
        }
        Optional partitionSnapshotOpt = FetchSnapshotResponse.forTopicPartition((FetchSnapshotResponseData)data, (TopicPartition)this.log.topicPartition());
        if (partitionSnapshotOpt.isEmpty()) {
            return false;
        }
        FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = (FetchSnapshotResponseData.PartitionSnapshot)partitionSnapshotOpt.get();
        FetchSnapshotResponseData.LeaderIdAndEpoch currentLeaderIdAndEpoch = partitionSnapshot.currentLeader();
        OptionalInt responseLeaderId = KafkaRaftClient.optionalLeaderId(currentLeaderIdAndEpoch.leaderId());
        int responseEpoch = currentLeaderIdAndEpoch.leaderEpoch();
        Errors error = Errors.forCode((short)partitionSnapshot.errorCode());
        Optional<Boolean> handled = this.maybeHandleCommonResponse(error, responseLeaderId, responseEpoch, leaderEndpoints = this.computeFetchSnapshotLeaderEndpoints(responseLeaderId, data.nodeEndpoints()), responseMetadata.source(), currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }
        FollowerState state = this.quorum.followerStateOrThrow();
        if (error == Errors.SNAPSHOT_NOT_FOUND || partitionSnapshot.snapshotId().endOffset() < 0L || partitionSnapshot.snapshotId().epoch() < 0) {
            this.logger.info("Leader doesn't know about snapshot id {}, returned error {} and snapshot id {}", new Object[]{state.fetchingSnapshot(), partitionSnapshot.errorCode(), partitionSnapshot.snapshotId()});
            state.setFetchingSnapshot(Optional.empty());
            state.resetFetchTimeoutForSuccessfulFetch(currentTimeMs);
            return true;
        }
        if (error != Errors.NONE) {
            return this.handleUnexpectedError(error, responseMetadata);
        }
        OffsetAndEpoch snapshotId = new OffsetAndEpoch(partitionSnapshot.snapshotId().endOffset(), partitionSnapshot.snapshotId().epoch());
        RawSnapshotWriter snapshot = state.fetchingSnapshot().orElseThrow(() -> new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot)));
        if (!snapshot.snapshotId().equals((Object)snapshotId)) {
            throw new IllegalStateException(String.format("Received fetch snapshot response with an invalid id. Expected %s; Received %s", snapshot.snapshotId(), snapshotId));
        }
        if (snapshot.sizeInBytes() != partitionSnapshot.position()) {
            throw new IllegalStateException(String.format("Received fetch snapshot response with an invalid position. Expected %d; Received %d", snapshot.sizeInBytes(), partitionSnapshot.position()));
        }
        if (partitionSnapshot.unalignedRecords() instanceof MemoryRecords) {
            records = new UnalignedMemoryRecords(((MemoryRecords)partitionSnapshot.unalignedRecords()).buffer());
        } else if (partitionSnapshot.unalignedRecords() instanceof UnalignedMemoryRecords) {
            records = (UnalignedMemoryRecords)partitionSnapshot.unalignedRecords();
        } else {
            throw new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot));
        }
        snapshot.append(records);
        if (snapshot.sizeInBytes() == partitionSnapshot.size()) {
            snapshot.freeze();
            state.setFetchingSnapshot(Optional.empty());
            if (this.log.truncateToLatestSnapshot()) {
                this.logger.info("Fully truncated the log at ({}, {}) after downloading snapshot {} from leader {}", new Object[]{this.log.endOffset(), this.log.lastFetchedEpoch(), snapshot.snapshotId(), this.quorum.leaderIdOrSentinel()});
                this.partitionState.updateState();
                this.updateFollowerHighWatermark(state, OptionalLong.of(this.log.highWatermark().offset()));
            } else {
                throw new IllegalStateException(String.format("Full log truncation expected but didn't happen. Snapshot of %s, log end offset %s, last fetched %d", snapshot.snapshotId(), this.log.endOffset(), this.log.lastFetchedEpoch()));
            }
        }
        state.resetFetchTimeoutForSuccessfulFetch(currentTimeMs);
        return true;
    }

    private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        AddRaftVoterRequestData data = (AddRaftVoterRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(data.clusterId())) {
            return CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()).setErrorMessage(String.format("The given id \"%s\" doesn't match the cluster id \"%s\"", data.clusterId(), this.clusterId)));
        }
        Optional<Errors> leaderValidationError = this.validateLeaderOnlyRequest(this.quorum.epoch());
        if (leaderValidationError.isPresent()) {
            return CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(leaderValidationError.get().code()));
        }
        Optional<ReplicaKey> newVoter = RaftUtil.addVoterRequestVoterKey(data);
        if (newVoter.isEmpty() || newVoter.get().directoryId().isEmpty()) {
            return CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Add voter request didn't include a valid voter"));
        }
        Endpoints newVoterEndpoints = Endpoints.fromAddVoterRequest(data.listeners());
        if (newVoterEndpoints.address(this.channel.listenerName()).isEmpty()) {
            return CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage(String.format("Add voter request didn't include the endpoint (%s) for the default listener %s", newVoterEndpoints, this.channel.listenerName())));
        }
        return this.addVoterHandler.handleAddVoterRequest(this.quorum.leaderStateOrThrow(), newVoter.get(), newVoterEndpoints, currentTimeMs);
    }

    private boolean handleApiVersionsResponse(RaftResponse.Inbound responseMetadata, long currentTimeMs) {
        if (!this.quorum.isLeader()) {
            return true;
        }
        ApiVersionsResponseData response = (ApiVersionsResponseData)responseMetadata.data();
        Errors error = Errors.forCode((short)response.errorCode());
        Optional<ApiVersionsResponseData.SupportedFeatureKey> supportedKraftVersions = Optional.ofNullable(response.supportedFeatures().find("kraft.version"));
        return this.addVoterHandler.handleApiVersionsResponse(this.quorum.leaderStateOrThrow(), responseMetadata.source(), error, supportedKraftVersions, currentTimeMs);
    }

    private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        RemoveRaftVoterRequestData data = (RemoveRaftVoterRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(data.clusterId())) {
            return CompletableFuture.completedFuture(new RemoveRaftVoterResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()).setErrorMessage(String.format("The given id \"%s\" doesn't match the cluster id \"%s\"", data.clusterId(), this.clusterId)));
        }
        Optional<Errors> leaderValidationError = this.validateLeaderOnlyRequest(this.quorum.epoch());
        if (leaderValidationError.isPresent()) {
            return CompletableFuture.completedFuture(new RemoveRaftVoterResponseData().setErrorCode(leaderValidationError.get().code()));
        }
        Optional<ReplicaKey> oldVoter = RaftUtil.removeVoterRequestVoterKey(data);
        if (oldVoter.isEmpty() || oldVoter.get().directoryId().isEmpty()) {
            return CompletableFuture.completedFuture(new RemoveRaftVoterResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Remove voter request didn't include a valid voter"));
        }
        return this.removeVoterHandler.handleRemoveVoterRequest(this.quorum.leaderStateOrThrow(), oldVoter.get(), currentTimeMs);
    }

    private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(RaftRequest.Inbound requestMetadata, long currentTimeMs) {
        UpdateRaftVoterRequestData data = (UpdateRaftVoterRequestData)requestMetadata.data();
        if (!this.hasValidClusterId(data.clusterId())) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID, requestMetadata.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        Optional<Errors> leaderValidationError = this.validateLeaderOnlyRequest(data.currentLeaderEpoch());
        if (leaderValidationError.isPresent()) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(leaderValidationError.get(), requestMetadata.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data);
        if (voter.isEmpty() || voter.get().directoryId().isEmpty()) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INVALID_REQUEST, requestMetadata.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners());
        if (voterEndpoints.address(this.channel.listenerName()).isEmpty()) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INVALID_REQUEST, requestMetadata.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions = data.kRaftVersionFeature();
        if (supportedKraftVersions.minSupportedVersion() < 0 || supportedKraftVersions.maxSupportedVersion() < 0 || supportedKraftVersions.maxSupportedVersion() < supportedKraftVersions.minSupportedVersion()) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INVALID_REQUEST, requestMetadata.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        return this.updateVoterHandler.handleUpdateVoterRequest(this.quorum.leaderStateOrThrow(), requestMetadata.listenerName(), voter.get(), voterEndpoints, supportedKraftVersions, currentTimeMs);
    }

    private boolean handleUpdateVoterResponse(RaftResponse.Inbound responseMetadata, long currentTimeMs) {
        Endpoints leaderEndpoints;
        int responseEpoch;
        OptionalInt responseLeaderId;
        UpdateRaftVoterResponseData data = (UpdateRaftVoterResponseData)responseMetadata.data();
        Errors error = Errors.forCode((short)data.errorCode());
        Optional<Boolean> handled = this.maybeHandleCommonResponse(error, responseLeaderId = KafkaRaftClient.optionalLeaderId(data.currentLeader().leaderId()), responseEpoch = data.currentLeader().leaderEpoch(), leaderEndpoints = responseLeaderId.isPresent() && !data.currentLeader().host().isEmpty() ? Endpoints.fromInetSocketAddresses(Map.of(this.channel.listenerName(), InetSocketAddress.createUnresolved(data.currentLeader().host(), data.currentLeader().port()))) : Endpoints.empty(), responseMetadata.source(), currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }
        if (error == Errors.NONE || error == Errors.UNSUPPORTED_VERSION) {
            FollowerState follower = this.quorum.followerStateOrThrow();
            follower.setHasUpdatedLeader();
            follower.resetFetchTimeoutForSuccessfulFetch(currentTimeMs);
            return true;
        }
        return this.handleUnexpectedError(error, responseMetadata);
    }

    private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
        if (leaderId.isPresent() && leaderId.getAsInt() == this.quorum.localIdOrSentinel()) {
            return this.quorum.isLeader();
        }
        return epoch != this.quorum.epoch() || leaderId.isEmpty() || this.quorum.leaderId().isEmpty() || leaderId.equals(this.quorum.leaderId());
    }

    private Optional<Boolean> maybeHandleCommonResponse(Errors error, OptionalInt leaderId, int epoch, Endpoints leaderEndpoints, Node source, long currentTimeMs) {
        if (leaderEndpoints.isEmpty() && leaderId.isPresent()) {
            leaderEndpoints = this.partitionState.lastVoterSet().listeners(leaderId.getAsInt());
        }
        if (epoch < this.quorum.epoch() || error == Errors.UNKNOWN_LEADER_EPOCH) {
            return Optional.of(true);
        }
        if (epoch > this.quorum.epoch() || error == Errors.FENCED_LEADER_EPOCH || error == Errors.NOT_LEADER_OR_FOLLOWER) {
            this.maybeTransition(leaderId, epoch, leaderEndpoints, currentTimeMs);
            return Optional.of(true);
        }
        if (epoch == this.quorum.epoch() && leaderId.isPresent() && !this.quorum.hasLeader()) {
            this.transitionToFollower(epoch, leaderId.getAsInt(), leaderEndpoints, currentTimeMs);
            if (error == Errors.NONE) {
                return Optional.empty();
            }
            return Optional.of(true);
        }
        if (error == Errors.BROKER_NOT_AVAILABLE) {
            return Optional.of(false);
        }
        if (error == Errors.INVALID_VOTER_KEY) {
            this.logger.info("Voter key for VOTE or BEGIN_QUORUM_EPOCH request didn't match the receiver's replica key: {}", (Object)source);
            return Optional.of(true);
        }
        if (error == Errors.INVALID_REQUEST) {
            throw new IllegalStateException("Received unexpected invalid request error");
        }
        return Optional.empty();
    }

    private void maybeTransition(OptionalInt leaderId, int epoch, Endpoints leaderEndpoints, long currentTimeMs) {
        if (!this.hasConsistentLeader(epoch, leaderId)) {
            throw new IllegalStateException("Received request or response with leader " + String.valueOf(leaderId) + " and epoch " + epoch + " which is inconsistent with current leader " + String.valueOf(this.quorum.leaderId()) + " and epoch " + this.quorum.epoch());
        }
        if (epoch > this.quorum.epoch()) {
            if (leaderId.isPresent()) {
                this.transitionToFollower(epoch, leaderId.getAsInt(), leaderEndpoints, currentTimeMs);
            } else {
                this.transitionToUnattached(epoch, OptionalInt.empty());
            }
        } else if (leaderId.isPresent() && (!this.quorum.hasLeader() || leaderEndpoints.size() > this.quorum.leaderEndpoints().size())) {
            this.transitionToFollower(epoch, leaderId.getAsInt(), leaderEndpoints, currentTimeMs);
        }
    }

    private boolean handleTopLevelError(Errors error, RaftResponse.Inbound response) {
        if (error == Errors.BROKER_NOT_AVAILABLE) {
            return false;
        }
        if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
            throw new ClusterAuthorizationException("Received cluster authorization error in response " + String.valueOf(response));
        }
        return this.handleUnexpectedError(error, response);
    }

    private boolean handleUnexpectedError(Errors error, RaftResponse.Inbound response) {
        this.logger.error("Unexpected error {} in {} response: {}", new Object[]{error, ApiKeys.forId((int)response.data().apiKey()), response});
        return false;
    }

    private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) {
        ApiKeys apiKey = ApiKeys.forId((int)response.data().apiKey());
        this.requestManager.onResponseResult(response.source(), response.correlationId(), switch (apiKey) {
            case ApiKeys.FETCH -> this.handleFetchResponse(response, currentTimeMs);
            case ApiKeys.VOTE -> this.handleVoteResponse(response, currentTimeMs);
            case ApiKeys.BEGIN_QUORUM_EPOCH -> this.handleBeginQuorumEpochResponse(response, currentTimeMs);
            case ApiKeys.END_QUORUM_EPOCH -> this.handleEndQuorumEpochResponse(response, currentTimeMs);
            case ApiKeys.FETCH_SNAPSHOT -> this.handleFetchSnapshotResponse(response, currentTimeMs);
            case ApiKeys.API_VERSIONS -> this.handleApiVersionsResponse(response, currentTimeMs);
            case ApiKeys.UPDATE_RAFT_VOTER -> this.handleUpdateVoterResponse(response, currentTimeMs);
            default -> throw new IllegalArgumentException("Received unexpected response type: " + String.valueOf(apiKey));
        }, currentTimeMs);
    }

    private Optional<Errors> validateVoterOnlyRequest(int remoteNodeId, int requestEpoch) {
        if (requestEpoch < this.quorum.epoch()) {
            return Optional.of(Errors.FENCED_LEADER_EPOCH);
        }
        if (remoteNodeId < 0) {
            return Optional.of(Errors.INVALID_REQUEST);
        }
        return Optional.empty();
    }

    private boolean isValidVoterKey(Optional<ReplicaKey> voterKey) {
        return voterKey.map(key -> {
            if (!OptionalInt.of(key.id()).equals(this.nodeId)) {
                return false;
            }
            if (key.directoryId().isEmpty()) {
                return true;
            }
            return key.directoryId().get().equals((Object)this.nodeDirectoryId);
        }).orElse(true);
    }

    private Optional<Errors> validateLeaderOnlyRequest(int requestEpoch) {
        if (requestEpoch < this.quorum.epoch()) {
            return Optional.of(Errors.FENCED_LEADER_EPOCH);
        }
        if (requestEpoch > this.quorum.epoch()) {
            return Optional.of(Errors.UNKNOWN_LEADER_EPOCH);
        }
        if (!this.quorum.isLeader()) {
            return Optional.of(Errors.NOT_LEADER_OR_FOLLOWER);
        }
        if (this.shutdown.get() != null) {
            return Optional.of(Errors.BROKER_NOT_AVAILABLE);
        }
        return Optional.empty();
    }

    private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
        ApiKeys apiKey = ApiKeys.forId((int)request.data().apiKey());
        (switch (apiKey) {
            case ApiKeys.FETCH -> this.handleFetchRequest(request, currentTimeMs);
            case ApiKeys.VOTE -> CompletableFuture.completedFuture(this.handleVoteRequest(request));
            case ApiKeys.BEGIN_QUORUM_EPOCH -> CompletableFuture.completedFuture(this.handleBeginQuorumEpochRequest(request, currentTimeMs));
            case ApiKeys.END_QUORUM_EPOCH -> CompletableFuture.completedFuture(this.handleEndQuorumEpochRequest(request, currentTimeMs));
            case ApiKeys.DESCRIBE_QUORUM -> CompletableFuture.completedFuture(this.handleDescribeQuorumRequest(request, currentTimeMs));
            case ApiKeys.FETCH_SNAPSHOT -> CompletableFuture.completedFuture(this.handleFetchSnapshotRequest(request, currentTimeMs));
            case ApiKeys.ADD_RAFT_VOTER -> this.handleAddVoterRequest(request, currentTimeMs);
            case ApiKeys.REMOVE_RAFT_VOTER -> this.handleRemoveVoterRequest(request, currentTimeMs);
            case ApiKeys.UPDATE_RAFT_VOTER -> this.handleUpdateVoterRequest(request, currentTimeMs);
            default -> throw new IllegalArgumentException("Unexpected request type " + String.valueOf(apiKey));
        }).whenComplete((response, exception) -> {
            ApiMessage message = response;
            if (message == null) {
                message = RaftUtil.errorResponse(apiKey, Errors.forException((Throwable)exception));
            }
            RaftResponse.Outbound responseMessage = new RaftResponse.Outbound(request.correlationId(), message);
            request.completion.complete(responseMessage);
            this.logger.trace("Sent response {} to inbound request {}", (Object)responseMessage, (Object)request);
        });
    }

    private void handleInboundMessage(RaftMessage message, long currentTimeMs) {
        this.logger.trace("Received inbound message {}", (Object)message);
        if (message instanceof RaftRequest.Inbound) {
            RaftRequest.Inbound request = (RaftRequest.Inbound)message;
            this.handleRequest(request, currentTimeMs);
        } else if (message instanceof RaftResponse.Inbound) {
            RaftResponse.Inbound response = (RaftResponse.Inbound)message;
            if (this.requestManager.isResponseExpected(response.source(), response.correlationId())) {
                this.handleResponse(response, currentTimeMs);
            } else {
                this.logger.debug("Ignoring response {} since it is no longer needed", (Object)response);
            }
        } else {
            throw new IllegalArgumentException("Unexpected message " + String.valueOf(message));
        }
    }

    private RequestSendResult maybeSendRequest(long currentTimeMs, Node destination, Supplier<ApiMessage> requestSupplier) {
        boolean requestSent = false;
        if (this.requestManager.isBackingOff(destination, currentTimeMs)) {
            long remainingBackoffMs = this.requestManager.remainingBackoffMs(destination, currentTimeMs);
            this.logger.debug("Connection for {} is backing off for {} ms", (Object)destination, (Object)remainingBackoffMs);
            return RequestSendResult.of(requestSent, remainingBackoffMs);
        }
        if (this.requestManager.isReady(destination, currentTimeMs)) {
            int correlationId = this.channel.newCorrelationId();
            ApiMessage request = requestSupplier.get();
            RaftRequest.Outbound requestMessage = new RaftRequest.Outbound(correlationId, request, destination, currentTimeMs);
            requestMessage.completion.whenComplete((response, exception) -> {
                if (exception != null) {
                    ApiKeys api = ApiKeys.forId((int)request.apiKey());
                    Errors error = Errors.forException((Throwable)exception);
                    ApiMessage errorResponse = RaftUtil.errorResponse(api, error);
                    response = new RaftResponse.Inbound(correlationId, errorResponse, destination);
                }
                this.messageQueue.add((RaftMessage)response);
            });
            this.requestManager.onRequestSent(destination, correlationId, currentTimeMs);
            this.channel.send(requestMessage);
            requestSent = true;
            this.logger.trace("Sent outbound request: {}", (Object)requestMessage);
        }
        return RequestSendResult.of(requestSent, this.requestManager.remainingRequestTimeMs(destination, currentTimeMs));
    }

    private EndQuorumEpochRequestData buildEndQuorumEpochRequest(ResignedState state) {
        return RaftUtil.singletonEndQuorumEpochRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localIdOrThrow(), state.preferredSuccessors());
    }

    private long maybeSendRequests(long currentTimeMs, Set<Node> destinations, Supplier<ApiMessage> requestSupplier) {
        long minBackoffMs = Long.MAX_VALUE;
        for (Node destination : destinations) {
            long backoffMs = this.maybeSendRequest(currentTimeMs, destination, requestSupplier).timeToWaitMs();
            if (backoffMs >= minBackoffMs) continue;
            minBackoffMs = backoffMs;
        }
        return minBackoffMs;
    }

    private long maybeSendRequest(long currentTimeMs, Set<ReplicaKey> remoteVoters, Function<Integer, Node> destinationSupplier, Function<ReplicaKey, ApiMessage> requestSupplier) {
        long minBackoffMs = Long.MAX_VALUE;
        for (ReplicaKey voter : remoteVoters) {
            long backoffMs = this.maybeSendRequest(currentTimeMs, destinationSupplier.apply(voter.id()), () -> (ApiMessage)requestSupplier.apply(voter)).timeToWaitMs();
            minBackoffMs = Math.min(minBackoffMs, backoffMs);
        }
        return minBackoffMs;
    }

    private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest(ReplicaKey remoteVoter) {
        return RaftUtil.singletonBeginQuorumEpochRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localIdOrThrow(), this.quorum.leaderEndpoints(), remoteVoter);
    }

    private VoteRequestData buildVoteRequest(ReplicaKey remoteVoter, boolean preVote) {
        OffsetAndEpoch endOffset = this.endOffset();
        return RaftUtil.singletonVoteRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localReplicaKeyOrThrow(), remoteVoter, endOffset.epoch(), endOffset.offset(), preVote);
    }

    private FetchRequestData buildFetchRequest() {
        FetchRequestData request = RaftUtil.singletonFetchRequest(this.log.topicPartition(), this.log.topicId(), fetchPartition -> fetchPartition.setCurrentLeaderEpoch(this.quorum.epoch()).setLastFetchedEpoch(this.log.lastFetchedEpoch()).setFetchOffset(this.log.endOffset().offset()).setReplicaDirectoryId(this.quorum.localDirectoryId()).setHighWatermark(this.quorum.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L).longValue()));
        return request.setMaxBytes(0x800000).setMaxWaitMs(this.fetchMaxWaitMs).setClusterId(this.clusterId).setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(this.quorum.localIdOrSentinel()));
    }

    private long maybeSendFetchToAnyBootstrap(long currentTimeMs) {
        Optional<Node> readyNode = this.requestManager.findReadyBootstrapServer(currentTimeMs);
        return readyNode.map(node -> this.maybeSendRequest(currentTimeMs, (Node)node, this::buildFetchRequest).timeToWaitMs()).orElseGet(() -> this.requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs));
    }

    private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapshotId, long snapshotSize) {
        return RaftUtil.singletonFetchSnapshotRequest(this.clusterId, ReplicaKey.of(this.quorum().localIdOrSentinel(), this.quorum.localDirectoryId()), this.log.topicPartition(), this.quorum.epoch(), snapshotId, 0x800000, snapshotSize);
    }

    private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader(FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot) {
        partitionSnapshot.currentLeader().setLeaderEpoch(this.quorum.epoch()).setLeaderId(this.quorum.leaderIdOrSentinel());
        return partitionSnapshot;
    }

    public boolean isRunning() {
        GracefulShutdown gracefulShutdown = this.shutdown.get();
        return gracefulShutdown == null || !gracefulShutdown.isFinished();
    }

    public boolean isShuttingDown() {
        GracefulShutdown gracefulShutdown = this.shutdown.get();
        return gracefulShutdown != null && !gracefulShutdown.isFinished();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void appendBatch(LeaderState<T> state, BatchAccumulator.CompletedBatch<T> batch, long appendTimeMs) {
        try {
            int epoch = state.epoch();
            LogAppendInfo info = this.appendAsLeader((Records)batch.data);
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset(), epoch);
            CompletableFuture<Long> future = this.appendPurgatory.await(offsetAndEpoch.offset() + 1L, Integer.MAX_VALUE);
            future.whenComplete((commitTimeMs, exception) -> {
                if (exception != null) {
                    this.logger.debug("Failed to commit {} records up to last offset {}", new Object[]{batch.numRecords, offsetAndEpoch, exception});
                } else {
                    long elapsedTime = Math.max(0L, commitTimeMs - appendTimeMs);
                    double elapsedTimePerRecord = (double)elapsedTime / (double)batch.numRecords;
                    this.kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs);
                    this.logger.debug("Completed commit of {} records up to last offset {}", (Object)batch.numRecords, (Object)offsetAndEpoch);
                    batch.records.ifPresent(records -> this.maybeFireHandleCommit(batch.baseOffset, epoch, batch.appendTimestamp(), batch.sizeInBytes(), (List<T>)records));
                }
            });
        }
        finally {
            batch.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long maybeAppendBatches(LeaderState<T> state, long currentTimeMs) {
        long timeUntilDrain = state.accumulator().timeUntilDrain(currentTimeMs);
        if (timeUntilDrain <= 0L) {
            List<BatchAccumulator.CompletedBatch<T>> batches = state.accumulator().drain();
            Iterator<BatchAccumulator.CompletedBatch<T>> iterator = batches.iterator();
            try {
                while (iterator.hasNext()) {
                    BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                    this.appendBatch(state, batch, currentTimeMs);
                }
                this.flushLeaderLog(state, currentTimeMs);
            }
            finally {
                while (iterator.hasNext()) {
                    iterator.next().release();
                }
            }
        }
        return state.accumulator().timeUntilDrain(currentTimeMs);
    }

    private long maybeSendBeginQuorumEpochRequests(LeaderState<T> state, long currentTimeMs) {
        long timeUntilNextBeginQuorumSend = state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
        if (timeUntilNextBeginQuorumSend == 0L) {
            VoterSet voters = this.partitionState.lastVoterSet();
            Function<Integer, Node> nodeSupplier = voterId -> voters.voterNode((int)voterId, this.channel.listenerName()).orElseThrow(() -> new IllegalStateException(String.format("Unknown endpoint for voter id %d for listener name %s", voterId, this.channel.listenerName())));
            timeUntilNextBeginQuorumSend = this.maybeSendRequest(currentTimeMs, voters.voterKeys().stream().filter(key -> key.id() != this.quorum.localIdOrThrow()).collect(Collectors.toSet()), nodeSupplier, this::buildBeginQuorumEpochRequest);
            state.resetBeginQuorumEpochTimer(currentTimeMs);
        }
        return timeUntilNextBeginQuorumSend;
    }

    private long pollResigned(long currentTimeMs) {
        long stateTimeoutMs;
        ResignedState state = this.quorum.resignedStateOrThrow();
        long endQuorumBackoffMs = this.maybeSendRequests(currentTimeMs, this.partitionState.lastVoterSet().voterNodes(state.unackedVoters().stream(), this.channel.listenerName()), () -> this.buildEndQuorumEpochRequest(state));
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown != null) {
            stateTimeoutMs = shutdown.remainingTimeMs();
        } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            this.transitionToUnattached(this.quorum.epoch() + 1, OptionalInt.empty());
            stateTimeoutMs = 0L;
        } else {
            stateTimeoutMs = state.remainingElectionTimeMs(currentTimeMs);
        }
        return Math.min(stateTimeoutMs, endQuorumBackoffMs);
    }

    private long pollLeader(long currentTimeMs) {
        LeaderState state = this.quorum.leaderStateOrThrow();
        this.maybeFireLeaderChange(state);
        long timeUntilCheckQuorumExpires = state.timeUntilCheckQuorumExpires(currentTimeMs);
        if (this.shutdown.get() != null || state.isResignRequested() || timeUntilCheckQuorumExpires == 0L) {
            this.transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
            return 0L;
        }
        long timeUtilVoterChangeExpires = state.maybeExpirePendingOperation(currentTimeMs);
        long timeUntilFlush = this.maybeAppendBatches(state, currentTimeMs);
        long timeUntilNextBeginQuorumSend = this.maybeSendBeginQuorumEpochRequests(state, currentTimeMs);
        return Math.min(timeUntilFlush, Math.min(timeUntilNextBeginQuorumSend, Math.min(timeUntilCheckQuorumExpires, timeUtilVoterChangeExpires)));
    }

    private long maybeSendVoteRequests(NomineeState state, long currentTimeMs) {
        if (!state.epochElection().isVoteRejected()) {
            VoterSet voters = this.partitionState.lastVoterSet();
            boolean preVote = this.quorum.isProspective();
            return this.maybeSendRequest(currentTimeMs, state.epochElection().unrecordedVoters(), voterId -> voters.voterNode((int)voterId, this.channel.listenerName()).orElseThrow(() -> new IllegalStateException(String.format("Unknown endpoint for voter id %d for listener name %s", voterId, this.channel.listenerName()))), voterId -> this.buildVoteRequest((ReplicaKey)voterId, preVote));
        }
        return Long.MAX_VALUE;
    }

    private long pollCandidate(long currentTimeMs) {
        CandidateState state = this.quorum.candidateStateOrThrow();
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown != null) {
            long minRequestBackoffMs = this.maybeSendVoteRequests(state, currentTimeMs);
            return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
        }
        if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            this.logger.info("Election was not granted, transitioning to prospective");
            this.transitionToProspective(currentTimeMs);
            return 0L;
        }
        long minVoteRequestBackoffMs = this.maybeSendVoteRequests(state, currentTimeMs);
        return Math.min(minVoteRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
    }

    private long pollProspective(long currentTimeMs) {
        ProspectiveState state = this.quorum.prospectiveStateOrThrow();
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown != null) {
            long minRequestBackoffMs = this.maybeSendVoteRequests(state, currentTimeMs);
            return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
        }
        if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            this.logger.info("Election timed out before receiving sufficient vote responses to become candidate. Current epoch election state: {}", (Object)state.epochElection());
            this.prospectiveTransitionAfterElectionLoss(state, currentTimeMs);
            return 0L;
        }
        long minVoteRequestBackoffMs = this.maybeSendVoteRequests(state, currentTimeMs);
        return Math.min(minVoteRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
    }

    private void prospectiveTransitionAfterElectionLoss(ProspectiveState prospective, long currentTimeMs) {
        if (prospective.election().hasLeader() && !prospective.leaderEndpoints().isEmpty()) {
            this.transitionToFollower(this.quorum().epoch(), prospective.election().leaderId(), prospective.leaderEndpoints(), currentTimeMs);
        } else {
            this.transitionToUnattached(this.quorum().epoch(), prospective.election().optionalLeaderId());
        }
    }

    private long pollFollower(long currentTimeMs) {
        FollowerState state = this.quorum.followerStateOrThrow();
        if (this.quorum.isVoter()) {
            return this.pollFollowerAsVoter(state, currentTimeMs);
        }
        return this.pollFollowerAsObserver(state, currentTimeMs);
    }

    private boolean shouldSendUpdateVoteRequest(FollowerState state) {
        KRaftVersion version = this.partitionState.lastKraftVersion();
        boolean sendWhenReconfigSupported = version.isReconfigSupported() && this.partitionState.lastVoterSet().voterNodeNeedsUpdate(this.quorum.localVoterNodeOrThrow());
        boolean sendWhenReconfigNotSupported = !version.isReconfigSupported() && !state.hasUpdatedLeader();
        return sendWhenReconfigSupported || sendWhenReconfigNotSupported;
    }

    private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) {
        long backoffMs;
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown != null) {
            backoffMs = 0L;
        } else if (state.hasFetchTimeoutExpired(currentTimeMs)) {
            this.logger.info("Transitioning to Prospective state due to fetch timeout");
            this.transitionToProspective(currentTimeMs);
            backoffMs = 0L;
        } else if (state.hasUpdateVoterPeriodExpired(currentTimeMs)) {
            boolean resetUpdateVoterTimer;
            if (this.shouldSendUpdateVoteRequest(state)) {
                RequestSendResult sendResult = this.maybeSendUpdateVoterRequest(state, currentTimeMs);
                resetUpdateVoterTimer = sendResult.requestSent();
                backoffMs = sendResult.timeToWaitMs();
            } else {
                resetUpdateVoterTimer = true;
                backoffMs = this.maybeSendFetchToBestNode(state, currentTimeMs);
            }
            if (resetUpdateVoterTimer) {
                state.resetUpdateVoterPeriod(currentTimeMs);
            }
        } else {
            backoffMs = this.maybeSendFetchToBestNode(state, currentTimeMs);
        }
        return Math.min(backoffMs, Math.min(state.remainingFetchTimeMs(currentTimeMs), state.remainingUpdateVoterPeriodMs(currentTimeMs)));
    }

    private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
        if (state.hasFetchTimeoutExpired(currentTimeMs)) {
            return this.maybeSendFetchToAnyBootstrap(currentTimeMs);
        }
        return this.maybeSendFetchToBestNode(state, currentTimeMs);
    }

    private long maybeSendFetchToBestNode(FollowerState state, long currentTimeMs) {
        long backoffMs;
        Node leaderNode = state.leaderNode(this.channel.listenerName());
        if (this.requestManager.hasRequestTimedOut(leaderNode, currentTimeMs)) {
            this.requestManager.reset(leaderNode);
            backoffMs = this.maybeSendFetchToAnyBootstrap(currentTimeMs);
        } else {
            backoffMs = this.requestManager.isBackingOff(leaderNode, currentTimeMs) ? this.maybeSendFetchToAnyBootstrap(currentTimeMs) : (!this.requestManager.hasAnyInflightRequest(currentTimeMs) ? this.maybeSendFetchOrFetchSnapshot(state, currentTimeMs) : this.requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs));
        }
        return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs));
    }

    private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) {
        Supplier<ApiMessage> requestSupplier;
        if (state.fetchingSnapshot().isPresent()) {
            RawSnapshotWriter snapshot = state.fetchingSnapshot().get();
            long snapshotSize = snapshot.sizeInBytes();
            requestSupplier = () -> this.buildFetchSnapshotRequest(snapshot.snapshotId(), snapshotSize);
        } else {
            requestSupplier = this::buildFetchRequest;
        }
        return this.maybeSendRequest(currentTimeMs, state.leaderNode(this.channel.listenerName()), requestSupplier).timeToWaitMs();
    }

    private UpdateRaftVoterRequestData buildUpdateVoterRequest() {
        return RaftUtil.updateVoterRequest(this.clusterId, this.quorum.localReplicaKeyOrThrow(), this.quorum.epoch(), this.localSupportedKRaftVersion, this.localListeners);
    }

    private RequestSendResult maybeSendUpdateVoterRequest(FollowerState state, long currentTimeMs) {
        return this.maybeSendRequest(currentTimeMs, state.leaderNode(this.channel.listenerName()), this::buildUpdateVoterRequest);
    }

    private long pollUnattached(long currentTimeMs) {
        UnattachedState state = this.quorum.unattachedStateOrThrow();
        if (this.quorum.isVoter()) {
            return this.pollUnattachedAsVoter(state, currentTimeMs);
        }
        return this.pollUnattachedCommon(state, currentTimeMs);
    }

    private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) {
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown != null) {
            return shutdown.remainingTimeMs();
        }
        if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            this.transitionToProspective(currentTimeMs);
            return 0L;
        }
        return this.pollUnattachedCommon(state, currentTimeMs);
    }

    private long pollUnattachedCommon(UnattachedState state, long currentTimeMs) {
        long fetchBackoffMs = this.maybeSendFetchToAnyBootstrap(currentTimeMs);
        return Math.min(fetchBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
    }

    private long pollCurrentState(long currentTimeMs) {
        if (this.quorum.isLeader()) {
            return this.pollLeader(currentTimeMs);
        }
        if (this.quorum.isCandidate()) {
            return this.pollCandidate(currentTimeMs);
        }
        if (this.quorum.isProspective()) {
            return this.pollProspective(currentTimeMs);
        }
        if (this.quorum.isFollower()) {
            return this.pollFollower(currentTimeMs);
        }
        if (this.quorum.isUnattached()) {
            return this.pollUnattached(currentTimeMs);
        }
        if (this.quorum.isResigned()) {
            return this.pollResigned(currentTimeMs);
        }
        throw new IllegalStateException("Unexpected quorum state " + String.valueOf(this.quorum));
    }

    private void pollListeners() {
        Registration<T> registration;
        while ((registration = this.pendingRegistrations.poll()) != null) {
            this.processRegistration(registration);
        }
        this.quorum.highWatermark().ifPresent(highWatermarkMetadata -> this.updateListenersProgress(highWatermarkMetadata.offset()));
        Optional leaderState = this.quorum.maybeLeaderState();
        if (leaderState.isPresent()) {
            this.maybeFireLeaderChange(leaderState.get());
        } else if (!this.quorum.isResigned()) {
            this.maybeFireLeaderChange();
        }
    }

    private void processRegistration(Registration<T> registration) {
        RaftClient.Listener<T> listener = registration.listener();
        Registration.Ops ops = registration.ops();
        if (ops == Registration.Ops.REGISTER) {
            if (this.listenerContexts.putIfAbsent(listener, new ListenerContext(listener)) != null) {
                this.logger.error("Attempting to add a listener that already exists: {}", (Object)KafkaRaftClient.listenerName(listener));
            } else {
                this.logger.info("Registered the listener {}", (Object)KafkaRaftClient.listenerName(listener));
            }
        } else if (this.listenerContexts.remove(listener) == null) {
            this.logger.error("Attempting to remove a listener that doesn't exists: {}", (Object)KafkaRaftClient.listenerName(listener));
        } else {
            this.logger.info("Unregistered the listener {}", (Object)KafkaRaftClient.listenerName(listener));
        }
    }

    private boolean maybeCompleteShutdown(long currentTimeMs) {
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown == null) {
            return false;
        }
        shutdown.update(currentTimeMs);
        if (shutdown.hasTimedOut()) {
            shutdown.failWithTimeout();
            return true;
        }
        if (this.quorum.isObserver() || this.quorum.isOnlyVoter() || this.quorum.hasRemoteLeader()) {
            shutdown.complete();
            return true;
        }
        return false;
    }

    private void wakeup() {
        this.messageQueue.wakeup();
    }

    public void handle(RaftRequest.Inbound request) {
        this.messageQueue.add(Objects.requireNonNull(request));
    }

    public void poll() {
        if (!this.isInitialized()) {
            throw new IllegalStateException("Replica needs to be initialized before polling");
        }
        long startPollTimeMs = this.time.milliseconds();
        if (this.maybeCompleteShutdown(startPollTimeMs)) {
            return;
        }
        long pollStateTimeoutMs = this.pollCurrentState(startPollTimeMs);
        long cleaningTimeoutMs = this.snapshotCleaner.maybeClean(startPollTimeMs);
        long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);
        long startWaitTimeMs = this.time.milliseconds();
        this.kafkaRaftMetrics.updatePollStart(startWaitTimeMs);
        RaftMessage message = this.messageQueue.poll(pollTimeoutMs);
        long endWaitTimeMs = this.time.milliseconds();
        this.kafkaRaftMetrics.updatePollEnd(endWaitTimeMs);
        if (message != null) {
            this.handleInboundMessage(message, endWaitTimeMs);
        }
        this.pollListeners();
    }

    @Override
    public long prepareAppend(int epoch, List<T> records) {
        return this.append(epoch, records);
    }

    private long append(int epoch, List<T> records) {
        if (!this.isInitialized()) {
            throw new NotLeaderException("Append failed because the replica is not the current leader");
        }
        LeaderState leaderState = this.quorum.maybeLeaderState().orElseThrow(() -> new NotLeaderException("Append failed because the replica is not the current leader"));
        if (records.isEmpty()) {
            throw new IllegalArgumentException("Append failed because there are no records");
        }
        BatchAccumulator<T> accumulator = leaderState.accumulator();
        boolean isFirstAppend = accumulator.isEmpty();
        long offset = accumulator.append(epoch, records, true);
        if (isFirstAppend || accumulator.needsDrain(this.time.milliseconds())) {
            this.wakeup();
        }
        return offset;
    }

    @Override
    public void schedulePreparedAppend() {
        if (!this.isInitialized()) {
            throw new NotLeaderException("Flush failed because the replica is not the current leader");
        }
        LeaderState leaderState = this.quorum.maybeLeaderState().orElseThrow(() -> new NotLeaderException("Flush failed because the replica is not the current leader"));
        leaderState.accumulator().allowDrain();
        if (leaderState.accumulator().needsDrain(this.time.milliseconds())) {
            this.wakeup();
        }
    }

    @Override
    public CompletableFuture<Void> shutdown(int timeoutMs) {
        this.logger.info("Beginning graceful shutdown");
        CompletableFuture<Void> shutdownComplete = new CompletableFuture<Void>();
        this.shutdown.set(new GracefulShutdown(timeoutMs, shutdownComplete));
        this.wakeup();
        return shutdownComplete;
    }

    @Override
    public void resign(int epoch) {
        if (epoch < 0) {
            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
        }
        if (!this.isInitialized()) {
            throw new IllegalStateException("Replica needs to be initialized before resigning");
        }
        LeaderAndEpoch leaderAndEpoch = this.leaderAndEpoch();
        int currentEpoch = leaderAndEpoch.epoch();
        if (epoch > currentEpoch) {
            throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + " which is larger than the current epoch " + currentEpoch);
        }
        if (epoch < currentEpoch) {
            this.logger.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", (Object)epoch, (Object)currentEpoch);
        } else {
            if (!leaderAndEpoch.isLeader(this.quorum.localIdOrThrow())) {
                throw new IllegalArgumentException("Cannot resign from epoch " + epoch + " since we are not the leader");
            }
            Optional leaderStateOpt = this.quorum.maybeLeaderState();
            if (leaderStateOpt.isEmpty()) {
                this.logger.debug("Ignoring call to resign from epoch {} since this node is no longer the leader", (Object)epoch);
                return;
            }
            LeaderState leaderState = leaderStateOpt.get();
            if (leaderState.epoch() != epoch) {
                this.logger.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", (Object)epoch, (Object)leaderState.epoch());
            } else {
                this.logger.info("Received user request to resign from the current epoch {}", (Object)currentEpoch);
                leaderState.requestResign();
                this.wakeup();
            }
        }
    }

    @Override
    public Optional<SnapshotWriter<T>> createSnapshot(OffsetAndEpoch snapshotId, long lastContainedLogTimestamp) {
        if (!this.isInitialized()) {
            throw new IllegalStateException("Cannot create snapshot before the replica has been initialized");
        }
        return this.log.createNewSnapshot(snapshotId).map(writer -> {
            long lastContainedLogOffset = snapshotId.offset() - 1L;
            NotifyingRawSnapshotWriter wrappedWriter = new NotifyingRawSnapshotWriter((RawSnapshotWriter)writer, offsetAndEpoch -> this.partitionState.truncateOldEntries(offsetAndEpoch.offset()));
            return new RecordsSnapshotWriter.Builder().setLastContainedLogTimestamp(lastContainedLogTimestamp).setTime(this.time).setMaxBatchSize(0x800000).setMemoryPool(this.memoryPool).setRawSnapshotWriter(wrappedWriter).setKraftVersion(this.partitionState.kraftVersionAtOffset(lastContainedLogOffset)).setVoterSet(this.partitionState.voterSetAtOffset(lastContainedLogOffset)).build(this.serde);
        });
    }

    @Override
    public Optional<OffsetAndEpoch> latestSnapshotId() {
        return this.log.latestSnapshotId();
    }

    @Override
    public long logEndOffset() {
        return this.log.endOffset().offset();
    }

    @Override
    public KRaftVersion kraftVersion() {
        if (!this.isInitialized()) {
            throw new IllegalStateException("Cannot read the kraft version before the replica has been initialized");
        }
        return this.quorum.maybeLeaderState().flatMap(LeaderState::requestedKRaftVersion).map(KRaftVersionUpgrade.Version::kraftVersion).orElseGet(this.partitionState::lastKraftVersion);
    }

    @Override
    public void upgradeKRaftVersion(int epoch, KRaftVersion version, boolean validateOnly) {
        if (!this.isInitialized()) {
            throw new IllegalStateException("Cannot update the kraft version before the replica has been initialized");
        }
        LeaderState leaderState = this.quorum.maybeLeaderState().orElseThrow(() -> new NotLeaderException("Upgrade kraft version failed because the replica is not the current leader"));
        leaderState.maybeAppendUpgradedKRaftVersion(epoch, version, this.partitionState.lastKraftVersion(), this.partitionState.lastVoterSet(), validateOnly, this.time.milliseconds());
    }

    @Override
    public void close() {
        MemoryPool memoryPool;
        this.log.flush(true);
        if (this.kafkaRaftMetrics != null) {
            this.kafkaRaftMetrics.close();
        }
        if ((memoryPool = this.memoryPool) instanceof BatchMemoryPool) {
            BatchMemoryPool batchMemoryPool = (BatchMemoryPool)memoryPool;
            batchMemoryPool.releaseRetained();
        }
    }

    @Override
    public OptionalLong highWatermark() {
        if (this.isInitialized() && this.quorum.highWatermark().isPresent()) {
            return OptionalLong.of(this.quorum.highWatermark().get().offset());
        }
        return OptionalLong.empty();
    }

    public Optional<Node> voterNode(int id, ListenerName listenerName) {
        return this.partitionState.lastVoterSet().voterNode(id, listenerName);
    }

    QuorumState quorum() {
        return this.quorum;
    }

    private boolean isInitialized() {
        return this.partitionState != null && this.quorum != null && this.requestManager != null && this.kafkaRaftMetrics != null;
    }

    private static class RaftMetadataLogCleanerManager {
        private final Logger logger;
        private final Timer timer;
        private final long delayMs;
        private final Runnable cleaner;

        RaftMetadataLogCleanerManager(Logger logger, Time time, long delayMs, Runnable cleaner) {
            this.logger = logger;
            this.timer = time.timer(delayMs);
            this.delayMs = delayMs;
            this.cleaner = cleaner;
        }

        public long maybeClean(long currentTimeMs) {
            this.timer.update(currentTimeMs);
            if (this.timer.isExpired()) {
                try {
                    this.cleaner.run();
                }
                catch (Throwable t) {
                    this.logger.error("Had an error during log cleaning", t);
                }
                this.timer.reset(this.delayMs);
            }
            return this.timer.remainingMs();
        }
    }

    private final class ListenerContext
    implements CloseListener<BatchReader<T>> {
        private static final long STARTING_NEXT_OFFSET = -1L;
        private static final long SMALLEST_LOG_OFFSET = 0L;
        private final RaftClient.Listener<T> listener;
        private LeaderAndEpoch lastFiredLeaderChange = LeaderAndEpoch.UNKNOWN;
        private BatchReader<T> lastSent = null;
        private long nextOffset = -1L;

        private ListenerContext(RaftClient.Listener<T> listener) {
            this.listener = listener;
        }

        private synchronized long nextOffset() {
            return this.nextOffset;
        }

        private synchronized void resetOffsetToSmallestLogOffset() {
            this.nextOffset = 0L;
        }

        private synchronized OptionalLong nextExpectedOffset() {
            if (this.lastSent != null) {
                OptionalLong lastSentOffset = this.lastSent.lastOffset();
                if (lastSentOffset.isPresent()) {
                    return OptionalLong.of(lastSentOffset.getAsLong() + 1L);
                }
                return OptionalLong.empty();
            }
            return OptionalLong.of(this.nextOffset);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void fireHandleSnapshot(SnapshotReader<T> reader) {
            ListenerContext listenerContext = this;
            synchronized (listenerContext) {
                this.nextOffset = reader.snapshotId().offset();
                this.lastSent = null;
            }
            KafkaRaftClient.this.logger.debug("Notifying listener {} of snapshot {}", (Object)this.listenerName(), (Object)reader.snapshotId());
            this.listener.handleLoadSnapshot(reader);
        }

        private void fireHandleCommit(long baseOffset, Records records) {
            this.fireHandleCommit(RecordsBatchReader.of(baseOffset, records, KafkaRaftClient.this.serde, BufferSupplier.create(), 0x800000, this, true, KafkaRaftClient.this.logContext));
        }

        private void fireHandleCommit(long baseOffset, int epoch, long appendTimestamp, int sizeInBytes, List<T> records) {
            Batch batch = Batch.data(baseOffset, epoch, appendTimestamp, sizeInBytes, records);
            MemoryBatchReader reader = MemoryBatchReader.of(List.of(batch), this);
            this.fireHandleCommit(reader);
        }

        private String listenerName() {
            return KafkaRaftClient.listenerName(this.listener);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void fireHandleCommit(BatchReader<T> reader) {
            ListenerContext listenerContext = this;
            synchronized (listenerContext) {
                this.lastSent = reader;
            }
            KafkaRaftClient.this.logger.debug("Notifying listener {} of batch for baseOffset {} and lastOffset {}", new Object[]{this.listenerName(), reader.baseOffset(), reader.lastOffset()});
            this.listener.handleCommit(reader);
        }

        private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (this.shouldFireLeaderChange(leaderAndEpoch)) {
                this.lastFiredLeaderChange = leaderAndEpoch;
                KafkaRaftClient.this.logger.debug("Notifying listener {} of leader change {}", (Object)this.listenerName(), (Object)leaderAndEpoch);
                this.listener.handleLeaderChange(leaderAndEpoch);
            }
        }

        private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (leaderAndEpoch.equals(this.lastFiredLeaderChange)) {
                return false;
            }
            if (leaderAndEpoch.epoch() > this.lastFiredLeaderChange.epoch()) {
                return true;
            }
            return leaderAndEpoch.leaderId().isPresent() && this.lastFiredLeaderChange.leaderId().isEmpty();
        }

        private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) {
            if (this.shouldFireLeaderChange(leaderAndEpoch) && this.nextOffset() > epochStartOffset) {
                this.lastFiredLeaderChange = leaderAndEpoch;
                KafkaRaftClient.this.logger.debug("Notifying listener {} of new leadership {}", (Object)this.listenerName(), (Object)leaderAndEpoch);
                this.listener.handleLeaderChange(leaderAndEpoch);
            }
        }

        @Override
        public synchronized void onClose(BatchReader<T> reader) {
            OptionalLong lastOffset = reader.lastOffset();
            if (lastOffset.isPresent()) {
                this.nextOffset = lastOffset.getAsLong() + 1L;
            }
            if (this.lastSent == reader) {
                this.lastSent = null;
                KafkaRaftClient.this.wakeup();
            }
        }
    }

    private static final class Registration<T> {
        private final Ops ops;
        private final RaftClient.Listener<T> listener;

        private Registration(Ops ops, RaftClient.Listener<T> listener) {
            this.ops = ops;
            this.listener = listener;
        }

        private Ops ops() {
            return this.ops;
        }

        private RaftClient.Listener<T> listener() {
            return this.listener;
        }

        private static <T> Registration<T> register(RaftClient.Listener<T> listener) {
            return new Registration<T>(Ops.REGISTER, listener);
        }

        private static <T> Registration<T> unregister(RaftClient.Listener<T> listener) {
            return new Registration<T>(Ops.UNREGISTER, listener);
        }

        private static enum Ops {
            REGISTER,
            UNREGISTER;

        }
    }

    private class GracefulShutdown {
        final Timer finishTimer;
        final CompletableFuture<Void> completeFuture;

        public GracefulShutdown(long shutdownTimeoutMs, CompletableFuture<Void> completeFuture) {
            this.finishTimer = KafkaRaftClient.this.time.timer(shutdownTimeoutMs);
            this.completeFuture = completeFuture;
        }

        public void update(long currentTimeMs) {
            this.finishTimer.update(currentTimeMs);
        }

        public boolean hasTimedOut() {
            return this.finishTimer.isExpired();
        }

        public boolean isFinished() {
            return this.completeFuture.isDone();
        }

        public long remainingTimeMs() {
            return this.finishTimer.remainingMs();
        }

        public void failWithTimeout() {
            KafkaRaftClient.this.logger.warn("Graceful shutdown timed out after {}ms", (Object)this.finishTimer.timeoutMs());
            this.completeFuture.completeExceptionally(new TimeoutException("Timeout expired before graceful shutdown completed"));
        }

        public void complete() {
            KafkaRaftClient.this.logger.info("Graceful shutdown completed");
            this.completeFuture.complete(null);
        }
    }
}

