JaegerSpanHandler.java
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.synapse.aspects.flow.statistics.opentracing.management.handling.span;
import io.jaegertracing.internal.JaegerTracer;
import io.opentracing.Span;
import org.apache.synapse.ContinuationState;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SequenceType;
import org.apache.synapse.aspects.ComponentType;
import org.apache.synapse.aspects.flow.statistics.data.raw.BasicStatisticDataUnit;
import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit;
import org.apache.synapse.aspects.flow.statistics.opentracing.management.helpers.SpanTagger;
import org.apache.synapse.aspects.flow.statistics.opentracing.management.helpers.TracingUtils;
import org.apache.synapse.aspects.flow.statistics.opentracing.management.parentresolving.ParentResolver;
import org.apache.synapse.aspects.flow.statistics.opentracing.management.scoping.TracingScope;
import org.apache.synapse.aspects.flow.statistics.opentracing.management.scoping.TracingScopeManager;
import org.apache.synapse.aspects.flow.statistics.opentracing.stores.SpanStore;
import org.apache.synapse.aspects.flow.statistics.opentracing.models.SpanWrapper;
import org.apache.synapse.aspects.flow.statistics.opentracing.models.ContinuationStateSequenceInfo;
import org.apache.synapse.continuation.SeqContinuationState;
import java.util.List;
import java.util.Objects;
/**
* Controls Jaeger spans, with respect to various events received during Synapse message flow.
*/
public class JaegerSpanHandler implements OpenTracingSpanHandler {
/**
* The tracer object, that is used to hold all the Jaeger spans.
*/
private JaegerTracer tracer;
/**
* Manages tracing scopes.
* Useful during cases like when an API is called within Proxy service.
*/
private TracingScopeManager tracingScopeManager;
public JaegerSpanHandler(JaegerTracer tracer, TracingScopeManager tracingScopeManager) {
this.tracer = tracer;
this.tracingScopeManager = tracingScopeManager;
}
@Override
public void handleOpenEntryEvent(StatisticDataUnit statisticDataUnit, MessageContext synCtx) {
startSpanOrBufferSequenceContinuationState(statisticDataUnit, synCtx);
}
@Override
public void handleOpenChildEntryEvent(StatisticDataUnit statisticDataUnit, MessageContext synCtx) {
startSpanOrBufferSequenceContinuationState(statisticDataUnit, synCtx);
}
@Override
public void handleOpenFlowContinuableEvent(StatisticDataUnit statisticDataUnit, MessageContext synCtx) {
startSpanOrBufferSequenceContinuationState(statisticDataUnit, synCtx);
}
@Override
public void handleOpenFlowSplittingEvent(StatisticDataUnit statisticDataUnit, MessageContext synCtx) {
startSpanOrBufferSequenceContinuationState(statisticDataUnit, synCtx);
}
@Override
public void handleOpenFlowAggregateEvent(StatisticDataUnit statisticDataUnit, MessageContext synCtx) {
startSpanOrBufferSequenceContinuationState(statisticDataUnit, synCtx);
}
/**
* Starts a span during a statistic data unit collection.
* When the statistic data unit collection is for a sequence whose state can be stacked by the
* ContinuationStackManager, the statistic data unit will be buffered until a stack modification event occurs
* upon which, the span can be started.
* @param statisticDataUnit Reported statistic data unit object.
* @param synCtx Message context.
*/
private void startSpanOrBufferSequenceContinuationState(StatisticDataUnit statisticDataUnit,
MessageContext synCtx) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);
synchronized (tracingScope.getSpanStore()) {
if (!isContinuationStateApplicable(statisticDataUnit)) {
startSpan(statisticDataUnit, synCtx, tracingScope.getSpanStore());
} else {
// Will begin during addSeqContinuationState
bufferSequenceContinuationState(statisticDataUnit, tracingScope.getSpanStore());
}
}
}
/**
* Returns whether the given statistic data unit represents a sequence,
* where continuation state stack management is applicable.
* This is used to buffer open events until continuation state stack insertion events occur.
* @param statisticDataUnit Statistic unit object.
* @return Whether continuation state stack is applicable for the statistic data unit.
*/
private boolean isContinuationStateApplicable(StatisticDataUnit statisticDataUnit) {
return statisticDataUnit.getComponentType() == ComponentType.SEQUENCE &&
(SequenceType.PROXY_INSEQ.toString().equals(statisticDataUnit.getComponentName()) ||
SequenceType.PROXY_OUTSEQ.toString().equals(statisticDataUnit.getComponentName()) ||
SequenceType.API_INSEQ.toString().equals(statisticDataUnit.getComponentName()) ||
SequenceType.API_OUTSEQ.toString().equals(statisticDataUnit.getComponentName()));
}
/**
* Starts a span, and stores necessary information in the span store to retrieve them back when needed.
* @param statisticDataUnit Statistic data unit object, which was collected during a statistic event.
* @param synCtx Message context.
* @param spanStore Span store object.
*/
private void startSpan(StatisticDataUnit statisticDataUnit, MessageContext synCtx, SpanStore spanStore) {
SpanWrapper parentSpanWrapper = ParentResolver.resolveParent(statisticDataUnit, spanStore, synCtx);
Span parentSpan = null;
if (parentSpanWrapper != null) {
parentSpan = parentSpanWrapper.getSpan();
}
Span span = tracer.buildSpan(statisticDataUnit.getComponentName()).asChildOf(parentSpan).start();
String spanId = TracingUtils.extractId(statisticDataUnit);
SpanWrapper spanWrapper = spanStore.addSpanWrapper(spanId, span, statisticDataUnit, parentSpanWrapper, synCtx);
if (isOuterLevelSpan(statisticDataUnit, spanStore)) {
spanStore.assignOuterLevelSpan(spanWrapper);
}
}
/**
* Buffers the given statistic data unit which is reported by an open event,
* until an appropriate continuation stack event is reported.
* A continuation event does not have the information about statistic data units to start and stop spans,
* and that information can only be obtained from this buffered open event.
* @param statisticDataUnit Statistic data unit object.
* @param spanStore Span store object.
*/
private void bufferSequenceContinuationState(StatisticDataUnit statisticDataUnit, SpanStore spanStore) {
ContinuationStateSequenceInfo continuationStateSequenceInfo =
new ContinuationStateSequenceInfo(statisticDataUnit);
spanStore.addContinuationStateSequenceInfo(continuationStateSequenceInfo);
}
/**
* Returns whether the given statistic data unit belongs to a component, which represents an outer level span.
* An outer level span is the super parent span for an entire tracing scope.
* The provided span store should not already have an assigned outer level span, in order to check by type.
* @param statisticDataUnit Statistic data unit object.
* @param spanStore Span store object.
* @return Whether the given statistic data unit denotes an outer level span.
*/
private boolean isOuterLevelSpan(StatisticDataUnit statisticDataUnit, SpanStore spanStore) {
return spanStore.getOuterLevelSpanWrapper() == null &&
(statisticDataUnit.getComponentType() == ComponentType.PROXYSERVICE ||
statisticDataUnit.getComponentType() == ComponentType.API);
}
@Override
public void handleOpenFlowAsynchronousEvent(BasicStatisticDataUnit statisticDataUnit, MessageContext synCtx) {
// Absorb
}
@Override
public void handleOpenContinuationEvents(BasicStatisticDataUnit statisticDataUnit, MessageContext synCtx) {
// Absorb
}
@Override
public void handleCloseEntryEvent(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx) {
handleCloseEvent(basicStatisticDataUnit, synCtx);
}
@Override
public void handleCloseFlowForcefully(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx) {
handleCloseEvent(basicStatisticDataUnit, synCtx);
}
private void handleCloseEvent(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);
synchronized (tracingScope.getSpanStore()) {
if (!isBufferedForContinuationState(basicStatisticDataUnit, tracingScope.getSpanStore())) {
finishSpan(basicStatisticDataUnit, synCtx, tracingScope.getSpanStore(), tracingScope);
}
// Else: Absorb. Will end during pop from stack
}
}
/**
* Returns whether the given basic statistic data unit has been buffered to consider the continuation state.
* In such cases, This will be used to absorb and skip the close event.
* @param basicStatisticDataUnit Basic statistic unit object.
* @param spanStore Span store object.
* @return Whether the given basic statistic data unit has been buffered to consider the
* continuation state.
*/
private boolean isBufferedForContinuationState(BasicStatisticDataUnit basicStatisticDataUnit, SpanStore spanStore) {
return spanStore.hasContinuationStateSequenceInfoWithId(TracingUtils.extractId(basicStatisticDataUnit));
}
@Override
public void handleTryEndFlow(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx) {
// Absorb
}
/**
* Ends a span which is related to the provided basic statistic data unit, and performs necessary updates or
* removals in the provided span store.
* @param basicStatisticDataUnit Basic statistic data unit object, which was collected during a statistic event.
* @param synCtx Message context.
* @param spanStore Span store object.
* @param tracingScope The tracing scope of the appropriate span.
*/
private void finishSpan(BasicStatisticDataUnit basicStatisticDataUnit,
MessageContext synCtx,
SpanStore spanStore,
TracingScope tracingScope) {
String spanWrapperId = TracingUtils.extractId(basicStatisticDataUnit);
SpanWrapper spanWrapper = spanStore.getSpanWrapper(spanWrapperId);
if (!Objects.equals(spanWrapper, spanStore.getOuterLevelSpanWrapper())) {
// A non-outer level span
spanStore.finishSpan(spanWrapper);
} else {
// An outer level span
if (tracingScope.isEventCollectionFinished(synCtx)) {
cleanupContinuationStateSequences(spanStore);
spanStore.finishSpan(spanWrapper);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
// Else - Absorb. Will be handled when all the callbacks are completed
}
}
/**
* Cleans up remaining unfinished continuation state sequences before ending the outer level span.
* @param spanStore Span store object.
*/
private void cleanupContinuationStateSequences(SpanStore spanStore) {
if (!spanStore.getContinuationStateSequenceInfos().isEmpty()) {
List<ContinuationStateSequenceInfo> continuationStateSequences =
spanStore.getContinuationStateSequenceInfos();
for (ContinuationStateSequenceInfo continuationStateSequence : continuationStateSequences) {
finishSpanForContinuationStateSequence(continuationStateSequence, spanStore);
}
continuationStateSequences.clear();
}
}
/**
* Finishes a span, which has been added as a continuation state sequence.
* @param continuationStateSequenceInfo Object that contains information about the continuation state sequence.
* @param spanStore Span store object.
*/
private void finishSpanForContinuationStateSequence(ContinuationStateSequenceInfo continuationStateSequenceInfo,
SpanStore spanStore) {
String spanWrapperId = continuationStateSequenceInfo.getSpanReferenceId();
SpanWrapper spanWrapper = spanStore.getSpanWrapper(spanWrapperId);
spanStore.finishSpan(spanWrapper);
}
@Override
public void handleAddCallback(MessageContext messageContext, String callbackId) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext);
tracingScope.addCallback();
}
@Override
public void handleCallbackCompletionEvent(MessageContext oldMessageContext, String callbackId) {
handleCallbackFinishEvent(oldMessageContext);
}
@Override
public void handleUpdateParentsForCallback(MessageContext oldMessageContext, String callbackId) {
// Absorb. Callback handling completion will be reported after handling the specific message.
}
@Override
public void handleReportCallbackHandlingCompletion(MessageContext synapseOutMsgCtx, String callbackId) {
handleCallbackFinishEvent(synapseOutMsgCtx);
}
private void handleCallbackFinishEvent(MessageContext messageContext) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext);
tracingScope.removeCallback();
// The last callback received in a scope will finish the outer level span
if (tracingScope.isEventCollectionFinished(messageContext)) {
synchronized (tracingScope.getSpanStore()) {
cleanupContinuationStateSequences(tracingScope.getSpanStore());
SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
}
}
@Override
public void handleStateStackInsertion(MessageContext synCtx, String seqName, SequenceType seqType) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);
synchronized (tracingScope.getSpanStore()) {
ContinuationStateSequenceInfo continuationStateSequenceInfo =
findContinuationStateSequenceInfo(seqType, tracingScope.getSpanStore(),false);
if (continuationStateSequenceInfo != null) {
StatisticDataUnit statisticDataUnit = continuationStateSequenceInfo.getStatisticDataUnit();
continuationStateSequenceInfo.setSpanActive(true);
startSpan(statisticDataUnit, synCtx, tracingScope.getSpanStore());
}
}
}
@Override
public void handleStateStackRemoval(ContinuationState continuationState, MessageContext synCtx) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);
synchronized (tracingScope.getSpanStore()) {
if (continuationState instanceof SeqContinuationState) { // No other type will be kept track of
ContinuationStateSequenceInfo continuationStateSequenceInfo =
findContinuationStateSequenceInfo(
((SeqContinuationState)continuationState).getSeqType(),
tracingScope.getSpanStore(),
true);
if (continuationStateSequenceInfo != null) {
continuationStateSequenceInfo.setSpanActive(false);
finishSpanForContinuationStateSequence(continuationStateSequenceInfo, tracingScope.getSpanStore());
tracingScope.getSpanStore().getContinuationStateSequenceInfos()
.remove(continuationStateSequenceInfo);
}
}
}
}
/**
* Finds the appropriate continuation state sequence which contains the statistic data unit information - from
* the buffer, when a continuation state stack event has been reported.
* This method does the correlation between a statistic data unit and a continuation state stack event, in order to
* start or end a span.
* @param seqType Type of the sequence.
* @param spanStore Span store object.
* @param desiredSpanActiveState Whether the span related to this continuation state sequence
* should be already active or not.
*
* False: Set during continuation state stack insertion, which denotes
* "Find the next span that has not been started yet".
*
* True: Set during continuation state stack removal, which denotes
* "Find the next span that is currently active".
*
* This flag is helpful in cases where multiple copies (denoting reliant states)
* of the same sequence has to be referred correctly, in scenarios like Iterate
* mediator.
* @return The found continuation state sequence info. Null when not found.
*/
private ContinuationStateSequenceInfo findContinuationStateSequenceInfo(SequenceType seqType,
SpanStore spanStore,
boolean desiredSpanActiveState) {
for (ContinuationStateSequenceInfo continuationStateSequenceInfo :
spanStore.getContinuationStateSequenceInfos()) {
if (seqType.toString().equals(continuationStateSequenceInfo.getStatisticDataUnit().getComponentName()) &&
(continuationStateSequenceInfo.isSpanActive() == desiredSpanActiveState)) {
return continuationStateSequenceInfo;
}
}
return null;
}
@Override
public void handleStateStackClearance(MessageContext synCtx) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);
synchronized (tracingScope.getSpanStore()) {
List<ContinuationStateSequenceInfo> stackedSequences =
tracingScope.getSpanStore().getContinuationStateSequenceInfos();
for (ContinuationStateSequenceInfo stackedSequence : stackedSequences) {
finishSpanForContinuationStateSequence(stackedSequence, tracingScope.getSpanStore());
}
stackedSequences.clear();
}
}
}