diff options
10 files changed, 167 insertions, 29 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 756d3b41..03fb4bb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## Unreleased +- Adds Tracing.getExportComponent().shutdown() for use within application shutdown hooks. ## 0.13.2 - 2018-05-08 - Map http attributes to Stackdriver format (fix [#1153](https://github.com/census-instrumentation/opencensus-java/issues/1153)). diff --git a/api/src/main/java/io/opencensus/trace/export/ExportComponent.java b/api/src/main/java/io/opencensus/trace/export/ExportComponent.java index 63447a76..bf06089c 100644 --- a/api/src/main/java/io/opencensus/trace/export/ExportComponent.java +++ b/api/src/main/java/io/opencensus/trace/export/ExportComponent.java @@ -66,6 +66,13 @@ public abstract class ExportComponent { */ public abstract SampledSpanStore getSampledSpanStore(); + /** + * Will shutdown this ExportComponent after flushing any pending spans. + * + * @since 0.13 + */ + public void shutdown() {} + private static final class NoopExportComponent extends ExportComponent { private final SampledSpanStore noopSampledSpanStore = SampledSpanStore.newNoopSampledSpanStore(); diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java index 0c58cb59..5145ca3b 100644 --- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java +++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java @@ -24,6 +24,9 @@ import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import io.opencensus.implcore.internal.DaemonThreadFactory; import io.opencensus.implcore.internal.EventQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -85,6 +88,9 @@ import javax.annotation.concurrent.ThreadSafe; */ @ThreadSafe public final class DisruptorEventQueue implements EventQueue { + + private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName()); + // Number of events that can be enqueued at any one time. If more than this are enqueued, // then subsequent attempts to enqueue new entries will block. // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be @@ -99,9 +105,11 @@ public final class DisruptorEventQueue implements EventQueue { // Ring Buffer for the {@link Disruptor} that underlies the queue. private final RingBuffer<DisruptorEvent> ringBuffer; + private volatile DisruptorEnqueuer enqueuer; + // Creates a new EventQueue. Private to prevent creation of non-singleton instance. // Suppress warnings for disruptor.handleEventsWith. - @SuppressWarnings({"unchecked"}) + @SuppressWarnings({"unchecked", "nullness"}) private DisruptorEventQueue() { // Create new Disruptor for processing. Note that Disruptor creates a single thread per // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details); @@ -116,6 +124,20 @@ public final class DisruptorEventQueue implements EventQueue { disruptor.handleEventsWith(DisruptorEventHandler.INSTANCE); disruptor.start(); ringBuffer = disruptor.getRingBuffer(); + + enqueuer = + new DisruptorEnqueuer() { + @Override + public void enqueue(Entry entry) { + long sequence = ringBuffer.next(); + try { + DisruptorEvent event = ringBuffer.get(sequence); + event.setEntry(entry); + } finally { + ringBuffer.publish(sequence); + } + } + }; } /** @@ -134,17 +156,36 @@ public final class DisruptorEventQueue implements EventQueue { */ @Override public void enqueue(Entry entry) { - long sequence = ringBuffer.next(); - try { - DisruptorEvent event = ringBuffer.get(sequence); - event.setEntry(entry); - } finally { - ringBuffer.publish(sequence); - } + enqueuer.enqueue(entry); + } + + /** Shuts down the underlying disruptor. */ + @Override + public void shutdown() { + enqueuer = + new DisruptorEnqueuer() { + final AtomicBoolean logged = new AtomicBoolean(false); + + @Override + public void enqueue(Entry entry) { + if (!logged.getAndSet(true)) { + logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown."); + } + } + }; + + disruptor.shutdown(); + } + + // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer + private abstract static class DisruptorEnqueuer { + + public abstract void enqueue(Entry entry); } // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry. private static final class DisruptorEvent { + // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so // intuitively this variable must be volatile. @Nullable private volatile Entry entry = null; diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java index 967d9b89..6eb1149a 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java @@ -20,6 +20,8 @@ package io.opencensus.implcore.internal; public interface EventQueue { void enqueue(Entry entry); + void shutdown(); + /** * Base interface to be used for all entries in {@link EventQueue}. For example usage, see {@code * DisruptorEventQueue}. diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java index 297ecd4c..58c61c89 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java @@ -26,4 +26,7 @@ public class SimpleEventQueue implements EventQueue { public void enqueue(Entry entry) { entry.process(); } + + @Override + public void shutdown() {} } diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java index e77d1f8e..19817380 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java @@ -47,6 +47,12 @@ public final class ExportComponentImpl extends ExportComponent { return sampledSpanStore; } + @Override + public void shutdown() { + sampledSpanStore.shutdown(); + spanExporter.shutdown(); + } + /** * Returns a new {@code ExportComponentImpl} that has valid instances for {@link RunningSpanStore} * and {@link SampledSpanStore}. diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java index 1bdb3f41..fe0132d8 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java @@ -275,6 +275,11 @@ public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl { eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames)); } + @Override + protected void shutdown() { + eventQueue.shutdown(); + } + private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) { synchronized (samples) { for (String spanName : spanNames) { diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java index 0c83a05a..302d5cd3 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java @@ -45,6 +45,8 @@ public abstract class SampledSpanStoreImpl extends SampledSpanStore { */ public abstract void considerForSampling(SpanImpl span); + protected void shutdown() {} + private static final class NoopSampledSpanStoreImpl extends SampledSpanStoreImpl { private static final Summary EMPTY_SUMMARY = Summary.create(Collections.<String, PerSpanNameSummary>emptyMap()); diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java index e702d38c..12940739 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java @@ -74,6 +74,15 @@ public final class SpanExporterImpl extends SpanExporter { worker.unregisterHandler(name); } + protected void flush() { + worker.flush(); + } + + protected void shutdown() { + flush(); + workerThread.interrupt(); + } + private SpanExporterImpl(Worker worker) { this.workerThread = new DaemonThreadFactory("ExportComponent.ServiceExporterThread").newThread(worker); @@ -188,5 +197,18 @@ public final class SpanExporterImpl extends SpanExporter { } } } + + void flush() { + List<SpanImpl> spansCopy; + synchronized (monitor) { + spansCopy = new ArrayList<SpanImpl>(spans); + spans.clear(); + } + + final List<SpanData> spanDataList = fromSpanImplToSpanData(spansCopy); + if (!spanDataList.isEmpty()) { + onBatchExport(spanDataList); + } + } } } diff --git a/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java b/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java index ee030277..9de9c67c 100644 --- a/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java +++ b/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java @@ -59,10 +59,7 @@ public class SpanExporterImplTest { private final SpanContext notSampledSpanContext = SpanContext.create( TraceId.generateRandomId(random), SpanId.generateRandomId(random), TraceOptions.DEFAULT); - private final SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0)); private final RunningSpanStoreImpl runningSpanStore = new InProcessRunningSpanStoreImpl(); - private final StartEndHandler startEndHandler = - new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue()); private EnumSet<Options> recordSpanOptions = EnumSet.of(Options.RECORD_EVENTS); private final TestHandler serviceHandler = new TestHandler(); @Mock private Handler mockServiceHandler; @@ -70,11 +67,9 @@ public class SpanExporterImplTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - - spanExporter.registerHandler("test.service", serviceHandler); } - private SpanImpl createSampledEndedSpan(String spanName) { + private SpanImpl createSampledEndedSpan(StartEndHandler startEndHandler, String spanName) { SpanImpl span = SpanImpl.startSpan( sampledSpanContext, @@ -90,7 +85,7 @@ public class SpanExporterImplTest { return span; } - private SpanImpl createNotSampledEndedSpan(String spanName) { + private SpanImpl createNotSampledEndedSpan(StartEndHandler startEndHandler, String spanName) { SpanImpl span = SpanImpl.startSpan( notSampledSpanContext, @@ -108,20 +103,32 @@ public class SpanExporterImplTest { @Test public void exportDifferentSampledSpans() { - SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1); - SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_2); + SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0)); + StartEndHandler startEndHandler = + new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue()); + + spanExporter.registerHandler("test.service", serviceHandler); + + SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2); List<SpanData> exported = serviceHandler.waitForExport(2); assertThat(exported).containsExactly(span1.toSpanData(), span2.toSpanData()); } @Test public void exportMoreSpansThanTheBufferSize() { - SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1); - SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_1); - SpanImpl span3 = createSampledEndedSpan(SPAN_NAME_1); - SpanImpl span4 = createSampledEndedSpan(SPAN_NAME_1); - SpanImpl span5 = createSampledEndedSpan(SPAN_NAME_1); - SpanImpl span6 = createSampledEndedSpan(SPAN_NAME_1); + SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0)); + StartEndHandler startEndHandler = + new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue()); + + spanExporter.registerHandler("test.service", serviceHandler); + + SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span3 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span4 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span5 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span6 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); List<SpanData> exported = serviceHandler.waitForExport(6); assertThat(exported) .containsExactly( @@ -135,6 +142,10 @@ public class SpanExporterImplTest { @Test public void interruptWorkerThreadStops() throws InterruptedException { + SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0)); + + spanExporter.registerHandler("test.service", serviceHandler); + Thread serviceExporterThread = spanExporter.getServiceExporterThread(); serviceExporterThread.interrupt(); // Test that the worker thread will stop. @@ -146,22 +157,35 @@ public class SpanExporterImplTest { doThrow(new IllegalArgumentException("No export for you.")) .when(mockServiceHandler) .export(anyListOf(SpanData.class)); + + SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0)); + StartEndHandler startEndHandler = + new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue()); + + spanExporter.registerHandler("test.service", serviceHandler); + spanExporter.registerHandler("mock.service", mockServiceHandler); - SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1); + SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); List<SpanData> exported = serviceHandler.waitForExport(1); assertThat(exported).containsExactly(span1.toSpanData()); // Continue to export after the exception was received. - SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_1); + SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); exported = serviceHandler.waitForExport(1); assertThat(exported).containsExactly(span2.toSpanData()); } @Test public void exportSpansToMultipleServices() { + SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0)); + StartEndHandler startEndHandler = + new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue()); + + spanExporter.registerHandler("test.service", serviceHandler); + TestHandler serviceHandler2 = new TestHandler(); spanExporter.registerHandler("test.service2", serviceHandler2); - SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1); - SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_2); + SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2); List<SpanData> exported1 = serviceHandler.waitForExport(2); List<SpanData> exported2 = serviceHandler2.waitForExport(2); assertThat(exported1).containsExactly(span1.toSpanData(), span2.toSpanData()); @@ -170,8 +194,14 @@ public class SpanExporterImplTest { @Test public void exportNotSampledSpans() { - SpanImpl span1 = createNotSampledEndedSpan(SPAN_NAME_1); - SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_2); + SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0)); + StartEndHandler startEndHandler = + new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue()); + + spanExporter.registerHandler("test.service", serviceHandler); + + SpanImpl span1 = createNotSampledEndedSpan(startEndHandler, SPAN_NAME_1); + SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2); // Spans are recorded and exported in the same order as they are ended, we test that a non // sampled span is not exported by creating and ending a sampled span after a non sampled span // and checking that the first exported span is the sampled span (the non sampled did not get @@ -182,4 +212,23 @@ public class SpanExporterImplTest { assertThat(exported).doesNotContain(span1.toSpanData()); assertThat(exported).containsExactly(span2.toSpanData()); } + + @Test(timeout = 10000L) + public void exportNotSampledSpansFlushed() { + // Set the export delay to zero, for no timeout, in order to confirm the #flush() below works + SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(0, 0)); + StartEndHandler startEndHandler = + new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue()); + + spanExporter.registerHandler("test.service", serviceHandler); + + SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2); + + // Force a flush, without this, the #waitForExport() call below would block indefinitely. + spanExporter.flush(); + + List<SpanData> exported = serviceHandler.waitForExport(1); + + assertThat(exported).containsExactly(span2.toSpanData()); + } } |
