summaryrefslogtreecommitdiff
path: root/jurt
diff options
context:
space:
mode:
authorRüdiger Timm <rt@openoffice.org>2006-12-01 13:54:09 +0000
committerRüdiger Timm <rt@openoffice.org>2006-12-01 13:54:09 +0000
commit15cc2f0db99c900ad3bc8d70f11976b8338e3280 (patch)
treeabc6538e048bc8cb00ce20c08613e6421be8a8aa /jurt
parent165cf111953e79b200bf5f57bb09e0c636ccec2b (diff)
INTEGRATION: CWS sb23 (1.15.26); FILE MERGED
2006/08/18 16:22:50 sb 1.15.26.11: RESYNC: (1.15-1.16); FILE MERGED 2005/03/15 16:38:13 sb 1.15.26.10: #88601# Minor fix. 2005/03/15 10:29:36 sb 1.15.26.9: #88601# Support for current context in Java URP. 2005/03/14 16:28:49 sb 1.15.26.8: #i35277# Queue release requests to improve performance. 2005/03/14 15:00:50 sb 1.15.26.7: #i35277# Further cleanup. 2005/03/14 10:55:37 sb 1.15.26.6: #i35277# Further cleanup. 2005/03/11 14:03:39 sb 1.15.26.5: #i35277# Improve URP performance by always sending asynchronous release requests, regardless of forceSynchronous. 2005/02/18 16:48:43 sb 1.15.26.4: #i35277# More cleanup; first part of supporting CurrentContext in URP. 2005/02/18 09:21:49 sb 1.15.26.3: #i35277# More cleanup; functionality moved from java_remote_bridge to urp, so that urp can autonomously handle protocol property requests. 2005/02/16 16:54:44 sb 1.15.26.2: #i35277# More cleanup. 2004/10/11 10:21:14 sb 1.15.26.1: #i35277# Cleand up (as a prerequisite to doing any substantial changes).
Diffstat (limited to 'jurt')
-rw-r--r--jurt/com/sun/star/lib/uno/protocols/urp/urp.java1173
1 files changed, 636 insertions, 537 deletions
diff --git a/jurt/com/sun/star/lib/uno/protocols/urp/urp.java b/jurt/com/sun/star/lib/uno/protocols/urp/urp.java
index 7722d745e352..f816de74e63a 100644
--- a/jurt/com/sun/star/lib/uno/protocols/urp/urp.java
+++ b/jurt/com/sun/star/lib/uno/protocols/urp/urp.java
@@ -4,9 +4,9 @@
*
* $RCSfile: urp.java,v $
*
- * $Revision: 1.16 $
+ * $Revision: 1.17 $
*
- * last change: $Author: rt $ $Date: 2005-09-07 19:04:14 $
+ * last change: $Author: rt $ $Date: 2006-12-01 14:54:09 $
*
* The Contents of this file are made available subject to
* the terms of GNU Lesser General Public License Version 2.1.
@@ -32,612 +32,711 @@
* MA 02111-1307 USA
*
************************************************************************/
-package com.sun.star.lib.uno.protocols.urp;
-
-import java.lang.reflect.Array;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-
-import com.sun.star.container.NoSuchElementException;
-import com.sun.star.uno.IBridge;
+package com.sun.star.lib.uno.protocols.urp;
-import com.sun.star.lib.uno.environments.remote.IMessage;
-import com.sun.star.lib.uno.environments.remote.Protocol;
+import com.sun.star.bridge.InvalidProtocolChangeException;
+import com.sun.star.bridge.ProtocolProperty;
+import com.sun.star.bridge.XProtocolProperties;
+import com.sun.star.lib.uno.environments.remote.IProtocol;
+import com.sun.star.lib.uno.environments.remote.Message;
import com.sun.star.lib.uno.environments.remote.ThreadId;
-
-import com.sun.star.uno.IMethodDescription;
-import com.sun.star.uno.ITypeDescription;
-
-
+import com.sun.star.lib.uno.typedesc.MethodDescription;
import com.sun.star.lib.uno.typedesc.TypeDescription;
-
import com.sun.star.uno.Any;
-import com.sun.star.uno.UnoRuntime;
+import com.sun.star.uno.IBridge;
+import com.sun.star.uno.IMethodDescription;
+import com.sun.star.uno.ITypeDescription;
import com.sun.star.uno.Type;
-
-
-/**
- * This class implements the complete urp protocol
- * from uno.
- * <p>
- * @version $Revision: 1.16 $ $ $Date: 2005-09-07 19:04:14 $
- * @author Kay Ramme
- * @since UDK1.0
- */
-public class urp extends Protocol {
- /**
- * When set to true, enables various debugging output.
- */
- static public final boolean DEBUG = false;
-
- static private final ITypeDescription __emptyITypeDescArray[] = new ITypeDescription[0];
- static private final short __cache_size = 256;
-
-
- protected IBridge _iBridge;
-
- private String _in_oid;
- private ITypeDescription _in_interface;
- private ThreadId _in_threadId;
-
- private String _out_oid;
- private ITypeDescription _out_interface;
- private ThreadId _out_threadId;
-
- private int _message_count;
- private boolean _ignore_cache;
- private Marshal _marshal;
- private Unmarshal _unmarshal;
-
- private String _operationContainer[] = new String[1];
- private Object _paramsContainer[][] = new Object[1][];
- private boolean _synchronContainer[] = new boolean[1];
- private boolean _mustReplyContainer[] = new boolean[1];
- private boolean _exceptionContainer[] = new boolean[1];
-
- static private final byte BIG_HEADER = (byte)0x80;
- // big header flags
- static private final byte REQUEST = 0x40;
- static private final byte NEWTYPE = 0x20;
- static private final byte NEWOID = 0x10;
- static private final byte NEWTID = 0x08;
- static private final byte LONGMETHODID = 0x04;
- static private final byte IGNORECACHE = 0x02;
- static private final byte MOREFLAGS = 0x01;
-
- // MOREFLAGS flags
- static private final byte MUSTREPLY = (byte)0x80;
- static private final byte SYNCHRONOUSE = (byte)0x40;
-
- static private final byte DIR_MID = 0x40;
- static private final byte EXCEPTION = 0x20;
-
-
-
- public urp(IBridge iBridge) {
- _iBridge = iBridge;
-
- _marshal = new Marshal(iBridge, __cache_size);
- _unmarshal = new Unmarshal(iBridge, __cache_size);
- }
-
-
- /**
- * Gets the name of the protocol.
- * <p>
- * @result the name of the protocol (iiop)
- */
- public String getName() {
- return "urp";
+import com.sun.star.uno.TypeClass;
+import com.sun.star.uno.UnoRuntime;
+import com.sun.star.uno.XCurrentContext;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+// This class internally relies on the availability of Java UNO type information
+// for the interface type com.sun.star.bridge.XProtocolProperties, even though
+// URP itself does not rely on that type.
+
+public final class urp implements IProtocol {
+ public urp(
+ IBridge bridge, String attributes, InputStream input,
+ OutputStream output)
+ {
+ this.input = new DataInputStream(input);
+ this.output = new DataOutputStream(output);
+ marshal = new Marshal(bridge, CACHE_SIZE);
+ unmarshal = new Unmarshal(bridge, CACHE_SIZE);
+ forceSynchronous = parseAttributes(attributes);
}
- private Object readReply(int header, boolean exception[]) {
- if((header & NEWTID) != 0) // new thread id ?
- _in_threadId = _unmarshal.readThreadId();
-
- // get the out signature and parameter array of the reply
- Object objects[] = (Object[])removePendingRequest(_in_threadId);
- Object param[] = (Object[])objects[0];
- ITypeDescription signature[] = (ITypeDescription[])objects[1];
- TypeDescription resultType = (TypeDescription)objects[2];
-
- exception[0] = (header & EXCEPTION) != 0;
- if(exception[0]) {// Exception? So the reply has an any as the result
- signature = __emptyITypeDescArray;
- try {
- resultType = TypeDescription.getTypeDescription("any");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("this cannot happen: " + e);
+ // @see IProtocol#init
+ public void init() throws IOException {
+ synchronized (monitor) {
+ if (state == STATE_INITIAL0) {
+ sendRequestChange();
}
}
-
- // read the result object
- Object result = null;
- if(resultType != null)
- result = _unmarshal.readValue(resultType);
-
- // read the out parameters
- for(int i = 0; i < signature.length; ++ i) {
- if(signature[i] != null) // is this an out parameter
- Array.set(
- param[i], 0,
- _unmarshal.readValue(
- (TypeDescription) signature[i].getComponentType()));
- }
-
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".readReply:" + result);
-
- return result;
}
-
- private Object []readParams(IMethodDescription iMethodDescription) {
- ITypeDescription in_sig[] = iMethodDescription.getInSignature();
- ITypeDescription out_sig[] = iMethodDescription.getOutSignature();
-
- Object params[] = new Object[in_sig.length];
-
- for(int i = 0; i < params.length; ++ i) {
- if(in_sig[i] != null) // is it an in parameter?
- if(out_sig[i] != null) {// is it also an out -> inout?
- Object inout = Array.newInstance(out_sig[i].getComponentType().getZClass(), 1);
- Array.set(
- inout, 0,
- _unmarshal.readValue(
- (TypeDescription) out_sig[i].getComponentType()));
- params[i] = inout;
+ // @see IProtocol#readMessage
+ public Message readMessage() throws IOException {
+ for (;;) {
+ if (!unmarshal.hasMore()) {
+ unmarshal.reset(readBlock());
+ if (!unmarshal.hasMore()) {
+ throw new IOException("closeConnection message received");
}
- else // it is only an in parameter
- params[i] = _unmarshal.readValue(
- (TypeDescription) in_sig[i]);
- else // it is only an out parameter, so provide the holder
- params[i] = Array.newInstance(out_sig[i].getComponentType().getZClass(), 1);
+ }
+ UrpMessage msg;
+ int header = unmarshal.read8Bit();
+ if ((header & HEADER_LONGHEADER) != 0) {
+ if ((header & HEADER_REQUEST) != 0) {
+ msg = readLongRequest(header);
+ } else {
+ msg = readReply(header);
+ }
+ } else {
+ msg = readShortRequest(header);
+ }
+ if (msg.isInternal()) {
+ handleInternalMessage(msg);
+ } else {
+ return msg;
+ }
}
-
- return params;
}
-
- private void readShortRequest(int header, String operation[], Object param[][], boolean synchron[]) {
- ++ _requestsRecieved;
-
- int methodId;
- if ((header & DIR_MID) != 0) {
- methodId = ((header & 0x3F) << 8) | _unmarshal.read8Bit();
- } else {
- methodId = header & 0x3F;
+ // @see IProtocol#writeRequest
+ public boolean writeRequest(
+ String oid, TypeDescription type, String function, ThreadId tid,
+ Object[] arguments)
+ throws IOException
+ {
+ if (oid.equals(PROPERTIES_OID)) {
+ throw new IllegalArgumentException("illegal OID " + oid);
}
-
- IMethodDescription iMethodDescription = _in_interface.getMethodDescription(methodId);
- operation[0] = iMethodDescription.getName();
-
- synchron[0] = !iMethodDescription.isOneway();
-
- param[0] = readParams(iMethodDescription);
-
- if(synchron[0]) { // if the request is synchron, it is pending
- putPendingReply(_in_threadId, new Object[]{param[0], iMethodDescription.getOutSignature(), iMethodDescription.getReturnSignature()/*, mMDesc*/});
+ synchronized (monitor) {
+ while (!initialized) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e.toString());
+ }
+ }
+ return writeRequest(false, oid, type, function, tid, arguments);
}
-
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".readShortRequest:" + _in_oid + " " + operation[0] + " " + synchron[0]);
}
- private void readLongRequest(int header, String operation[], Object param[][], boolean synchron[], boolean mustReply[]) {
- ++ _requestsRecieved;
-
- // read the extended flags
- if((header & MOREFLAGS) != 0) {// is there an extended flags byte?
- int exFlags = _unmarshal.read8Bit();
-
- mustReply[0] = (exFlags & MUSTREPLY) != 0;
- synchron[0] = (exFlags & SYNCHRONOUSE) != 0;
- }
-
- int methodId;
- if ((header & LONGMETHODID) != 0) {
- methodId = _unmarshal.read16Bit();
- } else {
- methodId = _unmarshal.read8Bit();
+ // @see IProtocol#writeReply
+ public void writeReply(boolean exception, ThreadId tid, Object result)
+ throws IOException
+ {
+ synchronized (output) {
+ writeQueuedReleases();
+ int header = HEADER_LONGHEADER;
+ PendingRequests.Item pending = pendingIn.pop(tid);
+ TypeDescription resultType;
+ ITypeDescription[] argTypes;
+ Object[] args;
+ if (exception) {
+ header |= HEADER_EXCEPTION;
+ resultType = TypeDescription.getTypeDescription(TypeClass.ANY);
+ argTypes = null;
+ args = null;
+ } else {
+ resultType = (TypeDescription)
+ pending.function.getReturnSignature();
+ argTypes = pending.function.getOutSignature();
+ args = pending.arguments;
+ }
+ if (!tid.equals(outL1Tid)) {
+ header |= HEADER_NEWTID;
+ outL1Tid = tid;
+ } else {
+ tid = null;
+ }
+ marshal.write8Bit(header);
+ if (tid != null) {
+ marshal.writeThreadId(tid);
+ }
+ marshal.writeValue(resultType, result);
+ if (argTypes != null) {
+ for (int i = 0; i < argTypes.length; ++i) {
+ if (argTypes[i] != null) {
+ marshal.writeValue(
+ (TypeDescription) argTypes[i].getComponentType(),
+ Array.get(args[i], 0));
+ }
+ }
+ }
+ writeBlock(true);
}
-
- if((header & NEWTYPE) != 0)
- _in_interface = _unmarshal.readType();
-
- IMethodDescription iMethodDescription = _in_interface.getMethodDescription(methodId);
-
- if((header & MOREFLAGS) == 0) // no ex flags, so get info from typeinfo
- synchron[0] = !iMethodDescription.isOneway();
-
- if((header & NEWOID) != 0) // new oid?
- _in_oid = _unmarshal.readObjectId();
-
- if((header & NEWTID) != 0) // new thread id ?
- _in_threadId = _unmarshal.readThreadId();
-
- _ignore_cache = ((header & IGNORECACHE) != 0); // do not use cache for this request?
-
- operation[0] = iMethodDescription.getName();
-
- param[0] = readParams(iMethodDescription);
-
- if(synchron[0]) // if the request is synchron, the reply is pending
- putPendingReply(_in_threadId, new Object[]{param[0], iMethodDescription.getOutSignature(), iMethodDescription.getReturnSignature()/*, mMDesc*/});
-
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".readLongRequest:" + _in_oid + " " + operation[0] + " " + synchron[0]);
}
- private Object readMessage(String operation[], Object param[], boolean synchron[], boolean mustReply[], boolean exception[]) {
- int header = _unmarshal.read8Bit();
-
- Class signature[];
- Object result = null;
-
- if((header & BIG_HEADER) != 0) { // full header?
- if((header & REQUEST) != 0) // a request ?
- readLongRequest(header, operation, (Object [][])param, synchron, mustReply);
- else // a reply
- result = readReply(header, exception);
+ private void sendRequestChange() throws IOException {
+ if (propertiesTid == null) {
+ propertiesTid = ThreadId.createFresh();
}
- else // only a short request header
- readShortRequest(header, operation, (Object[][])param, synchron);
-
- if(synchron[0]) // synchron implies MUSTREPLY
- mustReply[0] = true;
-
-
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".readMessage:" + _in_oid + " " + operation[0] + " " + _in_threadId + " " + param[0] + " " + result);
-
- return result;
+ random = new Random().nextInt();
+ writeRequest(
+ true, PROPERTIES_OID,
+ TypeDescription.getTypeDescription(XProtocolProperties.class),
+ PROPERTIES_FUN_REQUEST_CHANGE, propertiesTid,
+ new Object[] { new Integer(random) });
+ state = STATE_REQUESTED;
}
- public void writeRequest(String oid,
- TypeDescription zInterface,
- String operation,
- ThreadId threadId,
- Object params[],
- Boolean synchron[],
- Boolean mustReply[])
- {
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".writeRequest:" + oid + " " + zInterface + " " + operation);
-
- ++ _requestsSend;
- ++ _message_count;
-
- IMethodDescription iMethodDescription = zInterface.getMethodDescription(operation);
-
- byte header = 0;
- boolean bigHeader = false;
-
- if(_out_oid == null || !oid.equals(_out_oid)) { // change the oid?
- header |= NEWOID;
-
- _out_oid = oid;
- bigHeader = true;
+ private void handleInternalMessage(Message message) throws IOException {
+ if (message.isRequest()) {
+ String t = message.getType().getTypeName();
+ if (!t.equals("com.sun.star.bridge.XProtocolProperties")) {
+ throw new IOException(
+ "read URP protocol properties request with unsupported"
+ + " type " + t);
+ }
+ int fid = message.getMethod().getIndex();
+ switch (fid) {
+ case PROPERTIES_FID_REQUEST_CHANGE:
+ checkSynchronousPropertyRequest(message);
+ synchronized (monitor) {
+ switch (state) {
+ case STATE_INITIAL0:
+ case STATE_INITIAL:
+ writeReply(
+ false, message.getThreadId(), new Integer(1));
+ state = STATE_WAIT;
+ break;
+ case STATE_REQUESTED:
+ int n
+ = ((Integer) message.getArguments()[0]).intValue();
+ if (random < n) {
+ writeReply(
+ false, message.getThreadId(), new Integer(1));
+ state = STATE_WAIT;
+ } else if (random == n) {
+ writeReply(
+ false, message.getThreadId(), new Integer(-1));
+ state = STATE_INITIAL;
+ sendRequestChange();
+ } else {
+ writeReply(
+ false, message.getThreadId(), new Integer(0));
+ }
+ break;
+ default:
+ writeReply(
+ true, message.getThreadId(),
+ new com.sun.star.uno.RuntimeException(
+ "read URP protocol properties requestChange"
+ + " request in illegal state"));
+ break;
+ }
+ }
+ break;
+ case PROPERTIES_FID_COMMIT_CHANGE:
+ checkSynchronousPropertyRequest(message);
+ synchronized (monitor) {
+ if (state == STATE_WAIT) {
+ ProtocolProperty[] p = (ProtocolProperty[])
+ message.getArguments()[0];
+ boolean ok = true;
+ boolean cc = false;
+ int i = 0;
+ for (; i < p.length; ++i) {
+ if (p[i].Name.equals(PROPERTY_CURRENT_CONTEXT)) {
+ cc = true;
+ } else {
+ ok = false;
+ break;
+ }
+ }
+ if (ok) {
+ writeReply(false, message.getThreadId(), null);
+ } else {
+ writeReply(
+ true, message.getThreadId(),
+ new InvalidProtocolChangeException(
+ "", null, p[i], 1));
+ }
+ state = STATE_INITIAL;
+ if (!initialized) {
+ if (cc) {
+ currentContext = true;
+ initialized = true;
+ monitor.notifyAll();
+ } else {
+ sendRequestChange();
+ }
+ }
+ } else {
+ writeReply(
+ true, message.getThreadId(),
+ new com.sun.star.uno.RuntimeException(
+ "read URP protocol properties commitChange"
+ + " request in illegal state"));
+ }
+ }
+ break;
+ default:
+ throw new IOException(
+ "read URP protocol properties request with unsupported"
+ + " function ID " + fid);
+ }
+ } else {
+ synchronized (monitor) {
+ if (state == STATE_COMMITTED) {
+ // commitChange reply:
+ if (!message.isAbnormalTermination()) {
+ currentContext = true;
+ }
+ state = STATE_INITIAL;
+ initialized = true;
+ monitor.notifyAll();
+ } else {
+ // requestChange reply:
+ if (message.isAbnormalTermination()) {
+ // remote side probably does not support negotiation:
+ state = STATE_INITIAL;
+ initialized = true;
+ monitor.notifyAll();
+ } else {
+ int n = ((Integer) message.getResult()).intValue();
+ switch (n) {
+ case -1:
+ case 0:
+ break;
+ case 1:
+ writeRequest(
+ true, PROPERTIES_OID,
+ TypeDescription.getTypeDescription(
+ XProtocolProperties.class),
+ PROPERTIES_FUN_COMMIT_CHANGE, propertiesTid,
+ new Object[] {
+ new ProtocolProperty[] {
+ new ProtocolProperty(
+ PROPERTY_CURRENT_CONTEXT,
+ Any.VOID) } });
+ state = STATE_COMMITTED;
+ break;
+ default:
+ throw new IOException(
+ "read URP protocol properties "
+ + PROPERTIES_FUN_REQUEST_CHANGE
+ + " reply with illegal return value " + n);
+ }
+ }
+ }
+ }
}
- else
- oid = null;
-
-
- if(_out_interface == null || !_out_interface.equals(zInterface)) { // change interface
- header |= NEWTYPE;
+ }
- _out_interface = zInterface;
- bigHeader = true;
+ private void checkSynchronousPropertyRequest(Message message)
+ throws IOException
+ {
+ if (!message.isSynchronous()) {
+ throw new IOException(
+ "read URP protocol properties request for synchronous function"
+ + " marked as not SYNCHRONOUS");
}
- else
- zInterface = null;
-
- if(_out_threadId == null || !_out_threadId.equals(threadId)) { // change thread id
- header |= NEWTID;
+ }
- _out_threadId = threadId;
+ private byte[] readBlock() throws IOException {
+ int size = input.readInt();
+ input.readInt(); // ignore count
+ byte[] bytes = new byte[size];
+ input.readFully(bytes);
+ return bytes;
+ }
- bigHeader = true;
+ private UrpMessage readLongRequest(int header) throws IOException {
+ boolean sync = false;
+ if ((header & HEADER_MOREFLAGS) != 0) {
+ if (unmarshal.read8Bit() != (HEADER_MUSTREPLY | HEADER_SYNCHRONOUS))
+ {
+ throw new IOException(
+ "read URP request with bad MUSTREPLY/SYNCHRONOUS byte");
+ }
+ sync = true;
}
- else
- threadId = null;
-
- boolean hasExFlags = false;
-
- // if synchron is provided, test if it differs from declaration
- if(synchron[0] != null) {
- if(iMethodDescription.isOneway() == synchron[0].booleanValue()) {
- bigHeader = true;
- hasExFlags = true;
+ int funId = (header & HEADER_FUNCTIONID16) != 0
+ ? unmarshal.read16Bit() : unmarshal.read8Bit();
+ if ((header & HEADER_NEWTYPE) != 0) {
+ inL1Type = unmarshal.readType();
+ if (inL1Type.getTypeClass() != TypeClass.INTERFACE) {
+ throw new IOException(
+ "read URP request with non-interface type " + inL1Type);
}
}
- else
- synchron[0] = new Boolean(!iMethodDescription.isOneway());
-
- // if mustReply is provided and if it differs from synchron
- // then we have to write it
- if(mustReply[0] != null && (mustReply[0] != synchron[0])) {
- bigHeader = true;
- hasExFlags = true;
+ if ((header & HEADER_NEWOID) != 0) {
+ inL1Oid = unmarshal.readObjectId();
}
- else
- mustReply[0] = synchron[0];
-
- // Long request headers can handle 16-bit method IDs, and short request
- // headers can handle 14-bit method IDs:
- int methodId = iMethodDescription.getIndex();
- if (methodId < 0 || methodId >= 0x10000) {
- throw new IllegalArgumentException(
- "Method ID " + methodId + " out of range");
- } else if (methodId >= 0xC000) {
- bigHeader = true;
+ if ((header & HEADER_NEWTID) != 0) {
+ inL1Tid = unmarshal.readThreadId();
}
+ //TODO: check HEADER_IGNORECACHE
+ return readRequest(funId, sync);
+ }
- if(bigHeader) { // something has changed, send big header
- header |= BIG_HEADER; // big header
- header |= REQUEST;
- header |= hasExFlags ? MOREFLAGS : 0;
-
- if(methodId > 255)
- header |= LONGMETHODID;
-
- _marshal.write8Bit(header);
-
- if(hasExFlags) {// are there extended flags to write?
- byte exFlags = 0;
-
- exFlags |= synchron[0].booleanValue() ? SYNCHRONOUSE : 0;
- exFlags |= mustReply[0].booleanValue() ? MUSTREPLY : 0;
+ private UrpMessage readShortRequest(int header) {
+ int funId = (header & HEADER_FUNCTIONID14) != 0
+ ? ((header & HEADER_FUNCTIONID) << 8) | unmarshal.read8Bit()
+ : header & HEADER_FUNCTIONID;
+ return readRequest(funId, false);
+ }
- _marshal.write8Bit(exFlags);
+ private UrpMessage readRequest(int functionId, boolean forcedSynchronous) {
+ boolean internal = PROPERTIES_OID.equals(inL1Oid);
+ // inL1Oid may be null in XInstanceProvider.getInstance("")
+ XCurrentContext cc =
+ (currentContext && !internal
+ && functionId != MethodDescription.ID_RELEASE)
+ ? (XCurrentContext) unmarshal.readInterface(
+ new Type(XCurrentContext.class))
+ : null;
+ IMethodDescription desc = inL1Type.getMethodDescription(functionId);
+ ITypeDescription[] inSig = desc.getInSignature();
+ ITypeDescription[] outSig = desc.getOutSignature();
+ Object[] args = new Object[inSig.length];
+ for (int i = 0; i < args.length; ++i) {
+ if (inSig[i] != null) {
+ if (outSig[i] != null) {
+ Object inout = Array.newInstance(
+ outSig[i].getComponentType().getZClass(), 1);
+ Array.set(
+ inout, 0,
+ unmarshal.readValue(
+ (TypeDescription) outSig[i].getComponentType()));
+ args[i] = inout;
+ } else {
+ args[i] = unmarshal.readValue((TypeDescription) inSig[i]);
+ }
+ } else {
+ args[i] = Array.newInstance(
+ outSig[i].getComponentType().getZClass(), 1);
}
-
- // write the method id
- if(methodId > 255)
- _marshal.write16Bit(methodId);
- else
- _marshal.write8Bit(methodId);
-
- if(zInterface != null) // has the interface changed? -> write it
- _marshal.writeType(zInterface);
-
- if(oid != null) // has the oid changed? -> write it
- _marshal.writeObjectId(_out_oid);
-
- if(threadId != null) // has the thread id changed? -> write it
- _marshal.writeThreadId(threadId);
}
- else { // simple request
- if(methodId <= 0x2f) // does the method id fit in the header?
- _marshal.write8Bit(methodId);
- else { // no
- header |= DIR_MID;
- header |= methodId >> 8;
-
- _marshal.write8Bit(header);
- _marshal.write8Bit(methodId);
- }
+ boolean sync = forcedSynchronous || !desc.isOneway();
+ if (sync) {
+ pendingIn.push(
+ inL1Tid, new PendingRequests.Item(internal, desc, args));
}
+ return new UrpMessage(
+ inL1Tid, true, inL1Oid, inL1Type, desc, sync, cc, false, null, args,
+ internal);
+ }
- // write the in parameters
- ITypeDescription in_sig[] = iMethodDescription.getInSignature();
- ITypeDescription out_sig[] = iMethodDescription.getOutSignature();
- for(int i = 0; i < in_sig.length; ++ i) {
- if(in_sig[i] != null) { // is it an in parameter?
- if(out_sig[i] != null) // is it also an out parameter?
- _marshal.writeValue(
- (TypeDescription) out_sig[i].getComponentType(),
- ((Object[]) params[i])[0]);
- else // in only
- _marshal.writeValue((TypeDescription) in_sig[i], params[i]);
+ private UrpMessage readReply(int header) {
+ if ((header & HEADER_NEWTID) != 0) {
+ inL1Tid = unmarshal.readThreadId();
+ }
+ PendingRequests.Item pending = pendingOut.pop(inL1Tid);
+ TypeDescription resultType;
+ ITypeDescription[] argTypes;
+ Object[] args;
+ boolean exception = (header & HEADER_EXCEPTION) != 0;
+ if (exception) {
+ resultType = TypeDescription.getTypeDescription(TypeClass.ANY);
+ argTypes = null;
+ args = null;
+ } else {
+ resultType = (TypeDescription)
+ pending.function.getReturnSignature();
+ argTypes = pending.function.getOutSignature();
+ args = pending.arguments;
+ }
+ Object result = resultType == null
+ ? null : unmarshal.readValue(resultType);
+ if (argTypes != null) {
+ for (int i = 0; i < argTypes.length; ++i) {
+ if (argTypes[i] != null) {
+ Array.set(
+ args[i], 0,
+ unmarshal.readValue(
+ (TypeDescription) argTypes[i].getComponentType()));
+ }
}
}
-
- if(synchron[0].booleanValue()) // if we are waiting for a reply, the reply is pending
- putPendingRequest(_out_threadId, new Object[]{params, out_sig, iMethodDescription.getReturnSignature()});
+ return new UrpMessage(
+ inL1Tid, false, null, null, null, false, null, exception, result,
+ args, pending.internal);
}
- public void writeReply(boolean exception, ThreadId threadId, Object result) {
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".writeReply:" + exception + " " + threadId + " " + result);
-
- ++ _message_count;
-
- Object objects[] = (Object[])removePendingReply(threadId);
- Object params[] = (Object[])objects[0];
- ITypeDescription signature[] = (ITypeDescription[])objects[1];
- TypeDescription resType = (TypeDescription)objects[2];
-
- byte header = BIG_HEADER; // big header
-
- if(exception) { // has an exception occurred?
- header |= EXCEPTION;
-
- signature = __emptyITypeDescArray;
- try {
- resType = TypeDescription.getTypeDescription("any");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("this cannot happen: " + e);
+ private boolean writeRequest(
+ boolean internal, String oid, TypeDescription type, String function,
+ ThreadId tid, Object[] arguments)
+ throws IOException
+ {
+ IMethodDescription desc = type.getMethodDescription(function);
+ synchronized (output) {
+ if (desc.getIndex() == MethodDescription.ID_RELEASE
+ && releaseQueue.size() < MAX_RELEASE_QUEUE_SIZE)
+ {
+ releaseQueue.add(
+ new QueuedRelease(internal, oid, type, desc, tid));
+ return false;
+ } else {
+ writeQueuedReleases();
+ return writeRequest(
+ internal, oid, type, desc, tid, arguments, true);
}
}
-
- if(_out_threadId == null || !_out_threadId.equals(threadId)) { // change thread id ?
- header |= NEWTID;
-
- _out_threadId = threadId;
- }
- else
- threadId = null;
-
- _marshal.write8Bit(header);
-
- if(threadId != null) // has the thread id changed? -> write it
- _marshal.writeThreadId(threadId);
-
- // write the result
- _marshal.writeValue(resType, result);
-
- // write the out parameters
- for(int i = 0; i < signature.length; ++ i)
- if(signature[i] != null)
- _marshal.writeValue(
- (TypeDescription) signature[i].getComponentType(),
- Array.get(params[i], 0));
- }
-
-
- private byte []readBlock(DataInput dataInput) throws IOException {
- int size = dataInput.readInt();
- int message_count = dataInput.readInt();
-
- byte bytes[] = new byte[size];
-
- dataInput.readFully(bytes);
-
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".readBlock: size:" + size + " message count:" + message_count);
-
- return bytes;
}
- private void writeBlock(DataOutput out, byte[] data, int messageCount)
+ private boolean writeRequest(
+ boolean internal, String oid, TypeDescription type,
+ IMethodDescription desc, ThreadId tid, Object[] arguments,
+ boolean flush)
throws IOException
{
- out.writeInt(data.length);
- out.writeInt(messageCount);
- out.write(data);
- }
-
- static class Message implements IMessage {
- String _oid;
- Object _result;
- ITypeDescription _iTypeDescription;
- String _operation;
- ThreadId _threadId;
- boolean _synchron;
- boolean _mustReply;
- boolean _exception;
- Object _params[];
-
- Message(String oid,
- Object result,
- ITypeDescription iTypeDescription,
- String operation,
- ThreadId threadId,
- boolean synchron,
- boolean mustReply,
- boolean exception,
- Object params[])
- {
- _oid = oid;
- _result = result;
- _iTypeDescription = iTypeDescription;
- _operation = operation;
- _threadId = threadId;
- _synchron = synchron;
- _mustReply = mustReply;
- _exception = exception;
- _params = params;
+ int funId = desc.getIndex();
+ if (funId < 0 || funId > MAX_FUNCTIONID16) {
+ throw new IllegalArgumentException(
+ "function ID " + funId + " out of range");
}
-
- public String getOperation() {
- return _operation;
+ boolean forceSync = forceSynchronous
+ && funId != MethodDescription.ID_RELEASE;
+ boolean moreFlags = forceSync && desc.isOneway();
+ boolean longHeader = moreFlags;
+ int header = 0;
+ if (!type.equals(outL1Type)) {
+ longHeader = true;
+ header |= HEADER_NEWTYPE;
+ outL1Type = type;
+ } else {
+ type = null;
}
-
- public ThreadId getThreadId() {
- return _threadId;
+ if (!oid.equals(outL1Oid)) {
+ longHeader = true;
+ header |= HEADER_NEWOID;
+ outL1Oid = oid;
+ } else {
+ oid = null;
}
-
- public ITypeDescription getInterface() {
- return _iTypeDescription;
+ if (!tid.equals(outL1Tid)) {
+ longHeader = true;
+ header |= HEADER_NEWTID;
+ outL1Tid = tid;
+ } else {
+ tid = null;
}
-
- public boolean isSynchron() {
- return _synchron;
+ if (funId > MAX_FUNCTIONID14) {
+ longHeader = true;
}
-
- public boolean mustReply() {
- return _mustReply;
+ if (longHeader) {
+ header |= HEADER_LONGHEADER | HEADER_REQUEST;
+ if (funId > MAX_FUNCTIONID8) {
+ header |= HEADER_FUNCTIONID16;
+ }
+ if (moreFlags) {
+ header |= HEADER_MOREFLAGS;
+ }
+ marshal.write8Bit(header);
+ if (moreFlags) {
+ marshal.write8Bit(HEADER_MUSTREPLY | HEADER_SYNCHRONOUS);
+ }
+ if (funId > MAX_FUNCTIONID8) {
+ marshal.write16Bit(funId);
+ } else {
+ marshal.write8Bit(funId);
+ }
+ if (type != null) {
+ marshal.writeType(type);
+ }
+ if (oid != null) {
+ marshal.writeObjectId(oid);
+ }
+ if (tid != null) {
+ marshal.writeThreadId(tid);
+ }
+ } else {
+ if (funId > HEADER_FUNCTIONID) {
+ marshal.write8Bit(HEADER_FUNCTIONID14 | (funId >> 8));
+ }
+ marshal.write8Bit(funId);
}
-
- public boolean isException() {
- return _exception;
+ if (currentContext && !internal
+ && funId != MethodDescription.ID_RELEASE)
+ {
+ marshal.writeInterface(
+ UnoRuntime.getCurrentContext(),
+ new Type(XCurrentContext.class));
}
-
- public String getOid() {
- return _oid;
+ ITypeDescription[] inSig = desc.getInSignature();
+ ITypeDescription[] outSig = desc.getOutSignature();
+ for (int i = 0; i < inSig.length; ++i) {
+ if (inSig[i] != null) {
+ if (outSig[i] != null) {
+ marshal.writeValue(
+ (TypeDescription) outSig[i].getComponentType(),
+ ((Object[]) arguments[i])[0]);
+ } else {
+ marshal.writeValue(
+ (TypeDescription) inSig[i], arguments[i]);
+ }
+ }
}
+ boolean sync = forceSync || !desc.isOneway();
+ if (sync) {
+ pendingOut.push(
+ outL1Tid, new PendingRequests.Item(internal, desc, arguments));
+ }
+ writeBlock(flush);
+ return sync;
+ }
- public Object getData(Object params[][]) {
- params[0] = _params;
-
- return _result;
+ private void writeBlock(boolean flush) throws IOException {
+ byte[] data = marshal.reset();
+ output.writeInt(data.length);
+ output.writeInt(1);
+ output.write(data);
+ if (flush) {
+ output.flush();
}
}
- /**
- * reads a job from the given stream.
- * <p>
- * @return thread read job.
- * @see com.sun.star.lib.uno.environments.remote.Job
- */
- public IMessage readMessage(InputStream inputStream) throws IOException {
- IMessage iMessage = null;
-
- DataInput dataInput = new DataInputStream( inputStream );
- while(iMessage == null) { // try hard to get a message
- if(!_unmarshal.hasMore()) { // the last block is empty, get a new one
- byte bytes[] = readBlock(dataInput);
- _unmarshal.reset(bytes);
- }
+ private void writeQueuedReleases() throws IOException {
+ for (int i = releaseQueue.size(); i > 0;) {
+ --i;
+ QueuedRelease r = (QueuedRelease) releaseQueue.get(i);
+ writeRequest(
+ r.internal, r.objectId, r.type, r.method, r.threadId, null,
+ false);
+ releaseQueue.remove(i);
+ }
+ }
- if(!_unmarshal.hasMore()) // we already got a new block and there are still no bytes left? -> a close message
- throw new java.io.IOException("connection close message received");
-
- else {
- Object result = readMessage(_operationContainer, _paramsContainer, _synchronContainer,
- _mustReplyContainer, _exceptionContainer);
-
- if(_operationContainer[0] == null) { // a reply ?
- iMessage = new Message(null, // oid
- result, // object
- null, // interface
- null, // operation
- _in_threadId,
- false,
- false,
- _exceptionContainer[0],
- _paramsContainer[0]);
+ private static boolean parseAttributes(String attributes) {
+ boolean forceSynchronous = true;
+ if (attributes != null) {
+ StringTokenizer t = new StringTokenizer(attributes, ",");
+ while (t.hasMoreTokens()) {
+ String a = t.nextToken();
+ String v = null;
+ int i = a.indexOf('=');
+ if (i >= 0) {
+ v = a.substring(i + 1);
+ a = a.substring(0, i);
}
- else { // a request
- iMessage = new Message(_in_oid,
- null,
- _in_interface,
- _operationContainer[0],
- _in_threadId,
- _synchronContainer[0],
- _mustReplyContainer[0],
- false,
- _paramsContainer[0]);
+ if (a.equalsIgnoreCase("ForceSynchronous")) {
+ forceSynchronous = parseBooleanAttributeValue(a, v);
+ } else if (a.equalsIgnoreCase("negotiate")) {
+ // Ignored:
+ parseBooleanAttributeValue(a, v);
+ } else {
+ throw new IllegalArgumentException(
+ "unknown protocol attribute " + a);
}
- _operationContainer[0] = null;
- _paramsContainer[0] = null;
- _synchronContainer[0] = false;
- _exceptionContainer[0] = false;
- _mustReplyContainer[0] = false;
}
- }
-
- return iMessage;
+ }
+ return forceSynchronous;
}
+ private static boolean parseBooleanAttributeValue(
+ String attribute, String value)
+ {
+ if (value == null) {
+ throw new IllegalArgumentException(
+ "missing value for protocol attribute " + attribute);
+ }
+ if (value.equals("0")) {
+ return false;
+ } else if (value.equals("1")) {
+ return true;
+ } else {
+ throw new IllegalArgumentException(
+ "bad value " + value + " for protocol attribute " + attribute);
+ }
+ }
- public void flush(DataOutput dataOutput) throws IOException {
- if(_message_count > 0) {
- writeBlock(dataOutput, _marshal.reset(), _message_count);
-
- _message_count = 0;
+ private static final class QueuedRelease {
+ public QueuedRelease(
+ boolean internal, String objectId, TypeDescription type,
+ IMethodDescription method, ThreadId threadId)
+ {
+ this.internal = internal;
+ this.objectId = objectId;
+ this.type = type;
+ this.method = method;
+ this.threadId = threadId;
}
+
+ public final boolean internal;
+ public final String objectId;
+ public final TypeDescription type;
+ public final IMethodDescription method;
+ public final ThreadId threadId;
}
-}
+ private static final String PROPERTIES_OID = "UrpProtocolProperties";
+ private static final int PROPERTIES_FID_REQUEST_CHANGE = 4;
+ private static final String PROPERTIES_FUN_REQUEST_CHANGE = "requestChange";
+ private static final int PROPERTIES_FID_COMMIT_CHANGE = 5;
+ private static final String PROPERTIES_FUN_COMMIT_CHANGE = "commitChange";
+ private static final String PROPERTY_CURRENT_CONTEXT = "CurrentContext";
+
+ private static final short CACHE_SIZE = 256;
+
+ private static final int HEADER_LONGHEADER = 0x80;
+ private static final int HEADER_REQUEST = 0x40;
+ private static final int HEADER_NEWTYPE = 0x20;
+ private static final int HEADER_NEWOID = 0x10;
+ private static final int HEADER_NEWTID = 0x08;
+ private static final int HEADER_FUNCTIONID16 = 0x04;
+ private static final int HEADER_IGNORECACHE = 0x02;
+ private static final int HEADER_MOREFLAGS = 0x01;
+ private static final int HEADER_MUSTREPLY = 0x80;
+ private static final int HEADER_SYNCHRONOUS = 0x40;
+ private static final int HEADER_FUNCTIONID14 = 0x40;
+ private static final int HEADER_FUNCTIONID = 0x3F;
+ private static final int HEADER_EXCEPTION = 0x20;
+
+ private static final int MAX_FUNCTIONID16 = 0xFFFF;
+ private static final int MAX_FUNCTIONID14 = 0x3FFF;
+ private static final int MAX_FUNCTIONID8 = 0xFF;
+
+ private static final int STATE_INITIAL0 = 0;
+ private static final int STATE_INITIAL = 1;
+ private static final int STATE_REQUESTED = 2;
+ private static final int STATE_COMMITTED = 3;
+ private static final int STATE_WAIT = 4;
+
+ private static final int MAX_RELEASE_QUEUE_SIZE = 100;
+
+ private final DataInput input;
+ private final DataOutputStream output;
+
+ private final Marshal marshal;
+ private final Unmarshal unmarshal;
+
+ private final boolean forceSynchronous;
+
+ private final PendingRequests pendingIn = new PendingRequests();
+ private final PendingRequests pendingOut = new PendingRequests();
+
+ private final Object monitor = new Object();
+ private int state = STATE_INITIAL0;
+ private boolean initialized = false;
+ private ThreadId propertiesTid = null;
+ private int random;
+ private boolean currentContext = false;
+
+ private ThreadId inL1Tid = null;
+ private String inL1Oid = null;
+ private TypeDescription inL1Type = null;
+
+ private ThreadId outL1Tid = null;
+ private String outL1Oid = null;
+ private ITypeDescription outL1Type = null;
+
+ private final ArrayList releaseQueue = new ArrayList(); // of QueuedRelease
+}