001 package org.cocome.tradingsystem.testdriver; 002 003 import java.io.Serializable; 004 import java.util.concurrent.TimeoutException; 005 006 import javax.jms.JMSException; 007 import javax.jms.Message; 008 import javax.jms.MessageListener; 009 import javax.jms.ObjectMessage; 010 import javax.jms.TopicSubscriber; 011 012 import org.cocome.tradingsystem.systests.interfaces.IUpdateReceiver; 013 014 /** 015 * This is the base for all classes implementing the IUpdateReceiver. The idea 016 * is to listen on a specified channel and keep only messages of certain types. 017 * 018 * @author Benjamin Hummel 019 * @author $Author: hummel $ 020 * @version $Rev: 63 $ 021 * @levd.rating GREEN Rev: 63 022 */ 023 public class UpdateReceiver implements IUpdateReceiver, MessageListener { 024 025 /** The last message (of those we are interested in) received. */ 026 private Serializable lastRelevantMessage = null; 027 028 /** 029 * A flag indicating if this is a new message or if this has been read 030 * before. 031 */ 032 private boolean isNewMessage = false; 033 034 /** The array of message types we are interested in. */ 035 private final Class<?>[] relevantMessages; 036 037 /** 038 * Creates a new update receiver. 039 * 040 * @param subscriber 041 * the subscriber to get the messages from. 042 * @param relevantMessageClasses 043 * the classes of messages we are interested in. 044 */ 045 public UpdateReceiver(TopicSubscriber subscriber, 046 Class<?>... relevantMessageClasses) throws JMSException { 047 subscriber.setMessageListener(this); 048 this.relevantMessages = relevantMessageClasses; 049 } 050 051 /** {@inheritDoc} */ 052 public synchronized void waitForUpdate(int maxMilliseconds) 053 throws TimeoutException { 054 055 try { 056 // multiplying by 10 makes the tests slightly slower, however I have 057 // a chance of getting them work on my system (even with so many 058 // processes running). 059 this.wait(10 * maxMilliseconds); 060 } catch (InterruptedException e) { 061 // should not happen 062 } 063 064 if (!isNewMessage) { 065 throw new TimeoutException(); 066 } 067 } 068 069 /** 070 * Signals that the next expected message should be of the given type. This 071 * checks if such a message is available, and waits again for a short time 072 * to get the chance of receiving the missing message. This is mostly used 073 * to circumvent situations where two messages are sent at the (nearly) same 074 * time. 075 * 076 * @param messageType 077 * the type of the message expected. 078 */ 079 protected void expectMessage(Class<? extends Serializable> messageType) { 080 if (lastRelevantMessage == null) { 081 return; 082 } 083 084 // This could lead to an infinite loop, if there are too many messages. 085 // However in the cash desk (where this is used) the 086 // number of messages is small. 087 while (!messageType.isAssignableFrom(lastRelevantMessage.getClass())) { 088 // throw away old message 089 isNewMessage = false; 090 try { 091 waitForUpdate(100); 092 } catch (TimeoutException e) { 093 // there are no more messages, so stop now 094 return; 095 } 096 } 097 } 098 099 /** Returns the last relevant message received. */ 100 protected synchronized Serializable getLastRelevantMessage() { 101 isNewMessage = false; 102 return lastRelevantMessage; 103 } 104 105 /** {@inheritDoc} */ 106 public synchronized void onMessage(Message message) { 107 108 if (!(message instanceof ObjectMessage)) { 109 return; 110 } 111 112 ObjectMessage objMessage = (ObjectMessage) message; 113 Serializable eventObject; 114 try { 115 eventObject = objMessage.getObject(); 116 } catch (JMSException e) { 117 // nothing much we can do here 118 return; 119 } 120 121 for (Class<?> relevantMessage : relevantMessages) { 122 if (relevantMessage.isAssignableFrom(eventObject.getClass())) { 123 isNewMessage = true; 124 lastRelevantMessage = eventObject; 125 this.notify(); 126 return; 127 } 128 } 129 } 130 }