package org.apache.nifi.remote;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/StandardRemoteGroupPort.class */
public class StandardRemoteGroupPort extends RemoteGroupPort {
    public static final String USER_AGENT = "NiFi-Site-to-Site";
    public static final String CONTENT_TYPE = "application/octet-stream";
    public static final int GZIP_COMPRESSION_LEVEL = 1;
    private static final String CATEGORY = "Site to Site";
    private final RemoteProcessGroup remoteGroup;
    private final AtomicBoolean useCompression;
    private final AtomicReference<Integer> batchCount;
    private final AtomicReference<String> batchSize;
    private final AtomicReference<String> batchDuration;
    private final AtomicBoolean targetExists;
    private final AtomicBoolean targetRunning;
    private final SSLContext sslContext;
    private final TransferDirection transferDirection;
    private final NiFiProperties nifiProperties;
    private volatile String targetId;
    private final AtomicReference<SiteToSiteClient> clientRef;
    private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500);
    private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);

    SiteToSiteClient getSiteToSiteClient() {
        return this.clientRef.get();
    }

    public StandardRemoteGroupPort(String str, String str2, String str3, ProcessGroup processGroup, RemoteProcessGroup remoteProcessGroup, TransferDirection transferDirection, ConnectableType connectableType, SSLContext sSLContext, ProcessScheduler processScheduler, NiFiProperties niFiProperties) {
        super(str, str3, processGroup, connectableType, processScheduler);
        this.useCompression = new AtomicBoolean(false);
        this.batchCount = new AtomicReference<>();
        this.batchSize = new AtomicReference<>();
        this.batchDuration = new AtomicReference<>();
        this.targetExists = new AtomicBoolean(true);
        this.targetRunning = new AtomicBoolean(true);
        this.clientRef = new AtomicReference<>();
        this.targetId = str2;
        this.remoteGroup = remoteProcessGroup;
        this.transferDirection = transferDirection;
        this.sslContext = sSLContext;
        this.nifiProperties = niFiProperties;
        setScheduldingPeriod("1 nanos");
    }

    public String getTargetIdentifier() {
        String str = this.targetId;
        return str == null ? getIdentifier() : str;
    }

    public void setTargetIdentifier(String str) {
        this.targetId = str;
    }

    private static File getPeerPersistenceFile(String str, NiFiProperties niFiProperties, SiteToSiteTransportProtocol siteToSiteTransportProtocol) {
        return new File(niFiProperties.getPersistentStateDirectory(), String.format("%s_%s.peers", str, siteToSiteTransportProtocol.name()));
    }

    public boolean isTargetRunning() {
        return this.targetRunning.get();
    }

    public void setTargetRunning(boolean z) {
        this.targetRunning.set(z);
    }

    public boolean isTriggerWhenEmpty() {
        return getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT;
    }

    public Resource getResource() {
        return ResourceFactory.getComponentResource(ConnectableType.REMOTE_INPUT_PORT.equals(getConnectableType()) ? ResourceType.InputPort : ResourceType.OutputPort, getIdentifier(), getName());
    }

    public Authorizable getParentAuthorizable() {
        return getRemoteProcessGroup();
    }

    public void shutdown() {
        super.shutdown();
        SiteToSiteClient siteToSiteClient = getSiteToSiteClient();
        if (siteToSiteClient != null) {
            try {
                siteToSiteClient.close();
            } catch (IOException e) {
                logger.warn("Failed to properly shutdown Site-to-Site Client due to {}", e);
            }
        }
    }

    public void onSchedulingStart() {
        super.onSchedulingStart();
        SiteToSiteClient.Builder localAddress = new SiteToSiteClient.Builder().urls(SiteToSiteRestApiClient.parseClusterUrls(this.remoteGroup.getTargetUris())).portIdentifier(getTargetIdentifier()).sslContext(this.sslContext).useCompression(isUseCompression()).eventReporter(this.remoteGroup.getEventReporter()).peerPersistenceFile(getPeerPersistenceFile(getIdentifier(), this.nifiProperties, this.remoteGroup.getTransportProtocol())).nodePenalizationPeriod(FormatUtils.getTimeDuration(this.remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS).timeout(this.remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS).transportProtocol(this.remoteGroup.getTransportProtocol()).httpProxy(new HttpProxy(this.remoteGroup.getProxyHost(), this.remoteGroup.getProxyPort(), this.remoteGroup.getProxyUser(), this.remoteGroup.getProxyPassword())).localAddress(this.remoteGroup.getLocalAddress());
        Integer batchCount = getBatchCount();
        if (batchCount != null) {
            localAddress.requestBatchCount(batchCount.intValue());
        }
        String batchSize = getBatchSize();
        if (batchSize != null && batchSize.length() > 0) {
            localAddress.requestBatchSize(DataUnit.parseDataSize(batchSize.trim(), DataUnit.B).intValue());
        }
        String batchDuration = getBatchDuration();
        if (batchDuration != null && batchDuration.length() > 0) {
            localAddress.requestBatchDuration(FormatUtils.getTimeDuration(batchDuration.trim(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
        this.clientRef.set(localAddress.build());
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        FlowFile flowFile;
        if (!this.remoteGroup.isTransmitting()) {
            logger.debug("{} {} is not transmitting; will not send/receive", this, this.remoteGroup);
            return;
        }
        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && processSession.getQueueSize().getObjectCount() == 0) {
            logger.debug("{} No data to send", this);
            return;
        }
        String targetUri = getRemoteProcessGroup().getTargetUri();
        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
            flowFile = processSession.get();
            if (flowFile == null) {
                return;
            }
        } else {
            flowFile = null;
        }
        try {
            Transaction createTransaction = getSiteToSiteClient().createTransaction(this.transferDirection);
            if (createTransaction == null) {
                logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
                processSession.rollback();
                processContext.yield();
                return;
            }
            try {
                if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
                    transferFlowFiles(createTransaction, processContext, processSession, flowFile);
                } else if (receiveFlowFiles(createTransaction, processContext, processSession) == 0) {
                    processContext.yield();
                }
                processSession.commit();
            } catch (Throwable th) {
                String format = String.format("%s failed to communicate with remote NiFi instance due to %s", this, th.toString());
                logger.error("{} failed to communicate with remote NiFi instance due to {}", this, th.toString());
                if (logger.isDebugEnabled()) {
                    logger.error("", th);
                }
                this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, format);
                createTransaction.error();
                processSession.rollback();
            }
        } catch (IOException e) {
            String format2 = String.format("%s failed to communicate with %s due to %s", this, targetUri, e.toString());
            logger.error(format2);
            if (logger.isDebugEnabled()) {
                logger.error("", e);
            }
            processSession.rollback();
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, format2);
        } catch (UnreachableClusterException e2) {
            processContext.yield();
            String format3 = String.format("%s failed to communicate with %s due to %s", this, targetUri, e2.toString());
            logger.error(format3);
            processSession.rollback();
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, format3);
        } catch (UnknownPortException e3) {
            processContext.yield();
            this.targetExists.set(false);
            String format4 = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, targetUri);
            logger.error(format4);
            processSession.rollback();
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, format4);
        } catch (PortNotRunningException e4) {
            processContext.yield();
            this.targetRunning.set(false);
            String format5 = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, targetUri);
            logger.error(format5);
            processSession.rollback();
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, format5);
        }
    }

    public String getYieldPeriod() {
        return this.remoteGroup.getYieldDuration();
    }

    private int transferFlowFiles(final Transaction transaction, ProcessContext processContext, ProcessSession processSession, FlowFile flowFile) throws IOException, ProtocolException {
        FlowFile flowFile2 = flowFile;
        try {
            String distinguishedName = transaction.getCommunicant().getDistinguishedName();
            long nanoTime = System.nanoTime();
            StopWatch stopWatch = new StopWatch(true);
            long j = 0;
            SiteToSiteClientConfig config = getSiteToSiteClient().getConfig();
            long preferredBatchSize = config.getPreferredBatchSize();
            int preferredBatchCount = config.getPreferredBatchCount();
            long preferredBatchDuration = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
            long j2 = preferredBatchDuration > 0 ? preferredBatchDuration : BATCH_SEND_NANOS;
            HashSet hashSet = new HashSet();
            boolean z = true;
            while (z) {
                long nanoTime2 = System.nanoTime();
                final FlowFile flowFile3 = flowFile2;
                processSession.read(flowFile2, new InputStreamCallback() { // from class: org.apache.nifi.remote.StandardRemoteGroupPort.1
                    public void process(InputStream inputStream) throws IOException {
                        transaction.send(new StandardDataPacket(flowFile3.getAttributes(), inputStream, flowFile3.getSize()));
                    }
                });
                long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime2, TimeUnit.NANOSECONDS);
                hashSet.add(flowFile2);
                j += flowFile2.getSize();
                logger.debug("{} Sent {} to {}", new Object[]{this, flowFile2, transaction.getCommunicant().getUrl()});
                String createTransitUri = transaction.getCommunicant().createTransitUri(flowFile2.getAttribute(CoreAttributes.UUID.key()));
                FlowFile putAttribute = processSession.putAttribute(flowFile2, SiteToSiteAttributes.S2S_PORT_ID.key(), getTargetIdentifier());
                processSession.getProvenanceReporter().send(putAttribute, createTransitUri, "Remote DN=" + distinguishedName, convert, false);
                processSession.remove(putAttribute);
                flowFile2 = (preferredBatchCount <= 0 || hashSet.size() < preferredBatchCount) ? (preferredBatchSize <= 0 || j < preferredBatchSize) ? System.nanoTime() - nanoTime >= j2 ? null : processSession.get() : null : null;
                z = flowFile2 != null;
            }
            transaction.confirm();
            stopWatch.stop();
            String calculateDataRate = stopWatch.calculateDataRate(j);
            long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
            String formatDataSize = FormatUtils.formatDataSize(j);
            transaction.complete();
            processSession.commit();
            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{this, hashSet.size() < 20 ? hashSet.toString() : hashSet.size() + " FlowFiles", formatDataSize, transaction.getCommunicant().getUrl(), Long.valueOf(duration), calculateDataRate});
            return hashSet.size();
        } catch (Exception e) {
            processSession.rollback();
            throw e;
        }
    }

    private int receiveFlowFiles(Transaction transaction, ProcessContext processContext, ProcessSession processSession) throws IOException, ProtocolException {
        long j;
        String distinguishedName = transaction.getCommunicant().getDistinguishedName();
        StopWatch stopWatch = new StopWatch(true);
        HashSet hashSet = new HashSet();
        long j2 = 0;
        while (true) {
            j = j2;
            long nanoTime = System.nanoTime();
            DataPacket receive = transaction.receive();
            if (receive == null) {
                break;
            }
            FlowFile putAllAttributes = processSession.putAllAttributes(processSession.create(), receive.getAttributes());
            Communicant communicant = transaction.getCommunicant();
            String host = StringUtils.isEmpty(communicant.getHost()) ? "unknown" : communicant.getHost();
            String valueOf = communicant.getPort() < 0 ? "unknown" : String.valueOf(communicant.getPort());
            HashMap hashMap = new HashMap(2);
            hashMap.put(SiteToSiteAttributes.S2S_HOST.key(), host);
            hashMap.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + valueOf);
            hashMap.put(SiteToSiteAttributes.S2S_PORT_ID.key(), getTargetIdentifier());
            FlowFile importFrom = processSession.importFrom(receive.getData(), processSession.putAllAttributes(putAllAttributes, hashMap));
            long nanoTime2 = System.nanoTime() - nanoTime;
            hashSet.add(importFrom);
            String str = (String) receive.getAttributes().get(CoreAttributes.UUID.key());
            if (str == null) {
                str = "<Unknown Identifier>";
            }
            processSession.getProvenanceReporter().receive(importFrom, transaction.getCommunicant().createTransitUri(str), "urn:nifi:" + str, "Remote DN=" + distinguishedName, TimeUnit.NANOSECONDS.toMillis(nanoTime2));
            processSession.transfer(importFrom, Relationship.ANONYMOUS);
            j2 = j + receive.getSize();
        }
        transaction.confirm();
        processSession.commit();
        transaction.complete();
        if (!hashSet.isEmpty()) {
            stopWatch.stop();
            logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{this, hashSet.size() < 20 ? hashSet.toString() : hashSet.size() + " FlowFiles", FormatUtils.formatDataSize(j), transaction.getCommunicant().getUrl(), Long.valueOf(stopWatch.getDuration(TimeUnit.MILLISECONDS)), stopWatch.calculateDataRate(j)});
        }
        return hashSet.size();
    }

    public boolean getTargetExists() {
        return this.targetExists.get();
    }

    public boolean isValid() {
        if (!this.targetExists.get()) {
            return false;
        }
        if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
            return false;
        }
        return this.remoteGroup.validate().stream().allMatch(validationResult -> {
            return validationResult.isValid();
        });
    }

    public Collection<ValidationResult> getValidationErrors() {
        ArrayList arrayList = new ArrayList();
        if (getScheduledState() == ScheduledState.STOPPED) {
            ValidationResult validationResult = null;
            if (!this.targetExists.get()) {
                validationResult = new ValidationResult.Builder().explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())).subject(String.format("Remote port '%s'", getName())).valid(false).build();
            } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
                validationResult = new ValidationResult.Builder().explanation(String.format("Port '%s' has no outbound connections", getName())).subject(String.format("Remote port '%s'", getName())).valid(false).build();
            }
            if (validationResult != null) {
                arrayList.add(validationResult);
            }
        }
        return arrayList;
    }

    public void verifyCanStart() {
        super.verifyCanStart();
        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty()) {
            throw new IllegalStateException("Port " + getName() + " has no incoming connections");
        }
        Optional findFirst = this.remoteGroup.validate().stream().filter(validationResult -> {
            return !validationResult.isValid();
        }).findFirst();
        if (findFirst.isPresent()) {
            throw new IllegalStateException("Remote Process Group is not valid: " + ((ValidationResult) findFirst.get()).toString());
        }
    }

    public void setUseCompression(boolean z) {
        this.useCompression.set(z);
    }

    public boolean isUseCompression() {
        return this.useCompression.get();
    }

    public Integer getBatchCount() {
        return this.batchCount.get();
    }

    public void setBatchCount(Integer num) {
        this.batchCount.set(num);
    }

    public String getBatchSize() {
        return this.batchSize.get();
    }

    public void setBatchSize(String str) {
        this.batchSize.set(str);
    }

    public String getBatchDuration() {
        return this.batchDuration.get();
    }

    public void setBatchDuration(String str) {
        this.batchDuration.set(str);
    }

    public String toString() {
        return "RemoteGroupPort[name=" + getName() + ",targets=" + this.remoteGroup.getTargetUris() + "]";
    }

    public RemoteProcessGroup getRemoteProcessGroup() {
        return this.remoteGroup;
    }

    public TransferDirection getTransferDirection() {
        return getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ? TransferDirection.SEND : TransferDirection.RECEIVE;
    }

    public void setTargetExists(boolean z) {
        this.targetExists.set(z);
    }

    public void removeConnection(Connection connection) throws IllegalArgumentException, IllegalStateException {
        super.removeConnection(connection);
        if (getTargetExists() || hasIncomingConnection() || !getConnections().isEmpty()) {
            return;
        }
        this.remoteGroup.removeNonExistentPort(this);
    }

    public SchedulingStrategy getSchedulingStrategy() {
        return SchedulingStrategy.TIMER_DRIVEN;
    }

    public boolean isSideEffectFree() {
        return false;
    }

    public String getComponentType() {
        return "RemoteGroupPort";
    }
}
