package de.rcenvironment.core.communication.transport.jms.common;

import de.rcenvironment.core.communication.channel.MessageChannelState;
import de.rcenvironment.core.communication.common.InstanceNodeSessionId;
import de.rcenvironment.core.communication.model.NetworkRequest;
import de.rcenvironment.core.communication.model.NetworkResponse;
import de.rcenvironment.core.communication.protocol.NetworkResponseFactory;
import de.rcenvironment.core.communication.transport.jms.common.NonBlockingResponseInboxConsumer;
import de.rcenvironment.core.communication.transport.spi.AbstractMessageChannel;
import de.rcenvironment.core.communication.transport.spi.MessageChannelResponseHandler;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.toolkitbridge.transitional.StatsCounter;
import de.rcenvironment.core.utils.common.LogUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.incubator.DebugSettings;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncTaskService;
import de.rcenvironment.toolkit.modules.concurrency.api.TaskDescription;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/communication/transport/jms/common/AbstractJmsMessageChannel.class */
public abstract class AbstractJmsMessageChannel extends AbstractMessageChannel implements JmsMessageChannel {
    protected Connection connection;
    protected InstanceNodeSessionId localNodeId;
    private String outgoingRequestQueueName;
    private String shutdownSecurityToken;
    private String sharedResponseQueueName;
    private RequestSender requestSender;
    private NonBlockingResponseInboxConsumer responseInboxConsumer;
    protected final AsyncTaskService threadPool = ConcurrencyUtils.getAsyncTaskService();
    protected final Log log = LogFactory.getLog(getClass());
    private final boolean verboseRequestLoggingEnabled = DebugSettings.getVerboseLoggingEnabled("NetworkRequests");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/rcenvironment/core/communication/transport/jms/common/AbstractJmsMessageChannel$RequestSender.class */
    public final class RequestSender implements Runnable {
        private Session jmsSession;
        private Queue jmsDestinationQueue;
        private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        private volatile boolean cancelled = false;

        RequestSender(String str, Connection connection) {
        }

        public String getCategoryName() {
            return "JMS Network Transport: Message channel request sender";
        }

        @Override // java.lang.Runnable
        @TaskDescription("JMS Network Transport: Message channel request sender")
        public void run() {
            try {
                try {
                    this.jmsSession = AbstractJmsMessageChannel.this.connection.createSession(false, 1);
                    this.jmsDestinationQueue = this.jmsSession.createQueue(AbstractJmsMessageChannel.this.outgoingRequestQueueName);
                    runDispatchLoop();
                    try {
                        if (this.jmsSession != null) {
                            this.jmsSession.close();
                        }
                    } catch (JMSException e) {
                        AbstractJmsMessageChannel.this.log.error("Error closing JMS session after running request sender loop", e);
                    }
                } catch (Throwable th) {
                    try {
                        if (this.jmsSession != null) {
                            this.jmsSession.close();
                        }
                        throw th;
                    } catch (JMSException e2) {
                        AbstractJmsMessageChannel.this.log.error("Error closing JMS session after running request sender loop", e2);
                    }
                }
            } catch (JMSException e3) {
                AbstractJmsMessageChannel.this.log.error("Error creating JMS session or destination for request sender loop", e3);
                try {
                    if (this.jmsSession != null) {
                        this.jmsSession.close();
                    }
                } catch (JMSException e4) {
                    AbstractJmsMessageChannel.this.log.error("Error closing JMS session after running request sender loop", e4);
                }
            }
        }

        private void runDispatchLoop() {
            while (true) {
                try {
                    Runnable take = this.queue.take();
                    if (this.cancelled) {
                        AbstractJmsMessageChannel.this.log.debug("Clean request sender shutdown");
                        return;
                    }
                    take.run();
                } catch (InterruptedException unused) {
                    AbstractJmsMessageChannel.this.log.warn("Request sender interrupted; shutting down");
                    return;
                }
            }
        }

