package com.Slack.mgr;

import android.content.Context;
import android.os.PowerManager;
import com.Slack.api.ApiResponseError;
import com.Slack.api.SlackApi;
import com.Slack.api.request.ChatPostMessage;
import com.Slack.api.response.ChatPostMessageResponse;
import com.Slack.api.wrappers.MsgChannelApiActions;
import com.Slack.featureflag.Feature;
import com.Slack.featureflag.FeatureFlagStore;
import com.Slack.mgr.msgformatting.MessageEncoder;
import com.Slack.model.EventType;
import com.Slack.model.Message;
import com.Slack.model.MsgState;
import com.Slack.model.PersistedMessageObj;
import com.Slack.ms.MSClient;
import com.Slack.ms.MSClientException;
import com.Slack.ms.MsState;
import com.Slack.ms.bus.MessageDeliveryFailedBusEvent;
import com.Slack.ms.handlers.ReplyEventHandler;
import com.Slack.ms.msevents.ChatMessage;
import com.Slack.ms.msevents.SocketEvent;
import com.Slack.persistence.PersistentStore;
import com.Slack.persistence.bus.ConversationReplyUpdatedBusEvent;
import com.Slack.persistence.bus.MsgChannelMessageUpdated;
import com.Slack.utils.MessageHelper;
import com.Slack.utils.beacon.Beacon;
import com.Slack.utils.beacon.EventTracker;
import com.Slack.utils.rx.MappingFuncs;
import com.Slack.utils.rx.Observers;
import com.google.common.base.Preconditions;
import com.slack.commons.rx.RxRetries;
import com.slack.commons.rx.RxTransformers;
import com.slack.commons.rx.Vacant;
import com.squareup.otto.Bus;
import dagger.Lazy;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import kotlin.jvm.functions.Function1;
import rx.Observable;
import rx.Single;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import timber.log.Timber;

