diff options
author | Rüdiger Timm <rt@openoffice.org> | 2006-12-01 13:54:09 +0000 |
---|---|---|
committer | Rüdiger Timm <rt@openoffice.org> | 2006-12-01 13:54:09 +0000 |
commit | 15cc2f0db99c900ad3bc8d70f11976b8338e3280 (patch) | |
tree | abc6538e048bc8cb00ce20c08613e6421be8a8aa /jurt | |
parent | 165cf111953e79b200bf5f57bb09e0c636ccec2b (diff) |
INTEGRATION: CWS sb23 (1.15.26); FILE MERGED
2006/08/18 16:22:50 sb 1.15.26.11: RESYNC: (1.15-1.16); FILE MERGED
2005/03/15 16:38:13 sb 1.15.26.10: #88601# Minor fix.
2005/03/15 10:29:36 sb 1.15.26.9: #88601# Support for current context in Java URP.
2005/03/14 16:28:49 sb 1.15.26.8: #i35277# Queue release requests to improve performance.
2005/03/14 15:00:50 sb 1.15.26.7: #i35277# Further cleanup.
2005/03/14 10:55:37 sb 1.15.26.6: #i35277# Further cleanup.
2005/03/11 14:03:39 sb 1.15.26.5: #i35277# Improve URP performance by always sending asynchronous release requests, regardless of forceSynchronous.
2005/02/18 16:48:43 sb 1.15.26.4: #i35277# More cleanup; first part of supporting CurrentContext in URP.
2005/02/18 09:21:49 sb 1.15.26.3: #i35277# More cleanup; functionality moved from java_remote_bridge to urp, so that urp can autonomously handle protocol property requests.
2005/02/16 16:54:44 sb 1.15.26.2: #i35277# More cleanup.
2004/10/11 10:21:14 sb 1.15.26.1: #i35277# Cleand up (as a prerequisite to doing any substantial changes).
Diffstat (limited to 'jurt')
-rw-r--r-- | jurt/com/sun/star/lib/uno/protocols/urp/urp.java | 1173 |
1 files changed, 636 insertions, 537 deletions
diff --git a/jurt/com/sun/star/lib/uno/protocols/urp/urp.java b/jurt/com/sun/star/lib/uno/protocols/urp/urp.java index 7722d745e352..f816de74e63a 100644 --- a/jurt/com/sun/star/lib/uno/protocols/urp/urp.java +++ b/jurt/com/sun/star/lib/uno/protocols/urp/urp.java @@ -4,9 +4,9 @@ * * $RCSfile: urp.java,v $ * - * $Revision: 1.16 $ + * $Revision: 1.17 $ * - * last change: $Author: rt $ $Date: 2005-09-07 19:04:14 $ + * last change: $Author: rt $ $Date: 2006-12-01 14:54:09 $ * * The Contents of this file are made available subject to * the terms of GNU Lesser General Public License Version 2.1. @@ -32,612 +32,711 @@ * MA 02111-1307 USA * ************************************************************************/ -package com.sun.star.lib.uno.protocols.urp; - -import java.lang.reflect.Array; - -import java.io.IOException; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.InputStream; -import java.io.OutputStream; - - -import com.sun.star.container.NoSuchElementException; -import com.sun.star.uno.IBridge; +package com.sun.star.lib.uno.protocols.urp; -import com.sun.star.lib.uno.environments.remote.IMessage; -import com.sun.star.lib.uno.environments.remote.Protocol; +import com.sun.star.bridge.InvalidProtocolChangeException; +import com.sun.star.bridge.ProtocolProperty; +import com.sun.star.bridge.XProtocolProperties; +import com.sun.star.lib.uno.environments.remote.IProtocol; +import com.sun.star.lib.uno.environments.remote.Message; import com.sun.star.lib.uno.environments.remote.ThreadId; - -import com.sun.star.uno.IMethodDescription; -import com.sun.star.uno.ITypeDescription; - - +import com.sun.star.lib.uno.typedesc.MethodDescription; import com.sun.star.lib.uno.typedesc.TypeDescription; - import com.sun.star.uno.Any; -import com.sun.star.uno.UnoRuntime; +import com.sun.star.uno.IBridge; +import com.sun.star.uno.IMethodDescription; +import com.sun.star.uno.ITypeDescription; import com.sun.star.uno.Type; - - -/** - * This class implements the complete urp protocol - * from uno. - * <p> - * @version $Revision: 1.16 $ $ $Date: 2005-09-07 19:04:14 $ - * @author Kay Ramme - * @since UDK1.0 - */ -public class urp extends Protocol { - /** - * When set to true, enables various debugging output. - */ - static public final boolean DEBUG = false; - - static private final ITypeDescription __emptyITypeDescArray[] = new ITypeDescription[0]; - static private final short __cache_size = 256; - - - protected IBridge _iBridge; - - private String _in_oid; - private ITypeDescription _in_interface; - private ThreadId _in_threadId; - - private String _out_oid; - private ITypeDescription _out_interface; - private ThreadId _out_threadId; - - private int _message_count; - private boolean _ignore_cache; - private Marshal _marshal; - private Unmarshal _unmarshal; - - private String _operationContainer[] = new String[1]; - private Object _paramsContainer[][] = new Object[1][]; - private boolean _synchronContainer[] = new boolean[1]; - private boolean _mustReplyContainer[] = new boolean[1]; - private boolean _exceptionContainer[] = new boolean[1]; - - static private final byte BIG_HEADER = (byte)0x80; - // big header flags - static private final byte REQUEST = 0x40; - static private final byte NEWTYPE = 0x20; - static private final byte NEWOID = 0x10; - static private final byte NEWTID = 0x08; - static private final byte LONGMETHODID = 0x04; - static private final byte IGNORECACHE = 0x02; - static private final byte MOREFLAGS = 0x01; - - // MOREFLAGS flags - static private final byte MUSTREPLY = (byte)0x80; - static private final byte SYNCHRONOUSE = (byte)0x40; - - static private final byte DIR_MID = 0x40; - static private final byte EXCEPTION = 0x20; - - - - public urp(IBridge iBridge) { - _iBridge = iBridge; - - _marshal = new Marshal(iBridge, __cache_size); - _unmarshal = new Unmarshal(iBridge, __cache_size); - } - - - /** - * Gets the name of the protocol. - * <p> - * @result the name of the protocol (iiop) - */ - public String getName() { - return "urp"; +import com.sun.star.uno.TypeClass; +import com.sun.star.uno.UnoRuntime; +import com.sun.star.uno.XCurrentContext; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Random; +import java.util.StringTokenizer; + +// This class internally relies on the availability of Java UNO type information +// for the interface type com.sun.star.bridge.XProtocolProperties, even though +// URP itself does not rely on that type. + +public final class urp implements IProtocol { + public urp( + IBridge bridge, String attributes, InputStream input, + OutputStream output) + { + this.input = new DataInputStream(input); + this.output = new DataOutputStream(output); + marshal = new Marshal(bridge, CACHE_SIZE); + unmarshal = new Unmarshal(bridge, CACHE_SIZE); + forceSynchronous = parseAttributes(attributes); } - private Object readReply(int header, boolean exception[]) { - if((header & NEWTID) != 0) // new thread id ? - _in_threadId = _unmarshal.readThreadId(); - - // get the out signature and parameter array of the reply - Object objects[] = (Object[])removePendingRequest(_in_threadId); - Object param[] = (Object[])objects[0]; - ITypeDescription signature[] = (ITypeDescription[])objects[1]; - TypeDescription resultType = (TypeDescription)objects[2]; - - exception[0] = (header & EXCEPTION) != 0; - if(exception[0]) {// Exception? So the reply has an any as the result - signature = __emptyITypeDescArray; - try { - resultType = TypeDescription.getTypeDescription("any"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("this cannot happen: " + e); + // @see IProtocol#init + public void init() throws IOException { + synchronized (monitor) { + if (state == STATE_INITIAL0) { + sendRequestChange(); } } - - // read the result object - Object result = null; - if(resultType != null) - result = _unmarshal.readValue(resultType); - - // read the out parameters - for(int i = 0; i < signature.length; ++ i) { - if(signature[i] != null) // is this an out parameter - Array.set( - param[i], 0, - _unmarshal.readValue( - (TypeDescription) signature[i].getComponentType())); - } - - if(DEBUG) System.err.println("##### " + getClass().getName() + ".readReply:" + result); - - return result; } - - private Object []readParams(IMethodDescription iMethodDescription) { - ITypeDescription in_sig[] = iMethodDescription.getInSignature(); - ITypeDescription out_sig[] = iMethodDescription.getOutSignature(); - - Object params[] = new Object[in_sig.length]; - - for(int i = 0; i < params.length; ++ i) { - if(in_sig[i] != null) // is it an in parameter? - if(out_sig[i] != null) {// is it also an out -> inout? - Object inout = Array.newInstance(out_sig[i].getComponentType().getZClass(), 1); - Array.set( - inout, 0, - _unmarshal.readValue( - (TypeDescription) out_sig[i].getComponentType())); - params[i] = inout; + // @see IProtocol#readMessage + public Message readMessage() throws IOException { + for (;;) { + if (!unmarshal.hasMore()) { + unmarshal.reset(readBlock()); + if (!unmarshal.hasMore()) { + throw new IOException("closeConnection message received"); } - else // it is only an in parameter - params[i] = _unmarshal.readValue( - (TypeDescription) in_sig[i]); - else // it is only an out parameter, so provide the holder - params[i] = Array.newInstance(out_sig[i].getComponentType().getZClass(), 1); + } + UrpMessage msg; + int header = unmarshal.read8Bit(); + if ((header & HEADER_LONGHEADER) != 0) { + if ((header & HEADER_REQUEST) != 0) { + msg = readLongRequest(header); + } else { + msg = readReply(header); + } + } else { + msg = readShortRequest(header); + } + if (msg.isInternal()) { + handleInternalMessage(msg); + } else { + return msg; + } } - - return params; } - - private void readShortRequest(int header, String operation[], Object param[][], boolean synchron[]) { - ++ _requestsRecieved; - - int methodId; - if ((header & DIR_MID) != 0) { - methodId = ((header & 0x3F) << 8) | _unmarshal.read8Bit(); - } else { - methodId = header & 0x3F; + // @see IProtocol#writeRequest + public boolean writeRequest( + String oid, TypeDescription type, String function, ThreadId tid, + Object[] arguments) + throws IOException + { + if (oid.equals(PROPERTIES_OID)) { + throw new IllegalArgumentException("illegal OID " + oid); } - - IMethodDescription iMethodDescription = _in_interface.getMethodDescription(methodId); - operation[0] = iMethodDescription.getName(); - - synchron[0] = !iMethodDescription.isOneway(); - - param[0] = readParams(iMethodDescription); - - if(synchron[0]) { // if the request is synchron, it is pending - putPendingReply(_in_threadId, new Object[]{param[0], iMethodDescription.getOutSignature(), iMethodDescription.getReturnSignature()/*, mMDesc*/}); + synchronized (monitor) { + while (!initialized) { + try { + monitor.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e.toString()); + } + } + return writeRequest(false, oid, type, function, tid, arguments); } - - if(DEBUG) System.err.println("##### " + getClass().getName() + ".readShortRequest:" + _in_oid + " " + operation[0] + " " + synchron[0]); } - private void readLongRequest(int header, String operation[], Object param[][], boolean synchron[], boolean mustReply[]) { - ++ _requestsRecieved; - - // read the extended flags - if((header & MOREFLAGS) != 0) {// is there an extended flags byte? - int exFlags = _unmarshal.read8Bit(); - - mustReply[0] = (exFlags & MUSTREPLY) != 0; - synchron[0] = (exFlags & SYNCHRONOUSE) != 0; - } - - int methodId; - if ((header & LONGMETHODID) != 0) { - methodId = _unmarshal.read16Bit(); - } else { - methodId = _unmarshal.read8Bit(); + // @see IProtocol#writeReply + public void writeReply(boolean exception, ThreadId tid, Object result) + throws IOException + { + synchronized (output) { + writeQueuedReleases(); + int header = HEADER_LONGHEADER; + PendingRequests.Item pending = pendingIn.pop(tid); + TypeDescription resultType; + ITypeDescription[] argTypes; + Object[] args; + if (exception) { + header |= HEADER_EXCEPTION; + resultType = TypeDescription.getTypeDescription(TypeClass.ANY); + argTypes = null; + args = null; + } else { + resultType = (TypeDescription) + pending.function.getReturnSignature(); + argTypes = pending.function.getOutSignature(); + args = pending.arguments; + } + if (!tid.equals(outL1Tid)) { + header |= HEADER_NEWTID; + outL1Tid = tid; + } else { + tid = null; + } + marshal.write8Bit(header); + if (tid != null) { + marshal.writeThreadId(tid); + } + marshal.writeValue(resultType, result); + if (argTypes != null) { + for (int i = 0; i < argTypes.length; ++i) { + if (argTypes[i] != null) { + marshal.writeValue( + (TypeDescription) argTypes[i].getComponentType(), + Array.get(args[i], 0)); + } + } + } + writeBlock(true); } - - if((header & NEWTYPE) != 0) - _in_interface = _unmarshal.readType(); - - IMethodDescription iMethodDescription = _in_interface.getMethodDescription(methodId); - - if((header & MOREFLAGS) == 0) // no ex flags, so get info from typeinfo - synchron[0] = !iMethodDescription.isOneway(); - - if((header & NEWOID) != 0) // new oid? - _in_oid = _unmarshal.readObjectId(); - - if((header & NEWTID) != 0) // new thread id ? - _in_threadId = _unmarshal.readThreadId(); - - _ignore_cache = ((header & IGNORECACHE) != 0); // do not use cache for this request? - - operation[0] = iMethodDescription.getName(); - - param[0] = readParams(iMethodDescription); - - if(synchron[0]) // if the request is synchron, the reply is pending - putPendingReply(_in_threadId, new Object[]{param[0], iMethodDescription.getOutSignature(), iMethodDescription.getReturnSignature()/*, mMDesc*/}); - - if(DEBUG) System.err.println("##### " + getClass().getName() + ".readLongRequest:" + _in_oid + " " + operation[0] + " " + synchron[0]); } - private Object readMessage(String operation[], Object param[], boolean synchron[], boolean mustReply[], boolean exception[]) { - int header = _unmarshal.read8Bit(); - - Class signature[]; - Object result = null; - - if((header & BIG_HEADER) != 0) { // full header? - if((header & REQUEST) != 0) // a request ? - readLongRequest(header, operation, (Object [][])param, synchron, mustReply); - else // a reply - result = readReply(header, exception); + private void sendRequestChange() throws IOException { + if (propertiesTid == null) { + propertiesTid = ThreadId.createFresh(); } - else // only a short request header - readShortRequest(header, operation, (Object[][])param, synchron); - - if(synchron[0]) // synchron implies MUSTREPLY - mustReply[0] = true; - - - if(DEBUG) System.err.println("##### " + getClass().getName() + ".readMessage:" + _in_oid + " " + operation[0] + " " + _in_threadId + " " + param[0] + " " + result); - - return result; + random = new Random().nextInt(); + writeRequest( + true, PROPERTIES_OID, + TypeDescription.getTypeDescription(XProtocolProperties.class), + PROPERTIES_FUN_REQUEST_CHANGE, propertiesTid, + new Object[] { new Integer(random) }); + state = STATE_REQUESTED; } - public void writeRequest(String oid, - TypeDescription zInterface, - String operation, - ThreadId threadId, - Object params[], - Boolean synchron[], - Boolean mustReply[]) - { - if(DEBUG) System.err.println("##### " + getClass().getName() + ".writeRequest:" + oid + " " + zInterface + " " + operation); - - ++ _requestsSend; - ++ _message_count; - - IMethodDescription iMethodDescription = zInterface.getMethodDescription(operation); - - byte header = 0; - boolean bigHeader = false; - - if(_out_oid == null || !oid.equals(_out_oid)) { // change the oid? - header |= NEWOID; - - _out_oid = oid; - bigHeader = true; + private void handleInternalMessage(Message message) throws IOException { + if (message.isRequest()) { + String t = message.getType().getTypeName(); + if (!t.equals("com.sun.star.bridge.XProtocolProperties")) { + throw new IOException( + "read URP protocol properties request with unsupported" + + " type " + t); + } + int fid = message.getMethod().getIndex(); + switch (fid) { + case PROPERTIES_FID_REQUEST_CHANGE: + checkSynchronousPropertyRequest(message); + synchronized (monitor) { + switch (state) { + case STATE_INITIAL0: + case STATE_INITIAL: + writeReply( + false, message.getThreadId(), new Integer(1)); + state = STATE_WAIT; + break; + case STATE_REQUESTED: + int n + = ((Integer) message.getArguments()[0]).intValue(); + if (random < n) { + writeReply( + false, message.getThreadId(), new Integer(1)); + state = STATE_WAIT; + } else if (random == n) { + writeReply( + false, message.getThreadId(), new Integer(-1)); + state = STATE_INITIAL; + sendRequestChange(); + } else { + writeReply( + false, message.getThreadId(), new Integer(0)); + } + break; + default: + writeReply( + true, message.getThreadId(), + new com.sun.star.uno.RuntimeException( + "read URP protocol properties requestChange" + + " request in illegal state")); + break; + } + } + break; + case PROPERTIES_FID_COMMIT_CHANGE: + checkSynchronousPropertyRequest(message); + synchronized (monitor) { + if (state == STATE_WAIT) { + ProtocolProperty[] p = (ProtocolProperty[]) + message.getArguments()[0]; + boolean ok = true; + boolean cc = false; + int i = 0; + for (; i < p.length; ++i) { + if (p[i].Name.equals(PROPERTY_CURRENT_CONTEXT)) { + cc = true; + } else { + ok = false; + break; + } + } + if (ok) { + writeReply(false, message.getThreadId(), null); + } else { + writeReply( + true, message.getThreadId(), + new InvalidProtocolChangeException( + "", null, p[i], 1)); + } + state = STATE_INITIAL; + if (!initialized) { + if (cc) { + currentContext = true; + initialized = true; + monitor.notifyAll(); + } else { + sendRequestChange(); + } + } + } else { + writeReply( + true, message.getThreadId(), + new com.sun.star.uno.RuntimeException( + "read URP protocol properties commitChange" + + " request in illegal state")); + } + } + break; + default: + throw new IOException( + "read URP protocol properties request with unsupported" + + " function ID " + fid); + } + } else { + synchronized (monitor) { + if (state == STATE_COMMITTED) { + // commitChange reply: + if (!message.isAbnormalTermination()) { + currentContext = true; + } + state = STATE_INITIAL; + initialized = true; + monitor.notifyAll(); + } else { + // requestChange reply: + if (message.isAbnormalTermination()) { + // remote side probably does not support negotiation: + state = STATE_INITIAL; + initialized = true; + monitor.notifyAll(); + } else { + int n = ((Integer) message.getResult()).intValue(); + switch (n) { + case -1: + case 0: + break; + case 1: + writeRequest( + true, PROPERTIES_OID, + TypeDescription.getTypeDescription( + XProtocolProperties.class), + PROPERTIES_FUN_COMMIT_CHANGE, propertiesTid, + new Object[] { + new ProtocolProperty[] { + new ProtocolProperty( + PROPERTY_CURRENT_CONTEXT, + Any.VOID) } }); + state = STATE_COMMITTED; + break; + default: + throw new IOException( + "read URP protocol properties " + + PROPERTIES_FUN_REQUEST_CHANGE + + " reply with illegal return value " + n); + } + } + } + } } - else - oid = null; - - - if(_out_interface == null || !_out_interface.equals(zInterface)) { // change interface - header |= NEWTYPE; + } - _out_interface = zInterface; - bigHeader = true; + private void checkSynchronousPropertyRequest(Message message) + throws IOException + { + if (!message.isSynchronous()) { + throw new IOException( + "read URP protocol properties request for synchronous function" + + " marked as not SYNCHRONOUS"); } - else - zInterface = null; - - if(_out_threadId == null || !_out_threadId.equals(threadId)) { // change thread id - header |= NEWTID; + } - _out_threadId = threadId; + private byte[] readBlock() throws IOException { + int size = input.readInt(); + input.readInt(); // ignore count + byte[] bytes = new byte[size]; + input.readFully(bytes); + return bytes; + } - bigHeader = true; + private UrpMessage readLongRequest(int header) throws IOException { + boolean sync = false; + if ((header & HEADER_MOREFLAGS) != 0) { + if (unmarshal.read8Bit() != (HEADER_MUSTREPLY | HEADER_SYNCHRONOUS)) + { + throw new IOException( + "read URP request with bad MUSTREPLY/SYNCHRONOUS byte"); + } + sync = true; } - else - threadId = null; - - boolean hasExFlags = false; - - // if synchron is provided, test if it differs from declaration - if(synchron[0] != null) { - if(iMethodDescription.isOneway() == synchron[0].booleanValue()) { - bigHeader = true; - hasExFlags = true; + int funId = (header & HEADER_FUNCTIONID16) != 0 + ? unmarshal.read16Bit() : unmarshal.read8Bit(); + if ((header & HEADER_NEWTYPE) != 0) { + inL1Type = unmarshal.readType(); + if (inL1Type.getTypeClass() != TypeClass.INTERFACE) { + throw new IOException( + "read URP request with non-interface type " + inL1Type); } } - else - synchron[0] = new Boolean(!iMethodDescription.isOneway()); - - // if mustReply is provided and if it differs from synchron - // then we have to write it - if(mustReply[0] != null && (mustReply[0] != synchron[0])) { - bigHeader = true; - hasExFlags = true; + if ((header & HEADER_NEWOID) != 0) { + inL1Oid = unmarshal.readObjectId(); } - else - mustReply[0] = synchron[0]; - - // Long request headers can handle 16-bit method IDs, and short request - // headers can handle 14-bit method IDs: - int methodId = iMethodDescription.getIndex(); - if (methodId < 0 || methodId >= 0x10000) { - throw new IllegalArgumentException( - "Method ID " + methodId + " out of range"); - } else if (methodId >= 0xC000) { - bigHeader = true; + if ((header & HEADER_NEWTID) != 0) { + inL1Tid = unmarshal.readThreadId(); } + //TODO: check HEADER_IGNORECACHE + return readRequest(funId, sync); + } - if(bigHeader) { // something has changed, send big header - header |= BIG_HEADER; // big header - header |= REQUEST; - header |= hasExFlags ? MOREFLAGS : 0; - - if(methodId > 255) - header |= LONGMETHODID; - - _marshal.write8Bit(header); - - if(hasExFlags) {// are there extended flags to write? - byte exFlags = 0; - - exFlags |= synchron[0].booleanValue() ? SYNCHRONOUSE : 0; - exFlags |= mustReply[0].booleanValue() ? MUSTREPLY : 0; + private UrpMessage readShortRequest(int header) { + int funId = (header & HEADER_FUNCTIONID14) != 0 + ? ((header & HEADER_FUNCTIONID) << 8) | unmarshal.read8Bit() + : header & HEADER_FUNCTIONID; + return readRequest(funId, false); + } - _marshal.write8Bit(exFlags); + private UrpMessage readRequest(int functionId, boolean forcedSynchronous) { + boolean internal = PROPERTIES_OID.equals(inL1Oid); + // inL1Oid may be null in XInstanceProvider.getInstance("") + XCurrentContext cc = + (currentContext && !internal + && functionId != MethodDescription.ID_RELEASE) + ? (XCurrentContext) unmarshal.readInterface( + new Type(XCurrentContext.class)) + : null; + IMethodDescription desc = inL1Type.getMethodDescription(functionId); + ITypeDescription[] inSig = desc.getInSignature(); + ITypeDescription[] outSig = desc.getOutSignature(); + Object[] args = new Object[inSig.length]; + for (int i = 0; i < args.length; ++i) { + if (inSig[i] != null) { + if (outSig[i] != null) { + Object inout = Array.newInstance( + outSig[i].getComponentType().getZClass(), 1); + Array.set( + inout, 0, + unmarshal.readValue( + (TypeDescription) outSig[i].getComponentType())); + args[i] = inout; + } else { + args[i] = unmarshal.readValue((TypeDescription) inSig[i]); + } + } else { + args[i] = Array.newInstance( + outSig[i].getComponentType().getZClass(), 1); } - - // write the method id - if(methodId > 255) - _marshal.write16Bit(methodId); - else - _marshal.write8Bit(methodId); - - if(zInterface != null) // has the interface changed? -> write it - _marshal.writeType(zInterface); - - if(oid != null) // has the oid changed? -> write it - _marshal.writeObjectId(_out_oid); - - if(threadId != null) // has the thread id changed? -> write it - _marshal.writeThreadId(threadId); } - else { // simple request - if(methodId <= 0x2f) // does the method id fit in the header? - _marshal.write8Bit(methodId); - else { // no - header |= DIR_MID; - header |= methodId >> 8; - - _marshal.write8Bit(header); - _marshal.write8Bit(methodId); - } + boolean sync = forcedSynchronous || !desc.isOneway(); + if (sync) { + pendingIn.push( + inL1Tid, new PendingRequests.Item(internal, desc, args)); } + return new UrpMessage( + inL1Tid, true, inL1Oid, inL1Type, desc, sync, cc, false, null, args, + internal); + } - // write the in parameters - ITypeDescription in_sig[] = iMethodDescription.getInSignature(); - ITypeDescription out_sig[] = iMethodDescription.getOutSignature(); - for(int i = 0; i < in_sig.length; ++ i) { - if(in_sig[i] != null) { // is it an in parameter? - if(out_sig[i] != null) // is it also an out parameter? - _marshal.writeValue( - (TypeDescription) out_sig[i].getComponentType(), - ((Object[]) params[i])[0]); - else // in only - _marshal.writeValue((TypeDescription) in_sig[i], params[i]); + private UrpMessage readReply(int header) { + if ((header & HEADER_NEWTID) != 0) { + inL1Tid = unmarshal.readThreadId(); + } + PendingRequests.Item pending = pendingOut.pop(inL1Tid); + TypeDescription resultType; + ITypeDescription[] argTypes; + Object[] args; + boolean exception = (header & HEADER_EXCEPTION) != 0; + if (exception) { + resultType = TypeDescription.getTypeDescription(TypeClass.ANY); + argTypes = null; + args = null; + } else { + resultType = (TypeDescription) + pending.function.getReturnSignature(); + argTypes = pending.function.getOutSignature(); + args = pending.arguments; + } + Object result = resultType == null + ? null : unmarshal.readValue(resultType); + if (argTypes != null) { + for (int i = 0; i < argTypes.length; ++i) { + if (argTypes[i] != null) { + Array.set( + args[i], 0, + unmarshal.readValue( + (TypeDescription) argTypes[i].getComponentType())); + } } } - - if(synchron[0].booleanValue()) // if we are waiting for a reply, the reply is pending - putPendingRequest(_out_threadId, new Object[]{params, out_sig, iMethodDescription.getReturnSignature()}); + return new UrpMessage( + inL1Tid, false, null, null, null, false, null, exception, result, + args, pending.internal); } - public void writeReply(boolean exception, ThreadId threadId, Object result) { - if(DEBUG) System.err.println("##### " + getClass().getName() + ".writeReply:" + exception + " " + threadId + " " + result); - - ++ _message_count; - - Object objects[] = (Object[])removePendingReply(threadId); - Object params[] = (Object[])objects[0]; - ITypeDescription signature[] = (ITypeDescription[])objects[1]; - TypeDescription resType = (TypeDescription)objects[2]; - - byte header = BIG_HEADER; // big header - - if(exception) { // has an exception occurred? - header |= EXCEPTION; - - signature = __emptyITypeDescArray; - try { - resType = TypeDescription.getTypeDescription("any"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("this cannot happen: " + e); + private boolean writeRequest( + boolean internal, String oid, TypeDescription type, String function, + ThreadId tid, Object[] arguments) + throws IOException + { + IMethodDescription desc = type.getMethodDescription(function); + synchronized (output) { + if (desc.getIndex() == MethodDescription.ID_RELEASE + && releaseQueue.size() < MAX_RELEASE_QUEUE_SIZE) + { + releaseQueue.add( + new QueuedRelease(internal, oid, type, desc, tid)); + return false; + } else { + writeQueuedReleases(); + return writeRequest( + internal, oid, type, desc, tid, arguments, true); } } - - if(_out_threadId == null || !_out_threadId.equals(threadId)) { // change thread id ? - header |= NEWTID; - - _out_threadId = threadId; - } - else - threadId = null; - - _marshal.write8Bit(header); - - if(threadId != null) // has the thread id changed? -> write it - _marshal.writeThreadId(threadId); - - // write the result - _marshal.writeValue(resType, result); - - // write the out parameters - for(int i = 0; i < signature.length; ++ i) - if(signature[i] != null) - _marshal.writeValue( - (TypeDescription) signature[i].getComponentType(), - Array.get(params[i], 0)); - } - - - private byte []readBlock(DataInput dataInput) throws IOException { - int size = dataInput.readInt(); - int message_count = dataInput.readInt(); - - byte bytes[] = new byte[size]; - - dataInput.readFully(bytes); - - if(DEBUG) System.err.println("##### " + getClass().getName() + ".readBlock: size:" + size + " message count:" + message_count); - - return bytes; } - private void writeBlock(DataOutput out, byte[] data, int messageCount) + private boolean writeRequest( + boolean internal, String oid, TypeDescription type, + IMethodDescription desc, ThreadId tid, Object[] arguments, + boolean flush) throws IOException { - out.writeInt(data.length); - out.writeInt(messageCount); - out.write(data); - } - - static class Message implements IMessage { - String _oid; - Object _result; - ITypeDescription _iTypeDescription; - String _operation; - ThreadId _threadId; - boolean _synchron; - boolean _mustReply; - boolean _exception; - Object _params[]; - - Message(String oid, - Object result, - ITypeDescription iTypeDescription, - String operation, - ThreadId threadId, - boolean synchron, - boolean mustReply, - boolean exception, - Object params[]) - { - _oid = oid; - _result = result; - _iTypeDescription = iTypeDescription; - _operation = operation; - _threadId = threadId; - _synchron = synchron; - _mustReply = mustReply; - _exception = exception; - _params = params; + int funId = desc.getIndex(); + if (funId < 0 || funId > MAX_FUNCTIONID16) { + throw new IllegalArgumentException( + "function ID " + funId + " out of range"); } - - public String getOperation() { - return _operation; + boolean forceSync = forceSynchronous + && funId != MethodDescription.ID_RELEASE; + boolean moreFlags = forceSync && desc.isOneway(); + boolean longHeader = moreFlags; + int header = 0; + if (!type.equals(outL1Type)) { + longHeader = true; + header |= HEADER_NEWTYPE; + outL1Type = type; + } else { + type = null; } - - public ThreadId getThreadId() { - return _threadId; + if (!oid.equals(outL1Oid)) { + longHeader = true; + header |= HEADER_NEWOID; + outL1Oid = oid; + } else { + oid = null; } - - public ITypeDescription getInterface() { - return _iTypeDescription; + if (!tid.equals(outL1Tid)) { + longHeader = true; + header |= HEADER_NEWTID; + outL1Tid = tid; + } else { + tid = null; } - - public boolean isSynchron() { - return _synchron; + if (funId > MAX_FUNCTIONID14) { + longHeader = true; } - - public boolean mustReply() { - return _mustReply; + if (longHeader) { + header |= HEADER_LONGHEADER | HEADER_REQUEST; + if (funId > MAX_FUNCTIONID8) { + header |= HEADER_FUNCTIONID16; + } + if (moreFlags) { + header |= HEADER_MOREFLAGS; + } + marshal.write8Bit(header); + if (moreFlags) { + marshal.write8Bit(HEADER_MUSTREPLY | HEADER_SYNCHRONOUS); + } + if (funId > MAX_FUNCTIONID8) { + marshal.write16Bit(funId); + } else { + marshal.write8Bit(funId); + } + if (type != null) { + marshal.writeType(type); + } + if (oid != null) { + marshal.writeObjectId(oid); + } + if (tid != null) { + marshal.writeThreadId(tid); + } + } else { + if (funId > HEADER_FUNCTIONID) { + marshal.write8Bit(HEADER_FUNCTIONID14 | (funId >> 8)); + } + marshal.write8Bit(funId); } - - public boolean isException() { - return _exception; + if (currentContext && !internal + && funId != MethodDescription.ID_RELEASE) + { + marshal.writeInterface( + UnoRuntime.getCurrentContext(), + new Type(XCurrentContext.class)); } - - public String getOid() { - return _oid; + ITypeDescription[] inSig = desc.getInSignature(); + ITypeDescription[] outSig = desc.getOutSignature(); + for (int i = 0; i < inSig.length; ++i) { + if (inSig[i] != null) { + if (outSig[i] != null) { + marshal.writeValue( + (TypeDescription) outSig[i].getComponentType(), + ((Object[]) arguments[i])[0]); + } else { + marshal.writeValue( + (TypeDescription) inSig[i], arguments[i]); + } + } } + boolean sync = forceSync || !desc.isOneway(); + if (sync) { + pendingOut.push( + outL1Tid, new PendingRequests.Item(internal, desc, arguments)); + } + writeBlock(flush); + return sync; + } - public Object getData(Object params[][]) { - params[0] = _params; - - return _result; + private void writeBlock(boolean flush) throws IOException { + byte[] data = marshal.reset(); + output.writeInt(data.length); + output.writeInt(1); + output.write(data); + if (flush) { + output.flush(); } } - /** - * reads a job from the given stream. - * <p> - * @return thread read job. - * @see com.sun.star.lib.uno.environments.remote.Job - */ - public IMessage readMessage(InputStream inputStream) throws IOException { - IMessage iMessage = null; - - DataInput dataInput = new DataInputStream( inputStream ); - while(iMessage == null) { // try hard to get a message - if(!_unmarshal.hasMore()) { // the last block is empty, get a new one - byte bytes[] = readBlock(dataInput); - _unmarshal.reset(bytes); - } + private void writeQueuedReleases() throws IOException { + for (int i = releaseQueue.size(); i > 0;) { + --i; + QueuedRelease r = (QueuedRelease) releaseQueue.get(i); + writeRequest( + r.internal, r.objectId, r.type, r.method, r.threadId, null, + false); + releaseQueue.remove(i); + } + } - if(!_unmarshal.hasMore()) // we already got a new block and there are still no bytes left? -> a close message - throw new java.io.IOException("connection close message received"); - - else { - Object result = readMessage(_operationContainer, _paramsContainer, _synchronContainer, - _mustReplyContainer, _exceptionContainer); - - if(_operationContainer[0] == null) { // a reply ? - iMessage = new Message(null, // oid - result, // object - null, // interface - null, // operation - _in_threadId, - false, - false, - _exceptionContainer[0], - _paramsContainer[0]); + private static boolean parseAttributes(String attributes) { + boolean forceSynchronous = true; + if (attributes != null) { + StringTokenizer t = new StringTokenizer(attributes, ","); + while (t.hasMoreTokens()) { + String a = t.nextToken(); + String v = null; + int i = a.indexOf('='); + if (i >= 0) { + v = a.substring(i + 1); + a = a.substring(0, i); } - else { // a request - iMessage = new Message(_in_oid, - null, - _in_interface, - _operationContainer[0], - _in_threadId, - _synchronContainer[0], - _mustReplyContainer[0], - false, - _paramsContainer[0]); + if (a.equalsIgnoreCase("ForceSynchronous")) { + forceSynchronous = parseBooleanAttributeValue(a, v); + } else if (a.equalsIgnoreCase("negotiate")) { + // Ignored: + parseBooleanAttributeValue(a, v); + } else { + throw new IllegalArgumentException( + "unknown protocol attribute " + a); } - _operationContainer[0] = null; - _paramsContainer[0] = null; - _synchronContainer[0] = false; - _exceptionContainer[0] = false; - _mustReplyContainer[0] = false; } - } - - return iMessage; + } + return forceSynchronous; } + private static boolean parseBooleanAttributeValue( + String attribute, String value) + { + if (value == null) { + throw new IllegalArgumentException( + "missing value for protocol attribute " + attribute); + } + if (value.equals("0")) { + return false; + } else if (value.equals("1")) { + return true; + } else { + throw new IllegalArgumentException( + "bad value " + value + " for protocol attribute " + attribute); + } + } - public void flush(DataOutput dataOutput) throws IOException { - if(_message_count > 0) { - writeBlock(dataOutput, _marshal.reset(), _message_count); - - _message_count = 0; + private static final class QueuedRelease { + public QueuedRelease( + boolean internal, String objectId, TypeDescription type, + IMethodDescription method, ThreadId threadId) + { + this.internal = internal; + this.objectId = objectId; + this.type = type; + this.method = method; + this.threadId = threadId; } + + public final boolean internal; + public final String objectId; + public final TypeDescription type; + public final IMethodDescription method; + public final ThreadId threadId; } -} + private static final String PROPERTIES_OID = "UrpProtocolProperties"; + private static final int PROPERTIES_FID_REQUEST_CHANGE = 4; + private static final String PROPERTIES_FUN_REQUEST_CHANGE = "requestChange"; + private static final int PROPERTIES_FID_COMMIT_CHANGE = 5; + private static final String PROPERTIES_FUN_COMMIT_CHANGE = "commitChange"; + private static final String PROPERTY_CURRENT_CONTEXT = "CurrentContext"; + + private static final short CACHE_SIZE = 256; + + private static final int HEADER_LONGHEADER = 0x80; + private static final int HEADER_REQUEST = 0x40; + private static final int HEADER_NEWTYPE = 0x20; + private static final int HEADER_NEWOID = 0x10; + private static final int HEADER_NEWTID = 0x08; + private static final int HEADER_FUNCTIONID16 = 0x04; + private static final int HEADER_IGNORECACHE = 0x02; + private static final int HEADER_MOREFLAGS = 0x01; + private static final int HEADER_MUSTREPLY = 0x80; + private static final int HEADER_SYNCHRONOUS = 0x40; + private static final int HEADER_FUNCTIONID14 = 0x40; + private static final int HEADER_FUNCTIONID = 0x3F; + private static final int HEADER_EXCEPTION = 0x20; + + private static final int MAX_FUNCTIONID16 = 0xFFFF; + private static final int MAX_FUNCTIONID14 = 0x3FFF; + private static final int MAX_FUNCTIONID8 = 0xFF; + + private static final int STATE_INITIAL0 = 0; + private static final int STATE_INITIAL = 1; + private static final int STATE_REQUESTED = 2; + private static final int STATE_COMMITTED = 3; + private static final int STATE_WAIT = 4; + + private static final int MAX_RELEASE_QUEUE_SIZE = 100; + + private final DataInput input; + private final DataOutputStream output; + + private final Marshal marshal; + private final Unmarshal unmarshal; + + private final boolean forceSynchronous; + + private final PendingRequests pendingIn = new PendingRequests(); + private final PendingRequests pendingOut = new PendingRequests(); + + private final Object monitor = new Object(); + private int state = STATE_INITIAL0; + private boolean initialized = false; + private ThreadId propertiesTid = null; + private int random; + private boolean currentContext = false; + + private ThreadId inL1Tid = null; + private String inL1Oid = null; + private TypeDescription inL1Type = null; + + private ThreadId outL1Tid = null; + private String outL1Oid = null; + private ITypeDescription outL1Type = null; + + private final ArrayList releaseQueue = new ArrayList(); // of QueuedRelease +} |