package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes2.dex */
public class GridDhtPartitionSupplyPool<K, V> {
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ReadWriteLock busyLock;
    private final GridCacheContext<K, V> cctx;
    private final boolean depEnabled;
    private final IgniteLogger log;
    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
    private GridDhtPartitionTopology top;
    private final Collection<GridDhtPartitionSupplyPool<K, V>.SupplyWorker> workers = new LinkedList();
    private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes2.dex */
    public static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
        private static final long serialVersionUID = 0;

        public DemandMessage() {
        }

        DemandMessage(UUID uuid, GridDhtPartitionDemandMessage gridDhtPartitionDemandMessage) {
            super(uuid, gridDhtPartitionDemandMessage);
        }

        public GridDhtPartitionDemandMessage message() {
            return get2();
        }

        UUID senderId() {
            return get1();
        }

        @Override // org.apache.ignite.lang.IgniteBiTuple
        public String toString() {
            return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes2.dex */
    public class SupplyWorker extends GridWorker {
        static final /* synthetic */ boolean $assertionsDisabled;
        private IgniteLogger log;

        static {
            $assertionsDisabled = !GridDhtPartitionSupplyPool.class.desiredAssertionStatus();
        }

        private SupplyWorker() {
            super(GridDhtPartitionSupplyPool.this.cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
            this.log = GridDhtPartitionSupplyPool.this.log;
        }

        /* JADX WARN: Code restructure failed: missing block: B:173:0x048e, code lost:
        
            r14.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:174:0x0491, code lost:
        
            r16.release();
         */
        /* JADX WARN: Code restructure failed: missing block: B:175:0x0494, code lost:
        
            if (r25 == null) goto L110;
         */
        /* JADX WARN: Code restructure failed: missing block: B:176:0x0496, code lost:
        
            r31.this$0.cctx.swap().removeOffHeapListener(r17.intValue(), r25);
            r31.this$0.cctx.swap().removeSwapListener(r17.intValue(), r25);
         */
        /* JADX WARN: Code restructure failed: missing block: B:178:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:228:0x085e, code lost:
        
            r22 = r23;
         */
        /* JADX WARN: Code restructure failed: missing block: B:243:0x070a, code lost:
        
            reply(r33, r7, r22);
         */
        /* JADX WARN: Code restructure failed: missing block: B:244:?, code lost:
        
            return;
         */
        /* JADX WARN: Removed duplicated region for block: B:45:0x0343 A[Catch: IgniteCheckedException -> 0x00f8, TryCatch #3 {IgniteCheckedException -> 0x00f8, blocks: (B:14:0x0052, B:15:0x0073, B:17:0x0079, B:19:0x0099, B:21:0x00a5, B:43:0x033e, B:45:0x0343, B:46:0x0379, B:240:0x06cf, B:242:0x06d4, B:232:0x080b, B:235:0x0810, B:24:0x00ab, B:27:0x00c2, B:243:0x070a), top: B:13:0x0052 }] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private void processMessage(org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyPool.DemandMessage r32, org.apache.ignite.cluster.ClusterNode r33) {
            /*
                Method dump skipped, instructions count: 2166
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyPool.SupplyWorker.processMessage(org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyPool$DemandMessage, org.apache.ignite.cluster.ClusterNode):void");
        }

        private boolean reply(ClusterNode clusterNode, GridDhtPartitionDemandMessage gridDhtPartitionDemandMessage, GridDhtPartitionSupplyMessage gridDhtPartitionSupplyMessage) throws IgniteCheckedException {
            try {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Replying to partition demand [node=" + clusterNode.id() + ", demand=" + gridDhtPartitionDemandMessage + ", supply=" + gridDhtPartitionSupplyMessage + ']');
                }
                GridDhtPartitionSupplyPool.this.cctx.io().sendOrderedMessage(clusterNode, gridDhtPartitionDemandMessage.topic(), gridDhtPartitionSupplyMessage, GridDhtPartitionSupplyPool.this.cctx.ioPolicy(), gridDhtPartitionDemandMessage.timeout());
                return true;
            } catch (ClusterTopologyCheckedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to send partition supply message because node left grid: " + clusterNode.id());
                }
                return false;
            }
        }

        @Override // org.apache.ignite.internal.util.worker.GridWorker
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            while (!isCancelled()) {
                DemandMessage demandMessage = (DemandMessage) GridDhtPartitionSupplyPool.this.poll(GridDhtPartitionSupplyPool.this.queue, this);
                if (demandMessage != null) {
                    ClusterNode node = GridDhtPartitionSupplyPool.this.cctx.discovery().node(demandMessage.senderId());
                    if (node != null) {
                        processMessage(demandMessage, node);
                    } else if (this.log.isDebugEnabled()) {
                        this.log.debug("Received message from non-existing node (will ignore): " + demandMessage);
                    }
                }
            }
        }
    }

    static {
        $assertionsDisabled = !GridDhtPartitionSupplyPool.class.desiredAssertionStatus();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GridDhtPartitionSupplyPool(GridCacheContext<K, V> gridCacheContext, ReadWriteLock readWriteLock) {
        if (!$assertionsDisabled && gridCacheContext == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && readWriteLock == null) {
            throw new AssertionError();
        }
        this.cctx = gridCacheContext;
        this.busyLock = readWriteLock;
        this.log = gridCacheContext.logger(getClass());
        this.top = gridCacheContext.dht().topology();
        int rebalanceThreadPoolSize = gridCacheContext.rebalanceEnabled() ? gridCacheContext.config().getRebalanceThreadPoolSize() : 0;
        for (int i = 0; i < rebalanceThreadPoolSize; i++) {
            this.workers.add(new SupplyWorker());
        }
        gridCacheContext.io().addHandler(gridCacheContext.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { // from class: org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyPool.1
            @Override // org.apache.ignite.lang.IgniteBiInClosure
            public void apply(UUID uuid, GridDhtPartitionDemandMessage gridDhtPartitionDemandMessage) {
                GridDhtPartitionSupplyPool.this.processDemandMessage(uuid, gridDhtPartitionDemandMessage);
            }
        });
        this.depEnabled = gridCacheContext.gridDeploy().enabled();
    }

    private boolean enterBusy() {
        if (this.busyLock.readLock().tryLock()) {
            return true;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Failed to enter to busy state (supplier is stopping): " + this.cctx.nodeId());
        }
        return false;
    }

    private void leaveBusy() {
        this.busyLock.readLock().unlock();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public <T> T poll(BlockingQueue<T> blockingQueue, GridWorker gridWorker) throws InterruptedException {
        if (!$assertionsDisabled && gridWorker == null) {
            throw new AssertionError();
        }
        if (gridWorker.isCancelled()) {
            Thread.currentThread().interrupt();
        }
        return blockingQueue.poll(2000L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processDemandMessage(UUID uuid, GridDhtPartitionDemandMessage gridDhtPartitionDemandMessage) {
        if (enterBusy()) {
            try {
                if (this.cctx.rebalanceEnabled()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Received partition demand [node=" + uuid + ", demand=" + gridDhtPartitionDemandMessage + ']');
                    }
                    this.queue.offer(new DemandMessage(uuid, gridDhtPartitionDemandMessage));
                } else {
                    U.warn(this.log, "Received partition demand message when rebalancing is disabled (will ignore): " + gridDhtPartitionDemandMessage);
                }
            } finally {
                leaveBusy();
            }
        }
    }

    int poolSize() {
        return this.cctx.config().getRebalanceThreadPoolSize();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> ignitePredicate) {
        this.preloadPred = ignitePredicate;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        Iterator<GridDhtPartitionSupplyPool<K, V>.SupplyWorker> it = this.workers.iterator();
        while (it.hasNext()) {
            new IgniteThread(this.cctx.gridName(), "preloader-supply-worker", it.next()).start();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        U.cancel(this.workers);
        U.join(this.workers, this.log);
        this.top = null;
    }
}