@Singleton
/* loaded from: classes.dex */
public class MessageSendingManager {
    private static final long WAKE_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(2);
    private final Bus bus;
    private final FeatureFlagStore ffs;
    private final MessageEncoder messageEncoder;
    private final MSClient msClient;
    private final Lazy<MsgChannelApiActions> msgChannelApiActionsLazy;
    private final PersistentStore persistentStore;
    private final ReplyEventHandler replyEventHandler;
    private final SlackApi slackApi;
    private PowerManager.WakeLock wakeLock;
    private Thread workerThread;
    private volatile long messageIdBeingProcessed = -1;
    private final LinkedBlockingQueue<ChatMessage> messageQueue = new LinkedBlockingQueue<>();
    private CountDownLatch messageProcessedLatch = new CountDownLatch(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public class MessageSenderWorker implements Runnable {
        private BlockingQueue<ChatMessage> messageQueue;

        public MessageSenderWorker(BlockingQueue<ChatMessage> blockingQueue) {
            this.messageQueue = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            MessageSendingManager.this.markAllPendingMessagesAsFailed();
            while (true) {
                try {
                    ChatMessage take = this.messageQueue.take();
                    MessageSendingManager.this.wakeLock.acquire(MessageSendingManager.WAKE_LOCK_TIMEOUT_MILLIS);
                    if (MessageSendingManager.this.ffs.isEnabled(Feature.API_MESSAGE_SEND)) {
                        MessageSendingManager.this.sendMessageUsingApi(take);
                    } else {
                        MessageSendingManager.this.sendMessage(take);
                    }
                    MessageSendingManager.this.awaitMessageProcessing(100L, TimeUnit.SECONDS);
                    MessageSendingManager.this.wakeLock.release();
                } catch (InterruptedException e) {
                    MessageSendingManager.this.wakeLock.release();
                    Timber.e(e, "The message sending thread was interrupted.", new Object[0]);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public static class RestrictedActionException extends Exception {
        public RestrictedActionException() {
        }

        public RestrictedActionException(Throwable th) {
            super(th);
        }
    }

    public MessageSendingManager(Context context, ReplyEventHandler replyEventHandler, PersistentStore persistentStore, Bus bus, MSClient mSClient, FeatureFlagStore featureFlagStore, MessageEncoder messageEncoder, SlackApi slackApi, Lazy<MsgChannelApiActions> lazy) {
        this.replyEventHandler = replyEventHandler;
        this.persistentStore = persistentStore;
        this.bus = bus;
        this.msClient = mSClient;
        this.ffs = featureFlagStore;
        this.messageEncoder = messageEncoder;
        this.slackApi = slackApi;
        this.msgChannelApiActionsLazy = lazy;
        PowerManager powerManager = (PowerManager) context.getSystemService("power");
        Preconditions.checkNotNull(powerManager);
        this.wakeLock = powerManager.newWakeLock(1, "MessageSendingWakeLock");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean awaitMessageProcessing(long j, TimeUnit timeUnit) throws InterruptedException {
        this.messageProcessedLatch = new CountDownLatch(1);
        return this.messageProcessedLatch.await(j, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void compareAndSetPendingMessageState(String str, Message message, MsgState msgState) {
        if (!(message != null ? this.persistentStore.compareAndSetMessage(str, MsgState.PENDING, message, msgState) : this.persistentStore.compareAndSetMessageState(str, MsgState.PENDING, msgState))) {
            Timber.i("Message: %s is already updated from PENDING state.", str);
            return;
        }
        Timber.d("Changed message: %s state to: %s", str, msgState);
        PersistedMessageObj messageByClientMsgId = this.persistentStore.getMessageByClientMsgId(str);
        if (messageByClientMsgId == null) {
            Timber.e(new RuntimeException("Bad things happened"), "Unable to retrieve message by clientMsgId: %s", str);
        } else {
            notifyAboutMessageUpdate(messageByClientMsgId.getLocalId(), messageByClientMsgId.getModelObj(), messageByClientMsgId.getMsgChannelId());
        }
    }

    private Observable<Vacant> connectedStateObservable() {
        return this.msClient.getMsClientState().doOnNext(new Action1<MsState>() { // from class: com.Slack.mgr.MessageSendingManager.20
            @Override // rx.functions.Action1
            public void call(MsState msState) {
                Timber.d("Received MsState: %s", msState);
            }
        }).filter(new Func1<MsState, Boolean>() { // from class: com.Slack.mgr.MessageSendingManager.19
            @Override // rx.functions.Func1
            public Boolean call(MsState msState) {
                return Boolean.valueOf(msState == MsState.CONNECTED);
            }
        }).map(new Func1<MsState, Vacant>() { // from class: com.Slack.mgr.MessageSendingManager.18
            @Override // rx.functions.Func1
            public Vacant call(MsState msState) {
                return Vacant.INSTANCE;
            }
        });
    }

    private Observable<Vacant> doSendMessage(final ChatMessage chatMessage) {
        return Observable.fromCallable(new Callable<Vacant>() { // from class: com.Slack.mgr.MessageSendingManager.17
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Vacant call() throws Exception {
                Timber.d("Sending message: %d", Long.valueOf(chatMessage.id()));
                MessageSendingManager.this.msClient.sendMessage(chatMessage);
                return Vacant.INSTANCE;
            }
        }).doOnError(new Action1<Throwable>() { // from class: com.Slack.mgr.MessageSendingManager.16
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                Timber.d(th, "doSendMessage onError()", new Object[0]);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drainTheQueueAndMarkPendingMessagesAsFailed() {
        ArrayList arrayList = new ArrayList();
        this.messageQueue.drainTo(arrayList);
        Timber.d("Draining the pending messages queue of size %d and marking messages as failed.", Integer.valueOf(arrayList.size()));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            markPendingMessageAsFailed(((ChatMessage) it.next()).id());
        }
    }

    private Single<ChatMessage> encodeMessage(final ChatMessage chatMessage) {
        return Single.fromCallable(new Callable<ChatMessage>() { // from class: com.Slack.mgr.MessageSendingManager.11
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public ChatMessage call() throws Exception {
                if (MessageSendingManager.this.ffs.isEnabled(Feature.NAME_TAGGING) || MessageSendingManager.this.ffs.isEnabled(Feature.NAME_TAGGING_AUTOCOMPLETE)) {
                    return chatMessage;
                }
                return chatMessage.withText(MessageSendingManager.this.messageEncoder.encodeMessageText(chatMessage.text(), true, chatMessage.channel(), null));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void markAllPendingMessagesAsFailed() {
        Iterator<PersistedMessageObj> it = this.persistentStore.getPendingMessages().iterator();
        while (it.hasNext()) {
            markPendingMessage(it.next(), MsgState.FAILED);
        }
    }

    private void markPendingMessage(PersistedMessageObj persistedMessageObj, MsgState msgState) {
        Preconditions.checkNotNull(persistedMessageObj);
        long localId = persistedMessageObj.getLocalId();
        Timber.d("Marking message: %d as %s.", Long.valueOf(localId), msgState.toString());
        String str = (String) Preconditions.checkNotNull(persistedMessageObj.getModelObj().getChannelId());
        String threadTs = persistedMessageObj.getModelObj().getThreadTs();
        EventTracker.track(Beacon.MESSAGE_SEND_FAIL);
        long markMessageState = this.persistentStore.markMessageState(localId, msgState);
        if (MessageHelper.isExcludedFromChannel(persistedMessageObj.getModelObj())) {
            Preconditions.checkState(threadTs != null);
            this.bus.post(new ConversationReplyUpdatedBusEvent(str, localId, markMessageState, threadTs));
        } else {
            this.bus.post(new MsgChannelMessageUpdated(str, localId, markMessageState, threadTs, persistedMessageObj.getModelObj().getTs()));
        }
        this.bus.post(new MessageDeliveryFailedBusEvent(str, threadTs));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void markPendingMessageAsFailed(long j) {
        PersistedMessageObj messageByLocalId = this.persistentStore.getMessageByLocalId(j);
        if (messageByLocalId != null) {
            markPendingMessage(messageByLocalId, MsgState.FAILED);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void markPendingMessageAsPermanentlyFailed(long j) {
        PersistedMessageObj messageByLocalId = this.persistentStore.getMessageByLocalId(j);
        if (messageByLocalId != null) {
            markPendingMessage(messageByLocalId, MsgState.PERMANENTLY_FAILED);
        }
    }

    private void notifyAboutMessageUpdate(long j, Message message, String str) {
        if (!MessageHelper.isExcludedFromChannel(message)) {
            this.bus.post(new MsgChannelMessageUpdated(str, j, j, message.getThreadTs(), message.getTs(), message.getClientMsgId()));
        } else {
            Preconditions.checkState(message.getThreadTs() != null);
            this.bus.post(new ConversationReplyUpdatedBusEvent(str, j, j, message.getThreadTs()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Single<ChatPostMessageResponse> postChatMessage(final ChatMessage chatMessage) {
        return this.slackApi.chatPostMessage(ChatPostMessage.fromChatMessage(chatMessage)).doOnSubscribe(new Action0() { // from class: com.Slack.mgr.MessageSendingManager.3
            @Override // rx.functions.Action0
            public void call() {
                Timber.d("Sending message with client_msg_id: %s", chatMessage.clientMsgId());
            }
        }).onErrorResumeNext(new Func1<Throwable, Single<? extends ChatPostMessageResponse>>() { // from class: com.Slack.mgr.MessageSendingManager.2
            @Override // rx.functions.Func1
            public Single<? extends ChatPostMessageResponse> call(Throwable th) {
                if (th instanceof ApiResponseError) {
                    ApiResponseError apiResponseError = (ApiResponseError) th;
                    if (apiResponseError.getErrorCode() != null && "restricted_action".equals(apiResponseError.getErrorCode())) {
                        return Single.error(new RestrictedActionException(th));
                    }
                }
                return Single.error(th);
            }
        }).retryWhen(RxRetries.retryConstantBackOffFunc(5L, TimeUnit.SECONDS, 20, new Function1<Throwable, Boolean>() { // from class: com.Slack.mgr.MessageSendingManager.1
            @Override // kotlin.jvm.functions.Function1
            public Boolean invoke(Throwable th) {
                return Boolean.valueOf(!(th instanceof RestrictedActionException));
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMessage(final ChatMessage chatMessage) {
        Timber.d("Attempt a message send for id: %d", Long.valueOf(chatMessage.id()));
        this.messageIdBeingProcessed = chatMessage.id();
        connectedStateObservable().first().toCompletable().andThen(encodeMessage(chatMessage).subscribeOn(Schedulers.io()).toObservable()).flatMap(new Func1<ChatMessage, Observable<Vacant>>() { // from class: com.Slack.mgr.MessageSendingManager.10
            @Override // rx.functions.Func1
            public Observable<Vacant> call(ChatMessage chatMessage2) {
                return MessageSendingManager.this.sendMessageWithAck(chatMessage2);
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { // from class: com.Slack.mgr.MessageSendingManager.9
            @Override // rx.functions.Func1
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, Observable<Throwable>>() { // from class: com.Slack.mgr.MessageSendingManager.9.1
                    @Override // rx.functions.Func1
                    public Observable<Throwable> call(Throwable th) {
                        ReplyEventHandler.MsError error;
                        if (!(th instanceof MSClientException) || (error = ((MSClientException) th).getError()) == null) {
                            return Observable.just(th);
                        }
                        switch (error) {
                            case NON_THREADABLE_CHANNEL:
                            case READ_ONLY_CHANNEL:
                            case THREAD_ONLY_CHANNEL:
                                return Observable.error(th);
                            default:
                                return Observable.just(th);
                        }
                    }
                }).compose(RxTransformers.backOffConstantTransformer(5L, TimeUnit.SECONDS, 20));
            }
        }).timeout(90L, TimeUnit.SECONDS).subscribe(new Action1<Vacant>() { // from class: com.Slack.mgr.MessageSendingManager.7
            @Override // rx.functions.Action1
            public void call(Vacant vacant) {
                Timber.d("All good message with reply_id: %d was delivered", Long.valueOf(chatMessage.id()));
                MessageSendingManager.this.messageIdBeingProcessed = -1L;
                MessageSendingManager.this.messageProcessedLatch.countDown();
            }
        }, new Action1<Throwable>() { // from class: com.Slack.mgr.MessageSendingManager.8
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                MessageSendingManager.this.messageIdBeingProcessed = -1L;
                long id = chatMessage.id();
                Timber.e(th, "Unable to deliver message reply_Id: %d", Long.valueOf(id));
                if ((th instanceof MSClientException) && ((MSClientException) th).getError() != null) {
                    MessageSendingManager.this.markPendingMessageAsPermanentlyFailed(id);
                    MessageSendingManager.this.messageProcessedLatch.countDown();
                } else {
                    MessageSendingManager.this.markPendingMessageAsFailed(id);
                    MessageSendingManager.this.drainTheQueueAndMarkPendingMessagesAsFailed();
                    MessageSendingManager.this.messageProcessedLatch.countDown();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMessageUsingApi(final ChatMessage chatMessage) {
        Timber.d("Attempt a message send using API for clientMsgId: %s", chatMessage.clientMsgId());
        encodeMessage(chatMessage).flatMap(new Func1<ChatMessage, Single<ChatPostMessageResponse>>() { // from class: com.Slack.mgr.MessageSendingManager.6
            @Override // rx.functions.Func1
            public Single<ChatPostMessageResponse> call(ChatMessage chatMessage2) {
                return MessageSendingManager.this.postChatMessage(chatMessage2);
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Action1<ChatPostMessageResponse>() { // from class: com.Slack.mgr.MessageSendingManager.4
            @Override // rx.functions.Action1
            public void call(ChatPostMessageResponse chatPostMessageResponse) {
                Message message = chatPostMessageResponse.getMessage();
                String str = (String) Preconditions.checkNotNull(message.getClientMsgId());
                Timber.d("All good message with clientMsgId: %s was delivered", str);
                MessageSendingManager.this.compareAndSetPendingMessageState(str, message, MsgState.OK_UNSYNCED);
                MessageSendingManager.this.messageProcessedLatch.countDown();
            }
        }, new Action1<Throwable>() { // from class: com.Slack.mgr.MessageSendingManager.5
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                Timber.e(th, "Exception sending a message clientMsgId: %s", chatMessage.clientMsgId());
                if (th instanceof RestrictedActionException) {
                    MessageSendingManager.this.markPendingMessageAsPermanentlyFailed(chatMessage.id());
                    ((MsgChannelApiActions) MessageSendingManager.this.msgChannelApiActionsLazy.get()).fetchChannelFromServerAndUpdatePermissions(chatMessage.channel()).subscribe(Observers.errorLogger());
                    MessageSendingManager.this.messageProcessedLatch.countDown();
                } else {
                    MessageSendingManager.this.compareAndSetPendingMessageState(chatMessage.clientMsgId(), null, MsgState.FAILED);
                    MessageSendingManager.this.drainTheQueueAndMarkPendingMessagesAsFailed();
                    MessageSendingManager.this.messageProcessedLatch.countDown();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Vacant> sendMessageWithAck(ChatMessage chatMessage) {
        return Observable.zip(waitForReplyToMessage(chatMessage.id()), doSendMessage(chatMessage), MappingFuncs.toSecondArg());
    }

    private Observable<Long> waitForReplyToMessage(final long j) {
        return this.replyEventHandler.replyToEventObservable().first(new Func1<SocketEvent, Boolean>() { // from class: com.Slack.mgr.MessageSendingManager.15
            @Override // rx.functions.Func1
            public Boolean call(SocketEvent socketEvent) {
                Long replyToId = socketEvent.getReplyToId();
                return Boolean.valueOf((socketEvent.getType() == EventType.pong || replyToId == null || j != replyToId.longValue()) ? false : true);
            }
        }).flatMap(new Func1<SocketEvent, Observable<Long>>() { // from class: com.Slack.mgr.MessageSendingManager.14
            @Override // rx.functions.Func1
            public Observable<Long> call(SocketEvent socketEvent) {
                if (socketEvent.isOk()) {
                    return Observable.just(socketEvent.getReplyToId());
                }
                SocketEvent.ReplyError error = socketEvent.getError();
                return Observable.error(new MSClientException(error != null ? error.getMsg() : "Can't send message atm.", error != null ? ReplyEventHandler.MsError.fromCode(error.getCode()) : ReplyEventHandler.MsError.UNKNOWN));
            }
        }).doOnSubscribe(new Action0() { // from class: com.Slack.mgr.MessageSendingManager.13
            @Override // rx.functions.Action0
            public void call() {
                Timber.d("Subscribing to replyToObservable", new Object[0]);
            }
        }).timeout(10L, TimeUnit.SECONDS).doOnError(new Action1<Throwable>() { // from class: com.Slack.mgr.MessageSendingManager.12
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                Timber.d(th, "waitForReplyToMessage onError()", new Object[0]);
            }
        });
    }

    public boolean enqueue(ChatMessage chatMessage) {
        if (initWorkerThread()) {
            Timber.wtf(new RuntimeException("The message worker thread died and was re-instantiated"), "The worker thread was re-instantiated.", new Object[0]);
        }
        if (this.messageQueue.contains(chatMessage) || this.messageIdBeingProcessed == chatMessage.id()) {
            Timber.d("Message id: %d is already in the queue.", Long.valueOf(chatMessage.id()));
            return false;
        }
        Timber.d("Enqueuing message localId: %d clientId: %s", Long.valueOf(chatMessage.id()), chatMessage.clientMsgId());
        return this.messageQueue.offer(chatMessage);
    }

    public boolean initWorkerThread() {
        if (this.workerThread != null && this.workerThread.isAlive()) {
            return false;
        }
        this.workerThread = new Thread(new MessageSenderWorker(this.messageQueue));
        this.workerThread.start();
        Timber.d("Initialized a worker thread.", new Object[0]);
        return true;
    }

    public boolean isQueueEmpty() {
        return this.messageQueue.isEmpty() && this.messageIdBeingProcessed == -1;
    }
}
