/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.stateless.core;

import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.BatchSize;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.stateless.core.AbstractStatelessComponent;
import org.apache.nifi.stateless.core.SLF4JComponentLog;
import org.apache.nifi.stateless.core.StatelessComponent;
import org.apache.nifi.stateless.core.StatelessConnectionContext;
import org.apache.nifi.stateless.core.StatelessFlowFile;
import org.apache.nifi.stateless.core.StatelessPassThroughConnectionContext;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;

public class StatelessRemoteOutputPort
extends AbstractStatelessComponent {
    private final SiteToSiteClient client;
    private final String url;
    private final String name;
    private final ComponentLog logger = new SLF4JComponentLog(this);
    private final StatelessConnectionContext connectionContext = new StatelessPassThroughConnectionContext();

    public StatelessRemoteOutputPort(VersionedRemoteProcessGroup rpg, VersionedRemoteGroupPort remotePort, SSLContext sslContext) {
        long batchMillis;
        long batchBytes;
        int batchCount;
        String timeout = rpg.getCommunicationsTimeout();
        long timeoutMillis = FormatUtils.getTimeDuration((String)timeout, (TimeUnit)TimeUnit.MILLISECONDS);
        this.url = rpg.getTargetUris();
        this.name = remotePort.getName();
        BatchSize batchSize = remotePort.getBatchSize();
        if (batchSize == null) {
            batchCount = 1;
            batchBytes = 1L;
            batchMillis = 1L;
        } else {
            batchCount = batchSize.getCount() == null ? 1 : batchSize.getCount();
            batchBytes = batchSize.getSize() == null ? 1L : DataUnit.parseDataSize((String)batchSize.getSize(), (DataUnit)DataUnit.B).longValue();
            batchMillis = batchSize.getDuration() == null ? 1L : FormatUtils.getTimeDuration((String)batchSize.getDuration(), (TimeUnit)TimeUnit.MILLISECONDS);
        }
        this.client = new SiteToSiteClient.Builder().portName(remotePort.getName()).timeout(timeoutMillis, TimeUnit.MILLISECONDS).requestBatchCount(batchCount).requestBatchDuration(batchMillis, TimeUnit.MILLISECONDS).requestBatchSize(batchBytes).transportProtocol(SiteToSiteTransportProtocol.valueOf((String)rpg.getTransportProtocol())).url(rpg.getTargetUris()).sslContext(sslContext).useCompression(remotePort.isUseCompression().booleanValue()).eventReporter(EventReporter.NO_OP).build();
    }

    @Override
    public Set<Relationship> getRelationships() {
        return Collections.emptySet();
    }

    @Override
    protected StatelessConnectionContext getContext() {
        return this.connectionContext;
    }

    @Override
    protected ComponentLog getLogger() {
        return this.logger;
    }

    @Override
    public void shutdown() {
    }

    @Override
    public void enqueueAll(Queue<StatelessFlowFile> list) {
        throw new UnsupportedOperationException("Cannot enqueue FlowFiles for a Remote Output Port");
    }

    @Override
    public boolean runRecursive(Queue<InMemoryFlowFile> queue) {
        try {
            DataPacket dataPacket;
            Transaction transaction = this.client.createTransaction(TransferDirection.RECEIVE);
            if (transaction == null) {
                this.getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{this.url, this.name});
                return false;
            }
            LinkedList<StatelessFlowFile> destinationQueue = new LinkedList<StatelessFlowFile>();
            while ((dataPacket = transaction.receive()) != null) {
                Map attributes = dataPacket.getAttributes();
                InputStream in = dataPacket.getData();
                byte[] buffer = new byte[(int)dataPacket.getSize()];
                StreamUtils.fillBuffer((InputStream)in, (byte[])buffer);
                StatelessFlowFile receivedFlowFile = new StatelessFlowFile(buffer, (Map<String, String>)attributes, true);
                destinationQueue.add(receivedFlowFile);
                for (StatelessComponent childComponent : this.getChildren().get(Relationship.ANONYMOUS)) {
                    childComponent.enqueueAll(destinationQueue);
                    childComponent.runRecursive(queue);
                }
                destinationQueue.clear();
            }
            transaction.confirm();
            transaction.complete();
        }
        catch (Exception e) {
            this.getLogger().error("Failed to receive FlowFile via site-to-site", (Throwable)e);
            return false;
        }
        return true;
    }

    @Override
    public boolean isMaterializeContent() {
        return false;
    }
}

