package de.rcenvironment.core.communication.transport.jms.activemq.internal;

import de.rcenvironment.core.communication.channel.ServerContactPoint;
import de.rcenvironment.core.communication.model.NetworkContactPoint;
import de.rcenvironment.core.communication.transport.jms.common.InitialInboxConsumer;
import de.rcenvironment.core.communication.transport.jms.common.JmsBroker;
import de.rcenvironment.core.communication.transport.jms.common.JmsProtocolConstants;
import de.rcenvironment.core.communication.transport.jms.common.JmsProtocolUtils;
import de.rcenvironment.core.communication.transport.jms.common.RemoteInitiatedMessageChannelFactory;
import de.rcenvironment.core.communication.transport.jms.common.RequestInboxConsumer;
import de.rcenvironment.core.communication.transport.spi.MessageChannelEndpointHandler;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncTaskService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/communication/transport/jms/activemq/internal/ActiveMQBroker.class */
public class ActiveMQBroker implements JmsBroker {
    private static final int SHUTDOWN_WAIT_AFTER_ANNOUNCE_MSEC = 1000;
    private static final long ACTIVEMQ_TEMPORARY_STORE_LIMIT = 52428800;
    private static final AtomicInteger sharedInboxConsumerIdGenerator = new AtomicInteger();
    private final String brokerName;
    private final String externalUrl;
    private final String jvmLocalUrl;
    private BrokerService brokerService;
    private Connection localBrokerConnection;
    private final ServerContactPoint scp;
    private final RemoteInitiatedMessageChannelFactory remoteInitiatedConnectionFactory;
    private int numRequestConsumers;
    private ActiveMQConnectionFilterPlugin connectionFilterPlugin;
    private final AsyncTaskService threadPool = ConcurrencyUtils.getAsyncTaskService();
    private final Log log = LogFactory.getLog(getClass());

    public ActiveMQBroker(ServerContactPoint serverContactPoint, RemoteInitiatedMessageChannelFactory remoteInitiatedMessageChannelFactory) {
        this.scp = serverContactPoint;
        this.remoteInitiatedConnectionFactory = remoteInitiatedMessageChannelFactory;
        NetworkContactPoint networkContactPoint = serverContactPoint.getNetworkContactPoint();
        int port = networkContactPoint.getPort();
        String host = networkContactPoint.getHost();
        this.brokerName = "RCE_ActiveMQ_" + host + "_" + port;
        this.externalUrl = "tcp://" + host + ":" + port;
        this.jvmLocalUrl = "vm://" + this.brokerName;
        this.numRequestConsumers = 1;
        String property = System.getProperty("jms.numRequestConsumers");
        if (property != null) {
            try {
                this.numRequestConsumers = Integer.parseInt(property);
            } catch (NumberFormatException unused) {
                this.log.warn("Ignoring invalid property value: " + property);
            }
        }
    }

    @Override // de.rcenvironment.core.communication.transport.jms.common.JmsBroker
    public void start() throws Exception {
        this.connectionFilterPlugin = new ActiveMQConnectionFilterPlugin();
        this.connectionFilterPlugin.setFilter(this.scp.getConnectionFilter());
        this.brokerService = createTransientEmbeddedBroker(this.brokerName, this.connectionFilterPlugin, this.externalUrl, this.jvmLocalUrl);
        this.brokerService.start();
        this.log.info(StringUtils.format("Listening for standard connections on %s:%d", new Object[]{this.scp.getNetworkContactPoint().getHost(), Integer.valueOf(this.scp.getNetworkContactPoint().getPort())}));
        this.localBrokerConnection = new ActiveMQConnectionFactory(this.jvmLocalUrl).createConnection();
        this.localBrokerConnection.setExceptionListener(new ExceptionListener() { // from class: de.rcenvironment.core.communication.transport.jms.activemq.internal.ActiveMQBroker.1
            public void onException(JMSException jMSException) {
                ActiveMQBroker.this.handleAsyncJMSException(jMSException);
            }
        });
        this.localBrokerConnection.start();
        spawnInboxConsumers(getLocalConnection());
    }

