package de.rcenvironment.core.datamanagement.internal;

import de.rcenvironment.core.authorization.AuthorizationException;
import de.rcenvironment.core.communication.api.CommunicationService;
import de.rcenvironment.core.communication.api.PlatformService;
import de.rcenvironment.core.communication.common.CommunicationException;
import de.rcenvironment.core.communication.common.NetworkDestination;
import de.rcenvironment.core.communication.common.ResolvableNodeId;
import de.rcenvironment.core.datamanagement.FileDataService;
import de.rcenvironment.core.datamanagement.RemotableFileDataService;
import de.rcenvironment.core.datamanagement.backend.DataBackend;
import de.rcenvironment.core.datamanagement.commons.BinaryReference;
import de.rcenvironment.core.datamanagement.commons.DataReference;
import de.rcenvironment.core.datamanagement.commons.MetaDataSet;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.rpc.RemoteOperationException;
import de.rcenvironment.toolkit.modules.concurrency.api.TaskDescription;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Exchanger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;

/* loaded from: input_file:de/rcenvironment/core/datamanagement/internal/FileDataServiceImpl.class */
public class FileDataServiceImpl implements FileDataService {
    private static final String UPLOAD_BLOCK_SIZE_PROPERTY = "communication.uploadBlockSize";
    private static final int DOWNLOAD_STREAM_BUFFER_SIZE = 262144;
    private static final int DEFAULT_UPLOAD_CHUNK_SIZE = 262144;
    private static final int MINIMUM_UPLOAD_CHUNK_SIZE = 8192;
    private static final long CHUNK_UPLOAD_TIME_WARNING_THRESHOLD_MSEC = 25000;
    private static final int REMOTE_REFERENCE_POLLING_INTERVAL_MSEC = 1000;
    private static final int END_OF_STREAM_MARKER = -1;
    private static final String DRCE_DEACTIVATE_SINGLE_STEP_UPDATE = "rce.upload.deactivateSingleStepUpdate";
    private PlatformService platformService;
    private final Log log = LogFactory.getLog(getClass());
    private final int uploadChunkSize;
    private CommunicationService communicationService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/rcenvironment/core/datamanagement/internal/FileDataServiceImpl$AsyncBufferUploader.class */
    public abstract class AsyncBufferUploader implements Runnable {
        private int bufferSize;
        private Exchanger<ChunkBuffer> exchanger = new Exchanger<>();

        AsyncBufferUploader(int i) {
            this.bufferSize = i;
        }

        @Override // java.lang.Runnable
        @TaskDescription("Async upload")
        public void run() {
            ChunkBuffer chunkBuffer = new ChunkBuffer(this.bufferSize);
            while (true) {
                try {
                    ChunkBuffer exchange = this.exchanger.exchange(chunkBuffer);
                    if (exchange == null) {
                        FileDataServiceImpl.this.log.debug("Async uploader shutting down");
                        return;
                    }
                    try {
                        sendSingleBuffer(exchange);
                        chunkBuffer = exchange;
                    } catch (IOException e) {
                        FileDataServiceImpl.this.log.debug("I/O exception during async upload", e);
                        exchange.setExceptionOnWrite(e);
                        this.exchanger.exchange(exchange);
                        return;
                    }
                } catch (InterruptedException unused) {
                    FileDataServiceImpl.this.log.warn("Async upload thread interrupted");
                    return;
                }
            }
        }

        protected abstract void sendSingleBuffer(ChunkBuffer chunkBuffer) throws IOException;

        ChunkBuffer getInitialEmptyBuffer() {
            return new ChunkBuffer(this.bufferSize);
        }

        ChunkBuffer swapBuffersWhenReady(ChunkBuffer chunkBuffer) throws InterruptedException, IOException {
            ChunkBuffer exchange = this.exchanger.exchange(chunkBuffer);
            IOException exceptionOnWrite = exchange.getExceptionOnWrite();
            if (exceptionOnWrite != null) {
                throw exceptionOnWrite;
            }
            return exchange;
        }