        void enqueue(final NetworkRequest networkRequest, final MessageChannelResponseHandler messageChannelResponseHandler, final int i) {
            final long currentTimeMillis = System.currentTimeMillis();
            this.queue.add(new Runnable() { // from class: de.rcenvironment.core.communication.transport.jms.common.AbstractJmsMessageChannel.RequestSender.1
                private static final int SIZE_CATEGORY_DIVISOR = 102400;

                @Override // java.lang.Runnable
                public void run() {
                    AbstractJmsMessageChannel.this.sendNonBlockingRequest(RequestSender.this.jmsSession, RequestSender.this.jmsDestinationQueue, networkRequest, messageChannelResponseHandler, i);
                    StatsCounter.registerValue("Messaging: Outgoing request queue transit time", StringUtils.format("Size range: %1$s00..%1$s99 kiB", new Object[]{Integer.valueOf(networkRequest.getContentBytes().length / SIZE_CATEGORY_DIVISOR)}), System.currentTimeMillis() - currentTimeMillis);
                }
            });
        }

        public void shutdown() {
            int size = this.queue.size();
            this.queue.clear();
            this.cancelled = true;
            enqueue(null, null, 0);
            if (size != 0) {
                AbstractJmsMessageChannel.this.log.debug(StringUtils.format("Discarded %d pending requests for %s as channel %s is shutting down", new Object[]{Integer.valueOf(size), AbstractJmsMessageChannel.this.getRemoteNodeInformation().getInstanceNodeSessionId(), AbstractJmsMessageChannel.this.getChannelId()}));
            }
        }
    }

    public AbstractJmsMessageChannel(InstanceNodeSessionId instanceNodeSessionId) {
        this.localNodeId = instanceNodeSessionId;
    }

