summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ramme <kr@openoffice.org>2001-05-17 11:55:06 +0000
committerKay Ramme <kr@openoffice.org>2001-05-17 11:55:06 +0000
commitee19c132d5a001abaec5df0af2b7bd400c4b6a0d (patch)
tree1bebeee8475c8c4c561346601ed3b70cffc54ab9
parent18d2636e5f058e7e1b0a5dd759251487d3b8a9f9 (diff)
renamed ThreadID; changed to new ThreadPool interface (#87110#)
-rw-r--r--jurt/com/sun/star/lib/uno/environments/remote/IThreadPool.java58
-rw-r--r--jurt/com/sun/star/lib/uno/environments/remote/JavaThreadPool.java324
-rw-r--r--jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java321
-rw-r--r--jurt/com/sun/star/lib/uno/environments/remote/makefile.mk37
-rw-r--r--jurt/test/com/sun/star/lib/uno/environments/remote/JobQueue_Test.java479
-rw-r--r--jurt/test/com/sun/star/lib/uno/environments/remote/ThreadPool_Test.java740
-rw-r--r--jurt/test/com/sun/star/lib/uno/environments/remote/makefile.mk6
7 files changed, 958 insertions, 1007 deletions
diff --git a/jurt/com/sun/star/lib/uno/environments/remote/IThreadPool.java b/jurt/com/sun/star/lib/uno/environments/remote/IThreadPool.java
index a08ce6689f97..3fee90ec8fe1 100644
--- a/jurt/com/sun/star/lib/uno/environments/remote/IThreadPool.java
+++ b/jurt/com/sun/star/lib/uno/environments/remote/IThreadPool.java
@@ -2,9 +2,9 @@
*
* $RCSfile: IThreadPool.java,v $
*
- * $Revision: 1.2 $
+ * $Revision: 1.3 $
*
- * last change: $Author: kr $ $Date: 2001-05-04 11:56:03 $
+ * last change: $Author: kr $ $Date: 2001-05-17 12:55:05 $
*
* The Contents of this file are made available subject to the terms of
* either of the following licenses
@@ -65,58 +65,58 @@ package com.sun.star.lib.uno.environments.remote;
* This interface is an abstraction of the various
* threadpool implementations.
* <p>
- * @version $Revision: 1.2 $ $ $Date: 2001-05-04 11:56:03 $
+ * @version $Revision: 1.3 $ $ $Date: 2001-05-17 12:55:05 $
* @author Joerg Budischewski
- * @see com.sun.star.lib.uno.environments.remote.ThreadPool
+ * @author Kay Ramme
+ * @see com.sun.star.lib.uno.environments.remote.ThreadPoolFactory
+ * @see com.sun.star.lib.uno.environments.remote.IThreadPoolFactory
* @since UDK1.0
*/
public interface IThreadPool {
/**
- * Retrieves the global threadId for the current thread.
+ * Attaches this thread to the thread pool.
* <p>
- * @return the thread id
+ * @see #enter
*/
- public ThreadID getThreadId();
-
+ public void attach();
/**
- * Adds a jobQueue for the current thread to the threadpool.
- * Requests are now put into this queue.
- * <p>
- * @param disposeId the dipose id with which the thread can be interrupted while staying in the queue
+ * Detaches this thread from the thread pool.
* @see #enter
*/
- public void addThread(Object disposeId);
+ public void detach();
/**
- * Removes the jobQueue for the current thread.
+ * Lets this thread enter the thread pool.
+ * This thread then executes all jobs put via
+ * <code>putJob</code> until a reply job arrives.
+ * <p>
+ * @see #putJob
*/
- public void removeThread();
+ public Object enter() throws Throwable;
/**
- * Queues a job into the jobQueue of the thread belonging to the jobs threadId.
+ * Queues a job into the jobQueue of the thread belonging
+ * to the jobs threadId.
* <p>
* @param job the job
- * @param disposeId the dispose id
*/
- public void putJob(Job job, Object disposeId);
+ public void putJob(Job job);
/**
- * Lets the current thread enter the ThreadPool.
- * The thread then dispatches all jobs and leaves
- * the ThreadPool when it gets a reply job.
+ * Disposes this thread pool, thus releasing
+ * all threads by throwing the given
+ * <code>Throwable</code>.
* <p>
+ * @param throwing the Throwable
*/
- public Object enter() throws Throwable;
+ public void dispose(Throwable throwable);
+
/**
- * Interrupts all threads which have associated the dispose id.
- * <p>
- * @param disposeId the dispose id
+ * Destroys the thread pool and tries
+ * to join all created threads immediatly.
*/
- public void dispose(Object disposeId);
-
-
- public void stopDispose(Object disposeId);
+ public void destroy();
}
diff --git a/jurt/com/sun/star/lib/uno/environments/remote/JavaThreadPool.java b/jurt/com/sun/star/lib/uno/environments/remote/JavaThreadPool.java
index 87b6f5f066ae..8113687a0ad5 100644
--- a/jurt/com/sun/star/lib/uno/environments/remote/JavaThreadPool.java
+++ b/jurt/com/sun/star/lib/uno/environments/remote/JavaThreadPool.java
@@ -2,9 +2,9 @@
*
* $RCSfile: JavaThreadPool.java,v $
*
- * $Revision: 1.7 $
+ * $Revision: 1.8 $
*
- * last change: $Author: kr $ $Date: 2001-05-04 11:56:03 $
+ * last change: $Author: kr $ $Date: 2001-05-17 12:55:05 $
*
* The Contents of this file are made available subject to the terms of
* either of the following licenses
@@ -62,7 +62,6 @@
package com.sun.star.lib.uno.environments.remote;
-import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.Hashtable;
@@ -72,7 +71,7 @@ import com.sun.star.uno.UnoRuntime;
/**
* This class implements a java thread pool.
* <p>
- * @version $Revision: 1.7 $ $ $Date: 2001-05-04 11:56:03 $
+ * @version $Revision: 1.8 $ $ $Date: 2001-05-17 12:55:05 $
* @author Kay Ramme
* @see com.sun.star.uno.UnoRuntime
* @see com.sun.star.lib.uno.environments.remote.ThreadPool
@@ -85,299 +84,84 @@ public class JavaThreadPool implements IThreadPool {
/**
* When set to true, enables various debugging output.
*/
- public static final boolean DEBUG = false;
+ private static final boolean DEBUG = false;
+ JavaThreadPoolFactory _javaThreadPoolFactory;
- protected Hashtable _jobQueues = new Hashtable();
- protected Hashtable _disposeIds = new Hashtable();
- protected boolean _disposed = false;
-
-
-// public JavaThreadPool() {
-// new Thread() {
-// public void run() {
-// try {
-// while(true) {
-// list();
-
-// Thread.sleep(5000);
-// }
-// }
-// catch(InterruptedException interruptedException) {
-// System.err.println("lister interrupted:" + interruptedException);
-// }
-// }
-// }.start();
-// }
-
- /**
- * For debugging, lists the jobqueues
- */
- synchronized void list() {
- Enumeration elements = _jobQueues.elements();
-
- System.err.println("##### ThreadPool.list:");
- while(elements.hasMoreElements()) {
- System.err.println(" - " + elements.nextElement());
- }
- }
-
- /**
- * Gets the <code>ThreadID</code> of the given thread.
- * <p>
- * @return the thread id
- * @param thread the thread
- * @see com.sun.star.lib.uno.environments.remote.ThreadID
- */
- static public ThreadID getThreadId(Thread thread) {
- ThreadID threadId = null;
-
- if(thread instanceof JobQueue.JobDispatcher)
- threadId = ((JobQueue.JobDispatcher)thread).getThreadId();
- else {
- try {
- threadId = new ThreadID(UnoRuntime.generateOid(thread).getBytes("UTF8"));
- }
- catch(UnsupportedEncodingException unsupportedEncodingException) {
- throw new com.sun.star.uno.RuntimeException("JavaThreadPool.getThreadId - unexpected: " + unsupportedEncodingException.toString());
- }
- }
-
- if(DEBUG) System.err.println("##### ThreadPool.getThreadId:" + threadId);
-
- return threadId;
+ JavaThreadPool(JavaThreadPoolFactory javaThreadPoolFactory) {
+ _javaThreadPoolFactory = javaThreadPoolFactory;
}
- /**
- * Gets the <code>ThreadID</code> of this thread.
- * Implements the method of <code>IThreadPool</code>
- * <p>
- * @return the thread id
- * @see com.sun.star.lib.uno.environments.remote.IThreadPool#getThreadId
- */
- public ThreadID getThreadId() {
- if(_disposed) throw new RuntimeException("ThreadPool.getThreadId - is disposed");
+ public void attach() {
+ ThreadId threadId = _javaThreadPoolFactory.getThreadId();
- return getThreadId(Thread.currentThread());
- }
+ if(DEBUG) System.err.println("##### " + getClass().getName() + ".attach - id:" + threadId);
+ // we don't have to synchronize here
+ // cause the thread can attach itself
+ // not concurrently
+ JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(threadId);
+ if(jobQueue == null)
+ jobQueue = new JobQueue(_javaThreadPoolFactory, threadId, false);
- public void removeJobQueue(ThreadID threadId) {
- _jobQueues.remove(threadId);
- _disposeIds.remove(threadId);
- }
-
- public void addJobQueue(ThreadID threadId, JobQueue jobQueue/*, Object disposeId*/) {
- if(_disposed) throw new RuntimeException("ThreadPool.addThread(" + threadId + ") - is disposed");
-
- if(DEBUG) System.err.println("##### ThreadPool.addThread:" + threadId);
-
- _jobQueues.put(threadId, jobQueue);
-// _disposeIds.put(threadId, disposeId);
- }
-
- /**
- * Adds a <code>JobQueue</code> for the given thread under the given <code>ThreadID</code>
- * with the given disposeId.
- * <p>
- * @param createWorkerThread create a JobQueue with or without worker thread
- * @param threadId the thread id to use
- * @param disposeId the dispose id
- */
- public JobQueue addThread(boolean createWorkerThread, ThreadID threadId, Object disposeId, JobQueue syncQueue) {
- if(_disposed) throw new RuntimeException("ThreadPool.addThread(" + threadId + ") - is disposed");
-
- if(DEBUG) System.err.println("##### ThreadPool.addThread:" + threadId);
-
- JobQueue jobQueue = null;
- synchronized(this) {
- jobQueue = (JobQueue)_jobQueues.get(threadId);
- if(jobQueue == null) {
- if(syncQueue != null)
- jobQueue = new JobQueue(this, threadId, syncQueue);
- else
- jobQueue = new JobQueue(this, threadId, createWorkerThread);
-
- if(disposeId != null)
- _disposeIds.put(threadId, disposeId);
- }
- }
+ // acquiring the jobQueue registers it at the ThreadPoolFactory
jobQueue.acquire();
-
- return jobQueue;
- }
-
- /**
- * Adds a jobQueue for the current thread to the threadpool.
- * Requests are now put into this queue.
- * Implements the method of <code>IThreadPool</code>
- * <p>
- * @param disposeId the dipose id with which the thread can be interrupted while staying in the queue
- * @see #enter
- * @see com.sun.star.lib.uno.environments.remote.IThreadPool#addThread
- */
- public void addThread(Object disposeId) {
- if(_disposed) throw new RuntimeException("ThreadPool.addThread - is disposed");
-
- addThread(false, getThreadId(Thread.currentThread()), disposeId, null);
- }
-
- /**
- * Gives the <code>JobQueue</code> for the given threadId.
- * <p>
- * @return the job queue
- * @param threadId the thread id
- * @see com.sun.star.lib.uno.environments.remote.ThreadID
- * @see com.sun.star.lib.uno.environments.remote.JobQueue
- */
- public JobQueue getJobQueue(ThreadID threadId) {
- return (JobQueue)_jobQueues.get(threadId);
}
- /**
- * Removes the <code>JobQueue</code> for the given threadId.
- * <p>
- * @param threadId the thread id
- * @see com.sun.star.lib.uno.environments.remote.ThreadID
- * @see com.sun.star.lib.uno.environments.remote.JobQueue
- */
- public void removeThread(ThreadID threadId) {
- if(_disposed) throw new RuntimeException("ThreadPool.removeThread - is disposed");
-
- if(DEBUG) System.err.println("##### ThreadPool.removeThread:" + threadId);
-
- JobQueue jobQueue = (JobQueue)_jobQueues.get(threadId);
-
- if(jobQueue != null)
- jobQueue.release();
- }
-
- /**
- * Removes the jobQueue for the current thread.
- * Implements the method of <code>IThreadPool</code>
- * <p>
- * @see com.sun.star.lib.uno.environments.remote.IThreadPool#removeThread
- */
- public void removeThread() {
- if(_disposed) throw new RuntimeException("ThreadPool.removeQueue - is disposed");
+ public void detach() {
+ ThreadId threadId = _javaThreadPoolFactory.getThreadId();
- removeThread(getThreadId());
+ JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(threadId);
+ // releasing the jobQueue deregisters it from the ThreadPoolFactory
+ jobQueue.release();
}
- /**
- * Queues a job into the jobQueue of the thread belonging to the jobs threadId.
- * Implements the method of <code>IThreadPool</code>
- * <p>
- * @param job the job
- * @param disposeId the dispose id
- * @see com.sun.star.lib.uno.environments.remote.IThreadPool#putJob
- */
- private Object _syncPutJob = new Object();
-
- public void putJob(Job job, Object disposeId) {
- if(_disposed) throw new RuntimeException("ThreadPool.putJob - is disposed");
-
- JobQueue jobQueue = null;
-
- ThreadID threadId = job.getThreadId();
-
- if(DEBUG) System.err.println("#### ThreadPool.putJob:" + threadId + " " + job + " " + _jobQueues);
+ public Object enter() throws Throwable {
+ ThreadId threadId = _javaThreadPoolFactory.getThreadId();
- synchronized(_syncPutJob) {
- jobQueue = (JobQueue)_jobQueues.get(threadId);
- if(jobQueue == null) {
- if(job.getOperation() == null) // a reply? and no thread for it?
- throw new RuntimeException(getClass().getName() + ".putJob - no thread for reply " + threadId);
+ JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(threadId);
- jobQueue = new JobQueue(this, threadId, true);
- }
- jobQueue.putJob(job, disposeId);
- }
+ return jobQueue.enter(this);
}
- /**
- * Enters the <code>ThreadPool</code> under the given thread id.
- * Waits for a reply job or an exception.
- * <p>
- * @result the result of final reply
- * @param threadId the thread id to use
- */
- public Object enter(int waitTime, ThreadID threadId) throws Throwable {
- if(_disposed) throw new RuntimeException("ThreadPool.enter - is disposed");
+ public void putJob(Job job) {
+ if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob:" + job.isSynchron() + " " + job.getThreadId());
- JobQueue jobQueue = (JobQueue)_jobQueues.get(threadId);
+ if(job.isSynchron() || job.getOperation() == null) { // note: replys must be synchron
+ JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(job.getThreadId());
- Object object = null;
+ // this has not be synchronized, cause
+ // sync jobs can only come over one bridge
+ // (cause the thread blocks on other side)
+ if(jobQueue == null)
+ jobQueue = new JobQueue(_javaThreadPoolFactory, job.getThreadId(), true);
- try {
- object = jobQueue.enter(waitTime, _disposeIds.get(threadId));
+ // put job acquires the queue and registers it at the ThreadPoolFactory
+ jobQueue.putJob(job, this);
}
- finally {
- removeThread(threadId);
- }
-
- return object;
- }
-
- /**
- * Lets the current thread enter the ThreadPool.
- * The thread then dispatches all jobs and leaves
- * the ThreadPool when it gets a reply job.
- * Implements the method of <code>IThreadPool</code>
- * <p>
- * @see com.sun.star.lib.uno.environments.remote.IThreadPool#enter
- */
- public Object enter(int waitTime) throws Throwable {
- return enter(waitTime, getThreadId());
- }
-
- public Object enter() throws Throwable {
- return enter(0);
- }
-
- /**
- * Interrupts all threads which have associated the dispose id.
- * Implements the method of <code>IThreadPool</code>
- * <p>
- * @param disposeId the dispose id
- * @see com.sun.star.lib.uno.environments.remote.IThreadPool#dispose
- */
- public void dispose(Object disposeId) {
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".dispose:" + disposeId);
- // clear all jobqueues
- /*synchronized(_jobQueues)*/ {
- Enumeration elements = _jobQueues.elements();
- while(elements.hasMoreElements()) {
- JobQueue jobQueue = (JobQueue)elements.nextElement();
- jobQueue.interrupt(disposeId);
+ else {
+ // this has to be synchronized, cause
+ // async jobs of the same thread can come
+ // over different bridges
+ synchronized(_javaThreadPoolFactory) {
+ JobQueue async_jobQueue = _javaThreadPoolFactory.getAsyncJobQueue(job.getThreadId());
+
+ // ensure there is jobQueue
+ if(async_jobQueue == null) // so, there is really no async queue
+ async_jobQueue = new JobQueue(_javaThreadPoolFactory, job.getThreadId());
+
+ // put job acquires the queue and registers it at the ThreadPoolFactory
+ async_jobQueue.putJob(job, this);
}
}
}
- /**
- * Stops interrupting all jobs queued by the given bridge.
- * Implements the method of <IThreadPool>.
- */
- public synchronized void stopDispose(Object disposeId) {
+ public void dispose(Throwable throwable) {
+ if(DEBUG) System.err.println("##### " + getClass().getName() + ".dispose:" + throwable);
+ _javaThreadPoolFactory.dispose(this, throwable);
}
- synchronized void dispose() {
- if(_disposed) throw new RuntimeException("ThreadPool.dispose - is disposed");
-
- _disposed = true;
-
- if(_jobQueues.size() > 0)
- System.err.println("Warning! ThreadPool.dipose - there are active JobQueus:" + _jobQueues.size());
-
- // clear all jobqueues
- Enumeration elements = _jobQueues.elements();
- while(elements.hasMoreElements())
- ((JobQueue)elements.nextElement()).clear();
-
- _jobQueues.clear();
- _jobQueues.notifyAll();
-
- _jobQueues = null;
+ public void destroy() {
}
}
diff --git a/jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java b/jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java
index 7cd515c0fe44..55af366bb608 100644
--- a/jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java
+++ b/jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java
@@ -2,9 +2,9 @@
*
* $RCSfile: JobQueue.java,v $
*
- * $Revision: 1.13 $
+ * $Revision: 1.14 $
*
- * last change: $Author: kr $ $Date: 2001-05-04 11:56:03 $
+ * last change: $Author: kr $ $Date: 2001-05-17 12:55:05 $
*
* The Contents of this file are made available subject to the terms of
* either of the following licenses
@@ -83,7 +83,7 @@ import com.sun.star.uno.UnoRuntime;
* (put by <code>putjob</code>) into the async queue, which is only
* known by the sync queue.
* <p>
- * @version $Revision: 1.13 $ $ $Date: 2001-05-04 11:56:03 $
+ * @version $Revision: 1.14 $ $ $Date: 2001-05-17 12:55:05 $
* @author Kay Ramme
* @see com.sun.star.lib.uno.environments.remote.ThreadPool
* @see com.sun.star.lib.uno.environments.remote.Job
@@ -94,7 +94,7 @@ public class JobQueue {
/**
* When set to true, enables various debugging output.
*/
- public static final boolean DEBUG = false;
+ private static final boolean DEBUG = false;
/**
* E.g. to get privleges for security managers, it is
@@ -102,75 +102,66 @@ public class JobQueue {
*/
static public IInvokeHook __JobDispatcher_run_hook;
- static protected int __instances;
-
protected Job _head; // the head of the job list
protected Job _tail; // the tail of the job list
- protected Job _current; // the current executing job
- protected ThreadID _threadId; // the thread id of the queue
- protected int _add_count = 0; // the stack deepness
+ protected ThreadId _threadId; // the thread id of the queue
+ protected int _ref_count = 0; // the stack deepness
protected boolean _createThread; // create a worker thread, if needed
protected boolean _createThread_now; // create a worker thread, if needed
protected Thread _worker_thread; // the thread that does the jobs
- protected Hashtable _disposeIds = new Hashtable(); // disposeIds for disposing
+ protected Object _disposeId; // the active dispose id
+ protected Object _doDispose = null;
+ protected Throwable _throwable;
protected JobQueue _async_jobQueue; // chaining job qeueus for asyncs
protected JobQueue _sync_jobQueue; // chaining job qeueus for syncs
protected boolean _active = false;
- protected JavaThreadPool _javaThreadPool;
-
- // statistics
- protected int _async_threads_created;
- protected int _sync_threads_created;
- protected int _async_jobs_queued;
- protected int _sync_jobs_queued;
-
- class MutableInt {
- int _value;
- }
+ protected JavaThreadPoolFactory _javaThreadPoolFactory;
/**
* A thread for dispatching jobs
*/
class JobDispatcher extends Thread implements IInvokable {
- JobDispatcher() {
+ Object _disposeId;
+
+ JobDispatcher(Object disposeId) {
if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId);
- if(_sync_jobQueue == null)
- ++ _sync_threads_created;
- else
- ++ _sync_jobQueue._async_threads_created;
+ _disposeId = disposeId;
}
- ThreadID getThreadId() {
+ ThreadId getThreadId() {
return _threadId;
}
public Object invoke(Object params[]) {
try {
- _javaThreadPool.enter(5000);
+ enter(2000, _disposeId);
}
- catch(Throwable exception) {
-// catch(java.lang.Exception exception) {
+ catch(Throwable throwable) {
if(_head != null || _active) { // there was a job in progress, so give a stack
- System.err.println(getClass().getName() + " - exception occurred:" + exception);
- exception.printStackTrace(System.err);
+ System.err.println(getClass().getName() + " - exception occurred:" + throwable);
+ throwable.printStackTrace(System.err);
}
}
+ finally {
+ release();
+ }
return null;
}
public void run() {
- if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run");
+ if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread());
if(__JobDispatcher_run_hook != null) {
try {
__JobDispatcher_run_hook.invoke(this, null);
+
}
catch(Exception exception) { // should not fly
System.err.println(getClass().getName() + " - unexpected: method >doWork< threw an exception - " + exception);
@@ -179,9 +170,16 @@ public class JobQueue {
else
invoke(null);
- // dispose the jobQueue
-// dispose();
if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId);
+
+// try {
+// Object object = new Object();
+// synchronized(object) {
+// object.wait();
+// }
+// }
+// catch(InterruptedException interruptedException) {
+// }
}
}
@@ -194,15 +192,23 @@ public class JobQueue {
* @param sync_jobQueue the sync queue this async queue belongs to
* @see com.sun.star.lib.uno.environments.remote.ThreadID
*/
- JobQueue(JavaThreadPool javaThreadPool, ThreadID threadId, JobQueue sync_jobQueue) {
- ++ __instances;
+ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) {
+ _javaThreadPoolFactory = javaThreadPoolFactory;
+ _threadId = new ThreadId();
+
+ _sync_jobQueue = javaThreadPoolFactory.getJobQueue(threadId);
+ if(_sync_jobQueue == null) {
+ _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true);
+ _sync_jobQueue.acquire();
+ }
+
+ _sync_jobQueue._async_jobQueue = this;
- _javaThreadPool = javaThreadPool;
- _threadId = threadId;
- _sync_jobQueue = sync_jobQueue;
_createThread = true;
_createThread_now = true;
+ acquire();
+
if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId);
}
@@ -213,10 +219,8 @@ public class JobQueue {
* @param createThread if true, the queue creates a worker thread if needed
* @see com.sun.star.lib.uno.environments.remote.ThreadID
*/
- JobQueue(JavaThreadPool javaThreadPool, ThreadID threadId, boolean createThread){
- ++ __instances;
-
- _javaThreadPool = javaThreadPool;
+ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){
+ _javaThreadPoolFactory = javaThreadPoolFactory;
_threadId = threadId;
_createThread = createThread;
_createThread_now = createThread;
@@ -225,100 +229,34 @@ public class JobQueue {
}
/**
- * Gives the currently dispatched job.
- * <p>
- * @return the dispatched job
- * @see com.sun.star.lib.uno.environments.remote.Job
- */
- Job getCurrentJob() {
- return _current;
- }
-
- /**
* Gives the thread id of this queue
* <p>
* @return the thread id
* @see com.sun.star.lib.uno.environments.remote.ThreadID
*/
- ThreadID getThreadId() {
+ ThreadId getThreadId() {
return _threadId;
}
- /**
- * Gives the dispatcher thread
- * <p>
- * @return the thread
- */
- Thread getThread() {
- return _worker_thread;
- }
-
-
synchronized void acquire() {
- if(_add_count <= 0)
- _javaThreadPool.addJobQueue(_threadId, this);
+ if(_ref_count <= 0)
+ _javaThreadPoolFactory.addJobQueue(this);
- ++ _add_count;
+ ++ _ref_count;
}
synchronized void release() {
- -- _add_count;
-
- if(_add_count <= 0) {
- _javaThreadPool.removeJobQueue(_threadId);
- dispose();
- }
- }
+ -- _ref_count;
- /**
- * Adds a dispose id.
- * <p>
- * @return the count of how often the id has already been added
- * @param disposeId the dispose id
- */
- private synchronized MutableInt addDisposeId(Object disposeId) {
- MutableInt disposeId_count = null;
+ if(_ref_count <= 0) {
+ _javaThreadPoolFactory.removeJobQueue(this);
- if(disposeId != null) {
- if(DEBUG) System.err.println("##### " + getClass().getName() + " " + this +".addDisposeId:" + disposeId);
- disposeId_count = (MutableInt)_disposeIds.get(disposeId);
- if(disposeId_count == null) {
- disposeId_count = new MutableInt();
- _disposeIds.put(disposeId, disposeId_count);
+ if(_sync_jobQueue != null) {
+ _sync_jobQueue._async_jobQueue = null;
+ _sync_jobQueue.release();
}
-
- ++ disposeId_count._value;
-
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".addDisposeId value:" + disposeId_count._value);
-
}
-
- return disposeId_count;
- }
-
- /**
- * Removes a dispose id.
- * <p>
- * @param disposeId the dispose id
- * @param disposeId_count
- */
- private synchronized void removeDisposeId(Object disposeId, MutableInt disposeId_count) {
- if(disposeId != null) {
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeDisposeId:" + disposeId + " " + disposeId_count);
-
- if(disposeId_count == null)
- disposeId_count = (MutableInt)_disposeIds.get(disposeId);
-
- if(disposeId_count != null) {
- -- disposeId_count._value;
- if(disposeId_count._value <= 0)
- _disposeIds.remove(disposeId);
- }
- if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeDisposeId value:" + disposeId_count._value);
-
- }
-
}
/**
@@ -327,19 +265,28 @@ public class JobQueue {
* @return a job or null if timed out
* @param waitTime the maximum amount of time to wait for a job
*/
- private Job removeJob(int waitTime) {
+ private Job removeJob(int waitTime) throws Throwable {
if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId);
Job job = null;
synchronized (this) {
// wait max. waitTime time for a job to enter the queue
boolean waited = false;
- while(_head == null && waitTime >= 0 && !waited) {
+ while(_head == null && (waitTime == 0 || !waited)) {
+ if(_doDispose == _disposeId) {
+ _doDispose = null;
+ throw _throwable;
+ }
+
+ // notify sync queues
+ notifyAll();
+
try {
+ // wait for new job
wait(waitTime);
}
catch(InterruptedException interruptedException) {
- throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - caught:" + interruptedException);
+ throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
}
// signal that we have already waited once
@@ -348,29 +295,34 @@ public class JobQueue {
if(_head != null) {
- _current = _head;
+ Job current = _head;
_head = _head._next;
if(_head == null)
_tail = null;
- job = _current;
+ job = current;
_active = true;
}
}
// always wait for asynchron jobqueue to be finished !
- if( job != null && _async_jobQueue != null )
- {
+ if(job != null && _async_jobQueue != null) {
synchronized(_async_jobQueue) {
// wait for async queue to be empty and last job to be done
while(_async_jobQueue._active || _async_jobQueue._head != null) {
if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " + _async_jobQueue._worker_thread);
+
+ if(_doDispose == _disposeId) {
+ _doDispose = null;
+ throw _throwable;
+ }
+
try {
- _async_jobQueue.wait(10);
+ _async_jobQueue.wait();
}
catch(InterruptedException interruptedException) {
- throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - caught:" + interruptedException);
+ throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
}
}
}
@@ -385,42 +337,9 @@ public class JobQueue {
* @param job the job
* @param disposeId a dispose id
*/
- public void putJob(Job job, Object disposeId) {
- if(job.getOperation() == null || job.isSynchron()) { // if job is a reply or is sync
- // fill the sync queue (this)
- _putJob(job, disposeId);
-
- ++ _sync_jobs_queued;
- }
- else {
- synchronized(this) {
- // create the async JobQueue ?
- if(_async_jobQueue == null) {
- _async_jobQueue = new JobQueue(_javaThreadPool, new ThreadID(_threadId), this);
- }
-
- // fill the async queue, async queue are intentionally not disposed
- _async_jobQueue._putJob(job, null);
-
- ++ _async_jobs_queued;
- }
- }
- }
-
- /**
- * Puts a job into the queue.
- * <p>
- * @param job the job
- * @param disposeId a dispose id
- */
- private synchronized void _putJob(Job job, Object disposeId) {
+ synchronized void putJob(Job job, Object disposeId) {
if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job);
- // Hold the dispose id at the, to be able to remove the dispose id
- // once the job has been executed.
- job._disposeId = disposeId;
- addDisposeId(disposeId);
-
if(_tail != null)
_tail._next = job;
else
@@ -431,11 +350,9 @@ public class JobQueue {
if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one
acquire();
- if(_sync_jobQueue != null)
- _sync_jobQueue.acquire();
_createThread_now = false;
- new JobDispatcher().start();
+ new JobDispatcher(disposeId).start();
}
// always notify possible waiters
@@ -464,7 +381,8 @@ public class JobQueue {
boolean quit = false;
- MutableInt disposeId_count = addDisposeId(disposeId);
+ Object hold_disposeId = _disposeId;
+ _disposeId = disposeId;
Object result = null;
@@ -473,6 +391,7 @@ public class JobQueue {
while(!quit) {
Job job = null;
+
try {
job = removeJob(waitTime);
@@ -482,7 +401,6 @@ public class JobQueue {
}
finally {
_active = false;
- removeDisposeId(job._disposeId, null);
}
if(job.isFinal()) {
@@ -507,7 +425,7 @@ public class JobQueue {
_createThread_now = true;
- removeDisposeId(disposeId, disposeId_count);
+ _disposeId = hold_disposeId;
if(_sync_jobQueue != null)
notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
@@ -528,67 +446,16 @@ public class JobQueue {
* <p>
* @param disposeId the dispose id
*/
- synchronized void interrupt(Object disposeId) {
- MutableInt disposeId_count = (MutableInt)_disposeIds.get(disposeId);
+ synchronized void dispose(Object disposeId, Throwable throwable) {
+ if(_sync_jobQueue == null) { // dispose only sync queues
+ _doDispose = disposeId;
+ _throwable = throwable;
- if(DEBUG) System.err.println("##### " + getClass().getName() + " " + this + ".interrupt:" + disposeId + " " + disposeId_count);
+ // get thread out of wait and let it throw the throwable
+ if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread");
- if(disposeId_count != null && _worker_thread != null) { //
- _worker_thread.interrupt();
+ notifyAll();
}
}
-
- /**
- * The finalizer decreases the instance count
- */
- public void finalize() {
- -- __instances;
- }
-
- /**
- * Prints statistics abourt the queue
- */
- void printStats() {
- System.err.println("threads created all: " + (_sync_threads_created + _async_threads_created)
- + " asyncs: " + _async_threads_created
- + " syncs: " + _sync_threads_created);
- System.err.println("jobs pub - all: " + (_async_jobs_queued + _sync_jobs_queued)
- + " asyncs: " + _async_jobs_queued
- + " syncs: " + _sync_jobs_queued);
- }
-
- public String toString() {
- String string = "jobQueue: " + _threadId.toString();
-
- Enumeration elements = _disposeIds.keys();
- while(elements.hasMoreElements()) {
- string += " " + elements.nextElement();
- }
-
- return string;
- }
-
-
- void dispose() {
- if(_sync_jobQueue != null) {
- _sync_jobQueue.release();
- }
- }
-
- /**
- * Clears the queue
- */
- synchronized void clear() {
- if(_head != null)
- System.err.println("JobQueue.dispose - jobs left");
-
- _head = _tail = _current = null;
- _worker_thread = null;
-
-// _threadId = null;
-// _async_jobQueue = null;
-
-// notify(); // wakes up all threads, which are waiting for jobs
- }
}
diff --git a/jurt/com/sun/star/lib/uno/environments/remote/makefile.mk b/jurt/com/sun/star/lib/uno/environments/remote/makefile.mk
index 1fec03e7d7ba..9c09251a3807 100644
--- a/jurt/com/sun/star/lib/uno/environments/remote/makefile.mk
+++ b/jurt/com/sun/star/lib/uno/environments/remote/makefile.mk
@@ -2,9 +2,9 @@
#
# $RCSfile: makefile.mk,v $
#
-# $Revision: 1.2 $
+# $Revision: 1.3 $
#
-# last change: $Author: jsc $ $Date: 2000-11-08 13:06:48 $
+# last change: $Author: kr $ $Date: 2001-05-17 12:55:05 $
#
# The Contents of this file are made available subject to the terms of
# either of the following licenses
@@ -71,22 +71,23 @@ TARGET = com_sun_star_lib_uno_environments_remote
# --- Files --------------------------------------------------------
-JAVACLASSFILES= \
- $(CLASSDIR)$/$(PACKAGE)$/IMarshal.class \
- $(CLASSDIR)$/$(PACKAGE)$/IMessage.class \
- $(CLASSDIR)$/$(PACKAGE)$/IUnmarshal.class \
- $(CLASSDIR)$/$(PACKAGE)$/IProtocol.class \
- $(CLASSDIR)$/$(PACKAGE)$/Job.class \
- $(CLASSDIR)$/$(PACKAGE)$/JobQueue.class \
- $(CLASSDIR)$/$(PACKAGE)$/IReceiver.class \
- $(CLASSDIR)$/$(PACKAGE)$/ThreadID.class \
- $(CLASSDIR)$/$(PACKAGE)$/IThreadPool.class \
- $(CLASSDIR)$/$(PACKAGE)$/JavaThreadPool.class \
- $(CLASSDIR)$/$(PACKAGE)$/Protocol.class \
- $(CLASSDIR)$/$(PACKAGE)$/ThreadPool.class \
- $(CLASSDIR)$/$(PACKAGE)$/remote_environment.class \
- $(CLASSDIR)$/$(PACKAGE)$/INativeCallback.class \
- $(CLASSDIR)$/$(PACKAGE)$/NativeThreadPoolWrapper.class
+JAVACLASSFILES= \
+ $(CLASSDIR)$/$(PACKAGE)$/IMarshal.class \
+ $(CLASSDIR)$/$(PACKAGE)$/IMessage.class \
+ $(CLASSDIR)$/$(PACKAGE)$/IUnmarshal.class \
+ $(CLASSDIR)$/$(PACKAGE)$/IProtocol.class \
+ $(CLASSDIR)$/$(PACKAGE)$/Job.class \
+ $(CLASSDIR)$/$(PACKAGE)$/JobQueue.class \
+ $(CLASSDIR)$/$(PACKAGE)$/IReceiver.class \
+ $(CLASSDIR)$/$(PACKAGE)$/Protocol.class \
+ $(CLASSDIR)$/$(PACKAGE)$/ThreadId.class \
+ $(CLASSDIR)$/$(PACKAGE)$/IThreadPool.class \
+ $(CLASSDIR)$/$(PACKAGE)$/IThreadPoolFactory.class \
+ $(CLASSDIR)$/$(PACKAGE)$/ThreadPoolFactory.class \
+ $(CLASSDIR)$/$(PACKAGE)$/JavaThreadPoolFactory.class \
+ $(CLASSDIR)$/$(PACKAGE)$/JavaThreadPool.class \
+ $(CLASSDIR)$/$(PACKAGE)$/NativeThreadPoolFactory.class \
+ $(CLASSDIR)$/$(PACKAGE)$/remote_environment.class
# --- Targets ------------------------------------------------------
diff --git a/jurt/test/com/sun/star/lib/uno/environments/remote/JobQueue_Test.java b/jurt/test/com/sun/star/lib/uno/environments/remote/JobQueue_Test.java
index be16bf706187..ac671757dbe0 100644
--- a/jurt/test/com/sun/star/lib/uno/environments/remote/JobQueue_Test.java
+++ b/jurt/test/com/sun/star/lib/uno/environments/remote/JobQueue_Test.java
@@ -2,9 +2,9 @@
*
* $RCSfile: JobQueue_Test.java,v $
*
- * $Revision: 1.6 $
+ * $Revision: 1.7 $
*
- * last change: $Author: kr $ $Date: 2001-05-04 12:03:56 $
+ * last change: $Author: kr $ $Date: 2001-05-17 12:55:06 $
*
* The Contents of this file are made available subject to the terms of
* either of the following licenses
@@ -76,351 +76,362 @@ import com.sun.star.uno.UnoRuntime;
public class JobQueue_Test {
- static class MyContext {
- }
-
- static JavaThreadPool __javaThreadPool = new JavaThreadPool();
+ /**
+ * When set to true, enables various debugging output.
+ */
+ private static final boolean DEBUG = false;
- static class MyImpl implements MyInterface {
- int _received_requestId;
- int _send_requestId;
- boolean _passed = true;
- Object _context = new MyContext();
+ static JavaThreadPoolFactory __javaThreadPoolFactory = new JavaThreadPoolFactory();
+ static IReceiver __iReceiver = new MyReceiver();
+ static Object __disposeId = new Object();
+ static TypeDescription __workAt_td = TypeDescription.getTypeDescription(IWorkAt.class);
- public int getNext() {
- return _send_requestId ++;
- }
+// static class Receiver implements IReceiver {
+// public void sendReply(boolean exception, ThreadID threadId, Object result) {
+// // System.err.println(getClass().getName() + ".sendReply " + threadId + " " + result);
+// }
+// }
- public Object doSomething(Object param) throws InterruptedException {
- // synchronized(this) {
-// long waitTime = (long)(Math.random() * 100);
-// if(waitTime > 0)
-// wait(waitTime); // simulate work
-// }
- _passed = _passed && (((Integer)param).intValue() == _received_requestId);
- if(!_passed)
- throw new NullPointerException("blblbl: " + param + " " + _received_requestId);
+ static class TestThread extends Thread {
+ ThreadId _threadId = __javaThreadPoolFactory.getThreadId(this);
+ JobQueue _jobQueue;
+ Object _disposeId = new Object();
+ int _waitTime_before_enter;
+ String _message;
- ++ _received_requestId;
+ TestThread(int waitTime_before_enter) {
+ _waitTime_before_enter = waitTime_before_enter;
+ _jobQueue = new JobQueue(__javaThreadPoolFactory, _threadId, false);
+ }
-// synchronized(this) {
-// long waitTime = (long)(Math.random() * 100);
-// if(waitTime > 0)
-// wait(waitTime); // simulate work
-// }
+ TestThread(JobQueue jobQueue) {
+ _jobQueue = jobQueue;
- return "blabla";
}
- public Object syncCall(Object param) throws Throwable {
- Object object = doSomething(param);
+ public void run() {
- // send a request to ourself
+ synchronized(this) {
+ notify();
+ }
- ThreadPool.addThread(_context);
- Job job = new Job(this, new MyReceiver(null), // receiver
- new MyMessage(true,
- TypeDescription.getTypeDescription(MyInterface.class),
- UnoRuntime.generateOid(this),
- JavaThreadPool.getThreadId(Thread.currentThread()),
- this,
- null,
- null));
+ try {
+ if(_waitTime_before_enter != 0)
+ Thread.sleep(_waitTime_before_enter);
- ThreadPool.putJob(job, null);
-// System.err.println("entering ...");
- ThreadPool.enter();
-// System.err.println("left");
+ if(DEBUG)System.err.println("entering queue");
- return object;
+ _jobQueue.enter(_disposeId);
+ }
+ catch(Throwable throwable) {
+ if(DEBUG) {
+ System.err.println("throwable:" + throwable);
+ throwable.printStackTrace();
+ }
- }
+ _message = throwable.getMessage();
+ }
- public Object asyncCall(Object param) throws Exception {
- return doSomething(param);
- }
+ synchronized(this) {
+ if(DEBUG) System.err.println("dying");
- void finish() {
- _passed = _passed && (_send_requestId == _received_requestId);
+ // notify the listeners that we are dying
+ notifyAll();
+ }
}
}
- static Object __lock = new Object();
- // this is for testing dispose
- static class MyImpl2 implements MyInterface {
- int _received_requestId;
- int _send_requestId;
- boolean _passed = true;
- Object _context = new MyContext();
+ static boolean test_thread_leaves_jobQueue_on_dispose(Vector vector, int waitTime_before_enter) throws Throwable {
+ boolean passed = true;
+
+ System.err.println("\t\ttest_thread_leaves_jobQueue_on_dispose with enter time:" + waitTime_before_enter);
+ TestThread testThread = new TestThread(waitTime_before_enter);
+ synchronized(testThread) {
+ testThread.start();
- public int getNext() {
- return _send_requestId ++;
+ testThread.wait();
}
- boolean waitForException() {
- boolean result = false;
+ String message = "xcxxxxxxxx";
- try {
- synchronized(this) {
- notify(); // notify the tester that we entered the call
- wait(); // wait for tester to tell us to leave
- }
- }
- catch(InterruptedException interruptedException) {
- result = true;
- }
+ synchronized(testThread) {
+ testThread._jobQueue.dispose(testThread._disposeId, new RuntimeException(message));
+
+ if(DEBUG) System.err.println("waiting for TestThread to die");
- return result;
+ testThread.wait();
}
- public Object syncCall(Object param) throws Exception{
- System.err.println("\tsyncCall - waiting for exception...");
- boolean occurred = waitForException();
- System.err.println("\toccurred (should):" + occurred);
+ testThread.join();
- _passed = _passed && occurred;
+ passed = testThread._message.equals(message);
+ if(!passed)
+ vector.addElement("test_thread_leaves_jobQueue_on_dispose - not passed: message != " + message + ", instead:" + testThread._message);
- synchronized(this) {
- notify();
- }
+ System.err.println("\t\tpassed? " + passed);
- return "hallo";
- }
+ return passed;
+ }
- public Object asyncCall(Object param) throws Exception {
- System.err.println("\tasyncCall - waiting for exception...");
- boolean occurred = waitForException();
- System.err.println("\toccurred (should not):" + occurred);
+ static boolean test_thread_leaves_jobQueue_on_reply(Vector vector, int waitTime_before_enter) throws Throwable {
+ boolean passed = true;
- _passed = _passed && !occurred;
+ System.err.println("\t\ttest_thread_leaves_jobQueue_on_reply:" + waitTime_before_enter);
+ TestThread testThread = new TestThread(waitTime_before_enter);
+ synchronized(testThread) {
+ testThread.start();
- synchronized(this) {
- notify();
- }
+ testThread.wait();
+ }
- return "hallo";
+ synchronized(testThread) {
+ if(DEBUG) System.err.println("waiting for TestThread to die");
+
+ // put reply job
+ testThread._jobQueue.putJob(new Job(null, __iReceiver, new MyMessage(true, __workAt_td, "oid", null, null, null, null)), null);
+
+ testThread.wait();
}
+
+ testThread.join();
+
+ System.err.println("\t\tpassed? " + passed);
+
+ return passed;
}
+ static void test_send_request(WorkAt workAt, String operation, JobQueue jobQueue) throws Throwable {
+ IMessage iMessage = new MyMessage(true, __workAt_td, "oid", null, null, operation, null);
- static void sendAsyncJobs(int jobs, JobQueue jobQueue, MyReceiver myReceiver, ThreadID threadID, MyInterface myImpl, Object context) throws Exception {
- // sending asynchrones calls
- System.err.println("\tsending " + jobs + " asynchrones calls...");
+ jobQueue.putJob(new Job(workAt, __iReceiver, iMessage), __disposeId);
+ }
- for(int i = 0; i < jobs; ++ i) {
- MyMessage myMessage = new MyMessage(false,
- TypeDescription.getTypeDescription(MyInterface.class),
- UnoRuntime.generateOid(myImpl),
- threadID,
- myImpl,
- "asyncCall",
- new Object[]{new Integer(myImpl.getNext())});
+ static void test_send_requests(WorkAt workAt, String operation, JobQueue jobQueue) throws Throwable {
+ IMessage iMessage = new MyMessage(true, __workAt_td, "oid", null, null, operation, null);
- Job job = new Job(myImpl, myReceiver, myMessage);
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ jobQueue.putJob(new Job(workAt, __iReceiver, iMessage), __disposeId);
+ }
+ }
+
+ static boolean test_execute_jobs(Vector vector, JobQueue jobQueue) throws Throwable {
+ boolean passed = true;
- jobQueue.putJob(job, context);
+ WorkAt workAt = new WorkAt();
+
+ test_send_requests(workAt, "increment", jobQueue);
+
+ synchronized(workAt) {
+ jobQueue.putJob(new Job(workAt, __iReceiver, new MyMessage(true, __workAt_td, "oid", null, null, "notifyme", null)), null);
+
+ while(!workAt._notified)
+ workAt.wait();
}
+
+ passed = workAt._counter == WorkAt.MESSAGES;
+ if(!passed)
+ vector.addElement("test_execute_jobs - not passed: workAt._counter == 20, instead:" + workAt._counter);
+
+ return passed;
}
- static void sendSyncJobs(int jobs, JobQueue jobQueue, MyReceiver myReceiver, ThreadID threadID, MyInterface myImpl, Thread thread, Object context, boolean wait) throws Throwable {
- // sending synchronous calls
- System.err.println("\tsending " + jobs + " synchrones calls...");
- for(int i = 0; i < jobs; ++ i) {
- MyMessage myMessage = new MyMessage(true,
- TypeDescription.getTypeDescription(MyInterface.class),
- UnoRuntime.generateOid(myImpl),
- threadID,
- myImpl,
- "syncCall",
- new Object[]{new Integer(myImpl.getNext())});
+ static boolean test_static_thread_executes_jobs(Vector vector, int waitTime_before_enter) throws Throwable {
+ boolean passed = true;
+ System.err.println("\t\ttest_static_thread_executes_jobs:" + waitTime_before_enter);
+ TestThread testThread = new TestThread(waitTime_before_enter);
+ synchronized(testThread) {
+ testThread.start();
- Job job_do = new Job(myImpl, myReceiver, myMessage);
+ testThread.wait();
+ }
- job_do._disposeId = context;
- if(thread == null) {
- synchronized(myReceiver) {
- jobQueue.putJob(job_do, context);
+ passed = test_execute_jobs(vector, testThread._jobQueue);
- if(wait) { // wait for the answer?
- myReceiver.wait();
- }
- }
- }
- else {
- jobQueue.putJob(job_do, context);
-
- myMessage = new MyMessage(true,
- TypeDescription.getTypeDescription(MyInterface.class),
- UnoRuntime.generateOid(myImpl),
- threadID,
- myImpl,
- null,
- null);
- Job job_return = new Job(myImpl, myReceiver, myMessage);
-
- jobQueue.putJob(job_return, context);
- jobQueue.enter(context);
- }
+ testThread._jobQueue.dispose(testThread._disposeId, new RuntimeException("xxxxxxxxxxxxx"));
+
+ synchronized(testThread) {
+ if(DEBUG) System.err.println("waiting for TestThread to die");
+
+ testThread.wait();
}
+
+ testThread.join();
+
+ System.err.println("\t\tpassed? " + passed);
+
+ return passed;
}
- static public boolean test_without_thread() throws Throwable { // this is like sending jobs from remote
- boolean passed[] = new boolean[]{true};
- System.err.println("doing test_without_thread ...");
+ static boolean test_dynamic_thread_executes_job(Vector vector) throws Throwable {
+ boolean passed = true;
- ThreadID threadID = new ThreadID("test_thread_id".getBytes());
+ Object disposeId = new Object();
- Object context = new MyContext();
+ System.err.println("\t\ttest_dynamic_thread_executes_job:");
- JobQueue jobQueue = new JobQueue(__javaThreadPool, threadID, true);
- MyImpl myImpl = new MyImpl();
- MyReceiver myReceiver = new MyReceiver(passed);
+ JobQueue jobQueue = new JobQueue(__javaThreadPoolFactory, new ThreadId(), true);
+ passed = test_execute_jobs(vector, jobQueue);
- do {
- sendAsyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, context);
- sendSyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, null, context, true);
- sendAsyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, context);
- sendSyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, null, context, true);
- }
- while(Math.random() > 0.25);
+ System.err.println("\t\tpassed? " + passed);
+ return passed;
+ }
- myImpl.finish();
+ static boolean test_async_jobQueue(Vector vector, WorkAt workAt, JobQueue async_jobQueue, ThreadId threadId) throws Throwable {
+ boolean passed = true;
- passed[0] = passed[0] && myImpl._passed;
+ // put slow async calls
+ if(DEBUG) System.err.println("\t\t\tputting asyncs:");
+ test_send_requests(workAt, "asyncCall", async_jobQueue);
- System.err.println("test_without_thread - passed? " + passed[0]);
+ // put fast sync calls
+ if(DEBUG) System.err.println("\t\t\tputting syncs:");
+ test_send_requests(workAt, "syncCall", __javaThreadPoolFactory.getJobQueue(threadId));
- jobQueue.printStats();
- return passed[0];
+ // wait until all is done
+ synchronized(workAt) {
+ async_jobQueue._sync_jobQueue.putJob(new Job(workAt, __iReceiver, new MyMessage(true, __workAt_td, "oid", null, null, "notifyme", null)), null);
+
+ while(!workAt._notified)
+ workAt.wait();
+ }
+
+ passed = passed && workAt.passedAsyncTest(vector);
+ if(!passed)
+ vector.addElement("workAt did not pass async test (sync overtook async)");
+
+ return passed;
}
- static public boolean test_with_thread() throws Throwable {
- boolean passed[] = new boolean[]{true};
+ static boolean test_static_thread_executes_asyncs(Vector vector) throws Throwable {
+ boolean passed = true;
- System.err.println("doing test_with_thread ...");
+ System.err.println("\t\ttest_static_thread_executes_asyncs:");
- ThreadID threadID = new ThreadID("test_thread_id".getBytes());
+ TestThread testThread = new TestThread(null);
- Object context = new MyContext();
+ // create an async queue
+ JobQueue async_jobQueue = new JobQueue(__javaThreadPoolFactory, testThread._threadId);
+ boolean tmp_passed = async_jobQueue._ref_count == 1;
+ passed = passed && tmp_passed;
- Thread thread = Thread.currentThread();
- JobQueue jobQueue = new JobQueue(__javaThreadPool, threadID, false);
- MyImpl myImpl = new MyImpl();
- MyReceiver myReceiver = new MyReceiver(passed);
+ testThread._jobQueue = __javaThreadPoolFactory.getJobQueue(testThread._threadId);
+ tmp_passed = testThread._jobQueue._ref_count == 1;
+ passed = passed && tmp_passed;
- do {
- sendAsyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, context);
- sendSyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, thread, context, true);
- sendAsyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, context);
- sendSyncJobs((int)(Math.random() * 10 + 1), jobQueue, myReceiver, threadID, myImpl, thread, context, true);
+
+ synchronized(testThread) {
+ testThread.start();
+
+ testThread.wait();
}
- while(Math.random() > 0.25);
+ WorkAt workAt = new WorkAt();
+
+ tmp_passed = test_async_jobQueue(vector, workAt, async_jobQueue, testThread._threadId);
+ passed = passed && passed;
- myImpl.finish();
+ testThread._jobQueue.dispose(testThread._disposeId, new RuntimeException("xxxxxxxxxxxxx"));
+ testThread.join();
- passed[0] = passed[0] && myImpl._passed;
+ tmp_passed = workAt._async_counter == WorkAt.MESSAGES;
+ passed = passed && tmp_passed;
- System.err.println("test_with_thread - passed? " + passed[0]);
+ tmp_passed = workAt._sync_counter == WorkAt.MESSAGES;
+ passed = passed && tmp_passed;
- jobQueue.printStats();
+ System.err.println("\t\tpassed? " + passed);
- return passed[0];
+ return passed;
}
+ static boolean test_dynamic_thread_executes_asyncs(Vector vector) throws Throwable {
+ boolean passed = true;
+ System.err.println("\t\ttest_dynamic_thread_executes_asyncs:");
- static public boolean test_disposing() throws Throwable {
- boolean passed[] = new boolean[]{true};
- System.err.println("doing test_disposing ...");
+ ThreadId threadId = new ThreadId();
+ JobQueue async_jobQueue = new JobQueue(__javaThreadPoolFactory, threadId);
- ThreadID threadID = new ThreadID("test_thread_id".getBytes());
+ WorkAt workAt = new WorkAt();
- Object context = new MyContext();
+ boolean tmp_passed = test_async_jobQueue(vector, workAt, async_jobQueue, threadId);
+ passed = passed && tmp_passed;
- JobQueue jobQueue = new JobQueue(__javaThreadPool, threadID, true);
- MyImpl2 myImpl = new MyImpl2();
- MyReceiver myReceiver = new MyReceiver(passed);
+ tmp_passed = workAt._async_counter == WorkAt.MESSAGES;
+ if(vector != null && !tmp_passed)
+ vector.addElement("test_dynamic_thread_executes_asyncs - not passed: worAt._async_counter == " + WorkAt.MESSAGES + ", instead:" + workAt._async_counter);
+ passed = passed && tmp_passed;
+ tmp_passed = workAt._sync_counter == WorkAt.MESSAGES;
+ if(!tmp_passed)
+ vector.addElement("test_dynamic_thread_executes_asyncs - not passed: worAt._sync_counter == " + WorkAt.MESSAGES + ",instead:" + workAt._sync_counter);
- // see if asyncs are interruptable, they should not be
- synchronized(myImpl) {
- sendAsyncJobs(1, jobQueue, myReceiver, threadID, myImpl, context);
- myImpl.wait();
- myImpl.notify();
+ passed = passed && tmp_passed;
- jobQueue.interrupt(context);
+ System.err.println("\t\tpassed? " + passed);
- myImpl.notify();
- myImpl.wait();
- }
+ return passed;
+ }
- // see if syncs are interruptable, they should be
- synchronized(myImpl) {
- sendSyncJobs(1, jobQueue, myReceiver, threadID, myImpl, null, context, false);
- myImpl.wait();
- myImpl.notify();
+ static public boolean test(Vector vector) throws Throwable {
+ System.err.println("\tJobQueue test:");
- jobQueue.interrupt(context);
+ boolean passed = true;
- myImpl.notify();
- myImpl.wait();
- }
+ boolean tmp_passed = test_thread_leaves_jobQueue_on_dispose(vector, 0);
+ passed = passed && tmp_passed;
- passed[0] = passed[0] && myImpl._passed;
+ tmp_passed = test_thread_leaves_jobQueue_on_dispose(vector, 5000);
+ passed = passed && tmp_passed;
- System.err.println("test_disposing - passed? " + passed[0]);
- jobQueue.printStats();
+ tmp_passed = test_thread_leaves_jobQueue_on_reply(vector, 0);
+ passed = passed && tmp_passed;
+ tmp_passed = test_thread_leaves_jobQueue_on_reply(vector, 5000);
+ passed = passed && tmp_passed;
- return passed[0];
- }
+ tmp_passed = test_static_thread_executes_jobs(vector, 0);
+ passed = passed && tmp_passed;
+ tmp_passed = test_static_thread_executes_jobs(vector, 5000);
+ passed = passed && tmp_passed;
- static public boolean test(Vector notpassed) throws Throwable {
- boolean passed = true;
+ tmp_passed = test_dynamic_thread_executes_job(vector);
+ passed = passed && tmp_passed;
+
- passed = passed && test_without_thread();
- if(!passed && notpassed != null)
- notpassed.addElement("JobQueue_Test - test_without_thread passed?" + passed);
+ tmp_passed = test_static_thread_executes_asyncs(vector);
+ passed = passed && tmp_passed;
- passed = passed && test_with_thread();
- if(!passed && notpassed != null)
- notpassed.addElement("JobQueue_Test - test_with_thread passed?" + passed);
- passed = passed && test_disposing();
- if(!passed && notpassed != null)
- notpassed.addElement("JobQueue_Test - test_disposing passed?" + passed);
+ tmp_passed = test_dynamic_thread_executes_asyncs(vector);
+ passed = passed && tmp_passed;
+ System.err.println("\tpassed? " + passed);
return passed;
}
static public void main(String args[]) throws Throwable {
- if(args.length == 0)
- test(null);
-
- else if(args[0].equals("test_disposing"))
- test_disposing();
-
- else if(args[0].equals("test_without_thread"))
- test_without_thread();
+ Vector vector = new Vector();
+ test(vector);
- else if(args[0].equals("test_with_thread"))
- test_with_thread();
+ for(int i = 0; i < vector.size(); ++ i)
+ System.err.println((String)vector.elementAt(i));
}
}
diff --git a/jurt/test/com/sun/star/lib/uno/environments/remote/ThreadPool_Test.java b/jurt/test/com/sun/star/lib/uno/environments/remote/ThreadPool_Test.java
index f176934e2a8b..c49de89777cc 100644
--- a/jurt/test/com/sun/star/lib/uno/environments/remote/ThreadPool_Test.java
+++ b/jurt/test/com/sun/star/lib/uno/environments/remote/ThreadPool_Test.java
@@ -2,9 +2,9 @@
*
* $RCSfile: ThreadPool_Test.java,v $
*
- * $Revision: 1.5 $
+ * $Revision: 1.6 $
*
- * last change: $Author: kr $ $Date: 2001-05-04 12:03:56 $
+ * last change: $Author: kr $ $Date: 2001-05-17 12:55:06 $
*
* The Contents of this file are made available subject to the terms of
* either of the following licenses
@@ -77,332 +77,618 @@ import com.sun.star.uno.UnoRuntime;
public class ThreadPool_Test {
- static int __requestId = 0;
- static int __running_thread_count;
-
- static interface IReadyListener {
- void readyEvent();
- }
+ /**
+ * When set to true, enables various debugging output.
+ */
+ private static final boolean DEBUG = false;
+ static IThreadPool __iThreadPool = null;
+ static IReceiver __iReceiver = new MyReceiver();
+ static TypeDescription __workAt_td = TypeDescription.getTypeDescription(IWorkAt.class);
+ static Object __disposeId = new Object();
- static class MyImpl implements MyInterface {
- int _received_requestId;
- int _send_requestId;
- boolean _passed = true;
- IReadyListener _iReadyListener;
- boolean _block;
- MyImpl() {
- }
+ static class TestThread extends Thread {
+ ThreadId _threadId;
+ Object _disposeId = new Object();
+ String _message;
+ IThreadPool _iThreadPool;
- public int getNext() {
- return _send_requestId ++;
+ TestThread() {
+ this(__iThreadPool);
}
- void addReadyListener(IReadyListener iReadyListener) {
- _iReadyListener = iReadyListener;
+ TestThread(IThreadPool iThreadPool) {
+ _iThreadPool = iThreadPool;
}
- public Object syncCall(Object param) throws Exception{
- Object object = doSomething(param);
+ public void run() {
+ _threadId = ThreadPoolFactory.getThreadId();
- // send a request to ourself
-// ThreadPool.addThread(null);
-// Job job = new Job(new MyReceiver(null), // receiver
-// JavaThreadPool.getThreadId(Thread.currentThread()), // threadID
-// __requestId ++, // requestId
-// this, // object
-// null, // operation,
-// new MyMessage(0), // parameter
-// true, // synchron ?
-// null, // exception ?
-// MyInterface.class); // interface
+ try {
+ synchronized(this) {
+ // notify that we are running
+ notify();
+ _iThreadPool.attach();
-// ThreadPool.putJob(job, null);
-// System.err.println("entering ...");
-// ThreadPool.enter();
-// System.err.println("left");
- if(_block) {
- try {
- synchronized(this) {
- System.err.println(this + " waiting for interrupt...");
- wait(); // wait for exception
- }
- }
- catch(InterruptedException interruptedException) {
- System.err.println(this + " succecessfully interrupted - rethrowing...");
- throw interruptedException;
+ // wait until we should continue
+ wait();
}
+
+ if(DEBUG) System.err.println("entering queue");
+
+ _iThreadPool.enter();
}
+ catch(Throwable throwable) {
+ if(DEBUG) throwable.printStackTrace();
+
+ _message = throwable.getMessage();
+ }
+
+ _iThreadPool.detach();
- return object;
+ synchronized(this) {
+ if(DEBUG) System.err.println("dying");
+ // notify the listeners that we are dying
+ notifyAll();
+ }
}
+ }
+
+ static void putJob(IWorkAt iWorkAt, boolean synchron, ThreadId threadId, String operation) {
+ IMessage iMessage = new MyMessage(synchron, __workAt_td, "oid", threadId, null, operation, null);
+
+ __iThreadPool.putJob(new Job(iWorkAt, __iReceiver, iMessage));
+ }
+
+
+ static boolean test_dispose(Vector vector, boolean silent) throws Throwable {
+ boolean passed = true;
+
+ if(!silent)
+ System.err.println("\t\ttest_dispose:");
+
+ IThreadPool iThreadPool = ThreadPoolFactory.createThreadPool();
+ TestThread testThread = new TestThread(iThreadPool);
+
+ ThreadId threadId = null;
+
+ // start the test thread
+ synchronized(testThread) {
+ testThread.start();
+
+ testThread.wait();
- public Object asyncCall(Object param) throws Exception {
- return doSomething(param);
+ threadId = testThread._threadId;
+
+ // let the thread attach and enter the threadpool
+ testThread.notifyAll();
}
- Object doSomething(Object param) throws Exception {
-// synchronized(this) {
-// long waitTime = (long)(Math.random() * 100);
-// if(waitTime > 0)
-// wait(waitTime); // simulate work
-// }
+ String message = "blabla";
- _passed = _passed && (((Integer)param).intValue() == _received_requestId);
+ // terminate the test thread
+ synchronized(testThread) {
+ if(DEBUG) System.err.println("waiting for TestThread to die");
- if(!_passed)
- throw new NullPointerException("blblbl");
+ // put reply job
+ iThreadPool.dispose(new RuntimeException(message));
+ testThread.wait();
+ }
+ testThread.join();
- ++ _received_requestId;
+ passed = testThread._message.equals(message);
- if(_iReadyListener != null)
- _iReadyListener.readyEvent();
+ if(!silent)
+ System.err.println("\t\tpassed? " + passed);
-// synchronized(this) {
-// long waitTime = (long)(Math.random() * 100);
-// if(waitTime > 0)
-// wait(waitTime); // simulate work
-// }
+ return passed;
+ }
- return "blabla";
+
+
+ static boolean test_thread_async(Vector vector, boolean silent) throws Throwable {
+ boolean passed = true;
+
+ if(!silent)
+ System.err.println("\t\ttest_thread_async:");
+
+ WorkAt workAt = new WorkAt();
+
+
+ ThreadId threadId = new ThreadId();
+
+ // queue asyncs
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ putJob(workAt, false, threadId, "increment");
}
- void finish() {
- _passed = _passed && (_send_requestId == _received_requestId);
+ synchronized(workAt) {
+ putJob(workAt, false, threadId, "notifyme");
+
+ while(!workAt._notified)
+ workAt.wait();
+ }
+
+ passed = workAt._counter == WorkAt.MESSAGES;
+
+ if(!silent)
+ System.err.println("\t\tpassed? " + passed);
+
+ return passed;
+ }
+
+ static boolean test_dynamic_thread_sync(Vector vector, boolean silent) throws Throwable {
+ boolean passed = true;
+
+ if(!silent)
+ System.err.println("\t\t test_dynamic_thread_sync:");
+
+ WorkAt workAt = new WorkAt();
+
+
+ ThreadId threadId = new ThreadId();
+
+ // queue asyncs
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ putJob(workAt, true, threadId, "increment");
}
- boolean pendingRequests() {
- return _send_requestId != _received_requestId;
+ synchronized(workAt) {
+ putJob(workAt, true, threadId, "notifyme");
+
+ while(!workAt._notified)
+ workAt.wait();
}
+
+ passed = workAt._counter == WorkAt.MESSAGES;
+
+ if(!silent)
+ System.err.println("\t\tpassed? " + passed);
+
+ return passed;
}
+ static boolean test_static_thread_sync(Vector vector, boolean silent) throws Throwable {
+ boolean passed = true;
+
+ if(!silent)
+ System.err.println("\t\t test_static_thread_sync:");
+
+ WorkAt workAt = new WorkAt();
+
+ TestThread testThread = new TestThread();
+
+ ThreadId threadId = null;
+
+ // start the test thread
+ synchronized(testThread) {
+ testThread.start();
+
+ testThread.wait();
+ threadId = testThread._threadId;
- static void sendJobs(int jobs, MyReceiver myReceiver, ThreadID threadID, MyImpl myImpl, boolean synchron, boolean finish, Object disposeId) throws Exception {
- // sending synchronous calls
- System.err.println("sending " + jobs + " " + synchron + " calls...");
-
- for(int i = 0; i < jobs; ++ i) {
- MyMessage myMessage = new MyMessage(synchron,
- TypeDescription.getTypeDescription(MyInterface.class),
- UnoRuntime.generateOid(myImpl),
- threadID,
- myImpl,
- finish ? null : (synchron ? "syncCall": "asyncCall"),
- new Object[]{new Integer(myImpl.getNext())});
-
- Job job = new Job(myImpl, myReceiver, myMessage);
-// Job job = new Job(UnoRuntime.generateOid(myImpl),
-// myReceiver, // receiver
-// threadID, // threadID
-// __requestId ++, // requestId
-// myImpl, // object
-// finish ? null : (synchron ? "syncCall": "asyncCall"), // operation,
-// myMessage, // parameter
-// synchron, // synchron ?
-// null, // exception ?
-// MyInterface.class); // interface
-
- ThreadPool.putJob(job, disposeId);
+ // let the thread attach and enter the threadpool
+ testThread.notifyAll();
}
+
+
+ // queue syncs
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ putJob(workAt, true, threadId, "increment");
+ }
+
+
+ // terminate the test thread
+ synchronized(testThread) {
+ if(DEBUG) System.err.println("waiting for TestThread to die");
+
+ // put reply job
+ putJob(workAt, true, threadId, null);
+
+ testThread.wait();
+ }
+
+ testThread.join();
+
+
+ passed = workAt._counter == WorkAt.MESSAGES;
+
+ if(!silent)
+ System.err.println("\t\tpassed? " + passed);
+
+ return passed;
}
+ static boolean test_dynamic_thread_async_sync_order(Vector vector, boolean silent) throws Throwable {
+ boolean passed = true;
+ if(!silent)
+ System.err.println("\t\ttest_dynamic_thread_async_sync_order:");
- static class Worker_with_Thread extends Thread {
- MyImpl _myImpl = new MyImpl();
- MyReceiver _myReceiver = new MyReceiver(null);
- boolean _started = false;
+ WorkAt workAt = new WorkAt();
- public void run() {
- System.err.println("WorkerThread - started");
- try {
- ThreadPool.addThread(null);
- _started = true;
- ThreadPool.enter();
+ ThreadId threadId = new ThreadId();
-// _myImpl.finish();
- }
- catch(Throwable exception) {
- System.err.println("WorkerThread - exception:" + exception);
- exception.printStackTrace();
- }
- System.err.println("WorkerThread - finished - passed:" + _myImpl._passed);
+ // queue asyncs
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ putJob(workAt, false, threadId, "asyncCall");
}
+
+ // queue syncs
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ putJob(workAt, true, threadId, "syncCall");
+ }
+
+
+ synchronized(workAt) {
+ putJob(workAt, true, threadId, "notifyme");
+
+ while(!workAt._notified)
+ workAt.wait();
+ }
+
+ passed = workAt.passedAsyncTest(vector);
+
+ if(!silent)
+ System.err.println("\t\tpassed? " + passed);
+
+ return passed;
}
- static class RemoteObject implements IReadyListener {
- MyImpl _myImpl = new MyImpl();
- MyReceiver _myReceiver = new MyReceiver(null);
- ThreadID _threadID = new ThreadID(UnoRuntime.generateOid(this).getBytes());
- boolean _finished = false;
- {
- _myImpl.addReadyListener(this);
+ static boolean test_static_thread_async_sync_order(Vector vector, boolean silent) throws Throwable {
+ boolean passed = true;
+
+ if(!silent)
+ System.err.println("\t\ttest_static_thread_async_sync_order:");
+
+ WorkAt workAt = new WorkAt();
+
+ TestThread testThread = new TestThread();
+
+ // start the test thread
+ synchronized(testThread) {
+ testThread.start();
+
+ testThread.wait();
}
- public synchronized void readyEvent() {
- _finished = true;
- notifyAll();
+ ThreadId threadId = testThread._threadId;
+
+ // queue asyncs
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ putJob(workAt, false, threadId, "asyncCall");
}
+
+
+ // let the thread attach and enter the threadpool
+ synchronized(testThread) {
+ testThread.notifyAll();
+ }
+
+
+ // queue syncs
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ putJob(workAt, true, threadId, "syncCall");
+ }
+
+
+ // terminate the test thread
+ synchronized(testThread) {
+ if(DEBUG) System.err.println("waiting for TestThread to die");
+
+ // put reply job
+ putJob(workAt, true, threadId, null);
+
+ testThread.wait();
+ }
+
+ testThread.join();
+
+ passed = workAt.passedAsyncTest(vector);
+
+ if(!silent)
+ System.err.println("\t\tpassed? " + passed);
+
+ return passed;
}
- static Vector __threads = new Vector();
+ static boolean test_stress(Vector vector) throws Throwable {
+ boolean passed = true;
+
+ System.err.println("\t\ttest_stress:");
+ WorkAt workAt = new WorkAt();
- static class SenderThread extends Thread {
- boolean _quit = false;
- Object _disposeId;
+ for(int i = 0; i < WorkAt.MESSAGES; ++ i) {
+ Thread.yield(); // force scheduling
+ ThreadId threadID = new ThreadId();
- SenderThread(Object disposeId) {
- _disposeId = disposeId;
+ putJob(workAt, true, threadID, "increment");
+ putJob(workAt, false, threadID, "increment");
}
- public void run() {
- try {
- while(!_quit && __threads.size() > 0) {
- Enumeration elements = __threads.elements();
- while(elements.hasMoreElements() && !_quit) {
- Object object = elements.nextElement();
-
- if(object instanceof Worker_with_Thread) {
- Worker_with_Thread thread = (Worker_with_Thread)object;
-
- if(thread._started && !thread._myImpl.pendingRequests()) {
- sendJobs((int)(Math.random() * 50 + 1), thread._myReceiver, JavaThreadPool.getThreadId(thread), thread._myImpl, false, false, _disposeId);
- sendJobs((int)(1), thread._myReceiver, JavaThreadPool.getThreadId(thread), thread._myImpl, true, false, _disposeId);
-
- if(Math.random() > 0.95) {
- sendJobs(1, thread._myReceiver, JavaThreadPool.getThreadId(thread), thread._myImpl, true, true, _disposeId); // finish
-
- __threads.removeElement(thread);
- }
- }
- }
- else {
- RemoteObject remoteObject = (RemoteObject)object;
-
- if(!remoteObject._myImpl.pendingRequests()) {
- sendJobs((int)(Math.random() * 50 + 1), remoteObject._myReceiver, remoteObject._threadID, remoteObject._myImpl, false, false, _disposeId);
- sendJobs((int)(Math.random() * 50 + 1), remoteObject._myReceiver, remoteObject._threadID, remoteObject._myImpl, true, false, _disposeId);
-
- if(Math.random() > 0.95) {
- __threads.removeElement(remoteObject);
- }
- }
- }
+
+ synchronized(workAt) {
+ while(workAt._counter < (2 * WorkAt.MESSAGES))
+ workAt.wait();
+ }
+
+
+ class Stress1 extends Thread {
+ Vector _vector;
+ boolean _passed = true;
+
+ Stress1(Vector vector) {
+ _vector = vector;
+ }
+
+ public void run() {
+ try {
+ for(int i = 0; i < 50; ++ i) {
+ boolean tmp_passed = test_thread_async(_vector, true);
+
+ _passed = _passed && tmp_passed;
}
- synchronized(this) {
- notify();
+ }
+ catch(Throwable throwable) {
+ System.err.println(throwable);
+ throwable.printStackTrace();
+ }
+ }
+ };
+
+
+ Stress1 stress1 = new Stress1(vector);
+ stress1.start();
+
+ class Stress2 extends Thread {
+ Vector _vector;
+ boolean _passed = true;
+
+ Stress2(Vector vector) {
+ _vector = vector;
+ }
+
+ public void run() {
+ try {
+ for(int i = 0; i < 50; ++ i) {
+ boolean tmp_passed = test_dynamic_thread_sync(_vector, true);
+
+ _passed = _passed && tmp_passed;
}
-// Thread.sleep((int)(Math.random() * 100));
+ }
+ catch(Throwable throwable) {
+ System.err.println(throwable);
+ throwable.printStackTrace();
}
}
- catch(Exception exception) {
- System.err.println("SenderThread - exception:" + exception);
- exception.printStackTrace();
+ };
+
+
+ Stress2 stress2 = new Stress2(vector);
+ stress2.start();
+
+
+
+ class Stress3 extends Thread {
+ Vector _vector;
+ boolean _passed = true;
+
+ Stress3(Vector vector) {
+ _vector = vector;
}
- }
- }
- static public boolean test_with_thread() throws InterruptedException {
- Object disposeId = new Integer(0);
+ public void run() {
+ try {
+ for(int i = 0; i < 50; ++ i) {
+ boolean tmp_passed = test_static_thread_sync(_vector, true);
+
+ _passed = _passed && tmp_passed;
+ }
+ }
+ catch(Throwable throwable) {
+ System.err.println(throwable);
+ throwable.printStackTrace();
+ }
+ }
+ };
- int blockers = 0;
- SenderThread senderThread = new SenderThread(disposeId);
-// senderThread.start();
- boolean started = false;
- Vector threads = new Vector();
+ Stress3 stress3 = new Stress3(vector);
+ stress3.start();
- do {
- ++ __running_thread_count;
- Object object = null;
- if(Math.random() > 1.25) {
- Thread thread = new Worker_with_Thread();
- thread.start();
- object = thread;
+ class Stress4 extends Thread {
+ Vector _vector;
+ boolean _passed = true;
+
+ Stress4(Vector vector) {
+ _vector = vector;
}
- else {
- object = new RemoteObject();
- if(Math.random() > 0.70) {
- ((RemoteObject)object)._myImpl._block = true;
- ++ blockers;
+ public void run() {
+ try {
+ for(int i = 0; i < 50; ++ i) {
+ boolean tmp_passed = test_dynamic_thread_async_sync_order(_vector, true);
+
+ _passed = _passed && tmp_passed;
+ }
+ }
+ catch(Throwable throwable) {
+ System.err.println(throwable);
+ throwable.printStackTrace();
}
}
+ };
- __threads.addElement(object);
- threads.addElement(object);
- if(!started) {
- started = true;
- senderThread.start();
+ Stress4 stress4 = new Stress4(vector);
+ stress4.start();
+
+
+ class Stress5 extends Thread {
+ Vector _vector;
+ boolean _passed = true;
+
+ Stress5(Vector vector) {
+ _vector = vector;
}
- Thread.sleep((int)(Math.random() * 1000));
- }
- while(Math.random() > 0.05);
+ public void run() {
+ try {
+ for(int i = 0; i < 50; ++ i) {
+ boolean tmp_passed = test_static_thread_async_sync_order(_vector, true);
+ _passed = _passed && tmp_passed;
+ }
+ }
+ catch(Throwable throwable) {
+ System.err.println(throwable);
+ throwable.printStackTrace();
+ }
+ }
+ };
- System.err.println("waiting for SenderThread to die...");
- senderThread._quit = true;
- senderThread.join();
- System.err.println("disposing ThreadPool for id " + disposeId + " with " + blockers + " blocked ...");
- ThreadPool.dispose(disposeId);
+ Stress5 stress5 = new Stress5(vector);
+ stress5.start();
- boolean passed = true;
- // wait for all threads
- System.err.println("joining all threads ...");
- Enumeration elements = threads.elements();
- while(elements.hasMoreElements()) {
- Object object = elements.nextElement();
- if(object instanceof Worker_with_Thread) {
- Worker_with_Thread thread = (Worker_with_Thread)object;
- thread.join();
+ class Stress6 extends Thread {
+ Vector _vector;
+ boolean _passed = true;
- passed = passed && thread._myImpl._passed;
+ Stress6(Vector vector) {
+ _vector = vector;
}
- else {
- RemoteObject remoteObject = (RemoteObject)object;
- synchronized(remoteObject) {
- while(!remoteObject._finished && !remoteObject._myImpl._block && remoteObject._myImpl._send_requestId > 0)
- remoteObject.wait(100);
+
+ public void run() {
+ for(int i = 0; i < 500; ++ i) {
+// Thread.sleep(500);
+ try {
+ boolean tmp_passed = test_dispose(_vector, true);
+
+ _passed = _passed && tmp_passed;
+ }
+ catch(Throwable throwable) {
+ System.err.println(throwable);
+ throwable.printStackTrace();
+
+ _passed = false;
+ _vector.addElement("Stress6 - exception:" + throwable);
+ }
}
}
- }
+ };
+
+
+ Stress6 stress6 = new Stress6(vector);
+ stress6.start();
- System.err.println("test_with_thread - passed? " + passed);
+
+
+
+ stress1.join();
+ stress2.join();
+ stress3.join();
+ stress4.join();
+ stress5.join();
+
+ if(!stress1._passed)
+ vector.addElement("Stress1 not passed");
+
+ if(!stress2._passed)
+ vector.addElement("Stress2 not passed");
+
+ if(!stress3._passed)
+ vector.addElement("Stress3 not passed");
+
+ if(!stress4._passed)
+ vector.addElement("Stress4 not passed");
+
+ if(!stress5._passed)
+ vector.addElement("Stress5 not passed");
+
+ if(!stress6._passed)
+ vector.addElement("Stress6 not passed");
+
+ passed = passed && stress1._passed;
+ passed = passed && stress2._passed;
+ passed = passed && stress3._passed;
+ passed = passed && stress4._passed;
+ passed = passed && stress5._passed;
+ passed = passed && stress6._passed;
+
+ System.err.println("\t\tpassed? " + passed);
return passed;
}
- static public boolean test(Vector notpassed) throws Exception {
+ static public boolean test(Vector vector) throws Throwable {
+ __iThreadPool = ThreadPoolFactory.createThreadPool();
+
+ System.err.println("\tThreadPool test:");
+
boolean passed = true;
+ boolean tmp_passed = false;
- passed = passed && test_with_thread();
- if(!passed && notpassed != null)
- notpassed.addElement("ThreadPool_Test - test_with_thread passed?" + passed);
+ tmp_passed = test_dispose(vector, false);
+ passed = passed && tmp_passed;
+ tmp_passed = test_thread_async(vector, false);
+ passed = passed && tmp_passed;
+
+ tmp_passed = test_dynamic_thread_sync(vector, false);
+ passed = passed && tmp_passed;
+
+ tmp_passed = test_static_thread_sync(vector, false);
+ passed = passed && tmp_passed;
+
+ tmp_passed = test_dynamic_thread_async_sync_order(vector, false);
+ passed = passed && tmp_passed;
+
+ tmp_passed = test_static_thread_async_sync_order(vector, false);
+ passed = passed && tmp_passed;
+
+ tmp_passed = test_stress(vector);
+ passed = passed && tmp_passed;
+
+ System.err.println("\tpassed? " + passed);
return passed;
}
- static public void main(String args[]) throws Exception {
- test(null);
+ static public void main(String args[]) throws Throwable {
+ Object object = com.sun.star.comp.helper.RegistryServiceFactory.create("/usr/local2/kr/udkapi.rdb");
+
+ Vector vector = new Vector();
+
+ test(vector);
+
+ for(int i = 0; i < vector.size(); ++ i)
+ System.err.println("---:" + vector.elementAt(i));
}
}
+
+
+
diff --git a/jurt/test/com/sun/star/lib/uno/environments/remote/makefile.mk b/jurt/test/com/sun/star/lib/uno/environments/remote/makefile.mk
index 2da83725b1de..aa709973b93c 100644
--- a/jurt/test/com/sun/star/lib/uno/environments/remote/makefile.mk
+++ b/jurt/test/com/sun/star/lib/uno/environments/remote/makefile.mk
@@ -2,9 +2,9 @@
#
# $RCSfile: makefile.mk,v $
#
-# $Revision: 1.2 $
+# $Revision: 1.3 $
#
-# last change: $Author: kr $ $Date: 2001-01-17 10:06:35 $
+# last change: $Author: kr $ $Date: 2001-05-17 12:55:06 $
#
# The Contents of this file are made available subject to the terms of
# either of the following licenses
@@ -77,6 +77,8 @@ JAVACLASSFILES= \
$(CLASSDIR)$/$(PACKAGE)$/JobQueue_Test.class \
$(CLASSDIR)$/$(PACKAGE)$/MyMessage.class \
$(CLASSDIR)$/$(PACKAGE)$/MyReceiver.class \
+ $(CLASSDIR)$/$(PACKAGE)$/IWorkAt.class \
+ $(CLASSDIR)$/$(PACKAGE)$/WorkAt.class \
$(CLASSDIR)$/$(PACKAGE)$/ThreadPool_Test.class
# --- Targets ------------------------------------------------------