summaryrefslogtreecommitdiffstats
path: root/java/gov/nist/javax/sip/parser/Pipeline.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/gov/nist/javax/sip/parser/Pipeline.java')
-rw-r--r--java/gov/nist/javax/sip/parser/Pipeline.java196
1 files changed, 196 insertions, 0 deletions
diff --git a/java/gov/nist/javax/sip/parser/Pipeline.java b/java/gov/nist/javax/sip/parser/Pipeline.java
new file mode 100644
index 0000000..26dbfb6
--- /dev/null
+++ b/java/gov/nist/javax/sip/parser/Pipeline.java
@@ -0,0 +1,196 @@
+/*
+ * Conditions Of Use
+ *
+ * This software was developed by employees of the National Institute of
+ * Standards and Technology (NIST), an agency of the Federal Government.
+ * Pursuant to title 15 Untied States Code Section 105, works of NIST
+ * employees are not subject to copyright protection in the United States
+ * and are considered to be in the public domain. As a result, a formal
+ * license is not needed to use the software.
+ *
+ * This software is provided by NIST as a service and is expressly
+ * provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
+ * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
+ * AND DATA ACCURACY. NIST does not warrant or make any representations
+ * regarding the use of the software or the results thereof, including but
+ * not limited to the correctness, accuracy, reliability or usefulness of
+ * the software.
+ *
+ * Permission to use this software is contingent upon your acceptance
+ * of the terms of this agreement
+ *
+ * .
+ *
+ */
+package gov.nist.javax.sip.parser;
+
+import gov.nist.core.InternalErrorHandler;
+import gov.nist.javax.sip.stack.SIPStackTimerTask;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Input class for the pipelined parser. Buffer all bytes read from the socket
+ * and make them available to the message parser.
+ *
+ * @author M. Ranganathan (Contains a bug fix contributed by Rob Daugherty (
+ * Lucent Technologies) )
+ *
+ */
+
+public class Pipeline extends InputStream {
+ private LinkedList buffList;
+
+ private Buffer currentBuffer;
+
+ private boolean isClosed;
+
+ private Timer timer;
+
+ private InputStream pipe;
+
+ private int readTimeout;
+
+ private TimerTask myTimerTask;
+
+ class MyTimer extends SIPStackTimerTask {
+ Pipeline pipeline;
+
+ private boolean isCancelled;
+
+ protected MyTimer(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ protected void runTask() {
+ if (this.isCancelled)
+ return;
+
+ try {
+ pipeline.close();
+ } catch (IOException ex) {
+ InternalErrorHandler.handleException(ex);
+ }
+ }
+
+ public boolean cancel() {
+ boolean retval = super.cancel();
+ this.isCancelled = true;
+ return retval;
+ }
+
+ }
+
+ class Buffer {
+ byte[] bytes;
+
+ int length;
+
+ int ptr;
+
+ public Buffer(byte[] bytes, int length) {
+ ptr = 0;
+ this.length = length;
+ this.bytes = bytes;
+ }
+
+ public int getNextByte() {
+ int retval = bytes[ptr++] & 0xFF;
+ return retval;
+ }
+
+ }
+
+ public void startTimer() {
+ if (this.readTimeout == -1)
+ return;
+ // TODO make this a tunable number. For now 4 seconds
+ // between reads seems reasonable upper limit.
+ this.myTimerTask = new MyTimer(this);
+ this.timer.schedule(this.myTimerTask, this.readTimeout);
+ }
+
+ public void stopTimer() {
+ if (this.readTimeout == -1)
+ return;
+ if (this.myTimerTask != null)
+ this.myTimerTask.cancel();
+ }
+
+ public Pipeline(InputStream pipe, int readTimeout, Timer timer) {
+ // pipe is the Socket stream
+ // this is recorded here to implement a timeout.
+ this.timer = timer;
+ this.pipe = pipe;
+ buffList = new LinkedList();
+ this.readTimeout = readTimeout;
+ }
+
+ public void write(byte[] bytes, int start, int length) throws IOException {
+ if (this.isClosed)
+ throw new IOException("Closed!!");
+ Buffer buff = new Buffer(bytes, length);
+ buff.ptr = start;
+ synchronized (this.buffList) {
+ buffList.add(buff);
+ buffList.notifyAll();
+ }
+ }
+
+ public void write(byte[] bytes) throws IOException {
+ if (this.isClosed)
+ throw new IOException("Closed!!");
+ Buffer buff = new Buffer(bytes, bytes.length);
+ synchronized (this.buffList) {
+ buffList.add(buff);
+ buffList.notifyAll();
+ }
+ }
+
+ public void close() throws IOException {
+ this.isClosed = true;
+ synchronized (this.buffList) {
+ this.buffList.notifyAll();
+ }
+
+ // JvB: added
+ this.pipe.close();
+ }
+
+ public int read() throws IOException {
+ // if (this.isClosed) return -1;
+ synchronized (this.buffList) {
+ if (currentBuffer != null
+ && currentBuffer.ptr < currentBuffer.length) {
+ int retval = currentBuffer.getNextByte();
+ if (currentBuffer.ptr == currentBuffer.length)
+ this.currentBuffer = null;
+ return retval;
+ }
+ // Bug fix contributed by Rob Daugherty.
+ if (this.isClosed && this.buffList.isEmpty())
+ return -1;
+ try {
+ // wait till something is posted.
+ while (this.buffList.isEmpty()) {
+ this.buffList.wait();
+ if (this.isClosed)
+ return -1;
+ }
+ currentBuffer = (Buffer) this.buffList.removeFirst();
+ int retval = currentBuffer.getNextByte();
+ if (currentBuffer.ptr == currentBuffer.length)
+ this.currentBuffer = null;
+ return retval;
+ } catch (InterruptedException ex) {
+ throw new IOException(ex.getMessage());
+ } catch (NoSuchElementException ex) {
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ }
+ }
+ }
+
+}