    @Override // de.rcenvironment.core.communication.transport.jms.common.JmsBroker
    public void stop() {
        try {
            try {
                Session createSession = this.localBrokerConnection.createSession(false, 1);
                try {
                    this.log.debug("Sending internal queue shutdown commands");
                    Message createQueueShutdownMessage = JmsProtocolUtils.createQueueShutdownMessage(createSession, "secToken");
                    MessageProducer createProducer = createSession.createProducer((Destination) null);
                    JmsProtocolUtils.configureMessageProducer(createProducer);
                    createProducer.send(createSession.createQueue(JmsProtocolConstants.QUEUE_NAME_INITIAL_BROKER_INBOX), createQueueShutdownMessage);
                    for (int i = 0; i < this.numRequestConsumers; i++) {
                        createProducer.send(createSession.createQueue(JmsProtocolConstants.QUEUE_NAME_C2B_REQUEST_INBOX), createQueueShutdownMessage);
                    }
                    createSession.close();
                    Thread.sleep(1000L);
                    try {
                        this.localBrokerConnection.close();
                    } catch (JMSException e) {
                        this.log.warn("Error closing local connection to broker " + this.brokerService.getBrokerName(), e);
                    }
                    try {
                        this.log.info("Shutting down server port " + this.scp.getNetworkContactPoint().getPort());
                        this.brokerService.stop();
                        this.log.debug("Stopped JMS broker " + this.brokerService.getBrokerName());
                    } catch (Exception e2) {
                        this.log.warn("Error shutting down JMS broker " + this.brokerService.getBrokerName(), e2);
                    }
                } catch (Throwable th) {
                    createSession.close();
                    throw th;
                }
            } catch (Throwable th2) {
                try {
                    this.localBrokerConnection.close();
                } catch (JMSException e3) {
                    this.log.warn("Error closing local connection to broker " + this.brokerService.getBrokerName(), e3);
                }
                try {
                    this.log.info("Shutting down server port " + this.scp.getNetworkContactPoint().getPort());
                    this.brokerService.stop();
                    this.log.debug("Stopped JMS broker " + this.brokerService.getBrokerName());
                } catch (Exception e4) {
                    this.log.warn("Error shutting down JMS broker " + this.brokerService.getBrokerName(), e4);
                }
                throw th2;
            }
        } catch (InterruptedException e5) {
            this.log.error("Interrupted while waiting for queue shutdown", e5);
            try {
                this.localBrokerConnection.close();
            } catch (JMSException e6) {
                this.log.warn("Error closing local connection to broker " + this.brokerService.getBrokerName(), e6);
            }
            try {
                this.log.info("Shutting down server port " + this.scp.getNetworkContactPoint().getPort());
                this.brokerService.stop();
                this.log.debug("Stopped JMS broker " + this.brokerService.getBrokerName());
            } catch (Exception e7) {
                this.log.warn("Error shutting down JMS broker " + this.brokerService.getBrokerName(), e7);
            }
        } catch (JMSException e8) {
            this.log.error("Error while shutting down queue listeners", e8);
            try {
                this.localBrokerConnection.close();
            } catch (JMSException e9) {
                this.log.warn("Error closing local connection to broker " + this.brokerService.getBrokerName(), e9);
            }
            try {
                this.log.info("Shutting down server port " + this.scp.getNetworkContactPoint().getPort());
                this.brokerService.stop();
                this.log.debug("Stopped JMS broker " + this.brokerService.getBrokerName());
            } catch (Exception e10) {
                this.log.warn("Error shutting down JMS broker " + this.brokerService.getBrokerName(), e10);
            }
        }
    }

    @Override // de.rcenvironment.core.communication.transport.jms.common.JmsBroker
    public Connection getLocalConnection() {
        return this.localBrokerConnection;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static BrokerService createTransientEmbeddedBroker(String str, ActiveMQConnectionFilterPlugin activeMQConnectionFilterPlugin, String... strArr) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName(str);
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.getSystemUsage().getTempUsage().setLimit(ACTIVEMQ_TEMPORARY_STORE_LIMIT);
        brokerService.setPlugins(new BrokerPlugin[]{activeMQConnectionFilterPlugin});
        for (String str2 : strArr) {
            brokerService.addConnector(str2);
        }
        return brokerService;
    }

    private void spawnInboxConsumers(Connection connection) throws JMSException {
        this.log.debug("Spawning initial inbox consumer for " + this.scp.toString());
        this.threadPool.execute("JMS Network Transport: Incoming connection listener", new InitialInboxConsumer(connection, this.scp, this.remoteInitiatedConnectionFactory));
        this.log.debug("Spawning " + this.numRequestConsumers + " request inbox consumer(s) for " + this.scp);
        MessageChannelEndpointHandler endpointHandler = this.scp.getEndpointHandler();
        for (int i = 1; i <= this.numRequestConsumers; i++) {
            this.threadPool.execute("JMS Network Transport: Incoming request listener", StringUtils.format("Shared C2B Request Inbox Consumer #%d (worker #%d for %s')", new Object[]{Integer.valueOf(sharedInboxConsumerIdGenerator.incrementAndGet()), Integer.valueOf(i), this.scp.toString()}), new RequestInboxConsumer(JmsProtocolConstants.QUEUE_NAME_C2B_REQUEST_INBOX, connection, endpointHandler));
        }
    }

    private void handleAsyncJMSException(JMSException jMSException) {
        String jMSException2 = jMSException.toString();
        if (jMSException2.contains("The destination temp-queue") || jMSException2.contains("Cannot remove session that had not been registered")) {
            this.log.debug("Asynchronous JMS exception (usually a follow-up error of a broken connection): " + jMSException2);
        } else {
            this.log.warn("Asynchronous JMS exception in local broker connection", jMSException);
        }
    }
}