    public void sendRequest(NetworkRequest networkRequest, MessageChannelResponseHandler messageChannelResponseHandler, int i) {
        this.requestSender.enqueue(networkRequest, messageChannelResponseHandler, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendShutdownMessageToRemoteRequestInbox() {
        try {
            Session createSession = this.connection.createSession(false, 1);
            try {
                createSession.createProducer(createSession.createQueue(this.outgoingRequestQueueName)).send(JmsProtocolUtils.createChannelShutdownMessage(createSession, getChannelId(), this.shutdownSecurityToken));
                createSession.close();
            } catch (Throwable th) {
                createSession.close();
                throw th;
            }
        } catch (JMSException e) {
            this.log.debug("Failed to send shutdown message while closing channel " + getChannelId(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void asyncSendShutdownMessageToB2CJmsQueue() throws JMSException {
        this.threadPool.execute("JMS Network Transport: Send shutdown signal to Client-to-Broker queue", () -> {
            try {
                Session createSession = this.connection.createSession(false, 1);
                try {
                    createSession.createProducer(createSession.createQueue(this.outgoingRequestQueueName)).send(JmsProtocolUtils.createQueueShutdownMessage(createSession, this.shutdownSecurityToken));
                    createSession.close();
                } catch (Throwable th) {
                    createSession.close();
                    throw th;
                }
            } catch (JMSException e) {
                String jMSException = e.toString();
                if (jMSException.contains("")) {
                    return;
                }
                this.log.warn(StringUtils.format("Exception on sending shutdown signal to Client-to-Broker JMS queue %s: %s", new Object[]{this.outgoingRequestQueueName, jMSException}));
            }
        });
    }

    @Override // de.rcenvironment.core.communication.transport.jms.common.JmsMessageChannel
    public String getOutgoingRequestQueueName() {
        return this.outgoingRequestQueueName;
    }

    @Override // de.rcenvironment.core.communication.transport.jms.common.JmsMessageChannel
    public void setupNonBlockingRequestSending(String str, String str2) throws JMSException {
        this.log.debug(StringUtils.format("Setting outgoing request queue for channel %s to %s", new Object[]{getChannelId(), str}));
        this.outgoingRequestQueueName = str;
        startRequestSender(StringUtils.format("Request Sender for channel %s @ %s", new Object[]{getChannelId(), str}));
        this.log.debug(StringUtils.format("Setting incoming response queue for channel %s to %s", new Object[]{getChannelId(), str2}));
        this.sharedResponseQueueName = str2;
        startResponseConsumer(StringUtils.format("Response Inbox Consumer for channel %s @ %s", new Object[]{getChannelId(), str2}));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onClosedOrBroken() {
        if (this.requestSender != null) {
            this.requestSender.shutdown();
        }
        try {
            if (this.responseInboxConsumer != null) {
                this.responseInboxConsumer.triggerShutDown();
            }
        } catch (JMSException e) {
            this.log.warn("Error while shutting down response consumer for channel " + getChannelId(), e);
        }
    }

    private void startRequestSender(String str) throws JMSException {
        this.requestSender = new RequestSender(this.outgoingRequestQueueName, this.connection);
        this.threadPool.execute(this.requestSender.getCategoryName(), str, this.requestSender);
    }

    private void startResponseConsumer(String str) throws JMSException {
        this.responseInboxConsumer = new NonBlockingResponseInboxConsumer(this.sharedResponseQueueName, this.connection);
        this.threadPool.execute("JMS Network Transport: Non-blocking response listener", str, this.responseInboxConsumer);
    }

    @Override // de.rcenvironment.core.communication.transport.jms.common.JmsMessageChannel
    public void setShutdownSecurityToken(String str) {
        this.shutdownSecurityToken = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getShutdownSecurityToken() {
        return this.shutdownSecurityToken;
    }

    private void spawnBlockingRequestResponseTask(NetworkRequest networkRequest, MessageChannelResponseHandler messageChannelResponseHandler, int i) {
        this.threadPool.execute("JMS Network Transport: blocking request/response", () -> {
            if (isReadyToUse()) {
                performBlockingRequestResponse(networkRequest, messageChannelResponseHandler, i);
            } else {
                messageChannelResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForCloseOrBrokenChannelDuringRequestDelivery(networkRequest, this.localNodeId, (String) null));
            }
        });
    }

    private void performBlockingRequestResponse(NetworkRequest networkRequest, MessageChannelResponseHandler messageChannelResponseHandler, int i) {
        try {
            Session createSession = this.connection.createSession(false, 1);
            try {
                messageChannelResponseHandler.onResponseAvailable(JmsProtocolUtils.createNetworkResponseFromMessage(performBlockingJmsRequestResponse(createSession, JmsProtocolUtils.createMessageFromNetworkRequest(networkRequest, createSession), createSession.createQueue(this.outgoingRequestQueueName), i), networkRequest));
                createSession.close();
            } catch (Throwable th) {
                createSession.close();
                throw th;
            }
        } catch (TimeoutException e) {
            this.log.debug(StringUtils.format("Timeout while waiting for response to request '%s' of type '%s': %s", new Object[]{networkRequest.getRequestId(), networkRequest.getMessageType(), e.getMessage()}));
            messageChannelResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForTimeoutWaitingForResponse(networkRequest, this.localNodeId));
        } catch (JMSException e2) {
            messageChannelResponseHandler.onChannelBroken(networkRequest, this);
            messageChannelResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForErrorDuringDelivery(networkRequest, this.localNodeId, LogUtils.logErrorAndAssignUniqueMarker(this.log, StringUtils.format("Error sending JMS message via channel %s; channel will be marked as broken (exception: %s) ", new Object[]{getChannelId(), e2.toString()}))));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Message performBlockingJmsRequestResponse(Session session, Message message, Queue queue, int i) throws JMSException, TimeoutException {
        TemporaryQueue createTemporaryQueue = session.createTemporaryQueue();
        try {
            message.setJMSReplyTo(createTemporaryQueue);
            sendRequest(session, message, queue);
            Message receiveResponse = receiveResponse(session, i, createTemporaryQueue);
            try {
                createTemporaryQueue.delete();
            } catch (JMSException e) {
                this.log.debug(StringUtils.format("Exception on deleting a temporary response queue for channel %s (%s - %s): %s", new Object[]{getChannelId(), createTemporaryQueue.getQueueName(), getState(), e.toString()}));
            }
            return receiveResponse;
        } catch (Throwable th) {
            try {
                createTemporaryQueue.delete();
            } catch (JMSException e2) {
                this.log.debug(StringUtils.format("Exception on deleting a temporary response queue for channel %s (%s - %s): %s", new Object[]{getChannelId(), createTemporaryQueue.getQueueName(), getState(), e2.toString()}));
            }
            throw th;
        }
    }

    private void sendNonBlockingRequestInTempSession(NetworkRequest networkRequest, MessageChannelResponseHandler messageChannelResponseHandler, int i) {
        Session session = null;
        try {
            try {
                session = this.connection.createSession(false, 1);
                sendNonBlockingRequest(session, session.createQueue(this.outgoingRequestQueueName), networkRequest, messageChannelResponseHandler, i);
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e) {
                        this.log.error("Error closing JMS session after message sending", e);
                    }
                }
            } catch (Throwable th) {
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e2) {
                        this.log.error("Error closing JMS session after message sending", e2);
                        return;
                    }
                }
                throw th;
            }
        } catch (JMSException e3) {
            this.log.error("Error creating JMS session or destination for message sending", e3);
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e4) {
                    this.log.error("Error closing JMS session after message sending", e4);
                }
            }
        }
    }

    private void sendNonBlockingRequest(Session session, Queue queue, final NetworkRequest networkRequest, final MessageChannelResponseHandler messageChannelResponseHandler, int i) {
        try {
            final int length = networkRequest.getContentBytes().length;
            if (this.verboseRequestLoggingEnabled) {
                this.log.debug(StringUtils.format("Sending request   %s: type %s, payload length %d", new Object[]{networkRequest.getRequestId(), networkRequest.getMessageType(), Integer.valueOf(length)}));
            }
            if (length >= 1048576) {
                this.log.debug(StringUtils.format("Sending large network request %s towards recipient %s: type %s, payload length %d", new Object[]{networkRequest.getRequestId(), networkRequest.accessMetaData().getFinalRecipient(), networkRequest.getMessageType(), Integer.valueOf(length)}));
            }
            Message createMessageFromNetworkRequest = JmsProtocolUtils.createMessageFromNetworkRequest(networkRequest, session);
            createMessageFromNetworkRequest.setJMSReplyTo(session.createQueue(this.sharedResponseQueueName));
            sendRequest(session, createMessageFromNetworkRequest, queue);
            this.responseInboxConsumer.registerResponseListener(createMessageFromNetworkRequest.getJMSMessageID(), new NonBlockingResponseInboxConsumer.JmsResponseCallback() { // from class: de.rcenvironment.core.communication.transport.jms.common.AbstractJmsMessageChannel.1
                @Override // de.rcenvironment.core.communication.transport.jms.common.NonBlockingResponseInboxConsumer.JmsResponseCallback
                public void onResponseReceived(Message message) {
                    try {
                        NetworkResponse createNetworkResponseFromMessage = JmsProtocolUtils.createNetworkResponseFromMessage(message, networkRequest);
                        int length2 = createNetworkResponseFromMessage.getContentBytes().length;
                        if (AbstractJmsMessageChannel.this.verboseRequestLoggingEnabled) {
                            AbstractJmsMessageChannel.this.log.debug(StringUtils.format("Received response %s from %s: response payload length is %d", new Object[]{createNetworkResponseFromMessage.getRequestId(), networkRequest.accessMetaData().getFinalRecipient(), Integer.valueOf(length2)}));
                        }
                        if (length2 >= 1048576) {
                            AbstractJmsMessageChannel.this.log.debug(StringUtils.format("Received large network response %s from %s: request type was %s, response payload length is %d", new Object[]{createNetworkResponseFromMessage.getRequestId(), networkRequest.accessMetaData().getFinalRecipient(), networkRequest.getMessageType(), Integer.valueOf(length2)}));
                        } else if (length >= 1048576) {
                            AbstractJmsMessageChannel.this.log.debug(StringUtils.format("Received network response %s for a large request sent to %s: request type was %s, response payload length is %d", new Object[]{createNetworkResponseFromMessage.getRequestId(), networkRequest.accessMetaData().getFinalRecipient(), networkRequest.getMessageType(), Integer.valueOf(length2)}));
                        }
                        messageChannelResponseHandler.onResponseAvailable(createNetworkResponseFromMessage);
                    } catch (JMSException e) {
                        messageChannelResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForErrorDuringDelivery(networkRequest, AbstractJmsMessageChannel.this.localNodeId, LogUtils.logExceptionWithStacktraceAndAssignUniqueMarker(AbstractJmsMessageChannel.this.log, "JMS exception while parsing response message", e)));
                    }
                }

                @Override // de.rcenvironment.core.communication.transport.jms.common.NonBlockingResponseInboxConsumer.JmsResponseCallback
                public void onTimeoutReached() {
                    AbstractJmsMessageChannel.this.log.debug(StringUtils.format("Timeout reached while waiting for response to request '%s' of type '%s'", new Object[]{networkRequest.getRequestId(), networkRequest.getMessageType()}));
                    messageChannelResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForTimeoutWaitingForResponse(networkRequest, AbstractJmsMessageChannel.this.localNodeId));
                }

                @Override // de.rcenvironment.core.communication.transport.jms.common.NonBlockingResponseInboxConsumer.JmsResponseCallback
                public void onChannelClosed() {
                    AbstractJmsMessageChannel.this.log.debug(StringUtils.format("Message channel closed while waiting for response to request '%s' of type '%s'", new Object[]{networkRequest.getRequestId(), networkRequest.getMessageType()}));
                    messageChannelResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForChannelCloseWhileWaitingForResponse(networkRequest, AbstractJmsMessageChannel.this.localNodeId, (String) null));
                }
            }, i);
        } catch (JMSException e) {
            messageChannelResponseHandler.onChannelBroken(networkRequest, this);
            messageChannelResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForErrorDuringDelivery(networkRequest, this.localNodeId, LogUtils.logErrorAndAssignUniqueMarker(this.log, StringUtils.format("Error sending JMS message via channel %s; channel will be marked as broken (exception: %s) ", new Object[]{getChannelId(), e.toString()}))));
        }
    }

    private void sendRequest(Session session, Message message, Queue queue) throws JMSException {
        JmsProtocolUtils.sendWithTransientProducer(session, message, queue);
    }

    private Message receiveResponse(Session session, int i, TemporaryQueue temporaryQueue) throws JMSException, TimeoutException {
        MessageConsumer createConsumer = session.createConsumer(temporaryQueue);
        try {
            Message receive = createConsumer.receive(i);
            if (receive != null) {
                return receive;
            }
            MessageChannelState state = getState();
            if (state == MessageChannelState.CLOSED || state == MessageChannelState.MARKED_AS_BROKEN) {
                throw new TimeoutException(StringUtils.format("Received JMS exception while waiting for a response from message channel %s (on queue %s), which is already %s", new Object[]{getChannelId(), temporaryQueue.getQueueName(), state}));
            }
            throw new TimeoutException(StringUtils.format("Timeout (%d ms) exceeded while waiting for a response from message channel %s (on queue %s), which is in state %s", new Object[]{Integer.valueOf(i), getChannelId(), temporaryQueue.getQueueName(), state}));
        } finally {
            createConsumer.close();
        }
    }
}
