/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.consistency.persistent.raft;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.EventPublisher;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.NumberUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftListener;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftProxy;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftStore;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.GZIPOutputStream;
import javax.annotation.PostConstruct;
import org.springframework.context.annotation.DependsOn;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;

@Deprecated
@DependsOn(value={"ProtocolManager"})
@Component
public class RaftCore
implements Closeable {
    public static final String API_VOTE = "/v1/ns/raft/vote";
    public static final String API_BEAT = "/v1/ns/raft/beat";
    public static final String API_PUB = "/v1/ns/raft/datum";
    public static final String API_DEL = "/v1/ns/raft/datum";
    public static final String API_GET = "/v1/ns/raft/datum";
    public static final String API_ON_PUB = "/v1/ns/raft/datum/commit";
    public static final String API_ON_DEL = "/v1/ns/raft/datum/commit";
    public static final String API_GET_PEER = "/v1/ns/raft/peer";
    public static final Lock OPERATE_LOCK = new ReentrantLock();
    public static final int PUBLISH_TERM_INCREASE_COUNT = 100;
    private volatile ConcurrentMap<String, List<RecordListener>> listeners = new ConcurrentHashMap<String, List<RecordListener>>();
    private volatile ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<String, Datum>();
    private RaftPeerSet peers;
    private final SwitchDomain switchDomain;
    private final GlobalConfig globalConfig;
    private final RaftProxy raftProxy;
    private final RaftStore raftStore;
    private final ClusterVersionJudgement versionJudgement;
    public final PersistentNotifier notifier;
    private final EventPublisher publisher;
    private final RaftListener raftListener;
    private boolean initialized = false;
    private volatile boolean stopWork = false;
    private ScheduledFuture masterTask = null;
    private ScheduledFuture heartbeatTask = null;

    public RaftCore(RaftPeerSet peers, SwitchDomain switchDomain, GlobalConfig globalConfig, RaftProxy raftProxy, RaftStore raftStore, ClusterVersionJudgement versionJudgement, RaftListener raftListener) {
        this.peers = peers;
        this.switchDomain = switchDomain;
        this.globalConfig = globalConfig;
        this.raftProxy = raftProxy;
        this.raftStore = raftStore;
        this.versionJudgement = versionJudgement;
        this.notifier = new PersistentNotifier(key -> null == this.getDatum((String)key) ? null : (Record)this.getDatum((String)key).value);
        this.publisher = NotifyCenter.registerToPublisher(ValueChangeEvent.class, (int)16384);
        this.raftListener = raftListener;
    }

    @PostConstruct
    public void init() throws Exception {
        Loggers.RAFT.info("initializing Raft sub-system");
        long start = System.currentTimeMillis();
        this.raftStore.loadDatums(this.notifier, this.datums);
        this.setTerm(NumberUtils.toLong((String)this.raftStore.loadMeta().getProperty("term"), (long)0L));
        Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", (Object)this.datums.size(), (Object)this.peers.getTerm());
        this.initialized = true;
        Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (Object)(System.currentTimeMillis() - start));
        this.masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
        this.heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
        this.versionJudgement.registerObserver(isAllNewVersion -> {
            this.stopWork = isAllNewVersion;
            if (this.stopWork) {
                try {
                    this.shutdown();
                    this.raftListener.removeOldRaftMetadata();
                }
                catch (NacosException e) {
                    throw new NacosRuntimeException(500, (Throwable)e);
                }
            }
        }, 100);
        NotifyCenter.registerSubscriber((Subscriber)this.notifier);
        Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", (Object)GlobalExecutor.LEADER_TIMEOUT_MS, (Object)GlobalExecutor.HEARTBEAT_INTERVAL_MS);
    }

    public Map<String, ConcurrentHashSet<RecordListener>> getListeners() {
        return this.notifier.getListeners();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void signalPublish(String key, Record value) throws Exception {
        if (this.stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        if (!this.isLeader()) {
            ObjectNode params = JacksonUtils.createEmptyJsonNode();
            params.put("key", key);
            params.replace("value", JacksonUtils.transferToJsonNode((Object)value));
            HashMap<String, String> parameters = new HashMap<String, String>(1);
            parameters.put("key", key);
            RaftPeer leader = this.getLeader();
            this.raftProxy.proxyPostLarge(leader.ip, "/v1/ns/raft/datum", params.toString(), parameters);
            return;
        }
        OPERATE_LOCK.lock();
        try {
            long start = System.currentTimeMillis();
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            if (this.getDatum(key) == null) {
                datum.timestamp.set(1L);
            } else {
                datum.timestamp.set(this.getDatum((String)key).timestamp.incrementAndGet());
            }
            ObjectNode json = JacksonUtils.createEmptyJsonNode();
            json.replace("datum", JacksonUtils.transferToJsonNode(datum));
            json.replace("source", JacksonUtils.transferToJsonNode((Object)this.peers.local()));
            this.onPublish(datum, this.peers.local());
            String content = json.toString();
            final CountDownLatch latch = new CountDownLatch(this.peers.majorityCount());
            for (final String server : this.peers.allServersIncludeMyself()) {
                if (this.isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                String url = RaftCore.buildUrl(server, "/v1/ns/raft/datum/commit");
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>(){

                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", new Object[]{datum.key, server, result.getCode()});
                            return;
                        }
                        latch.countDown();
                    }

                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                    }

                    public void onCancel() {
                    }
                });
            }
            if (!latch.await(5000L, TimeUnit.MILLISECONDS)) {
                Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", (Object)key);
                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
            }
            long end = System.currentTimeMillis();
            Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (Object)(end - start), (Object)key);
        }
        finally {
            OPERATE_LOCK.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void signalDelete(final String key) throws Exception {
        if (this.stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        OPERATE_LOCK.lock();
        try {
            if (!this.isLeader()) {
                HashMap<String, String> params = new HashMap<String, String>(1);
                params.put("key", URLEncoder.encode(key, "UTF-8"));
                this.raftProxy.proxy(this.getLeader().ip, "/v1/ns/raft/datum", params, HttpMethod.DELETE);
                return;
            }
            Datum datum = new Datum();
            datum.key = key;
            ObjectNode json = JacksonUtils.createEmptyJsonNode();
            json.replace("datum", JacksonUtils.transferToJsonNode(datum));
            json.replace("source", JacksonUtils.transferToJsonNode((Object)this.peers.local()));
            this.onDelete(datum.key, this.peers.local());
            for (final String server : this.peers.allServersWithoutMySelf()) {
                String url = RaftCore.buildUrl(server, "/v1/ns/raft/datum/commit");
                HttpClient.asyncHttpDeleteLarge(url, null, json.toString(), new Callback<String>(){

                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT.warn("[RAFT] failed to delete data from peer, datumId={}, peer={}, http code={}", new Object[]{key, server, result.getCode()});
                            return;
                        }
                        RaftPeer local = RaftCore.this.peers.local();
                        local.resetLeaderDue();
                    }

                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("[RAFT] failed to delete data from peer", throwable);
                    }

                    public void onCancel() {
                    }
                });
            }
        }
        finally {
            OPERATE_LOCK.unlock();
        }
    }

    public void onPublish(Datum datum, RaftPeer source) throws Exception {
        if (this.stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        RaftPeer local = this.peers.local();
        if (datum.value == null) {
            Loggers.RAFT.warn("received empty datum");
            throw new IllegalStateException("received empty datum");
        }
        if (!this.peers.isLeader(source.ip)) {
            Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", (Object)JacksonUtils.toJson((Object)source), (Object)JacksonUtils.toJson((Object)this.getLeader()));
            throw new IllegalStateException("peer(" + source.ip + ") tried to publish data but wasn't leader");
        }
        if (source.term.get() < local.term.get()) {
            Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", (Object)JacksonUtils.toJson((Object)source), (Object)JacksonUtils.toJson((Object)local));
            throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
        }
        local.resetLeaderDue();
        if (KeyBuilder.matchPersistentKey(datum.key)) {
            this.raftStore.write(datum);
        }
        this.datums.put(datum.key, datum);
        if (this.isLeader()) {
            local.term.addAndGet(100L);
        } else if (local.term.get() + 100L > source.term.get()) {
            this.getLeader().term.set(source.term.get());
            local.term.set(this.getLeader().term.get());
        } else {
            local.term.addAndGet(100L);
        }
        this.raftStore.updateTerm(local.term.get());
        NotifyCenter.publishEvent((Event)ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
        Loggers.RAFT.info("data added/updated, key={}, term={}", (Object)datum.key, (Object)local.term);
    }

    public void onDelete(String datumKey, RaftPeer source) throws Exception {
        if (this.stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        RaftPeer local = this.peers.local();
        if (!this.peers.isLeader(source.ip)) {
            Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", (Object)JacksonUtils.toJson((Object)source), (Object)JacksonUtils.toJson((Object)this.getLeader()));
            throw new IllegalStateException("peer(" + source.ip + ") tried to publish data but wasn't leader");
        }
        if (source.term.get() < local.term.get()) {
            Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", (Object)JacksonUtils.toJson((Object)source), (Object)JacksonUtils.toJson((Object)local));
            throw new IllegalStateException("out of date publish, pub-term:" + source.term + ", cur-term: " + local.term);
        }
        local.resetLeaderDue();
        String key = datumKey;
        this.deleteDatum(key);
        if (KeyBuilder.matchServiceMetaKey(key)) {
            if (local.term.get() + 100L > source.term.get()) {
                this.getLeader().term.set(source.term.get());
                local.term.set(this.getLeader().term.get());
            } else {
                local.term.addAndGet(100L);
            }
            this.raftStore.updateTerm(local.term.get());
        }
        Loggers.RAFT.info("data removed, key={}, term={}", (Object)datumKey, (Object)local.term);
    }

    public void shutdown() throws NacosException {
        this.stopWork = true;
        this.raftStore.shutdown();
        this.peers.shutdown();
        Loggers.RAFT.warn("start to close old raft protocol!!!");
        Loggers.RAFT.warn("stop old raft protocol task for notifier");
        NotifyCenter.deregisterSubscriber((Subscriber)this.notifier);
        Loggers.RAFT.warn("stop old raft protocol task for master task");
        this.masterTask.cancel(true);
        Loggers.RAFT.warn("stop old raft protocol task for heartbeat task");
        this.heartbeatTask.cancel(true);
        Loggers.RAFT.warn("clean old cache datum for old raft");
        this.datums.clear();
    }

    public synchronized RaftPeer receivedVote(RaftPeer remote) {
        if (this.stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        if (!this.peers.contains(remote)) {
            throw new IllegalStateException("can not find peer: " + remote.ip);
        }
        RaftPeer local = this.peers.get(NetUtils.localServer());
        if (remote.term.get() <= local.term.get()) {
            String msg = "received illegitimate vote, voter-term:" + remote.term + ", votee-term:" + local.term;
            Loggers.RAFT.info(msg);
            if (StringUtils.isEmpty((String)local.voteFor)) {
                local.voteFor = local.ip;
            }
            return local;
        }
        local.resetLeaderDue();
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term.set(remote.term.get());
        Loggers.RAFT.info("vote {} as leader, term: {}", (Object)remote.ip, (Object)remote.term);
        return local;
    }

    public RaftPeer receivedBeat(JsonNode beat) throws Exception {
        if (this.stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        final RaftPeer local = this.peers.local();
        final RaftPeer remote = new RaftPeer();
        JsonNode peer = beat.get("peer");
        remote.ip = peer.get("ip").asText();
        remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
        remote.term.set(peer.get("term").asLong());
        remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
        remote.leaderDueMs = peer.get("leaderDueMs").asLong();
        remote.voteFor = peer.get("voteFor").asText();
        if (remote.state != RaftPeer.State.LEADER) {
            Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", (Object)remote.state, (Object)JacksonUtils.toJson((Object)remote));
            throw new IllegalArgumentException("invalid state from master, state: " + (Object)((Object)remote.state));
        }
        if (local.term.get() > remote.term.get()) {
            Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}", new Object[]{remote.term.get(), local.term.get(), JacksonUtils.toJson((Object)remote), local.leaderDueMs});
            throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
        }
        if (local.state != RaftPeer.State.FOLLOWER) {
            Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", (Object)JacksonUtils.toJson((Object)remote));
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
        }
        JsonNode beatDatums = beat.get("datums");
        local.resetLeaderDue();
        local.resetHeartbeatDue();
        this.peers.makeLeader(remote);
        if (!this.switchDomain.isSendBeatOnly()) {
            HashMap receivedKeysMap = new HashMap(this.datums.size());
            for (Map.Entry entry : this.datums.entrySet()) {
                receivedKeysMap.put(entry.getKey(), 0);
            }
            ArrayList<String> batch = new ArrayList<String>();
            int processedCount = 0;
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}", new Object[]{beatDatums.size(), this.datums.size(), remote.ip, remote.term, local.term});
            }
            for (Object object : beatDatums) {
                String datumKey;
                ++processedCount;
                JsonNode jsonNode = (JsonNode)object;
                String key = jsonNode.get("key").asText();
                if (KeyBuilder.matchServiceMetaKey(key)) {
                    datumKey = KeyBuilder.detailServiceMetaKey(key);
                } else {
                    if (!KeyBuilder.matchInstanceListKey(key)) continue;
                    datumKey = KeyBuilder.detailInstanceListkey(key);
                }
                long timestamp = jsonNode.get("timestamp").asLong();
                receivedKeysMap.put(datumKey, 1);
                try {
                    if (this.datums.containsKey(datumKey) && ((Datum)this.datums.get((Object)datumKey)).timestamp.get() >= timestamp && processedCount < beatDatums.size()) continue;
                    if (!this.datums.containsKey(datumKey) || ((Datum)this.datums.get((Object)datumKey)).timestamp.get() < timestamp) {
                        batch.add(datumKey);
                    }
                    if (batch.size() < 50 && processedCount < beatDatums.size()) continue;
                    String keys = StringUtils.join(batch, (String)",");
                    if (batch.size() <= 0) continue;
                    Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}", new Object[]{this.getLeader().ip, batch.size(), processedCount, beatDatums.size(), this.datums.size()});
                    String url = RaftCore.buildUrl(remote.ip, "/v1/ns/raft/datum");
                    HashMap<String, String> queryParam = new HashMap<String, String>(1);
                    queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
                    HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                return;
                            }
                            List datumList = (List)JacksonUtils.toObj((String)((String)result.getData()), (TypeReference)new TypeReference<List<JsonNode>>(){});
                            for (JsonNode datumJson : datumList) {
                                Datum newDatum = null;
                                OPERATE_LOCK.lock();
                                try {
                                    Datum<?> oldDatum = RaftCore.this.getDatum(datumJson.get("key").asText());
                                    if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp.get()) {
                                        Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}", new Object[]{datumJson.get("key").asText(), datumJson.get("timestamp").asLong(), oldDatum.timestamp});
                                        continue;
                                    }
                                    if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
                                        Datum serviceDatum = new Datum();
                                        serviceDatum.key = datumJson.get("key").asText();
                                        serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                        serviceDatum.value = (Record)JacksonUtils.toObj((String)datumJson.get("value").toString(), Service.class);
                                        newDatum = serviceDatum;
                                    }
                                    if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
                                        Datum instancesDatum = new Datum();
                                        instancesDatum.key = datumJson.get("key").asText();
                                        instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                        instancesDatum.value = (Record)JacksonUtils.toObj((String)datumJson.get("value").toString(), Instances.class);
                                        newDatum = instancesDatum;
                                    }
                                    if (newDatum == null || newDatum.value == null) {
                                        Loggers.RAFT.error("receive null datum: {}", (Object)datumJson);
                                        continue;
                                    }
                                    RaftCore.this.raftStore.write(newDatum);
                                    RaftCore.this.datums.put(newDatum.key, newDatum);
                                    RaftCore.this.notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
                                    local.resetLeaderDue();
                                    if (local.term.get() + 100L > remote.term.get()) {
                                        RaftCore.this.getLeader().term.set(remote.term.get());
                                        local.term.set(RaftCore.this.getLeader().term.get());
                                    } else {
                                        local.term.addAndGet(100L);
                                    }
                                    RaftCore.this.raftStore.updateTerm(local.term.get());
                                    Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}", new Object[]{newDatum.key, newDatum.timestamp, JacksonUtils.toJson((Object)remote), local.term});
                                }
                                catch (Throwable e) {
                                    Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, (Object)e);
                                }
                                finally {
                                    OPERATE_LOCK.unlock();
                                }
                            }
                            try {
                                TimeUnit.MILLISECONDS.sleep(200L);
                            }
                            catch (InterruptedException e) {
                                Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", (Throwable)e);
                            }
                        }

                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
                        }

                        public void onCancel() {
                        }
                    });
                    batch.clear();
                }
                catch (Exception e) {
                    Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", (Object)datumKey);
                }
            }
            ArrayList deadKeys = new ArrayList();
            for (Map.Entry entry : receivedKeysMap.entrySet()) {
                if ((Integer)entry.getValue() != 0) continue;
                deadKeys.add(entry.getKey());
            }
            for (String string : deadKeys) {
                try {
                    this.deleteDatum(string);
                }
                catch (Exception e) {
                    Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", (Object)string, (Object)e);
                }
            }
        }
        return local;
    }

    public void listen(String key, RecordListener listener) {
        this.notifier.registerListener(key, listener);
        Loggers.RAFT.info("add listener: {}", (Object)key);
        for (Datum datum : this.datums.values()) {
            if (!listener.interests(datum.key)) continue;
            try {
                listener.onChange(datum.key, datum.value);
            }
            catch (Exception e) {
                Loggers.RAFT.error("NACOS-RAFT failed to notify listener", (Throwable)e);
            }
        }
    }

    public void unListen(String key, RecordListener listener) {
        this.notifier.deregisterListener(key, listener);
    }

    public void unListenAll(String key) {
        this.notifier.deregisterAllListener(key);
    }

    public void setTerm(long term) {
        this.peers.setTerm(term);
    }

    public boolean isLeader(String ip) {
        return this.peers.isLeader(ip);
    }

    public boolean isLeader() {
        return this.peers.isLeader(NetUtils.localServer());
    }

    public static String buildUrl(String ip, String api) {
        if (!InternetAddressUtil.containsPort((String)ip)) {
            ip = ip + ":" + EnvUtil.getPort();
        }
        return "http://" + ip + EnvUtil.getContextPath() + api;
    }

    public Datum<?> getDatum(String key) {
        return (Datum)this.datums.get(key);
    }

    public RaftPeer getLeader() {
        return this.peers.getLeader();
    }

    public List<RaftPeer> getPeers() {
        return new ArrayList<RaftPeer>(this.peers.allPeers());
    }

    public RaftPeerSet getPeerSet() {
        return this.peers;
    }

    public void setPeerSet(RaftPeerSet peerSet) {
        this.peers = peerSet;
    }

    public int datumSize() {
        return this.datums.size();
    }

    public void addDatum(Datum datum) {
        this.datums.put(datum.key, datum);
        NotifyCenter.publishEvent((Event)ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
    }

    public void loadDatum(String key) {
        try {
            Datum datum = this.raftStore.load(key);
            if (datum == null) {
                return;
            }
            this.datums.put(key, datum);
        }
        catch (Exception e) {
            Loggers.RAFT.error("load datum failed: " + key, (Throwable)e);
        }
    }

    private void deleteDatum(String key) {
        try {
            Datum deleted = (Datum)this.datums.remove(URLDecoder.decode(key, "UTF-8"));
            if (deleted != null) {
                this.raftStore.delete(deleted);
                Loggers.RAFT.info("datum deleted, key: {}", (Object)key);
            }
            NotifyCenter.publishEvent((Event)ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(DataOperation.DELETE).build());
        }
        catch (UnsupportedEncodingException e) {
            Loggers.RAFT.warn("datum key decode failed: {}", (Object)key);
        }
    }

    public boolean isInitialized() {
        return this.initialized || !this.globalConfig.isDataWarmup();
    }

    @Deprecated
    public int getNotifyTaskCount() {
        return (int)this.publisher.currentEventSize();
    }

    public class HeartBeat
    implements Runnable {
        @Override
        public void run() {
            try {
                if (RaftCore.this.stopWork) {
                    return;
                }
                if (!RaftCore.this.peers.isReady()) {
                    return;
                }
                RaftPeer local = RaftCore.this.peers.local();
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0L) {
                    return;
                }
                local.resetHeartbeatDue();
                this.sendBeat();
            }
            catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while sending beat {}", (Throwable)e);
            }
        }

        private void sendBeat() throws IOException, InterruptedException {
            RaftPeer local = RaftCore.this.peers.local();
            if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
                return;
            }
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("[RAFT] send beat with {} keys.", (Object)RaftCore.this.datums.size());
            }
            local.resetLeaderDue();
            ObjectNode packet = JacksonUtils.createEmptyJsonNode();
            packet.replace("peer", JacksonUtils.transferToJsonNode((Object)local));
            ArrayNode array = JacksonUtils.createEmptyArrayNode();
            if (RaftCore.this.switchDomain.isSendBeatOnly()) {
                Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", (Object)RaftCore.this.switchDomain.isSendBeatOnly());
            }
            if (!RaftCore.this.switchDomain.isSendBeatOnly()) {
                for (Datum datum : RaftCore.this.datums.values()) {
                    ObjectNode element = JacksonUtils.createEmptyJsonNode();
                    if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                        element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                    } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                        element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                    }
                    element.put("timestamp", datum.timestamp.get());
                    array.add((JsonNode)element);
                }
            }
            packet.replace("datums", (JsonNode)array);
            HashMap<String, String> params = new HashMap<String, String>(1);
            params.put("beat", JacksonUtils.toJson((Object)packet));
            String content = JacksonUtils.toJson(params);
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(out);
            gzip.write(content.getBytes(StandardCharsets.UTF_8));
            gzip.close();
            byte[] compressedBytes = out.toByteArray();
            String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", (Object)content.length(), (Object)compressedContent.length());
            }
            for (final String server : RaftCore.this.peers.allServersWithoutMySelf()) {
                try {
                    final String url = RaftCore.buildUrl(server, RaftCore.API_BEAT);
                    if (Loggers.RAFT.isDebugEnabled()) {
                        Loggers.RAFT.debug("send beat to server " + server);
                    }
                    HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>(){

                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", (Object)result.getCode(), (Object)server);
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                return;
                            }
                            RaftCore.this.peers.update((RaftPeer)JacksonUtils.toObj((String)((String)result.getData()), RaftPeer.class));
                            if (Loggers.RAFT.isDebugEnabled()) {
                                Loggers.RAFT.debug("receive beat response from: {}", (Object)url);
                            }
                        }

                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", (Object)server, (Object)throwable);
                            MetricsMonitor.getLeaderSendBeatFailedException().increment();
                        }

                        public void onCancel() {
                        }
                    });
                }
                catch (Exception e) {
                    Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", (Object)server, (Object)e);
                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                }
            }
        }
    }

    public class MasterElection
    implements Runnable {
        @Override
        public void run() {
            try {
                if (RaftCore.this.stopWork) {
                    return;
                }
                if (!RaftCore.this.peers.isReady()) {
                    return;
                }
                RaftPeer local = RaftCore.this.peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.leaderDueMs > 0L) {
                    return;
                }
                local.resetLeaderDue();
                local.resetHeartbeatDue();
                this.sendVote();
            }
            catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}", (Throwable)e);
            }
        }

        private void sendVote() {
            RaftPeer local = RaftCore.this.peers.get(NetUtils.localServer());
            Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", (Object)JacksonUtils.toJson((Object)RaftCore.this.getLeader()), (Object)local.term);
            RaftCore.this.peers.reset();
            local.term.incrementAndGet();
            local.voteFor = local.ip;
            local.state = RaftPeer.State.CANDIDATE;
            HashMap<String, String> params = new HashMap<String, String>(1);
            params.put("vote", JacksonUtils.toJson((Object)local));
            for (final String server : RaftCore.this.peers.allServersWithoutMySelf()) {
                final String url = RaftCore.buildUrl(server, RaftCore.API_VOTE);
                try {
                    HttpClient.asyncHttpPost(url, null, params, new Callback<String>(){

                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", (Object)result.getCode(), (Object)url);
                                return;
                            }
                            RaftPeer peer = (RaftPeer)JacksonUtils.toObj((String)((String)result.getData()), RaftPeer.class);
                            Loggers.RAFT.info("received approve from peer: {}", (Object)JacksonUtils.toJson((Object)peer));
                            RaftCore.this.peers.decideLeader(peer);
                        }

                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("error while sending vote to server: {}", (Object)server, (Object)throwable);
                        }

                        public void onCancel() {
                        }
                    });
                }
                catch (Exception e) {
                    Loggers.RAFT.warn("error while sending vote to server: {}", (Object)server);
                }
            }
        }
    }
}