        void shutDown() throws InterruptedException, IOException {
            IOException exceptionOnWrite = this.exchanger.exchange(null).getExceptionOnWrite();
            if (exceptionOnWrite != null) {
                throw exceptionOnWrite;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/rcenvironment/core/datamanagement/internal/FileDataServiceImpl$ChunkBuffer.class */
    public final class ChunkBuffer {
        private byte[] buffer;
        private int contentSize = 0;
        private IOException exceptionOnWrite;

        ChunkBuffer(int i) {
            this.buffer = new byte[i];
        }

        public void setExceptionOnWrite(IOException iOException) {
            this.exceptionOnWrite = iOException;
        }

        public IOException getExceptionOnWrite() {
            return this.exceptionOnWrite;
        }

        public int getContentSize() {
            return this.contentSize;
        }

        public byte[] getContentSizeBuffer() {
            return this.contentSize == this.buffer.length ? this.buffer : Arrays.copyOf(this.buffer, this.contentSize);
        }

        public boolean fillFromStream(InputStream inputStream) throws IOException {
            int read = inputStream.read(this.buffer);
            if (read == 0) {
                throw new IllegalStateException("Read zero bytes");
            }
            if (read >= 0) {
                this.contentSize = read;
            } else {
                this.contentSize = 0;
            }
            while (read < this.buffer.length && read != FileDataServiceImpl.END_OF_STREAM_MARKER) {
                read = inputStream.read(this.buffer, this.contentSize, this.buffer.length - this.contentSize);
                if (read == 0) {
                    throw new IllegalStateException("Read zero bytes");
                }
                if (read > 0) {
                    this.contentSize += read;
                }
            }
            return read != FileDataServiceImpl.END_OF_STREAM_MARKER;
        }
    }

    public FileDataServiceImpl() {
        int i = 262144;
        String property = System.getProperty(UPLOAD_BLOCK_SIZE_PROPERTY);
        if (property != null) {
            try {
                int parseInt = Integer.parseInt(property);
                if (parseInt >= MINIMUM_UPLOAD_CHUNK_SIZE) {
                    i = parseInt;
                } else {
                    this.log.error("Invalid upload block size specified: minimum value is 8192");
                }
            } catch (NumberFormatException e) {
                this.log.error("Failed to parse communication.uploadBlockSize setting; using default", e);
            }
        }
        this.uploadChunkSize = i;
        this.log.debug("Using remote upload block size " + this.uploadChunkSize);
    }

    protected void activate(BundleContext bundleContext) {
    }

    protected void bindCommunicationService(CommunicationService communicationService) {
        this.communicationService = communicationService;
    }

    protected void bindPlatformService(PlatformService platformService) {
        this.platformService = platformService;
    }

    @Override // de.rcenvironment.core.datamanagement.FileDataService
    public InputStream getStreamFromDataReference(DataReference dataReference) throws AuthorizationException, CommunicationException {
        return getStreamFromDataReference(dataReference, true);
    }

    @Override // de.rcenvironment.core.datamanagement.FileDataService
    public InputStream getStreamFromDataReference(DataReference dataReference, boolean z) throws AuthorizationException, CommunicationException {
        try {
            return new BufferedInputStream(getRemoteFileDataService(dataReference.getStorageNodeId()).getStreamFromDataReference(dataReference, Boolean.valueOf(!this.platformService.matchesLocalInstance(dataReference.getStorageNodeId())), Boolean.valueOf(z)), 262144);
        } catch (RemoteOperationException e) {
            throw new CommunicationException(String.valueOf(StringUtils.format("Failed to get stream from data reference from remote node %s: ", new Object[]{dataReference.getStorageNodeId()})) + e.getMessage());
        }
    }

    @Override // de.rcenvironment.core.datamanagement.FileDataService
    public DataReference newReferenceFromStream(InputStream inputStream, MetaDataSet metaDataSet, NetworkDestination networkDestination) throws AuthorizationException, IOException, InterruptedException, CommunicationException {
        return newReferenceFromStream(inputStream, metaDataSet, networkDestination, false);
    }

    @Override // de.rcenvironment.core.datamanagement.FileDataService
    public DataReference newReferenceFromStream(InputStream inputStream, MetaDataSet metaDataSet, NetworkDestination networkDestination, boolean z) throws AuthorizationException, IOException, InterruptedException, CommunicationException {
        if (networkDestination == null) {
            networkDestination = this.platformService.getLocalInstanceNodeSessionId();
        }
        if (!(networkDestination instanceof ResolvableNodeId) || !this.platformService.matchesLocalInstance((ResolvableNodeId) networkDestination)) {
            return performRemoteUpload(inputStream, metaDataSet, networkDestination, z);
        }
        try {
            return getRemoteFileDataService(networkDestination).newReferenceFromStream(inputStream, metaDataSet, Boolean.valueOf(z));
        } catch (RemoteOperationException e) {
            throw new CommunicationException(String.valueOf(StringUtils.format("Failed to create new data reference from stream from remote node @%s: ", new Object[]{networkDestination})) + e.getMessage());
        }
    }

    private DataReference performRemoteUpload(InputStream inputStream, MetaDataSet metaDataSet, final NetworkDestination networkDestination, boolean z) throws InterruptedException, IOException, CommunicationException {
        DataReference pollUploadForDataReference;
        final RemotableFileDataService remotableFileDataService = (RemotableFileDataService) this.communicationService.getRemotableService(RemotableFileDataService.class, networkDestination);
        try {
            ChunkBuffer chunkBuffer = new ChunkBuffer(this.uploadChunkSize);
            chunkBuffer.fillFromStream(inputStream);
            if (!System.getProperties().containsKey(DRCE_DEACTIVATE_SINGLE_STEP_UPDATE) && chunkBuffer.getContentSize() < this.uploadChunkSize) {
                this.log.debug("Data to upload is smaller than chunk size: performing upload in single step.");
                return remotableFileDataService.uploadInSingleStep(chunkBuffer.getContentSizeBuffer(), metaDataSet, Boolean.valueOf(z));
            }
            final String initializeUpload = remotableFileDataService.initializeUpload();
            if (initializeUpload == null) {
                throw new NullPointerException("Received null upload id");
            }
            this.log.debug("Received remote upload id " + initializeUpload);
            AsyncBufferUploader asyncBufferUploader = new AsyncBufferUploader(this, this.uploadChunkSize) { // from class: de.rcenvironment.core.datamanagement.internal.FileDataServiceImpl.1
                private long localTotalSize = 0;

                @Override // de.rcenvironment.core.datamanagement.internal.FileDataServiceImpl.AsyncBufferUploader
                protected void sendSingleBuffer(ChunkBuffer chunkBuffer2) throws IOException {
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        long appendToUpload = remotableFileDataService.appendToUpload(initializeUpload, chunkBuffer2.getContentSizeBuffer());
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 > FileDataServiceImpl.CHUNK_UPLOAD_TIME_WARNING_THRESHOLD_MSEC) {
                            this.log.warn(StringUtils.format("Uploading a data block of %d bytes took %d msec", new Object[]{Integer.valueOf(chunkBuffer2.getContentSize()), Long.valueOf(currentTimeMillis2)}));
                        }
                        this.localTotalSize += chunkBuffer2.getContentSize();
                        if (this.localTotalSize != appendToUpload) {
                            throw new IllegalStateException("Consistency error: Local and remote write counts are not equal!");
                        }
                    } catch (RemoteOperationException e) {
                        throw new RuntimeException(String.valueOf(StringUtils.format("Failed to create new data reference from stream from remote node @%s: ", new Object[]{networkDestination})) + e.getMessage());
                    }
                }
            };
            ConcurrencyUtils.getAsyncTaskService().execute("Async upload", asyncBufferUploader);
            long j = 0;
            do {
                j += chunkBuffer.getContentSize();
                chunkBuffer = asyncBufferUploader.swapBuffersWhenReady(chunkBuffer);
            } while (chunkBuffer.fillFromStream(inputStream));
            if (chunkBuffer.getContentSize() > 0) {
                j += chunkBuffer.getContentSize();
                asyncBufferUploader.swapBuffersWhenReady(chunkBuffer);
            }
            asyncBufferUploader.shutDown();
            remotableFileDataService.finishUpload(initializeUpload, metaDataSet, Boolean.valueOf(z));
            this.log.debug(StringUtils.format("Finished uploading %d bytes for upload id %s; polling for remote data reference", new Object[]{Long.valueOf(j), initializeUpload}));
            do {
                Thread.sleep(1000L);
                pollUploadForDataReference = remotableFileDataService.pollUploadForDataReference(initializeUpload);
            } while (pollUploadForDataReference == null);
            this.log.debug("Received remote data reference for upload id " + initializeUpload);
            return pollUploadForDataReference;
        } catch (IOException e) {
            throw new IOException("Error uploading file", e);
        } catch (RemoteOperationException e2) {
            throw new CommunicationException(String.valueOf(StringUtils.format("Failed to perform remote upload to node @%s: ", new Object[]{networkDestination})) + e2.getMessage());
        }
    }

    @Override // de.rcenvironment.core.datamanagement.FileDataService
    public void deleteReference(DataReference dataReference) throws CommunicationException {
        try {
            Iterator<BinaryReference> it = dataReference.getBinaryReferences().iterator();
            while (it.hasNext()) {
                getRemoteFileDataService(dataReference.getStorageNodeId()).deleteReference(it.next().getBinaryReferenceKey());
            }
        } catch (RemoteOperationException e) {
            throw new CommunicationException(String.valueOf(StringUtils.format("Failed to delete data reference on remote node %s: ", new Object[]{dataReference.getStorageNodeId()})) + e.getMessage());
        }
    }

    private RemotableFileDataService getRemoteFileDataService(NetworkDestination networkDestination) throws RemoteOperationException {
        return (RemotableFileDataService) this.communicationService.getRemotableService(RemotableFileDataService.class, networkDestination);
    }

    @Override // de.rcenvironment.core.datamanagement.FileDataService
    public void deleteReference(String str) throws RemoteOperationException {
        DataBackend dataBackend = BackendSupport.getDataBackend();
        dataBackend.delete(dataBackend.suggestLocation(UUID.fromString(str)));
    }
}
