diff options
Diffstat (limited to 'java/gov/nist/javax/sip/parser/Pipeline.java')
-rw-r--r-- | java/gov/nist/javax/sip/parser/Pipeline.java | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/java/gov/nist/javax/sip/parser/Pipeline.java b/java/gov/nist/javax/sip/parser/Pipeline.java new file mode 100644 index 0000000..26dbfb6 --- /dev/null +++ b/java/gov/nist/javax/sip/parser/Pipeline.java @@ -0,0 +1,196 @@ +/* + * Conditions Of Use + * + * This software was developed by employees of the National Institute of + * Standards and Technology (NIST), an agency of the Federal Government. + * Pursuant to title 15 Untied States Code Section 105, works of NIST + * employees are not subject to copyright protection in the United States + * and are considered to be in the public domain. As a result, a formal + * license is not needed to use the software. + * + * This software is provided by NIST as a service and is expressly + * provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED + * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT + * AND DATA ACCURACY. NIST does not warrant or make any representations + * regarding the use of the software or the results thereof, including but + * not limited to the correctness, accuracy, reliability or usefulness of + * the software. + * + * Permission to use this software is contingent upon your acceptance + * of the terms of this agreement + * + * . + * + */ +package gov.nist.javax.sip.parser; + +import gov.nist.core.InternalErrorHandler; +import gov.nist.javax.sip.stack.SIPStackTimerTask; + +import java.io.*; +import java.util.*; + +/** + * Input class for the pipelined parser. Buffer all bytes read from the socket + * and make them available to the message parser. + * + * @author M. Ranganathan (Contains a bug fix contributed by Rob Daugherty ( + * Lucent Technologies) ) + * + */ + +public class Pipeline extends InputStream { + private LinkedList buffList; + + private Buffer currentBuffer; + + private boolean isClosed; + + private Timer timer; + + private InputStream pipe; + + private int readTimeout; + + private TimerTask myTimerTask; + + class MyTimer extends SIPStackTimerTask { + Pipeline pipeline; + + private boolean isCancelled; + + protected MyTimer(Pipeline pipeline) { + this.pipeline = pipeline; + } + + protected void runTask() { + if (this.isCancelled) + return; + + try { + pipeline.close(); + } catch (IOException ex) { + InternalErrorHandler.handleException(ex); + } + } + + public boolean cancel() { + boolean retval = super.cancel(); + this.isCancelled = true; + return retval; + } + + } + + class Buffer { + byte[] bytes; + + int length; + + int ptr; + + public Buffer(byte[] bytes, int length) { + ptr = 0; + this.length = length; + this.bytes = bytes; + } + + public int getNextByte() { + int retval = bytes[ptr++] & 0xFF; + return retval; + } + + } + + public void startTimer() { + if (this.readTimeout == -1) + return; + // TODO make this a tunable number. For now 4 seconds + // between reads seems reasonable upper limit. + this.myTimerTask = new MyTimer(this); + this.timer.schedule(this.myTimerTask, this.readTimeout); + } + + public void stopTimer() { + if (this.readTimeout == -1) + return; + if (this.myTimerTask != null) + this.myTimerTask.cancel(); + } + + public Pipeline(InputStream pipe, int readTimeout, Timer timer) { + // pipe is the Socket stream + // this is recorded here to implement a timeout. + this.timer = timer; + this.pipe = pipe; + buffList = new LinkedList(); + this.readTimeout = readTimeout; + } + + public void write(byte[] bytes, int start, int length) throws IOException { + if (this.isClosed) + throw new IOException("Closed!!"); + Buffer buff = new Buffer(bytes, length); + buff.ptr = start; + synchronized (this.buffList) { + buffList.add(buff); + buffList.notifyAll(); + } + } + + public void write(byte[] bytes) throws IOException { + if (this.isClosed) + throw new IOException("Closed!!"); + Buffer buff = new Buffer(bytes, bytes.length); + synchronized (this.buffList) { + buffList.add(buff); + buffList.notifyAll(); + } + } + + public void close() throws IOException { + this.isClosed = true; + synchronized (this.buffList) { + this.buffList.notifyAll(); + } + + // JvB: added + this.pipe.close(); + } + + public int read() throws IOException { + // if (this.isClosed) return -1; + synchronized (this.buffList) { + if (currentBuffer != null + && currentBuffer.ptr < currentBuffer.length) { + int retval = currentBuffer.getNextByte(); + if (currentBuffer.ptr == currentBuffer.length) + this.currentBuffer = null; + return retval; + } + // Bug fix contributed by Rob Daugherty. + if (this.isClosed && this.buffList.isEmpty()) + return -1; + try { + // wait till something is posted. + while (this.buffList.isEmpty()) { + this.buffList.wait(); + if (this.isClosed) + return -1; + } + currentBuffer = (Buffer) this.buffList.removeFirst(); + int retval = currentBuffer.getNextByte(); + if (currentBuffer.ptr == currentBuffer.length) + this.currentBuffer = null; + return retval; + } catch (InterruptedException ex) { + throw new IOException(ex.getMessage()); + } catch (NoSuchElementException ex) { + ex.printStackTrace(); + throw new IOException(ex.getMessage()); + } + } + } + +} |