aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md1
-rw-r--r--api/src/main/java/io/opencensus/trace/export/ExportComponent.java7
-rw-r--r--impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java57
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java2
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java3
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java6
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java5
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java2
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java22
-rw-r--r--impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java91
10 files changed, 167 insertions, 29 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 756d3b41..03fb4bb7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## Unreleased
+- Adds Tracing.getExportComponent().shutdown() for use within application shutdown hooks.
## 0.13.2 - 2018-05-08
- Map http attributes to Stackdriver format (fix [#1153](https://github.com/census-instrumentation/opencensus-java/issues/1153)).
diff --git a/api/src/main/java/io/opencensus/trace/export/ExportComponent.java b/api/src/main/java/io/opencensus/trace/export/ExportComponent.java
index 63447a76..bf06089c 100644
--- a/api/src/main/java/io/opencensus/trace/export/ExportComponent.java
+++ b/api/src/main/java/io/opencensus/trace/export/ExportComponent.java
@@ -66,6 +66,13 @@ public abstract class ExportComponent {
*/
public abstract SampledSpanStore getSampledSpanStore();
+ /**
+ * Will shutdown this ExportComponent after flushing any pending spans.
+ *
+ * @since 0.13
+ */
+ public void shutdown() {}
+
private static final class NoopExportComponent extends ExportComponent {
private final SampledSpanStore noopSampledSpanStore =
SampledSpanStore.newNoopSampledSpanStore();
diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
index 0c58cb59..5145ca3b 100644
--- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
+++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
@@ -24,6 +24,9 @@ import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.opencensus.implcore.internal.DaemonThreadFactory;
import io.opencensus.implcore.internal.EventQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -85,6 +88,9 @@ import javax.annotation.concurrent.ThreadSafe;
*/
@ThreadSafe
public final class DisruptorEventQueue implements EventQueue {
+
+ private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName());
+
// Number of events that can be enqueued at any one time. If more than this are enqueued,
// then subsequent attempts to enqueue new entries will block.
// TODO(aveitch): consider making this a parameter to the constructor, so the queue can be
@@ -99,9 +105,11 @@ public final class DisruptorEventQueue implements EventQueue {
// Ring Buffer for the {@link Disruptor} that underlies the queue.
private final RingBuffer<DisruptorEvent> ringBuffer;
+ private volatile DisruptorEnqueuer enqueuer;
+
// Creates a new EventQueue. Private to prevent creation of non-singleton instance.
// Suppress warnings for disruptor.handleEventsWith.
- @SuppressWarnings({"unchecked"})
+ @SuppressWarnings({"unchecked", "nullness"})
private DisruptorEventQueue() {
// Create new Disruptor for processing. Note that Disruptor creates a single thread per
// consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details);
@@ -116,6 +124,20 @@ public final class DisruptorEventQueue implements EventQueue {
disruptor.handleEventsWith(DisruptorEventHandler.INSTANCE);
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
+
+ enqueuer =
+ new DisruptorEnqueuer() {
+ @Override
+ public void enqueue(Entry entry) {
+ long sequence = ringBuffer.next();
+ try {
+ DisruptorEvent event = ringBuffer.get(sequence);
+ event.setEntry(entry);
+ } finally {
+ ringBuffer.publish(sequence);
+ }
+ }
+ };
}
/**
@@ -134,17 +156,36 @@ public final class DisruptorEventQueue implements EventQueue {
*/
@Override
public void enqueue(Entry entry) {
- long sequence = ringBuffer.next();
- try {
- DisruptorEvent event = ringBuffer.get(sequence);
- event.setEntry(entry);
- } finally {
- ringBuffer.publish(sequence);
- }
+ enqueuer.enqueue(entry);
+ }
+
+ /** Shuts down the underlying disruptor. */
+ @Override
+ public void shutdown() {
+ enqueuer =
+ new DisruptorEnqueuer() {
+ final AtomicBoolean logged = new AtomicBoolean(false);
+
+ @Override
+ public void enqueue(Entry entry) {
+ if (!logged.getAndSet(true)) {
+ logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown.");
+ }
+ }
+ };
+
+ disruptor.shutdown();
+ }
+
+ // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer
+ private abstract static class DisruptorEnqueuer {
+
+ public abstract void enqueue(Entry entry);
}
// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
private static final class DisruptorEvent {
+
// TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so
// intuitively this variable must be volatile.
@Nullable private volatile Entry entry = null;
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java
index 967d9b89..6eb1149a 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java
@@ -20,6 +20,8 @@ package io.opencensus.implcore.internal;
public interface EventQueue {
void enqueue(Entry entry);
+ void shutdown();
+
/**
* Base interface to be used for all entries in {@link EventQueue}. For example usage, see {@code
* DisruptorEventQueue}.
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java
index 297ecd4c..58c61c89 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java
@@ -26,4 +26,7 @@ public class SimpleEventQueue implements EventQueue {
public void enqueue(Entry entry) {
entry.process();
}
+
+ @Override
+ public void shutdown() {}
}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
index e77d1f8e..19817380 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
@@ -47,6 +47,12 @@ public final class ExportComponentImpl extends ExportComponent {
return sampledSpanStore;
}
+ @Override
+ public void shutdown() {
+ sampledSpanStore.shutdown();
+ spanExporter.shutdown();
+ }
+
/**
* Returns a new {@code ExportComponentImpl} that has valid instances for {@link RunningSpanStore}
* and {@link SampledSpanStore}.
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java
index 1bdb3f41..fe0132d8 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java
@@ -275,6 +275,11 @@ public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl {
eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames));
}
+ @Override
+ protected void shutdown() {
+ eventQueue.shutdown();
+ }
+
private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) {
synchronized (samples) {
for (String spanName : spanNames) {
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
index 0c83a05a..302d5cd3 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
@@ -45,6 +45,8 @@ public abstract class SampledSpanStoreImpl extends SampledSpanStore {
*/
public abstract void considerForSampling(SpanImpl span);
+ protected void shutdown() {}
+
private static final class NoopSampledSpanStoreImpl extends SampledSpanStoreImpl {
private static final Summary EMPTY_SUMMARY =
Summary.create(Collections.<String, PerSpanNameSummary>emptyMap());
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java
index e702d38c..12940739 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java
@@ -74,6 +74,15 @@ public final class SpanExporterImpl extends SpanExporter {
worker.unregisterHandler(name);
}
+ protected void flush() {
+ worker.flush();
+ }
+
+ protected void shutdown() {
+ flush();
+ workerThread.interrupt();
+ }
+
private SpanExporterImpl(Worker worker) {
this.workerThread =
new DaemonThreadFactory("ExportComponent.ServiceExporterThread").newThread(worker);
@@ -188,5 +197,18 @@ public final class SpanExporterImpl extends SpanExporter {
}
}
}
+
+ void flush() {
+ List<SpanImpl> spansCopy;
+ synchronized (monitor) {
+ spansCopy = new ArrayList<SpanImpl>(spans);
+ spans.clear();
+ }
+
+ final List<SpanData> spanDataList = fromSpanImplToSpanData(spansCopy);
+ if (!spanDataList.isEmpty()) {
+ onBatchExport(spanDataList);
+ }
+ }
}
}
diff --git a/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java b/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java
index ee030277..9de9c67c 100644
--- a/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java
+++ b/impl_core/src/test/java/io/opencensus/implcore/trace/export/SpanExporterImplTest.java
@@ -59,10 +59,7 @@ public class SpanExporterImplTest {
private final SpanContext notSampledSpanContext =
SpanContext.create(
TraceId.generateRandomId(random), SpanId.generateRandomId(random), TraceOptions.DEFAULT);
- private final SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));
private final RunningSpanStoreImpl runningSpanStore = new InProcessRunningSpanStoreImpl();
- private final StartEndHandler startEndHandler =
- new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue());
private EnumSet<Options> recordSpanOptions = EnumSet.of(Options.RECORD_EVENTS);
private final TestHandler serviceHandler = new TestHandler();
@Mock private Handler mockServiceHandler;
@@ -70,11 +67,9 @@ public class SpanExporterImplTest {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
-
- spanExporter.registerHandler("test.service", serviceHandler);
}
- private SpanImpl createSampledEndedSpan(String spanName) {
+ private SpanImpl createSampledEndedSpan(StartEndHandler startEndHandler, String spanName) {
SpanImpl span =
SpanImpl.startSpan(
sampledSpanContext,
@@ -90,7 +85,7 @@ public class SpanExporterImplTest {
return span;
}
- private SpanImpl createNotSampledEndedSpan(String spanName) {
+ private SpanImpl createNotSampledEndedSpan(StartEndHandler startEndHandler, String spanName) {
SpanImpl span =
SpanImpl.startSpan(
notSampledSpanContext,
@@ -108,20 +103,32 @@ public class SpanExporterImplTest {
@Test
public void exportDifferentSampledSpans() {
- SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_2);
+ SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));
+ StartEndHandler startEndHandler =
+ new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue());
+
+ spanExporter.registerHandler("test.service", serviceHandler);
+
+ SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2);
List<SpanData> exported = serviceHandler.waitForExport(2);
assertThat(exported).containsExactly(span1.toSpanData(), span2.toSpanData());
}
@Test
public void exportMoreSpansThanTheBufferSize() {
- SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span3 = createSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span4 = createSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span5 = createSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span6 = createSampledEndedSpan(SPAN_NAME_1);
+ SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));
+ StartEndHandler startEndHandler =
+ new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue());
+
+ spanExporter.registerHandler("test.service", serviceHandler);
+
+ SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span3 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span4 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span5 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span6 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
List<SpanData> exported = serviceHandler.waitForExport(6);
assertThat(exported)
.containsExactly(
@@ -135,6 +142,10 @@ public class SpanExporterImplTest {
@Test
public void interruptWorkerThreadStops() throws InterruptedException {
+ SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));
+
+ spanExporter.registerHandler("test.service", serviceHandler);
+
Thread serviceExporterThread = spanExporter.getServiceExporterThread();
serviceExporterThread.interrupt();
// Test that the worker thread will stop.
@@ -146,22 +157,35 @@ public class SpanExporterImplTest {
doThrow(new IllegalArgumentException("No export for you."))
.when(mockServiceHandler)
.export(anyListOf(SpanData.class));
+
+ SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));
+ StartEndHandler startEndHandler =
+ new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue());
+
+ spanExporter.registerHandler("test.service", serviceHandler);
+
spanExporter.registerHandler("mock.service", mockServiceHandler);
- SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1);
+ SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
List<SpanData> exported = serviceHandler.waitForExport(1);
assertThat(exported).containsExactly(span1.toSpanData());
// Continue to export after the exception was received.
- SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_1);
+ SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
exported = serviceHandler.waitForExport(1);
assertThat(exported).containsExactly(span2.toSpanData());
}
@Test
public void exportSpansToMultipleServices() {
+ SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));
+ StartEndHandler startEndHandler =
+ new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue());
+
+ spanExporter.registerHandler("test.service", serviceHandler);
+
TestHandler serviceHandler2 = new TestHandler();
spanExporter.registerHandler("test.service2", serviceHandler2);
- SpanImpl span1 = createSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_2);
+ SpanImpl span1 = createSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2);
List<SpanData> exported1 = serviceHandler.waitForExport(2);
List<SpanData> exported2 = serviceHandler2.waitForExport(2);
assertThat(exported1).containsExactly(span1.toSpanData(), span2.toSpanData());
@@ -170,8 +194,14 @@ public class SpanExporterImplTest {
@Test
public void exportNotSampledSpans() {
- SpanImpl span1 = createNotSampledEndedSpan(SPAN_NAME_1);
- SpanImpl span2 = createSampledEndedSpan(SPAN_NAME_2);
+ SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(1, 0));
+ StartEndHandler startEndHandler =
+ new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue());
+
+ spanExporter.registerHandler("test.service", serviceHandler);
+
+ SpanImpl span1 = createNotSampledEndedSpan(startEndHandler, SPAN_NAME_1);
+ SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2);
// Spans are recorded and exported in the same order as they are ended, we test that a non
// sampled span is not exported by creating and ending a sampled span after a non sampled span
// and checking that the first exported span is the sampled span (the non sampled did not get
@@ -182,4 +212,23 @@ public class SpanExporterImplTest {
assertThat(exported).doesNotContain(span1.toSpanData());
assertThat(exported).containsExactly(span2.toSpanData());
}
+
+ @Test(timeout = 10000L)
+ public void exportNotSampledSpansFlushed() {
+ // Set the export delay to zero, for no timeout, in order to confirm the #flush() below works
+ SpanExporterImpl spanExporter = SpanExporterImpl.create(4, Duration.create(0, 0));
+ StartEndHandler startEndHandler =
+ new StartEndHandlerImpl(spanExporter, runningSpanStore, null, new SimpleEventQueue());
+
+ spanExporter.registerHandler("test.service", serviceHandler);
+
+ SpanImpl span2 = createSampledEndedSpan(startEndHandler, SPAN_NAME_2);
+
+ // Force a flush, without this, the #waitForExport() call below would block indefinitely.
+ spanExporter.flush();
+
+ List<SpanData> exported = serviceHandler.waitForExport(1);
+
+ assertThat(exported).containsExactly(span2.toSpanData());
+ }
}