package de.rcenvironment.core.component.workflow.execution.api;

import de.rcenvironment.core.communication.api.CommunicationService;
import de.rcenvironment.core.communication.common.InstanceNodeSessionId;
import de.rcenvironment.core.communication.common.NodeIdentifierUtils;
import de.rcenvironment.core.communication.management.WorkflowHostService;
import de.rcenvironment.core.notification.DistributedNotificationService;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.rpc.RemoteOperationException;
import de.rcenvironment.core.utils.incubator.DebugSettings;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncExceptionListener;
import de.rcenvironment.toolkit.modules.concurrency.api.CallablesGroup;
import de.rcenvironment.toolkit.modules.concurrency.api.TaskDescription;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/component/workflow/execution/api/GenericSubscriptionManager.class */
public class GenericSubscriptionManager {
    private static final String NOTIFICATION_PATTERN_WILDCARD = ".*";
    private final GenericSubscriptionEventProcessor eventProcessor;
    private final WorkflowHostService workflowHostService;
    private final CommunicationService communicationService;
    private final DistributedNotificationService notificationService;
    private final Set<String> subscribedIds = new HashSet();
    private final boolean verboseLogging = DebugSettings.getVerboseLoggingEnabled(getClass());
    private final Log log = LogFactory.getLog(getClass());

    public GenericSubscriptionManager(GenericSubscriptionEventProcessor genericSubscriptionEventProcessor, CommunicationService communicationService, WorkflowHostService workflowHostService, DistributedNotificationService distributedNotificationService) {
        this.eventProcessor = genericSubscriptionEventProcessor;
        this.communicationService = communicationService;
        this.workflowHostService = workflowHostService;
        this.notificationService = distributedNotificationService;
    }

    private Set<String> updateSubscribedIds() {
        HashSet hashSet = new HashSet();
        Iterator it = this.workflowHostService.getWorkflowHostNodesAndSelf().iterator();
        while (it.hasNext()) {
            hashSet.add(((InstanceNodeSessionId) it.next()).getInstanceNodeSessionIdString());
        }
        HashSet hashSet2 = new HashSet(hashSet);
        hashSet2.removeAll(this.subscribedIds);
        this.subscribedIds.retainAll(hashSet);
        return hashSet2;
    }

    public synchronized void updateSubscriptionsForPrefixes(String[] strArr) {
        Set<String> updateSubscribedIds = updateSubscribedIds();
        CallablesGroup createCallablesGroup = ConcurrencyUtils.getFactory().createCallablesGroup(Void.class);
        for (final String str : updateSubscribedIds) {
            final InstanceNodeSessionId parseInstanceNodeSessionIdStringWithExceptionWrapping = NodeIdentifierUtils.parseInstanceNodeSessionIdStringWithExceptionWrapping(str);
            for (final String str2 : strArr) {
                createCallablesGroup.add(new Callable<Void>() { // from class: de.rcenvironment.core.component.workflow.execution.api.GenericSubscriptionManager.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Multi-variable type inference failed */
                    /* JADX WARN: Type inference failed for: r0v14 */
                    /* JADX WARN: Type inference failed for: r0v8, types: [java.util.Set<java.lang.String>] */
                    /* JADX WARN: Type inference failed for: r0v9, types: [java.lang.Throwable] */
                    @Override // java.util.concurrent.Callable
                    @TaskDescription("Distributed console/input model notification subscriptions")
                    public Void call() throws Exception {
                        GenericSubscriptionManager.this.retrieveMissedNotifications(parseInstanceNodeSessionIdStringWithExceptionWrapping, GenericSubscriptionManager.this.notificationService.subscribe(StringUtils.format("%s%s:.*", new Object[]{str2, parseInstanceNodeSessionIdStringWithExceptionWrapping.getInstanceNodeIdString()}), GenericSubscriptionManager.this.eventProcessor, parseInstanceNodeSessionIdStringWithExceptionWrapping));
                        ?? r0 = GenericSubscriptionManager.this.subscribedIds;
                        synchronized (r0) {
                            GenericSubscriptionManager.this.subscribedIds.add(str);
                            r0 = r0;
                            return null;
                        }
                    }
                });
            }
        }
        createCallablesGroup.executeParallel(new AsyncExceptionListener() { // from class: de.rcenvironment.core.component.workflow.execution.api.GenericSubscriptionManager.2
            public void onAsyncException(Exception exc) {
                Throwable cause = (exc.getClass() != ExecutionException.class || exc.getCause() == null) ? exc : exc.getCause();
                if (cause.getCause() == null) {
                    GenericSubscriptionManager.this.log.warn("Asynchronous exception during parallel console/input model notification subscriptions: " + cause.toString());
                } else {
                    GenericSubscriptionManager.this.log.error("Asynchronous exception during parallel console/input model notification subscriptions", cause);
                }
            }
        });
    }

    private void retrieveMissedNotifications(InstanceNodeSessionId instanceNodeSessionId, Map<String, Long> map) throws RemoteOperationException {
        for (String str : map.keySet()) {
            Long l = map.get(str);
            if (l.longValue() != -1) {
                this.eventProcessor.setNumberOfLastMissingNotification(str, instanceNodeSessionId.getInstanceNodeSessionIdString(), l);
                if (this.verboseLogging) {
                    this.log.debug(StringUtils.format("Starting to fetch stored notifications for id %s from node %s", new Object[]{str, instanceNodeSessionId}));
                }
                Map notifications = this.notificationService.getNotifications(str, instanceNodeSessionId);
                if (this.verboseLogging) {
                    this.log.debug(StringUtils.format("Received %d stored notification entries for id %s from node %s", new Object[]{Integer.valueOf(notifications.size()), str, instanceNodeSessionId}));
                    for (Map.Entry entry : notifications.entrySet()) {
                        this.log.debug(StringUtils.format("  Received %d notifications for topic %s", new Object[]{Integer.valueOf(((List) entry.getValue()).size()), entry.getKey()}));
                    }
                }
                Iterator it = notifications.values().iterator();
                while (it.hasNext()) {
                    this.eventProcessor.receiveBatchedNotifications((List) it.next());
                }
            }
        }
    }
}
