package de.rcenvironment.core.component.workflow.execution.impl;

import de.rcenvironment.core.component.execution.api.ConsoleRow;
import de.rcenvironment.core.component.workflow.execution.api.GenericSubscriptionEventProcessor;
import de.rcenvironment.core.component.workflow.execution.internal.ConsoleRowProcessor;
import de.rcenvironment.core.notification.Notification;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncCallback;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncCallbackExceptionPolicy;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncOrderedCallbackManager;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/component/workflow/execution/impl/ConsoleSubscriptionEventProcessor.class */
public class ConsoleSubscriptionEventProcessor extends GenericSubscriptionEventProcessor {
    private static final long serialVersionUID = 5521705555312627039L;
    private final transient Log log = LogFactory.getLog(getClass());
    private final transient AsyncOrderedCallbackManager<ConsoleRowProcessor> callbackManager = ConcurrencyUtils.getFactory().createAsyncOrderedCallbackManager(AsyncCallbackExceptionPolicy.LOG_AND_PROCEED);

    public ConsoleSubscriptionEventProcessor(ConsoleRowProcessor... consoleRowProcessorArr) {
        for (ConsoleRowProcessor consoleRowProcessor : consoleRowProcessorArr) {
            this.callbackManager.addListener(consoleRowProcessor);
        }
    }

    @Override // de.rcenvironment.core.component.workflow.execution.api.GenericSubscriptionEventProcessor
    protected void processCollectedNotifications(List<Notification> list) {
        final ArrayList arrayList = new ArrayList();
        for (Notification notification : list) {
            Serializable body = notification.getBody();
            if (body instanceof ConsoleRow) {
                ConsoleRow body2 = notification.getBody();
                body2.setIndex(notification.getHeader().getNumber());
                arrayList.add(body2);
            } else {
                this.log.warn("Received unexpected notification of type " + body.getClass() + " for topic " + notification.getHeader().getNotificationIdentifier());
            }
        }
        this.callbackManager.enqueueCallback(new AsyncCallback<ConsoleRowProcessor>() { // from class: de.rcenvironment.core.component.workflow.execution.impl.ConsoleSubscriptionEventProcessor.1
            public void performCallback(ConsoleRowProcessor consoleRowProcessor) {
                consoleRowProcessor.processConsoleRows(arrayList);
            }
        });
    }
}
