/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.NetworkChannel;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.raft.RaftUtil;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaNetworkChannel
implements NetworkChannel {
    private static final Logger log = LoggerFactory.getLogger(KafkaNetworkChannel.class);
    private final SendThread requestThread;
    private final AtomicInteger correlationIdCounter = new AtomicInteger(0);
    private final ListenerName listenerName;

    public KafkaNetworkChannel(Time time, ListenerName listenerName, KafkaClient client, int requestTimeoutMs, String threadNamePrefix) {
        this.listenerName = listenerName;
        this.requestThread = new SendThread(threadNamePrefix + "-outbound-request-thread", client, requestTimeoutMs, time, false);
    }

    @Override
    public int newCorrelationId() {
        return this.correlationIdCounter.getAndIncrement();
    }

    @Override
    public void send(RaftRequest.Outbound request) {
        Node node = request.destination();
        if (node != null) {
            this.requestThread.sendRequest(new RequestAndCompletionHandler(request.createdTimeMs(), node, KafkaNetworkChannel.buildRequest(request.data()), response -> this.sendOnComplete(request, response)));
        } else {
            this.sendCompleteFuture(request, this.errorResponse(request.data(), Errors.BROKER_NOT_AVAILABLE));
        }
    }

    private void sendCompleteFuture(RaftRequest.Outbound request, ApiMessage message) {
        RaftResponse.Inbound response = new RaftResponse.Inbound(request.correlationId(), message, request.destination());
        request.completion.complete(response);
    }

    private void sendOnComplete(RaftRequest.Outbound request, ClientResponse clientResponse) {
        ApiMessage response;
        if (clientResponse.versionMismatch() != null) {
            log.error("Request {} failed due to unsupported version error", (Object)request, (Object)clientResponse.versionMismatch());
            response = this.errorResponse(request.data(), Errors.UNSUPPORTED_VERSION);
        } else if (clientResponse.authenticationException() != null) {
            log.error("Request {} failed due to authentication error", (Object)request, (Object)clientResponse.authenticationException());
            response = this.errorResponse(request.data(), Errors.NETWORK_EXCEPTION);
        } else {
            response = clientResponse.wasDisconnected() ? this.errorResponse(request.data(), Errors.BROKER_NOT_AVAILABLE) : clientResponse.responseBody().data();
        }
        this.sendCompleteFuture(request, response);
    }

    private ApiMessage errorResponse(ApiMessage request, Errors error) {
        ApiKeys apiKey = ApiKeys.forId((int)request.apiKey());
        return RaftUtil.errorResponse(apiKey, error);
    }

    @Override
    public ListenerName listenerName() {
        return this.listenerName;
    }

    public void start() {
        this.requestThread.start();
    }

    @Override
    public void close() throws InterruptedException {
        this.requestThread.shutdown();
    }

    public void pollOnce() {
        this.requestThread.doWork();
    }

    static AbstractRequest.Builder<? extends AbstractRequest> buildRequest(ApiMessage requestData) {
        if (requestData instanceof VoteRequestData) {
            return new VoteRequest.Builder((VoteRequestData)requestData);
        }
        if (requestData instanceof BeginQuorumEpochRequestData) {
            return new BeginQuorumEpochRequest.Builder((BeginQuorumEpochRequestData)requestData);
        }
        if (requestData instanceof EndQuorumEpochRequestData) {
            return new EndQuorumEpochRequest.Builder((EndQuorumEpochRequestData)requestData);
        }
        if (requestData instanceof FetchRequestData) {
            return new FetchRequest.SimpleBuilder((FetchRequestData)requestData);
        }
        if (requestData instanceof FetchSnapshotRequestData) {
            return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData)requestData);
        }
        if (requestData instanceof UpdateRaftVoterRequestData) {
            return new UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData)requestData);
        }
        if (requestData instanceof ApiVersionsRequestData) {
            return new ApiVersionsRequest.Builder((ApiVersionsRequestData)requestData, ApiKeys.API_VERSIONS.oldestVersion(), ApiKeys.API_VERSIONS.latestVersion());
        }
        throw new IllegalArgumentException("Unexpected type for requestData: " + String.valueOf(requestData));
    }

    static class SendThread
    extends InterBrokerSendThread {
        private final Queue<RequestAndCompletionHandler> queue = new ConcurrentLinkedQueue<RequestAndCompletionHandler>();

        public SendThread(String name, KafkaClient networkClient, int requestTimeoutMs, Time time, boolean isInterruptible) {
            super(name, networkClient, requestTimeoutMs, time, isInterruptible);
        }

        public Collection<RequestAndCompletionHandler> generateRequests() {
            ArrayList<RequestAndCompletionHandler> list = new ArrayList<RequestAndCompletionHandler>();
            RequestAndCompletionHandler request;
            while ((request = this.queue.poll()) != null) {
                list.add(request);
            }
            return list;
        }

        public void sendRequest(RequestAndCompletionHandler request) {
            this.queue.add(request);
            this.wakeup();
        }
    }
}

