package org.bytezero.network;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.bson.BasicBSONObject;
import org.bytezero.common.Result;
import org.bytezero.common.ThreadPoolManager;
import org.bytezero.common._F;
import org.bytezero.logger.LoggerFactoryBZ;
import org.slf4j.Logger;

/* loaded from: classes6.dex */
public class ResponseProcessorPool {
    public static final AtomicLong idGenerator = new AtomicLong(0);
    protected final Logger logger = LoggerFactoryBZ.getLogger(getClass());
    private final Map<Long, ResponseProcessor> responseMap = new ConcurrentHashMap();

    private ResponseProcessor getResponseProcessor(long j) {
        return this.responseMap.remove(Long.valueOf(j));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long registerResponse(BasicBSONObject basicBSONObject, ResponseProcessor responseProcessor) {
        if (responseProcessor == null) {
            this.logger.error("未指定回调处理器！");
        }
        long incrementAndGet = idGenerator.incrementAndGet();
        basicBSONObject.put(_F.RequestID, Long.valueOf(incrementAndGet));
        this.responseMap.put(Long.valueOf(incrementAndGet), responseProcessor);
        return incrementAndGet;
    }

    public boolean receive(final SocketHandle socketHandle, final BasicBSONObject basicBSONObject) {
        if (!basicBSONObject.containsField(_F.ResponseID) || basicBSONObject.containsField(_F.RequestID)) {
            return false;
        }
        Long l = (Long) basicBSONObject.get(_F.ResponseID);
        try {
            final ResponseProcessor responseProcessor = getResponseProcessor(l.longValue());
            if (responseProcessor == null) {
                this.logger.info("超时机制已移除" + l + "的回调处理器");
            } else {
                ThreadPoolManager.blockingThreadPool.execute(new Runnable() { // from class: org.bytezero.network.-$$Lambda$ResponseProcessorPool$PQc8RiuUogVF2DC2fLT8uR8MN1g
                    @Override // java.lang.Runnable
                    public final void run() {
                        ResponseProcessor.this.process(Result.from(basicBSONObject), socketHandle);
                    }
                });
            }
            return true;
        } catch (Exception e) {
            this.logger.error("处理" + l + "的回调失败", (Throwable) e);
            return true;
        }
    }

    public void send(final SocketHandle socketHandle, final BasicBSONObject basicBSONObject, final AProcessor aProcessor, final long j) {
        ThreadPoolManager.blockingThreadPool.execute(new Runnable() { // from class: org.bytezero.network.ResponseProcessorPool.1
            @Override // java.lang.Runnable
            public void run() {
                long registerResponse = ResponseProcessorPool.this.registerResponse(basicBSONObject, aProcessor);
                try {
                    try {
                        socketHandle.send(basicBSONObject);
                        synchronized (aProcessor) {
                            aProcessor.wait(j);
                        }
                    } catch (Throwable th) {
                        ResponseProcessorPool.this.responseMap.remove(Long.valueOf(registerResponse));
                        throw th;
                    }
                } catch (IllegalArgumentException | InterruptedException e) {
                    ResponseProcessorPool.this.logger.error("发送异常", e);
                }
                ResponseProcessorPool.this.responseMap.remove(Long.valueOf(registerResponse));
                if (aProcessor.result == null) {
                    aProcessor.aProcess(Result.fail("请求超时"), socketHandle);
                    ResponseProcessorPool.this.logger.warn("send超时未接收到请求ID为" + registerResponse + "的回调");
                }
            }
        });
    }

    public Result sendBlocking(MessageSender messageSender, BasicBSONObject basicBSONObject, long j) {
        ResponseProcessor responseProcessor = new ResponseProcessor();
        long registerResponse = registerResponse(basicBSONObject, responseProcessor);
        try {
            try {
                if (!messageSender.send(basicBSONObject)) {
                    return Result.fail(-1, "本地网络繁忙，请稍后再试");
                }
                synchronized (responseProcessor) {
                    responseProcessor.wait(j);
                }
                this.responseMap.remove(Long.valueOf(registerResponse));
                if (responseProcessor.result == null) {
                    this.logger.warn("sendBlocking超时未接收到请求ID为" + registerResponse + "的回调");
                    return Result.fail(-1, "请求超时");
                }
                this.logger.debug("处理ID为" + registerResponse + "的回调");
                return Result.from(responseProcessor.result);
            } finally {
                this.responseMap.remove(Long.valueOf(registerResponse));
            }
        } catch (IllegalArgumentException | InterruptedException e) {
            this.logger.error("发送异常", (Throwable) e);
            return Result.fail(-1, "请求失败，发生异常：" + e.getMessage());
        }
    }
}
