public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
clearMessagesInProgress();
clearDispatchList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
boolean expired = message.isExpired();
if (!expired) {
listener.onMessage(message);
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// schedual redelivery and possible dlq processing
md.setRollbackCause(e);
rollback();
} else {
// Transacted or Client ack: Deliver the
// next message.
afterMessageIsConsumed(md, false);
}
}
} else {
if (!unconsumedMessages.isRunning()) {
// delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage());
}
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
}
} else {
if (!session.isTransacted()) {
LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
+ " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
session.sendAck(ack);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
}
boolean needsPoisonAck = false;
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
} else {
// delivery while pending redelivery to another consumer on the same connection
// not waiting for redelivery will help here
needsPoisonAck = true;
}
}
if (needsPoisonAck) {
MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
+ session.getConnection().getConnectionInfo().getConnectionId()));
LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
+ " consumer on this connection, failoverRedeliveryWaitPeriod="
+ failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
session.sendAck(poisonAck);
} else {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}
}
}
if (++dispatchedCount % 1000 == 0) {
dispatchedCount = 0;
Thread.yield();
}
} catch (Exception e) {
session.connection.onClientInternalException(e);
}
}
1,进行出队操作截图
2,出队关键类SimplePriorityMessageDispatchChannel
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
private static final Integer MAX_PRIORITY = 10;
private final Object mutex = new Object();
private final LinkedList[] lists;
private boolean closed;
private boolean running;
private int size = 0;
public SimplePriorityMessageDispatchChannel() {
this.lists = new LinkedList[MAX_PRIORITY];
for (int i = 0; i ();
}
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
* .command.MessageDispatch)
*/
public void enqueue(MessageDispatch message) {
synchronized (mutex) {
getList(message).addLast(message);
this.size++;
mutex.notify();
}
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
* .command.MessageDispatch)
*/
public void enqueueFirst(MessageDispatch message) {
synchronized (mutex) {
getList(message).addFirst(message);
this.size++;
mutex.notify();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
*/
public MessageDispatch dequeue(long timeout) throws InterruptedException {
synchronized (mutex) {
// Wait until the consumer is ready to deliver messages.
while (timeout != 0 && !closed && (isEmpty() || !running)) {
if (timeout == -1) {
mutex.wait();
} else {
mutex.wait(timeout);
break;
}
}
if (closed || !running || isEmpty()) {
return null;
}
return removeFirst();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
*/
public MessageDispatch dequeueNoWait() {
synchronized (mutex) {
if (closed || !running || isEmpty()) {
return null;
}
return removeFirst();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#start()
*/
public void start() {
synchronized (mutex) {
running = true;
mutex.notifyAll();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#stop()
*/
public void stop() {
synchronized (mutex) {
running = false;
mutex.notifyAll();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#close()
*/
public void close() {
synchronized (mutex) {
if (!closed) {
running = false;
closed = true;
}
mutex.notifyAll();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#clear()
*/
public void clear() {
synchronized (mutex) {
for (int i = 0; i 0) {
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
LinkedList list = lists[i];
if (!list.isEmpty()) {
this.size--;
return list.removeFirst();
}
}
}
return null;
}
}
3,消费者处理类以及核心属性和方法
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
protected final ActiveMQSession session;
protected final ConsumerInfo info;
// These are the messages waiting to be delivered to the client
protected final MessageDispatchChannel unconsumedMessages;
// The are the messages that were delivered to the consumer but that have
// not been acknowledged. It's kept in reverse order since we
// Always walk list in reverse order.
private final LinkedList deliveredMessages = new LinkedList();
// track duplicate deliveries in a transaction such that the tx integrity can be validated
private PreviouslyDeliveredMap previouslyDeliveredMessages;
private int deliveredCounter;
private int additionalWindowSize;
private long redeliveryDelay;
private int ackCounter;
private int dispatchedCount;
private final AtomicReference messageListener = new AtomicReference();
private final JMSConsumerStatsImpl stats;
private final String selector;
private boolean synchronizationRegistered;
private final AtomicBoolean started = new AtomicBoolean(false);
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
private boolean optimizeAcknowledge;
private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
private ExecutorService executorService;
private MessageTransformer transformer;
private boolean clearDispatchList;
AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
private MessageAck pendingAck;
private long lastDeliveredSequenceId;
private IOException failureError;
private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAcknowledgeTimeOut = 0;
private long optimizedAckScheduledAckInterval = 0;
private Runnable optimizedAckTask;
private long failoverRedeliveryWaitPeriod = 0;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync, MessageListener messageListener) throws JMSException {
if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations");
} else if (dest.getPhysicalName() == null) {
throw new InvalidDestinationException("The destination object was not given a physical name.");
} else if (dest.isTemporary()) {
String physicalName = dest.getPhysicalName();
if (physicalName == null) {
throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
}
String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) options = IntrospectionSupport.extractProperties(
new HashMap(dest.getOptions()), "consumer.");
IntrospectionSupport.setProperties(this.info, options);
if (options.size() > 0) {
String msg = "There are " + options.size()
+ " consumer options that couldn't be set on the consumer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This consumer cannot be started.";
LOG.warn(msg);
throw new ConfigurationException(msg);
}
}
this.info.setDestination(dest);
this.info.setBrowser(browser);
if (selector != null && selector.trim().length() != 0) {
// Validate the selector
SelectorParser.parse(selector);
this.info.setSelector(selector);
this.selector = selector;
} else if (info.getSelector() != null) {
// Validate the selector
SelectorParser.parse(this.info.getSelector());
this.selector = this.info.getSelector();
} else {
this.selector = null;
}
this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
&& !info.isBrowser();
if (this.optimizeAcknowledge) {
this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
}
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
if (messageListener != null) {
setMessageListener(messageListener);
}
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
} catch (JMSException e) {
this.session.removeConsumer(this);
throw e;
}
if (session.connection.isStarted()) {
start();
}
}
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
if (wasRunning) {
session.stop();
}
this.messageListener.set(listener);
session.redispatch(this, unconsumedMessages);
if (wasRunning) {
session.start();
}
} else {
this.messageListener.set(null);
}
}
private MessageDispatch dequeue(long timeout) throws JMSException {
try {
long deadline = 0;
if (timeout > 0) {
deadline = System.currentTimeMillis() + timeout;
}
while (true) {
MessageDispatch md = unconsumedMessages.dequeue(timeout);
if (md == null) {
if (timeout > 0 && !unconsumedMessages.isClosed()) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
} else {
if (failureError != null) {
throw JMSExceptionSupport.create(failureError);
} else {
return null;
}
}
} else if (md.getMessage() == null) {
return null;
} else if (md.getMessage().isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " received expired message: " + md);
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, true);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(getConsumerId() + " received message: " + md);
}
return md;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e);
}
}
/**
* Receives the next message produced for this message consumer.
*
* This call blocks indefinitely until a message is produced or until this
* message consumer is closed.
*
* If this receive
is done within a transaction, the consumer
* retains the message until the transaction commits.
*
* @return the next message produced for this message consumer, or null if
* this message consumer is concurrently closed
*/
public Message receive() throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(0);
MessageDispatch md = dequeue(-1);
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return createActiveMQMessage(md);
}
/**
* @param md
* @return
*/
private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
}
if (transformer != null) {
Message transformedMessage = transformer.consumerTransform(session, this, m);
if (transformedMessage != null) {
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
}
}
if (session.isClientAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
session.acknowledge();
}
});
} else if (session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
acknowledge(md);
}
});
}
return m;
}
/**
* Receives the next message that arrives within the specified timeout
* interval.
*
* This call blocks until a message arrives, the timeout expires, or this
* message consumer is closed. A timeout
of zero never
* expires, and the call blocks indefinitely.
*
* @param timeout the timeout value (in milliseconds), a time out of zero
* never expires.
* @return the next message produced for this message consumer, or null if
* the timeout expires or this message consumer is concurrently
* closed
*/
public Message receive(long timeout) throws JMSException {
checkClosed();
checkMessageListener();
if (timeout == 0) {
return this.receive();
}
sendPullCommand(timeout);
while (timeout > 0) {
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we timeout.
} else {
md = dequeue(timeout);
}
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return createActiveMQMessage(md);
}
return null;
}
/**
* Receives the next message if one is immediately available.
*
* @return the next message produced for this message consumer, or null if
* one is not available
* @throws JMSException if the JMS provider fails to receive the next
* message due to some internal error.
*/
public Message receiveNoWait() throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(-1);
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we
// timeout.
} else {
md = dequeue(0);
}
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return createActiveMQMessage(md);
}
public void dispose() throws JMSException {
if (!unconsumedMessages.isClosed()) {
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now.
if (!session.getTransacted()) {
deliverAcks();
if (isAutoAcknowledgeBatch()) {
acknowledge();
}
}
if (executorService != null) {
ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
executorService = null;
}
if (optimizedAckTask != null) {
this.session.connection.getScheduler().cancel(optimizedAckTask);
optimizedAckTask = null;
}
if (session.isClientAcknowledge()) {
if (!this.info.isBrowser()) {
// rollback duplicates that aren't acknowledged
List tmp = null;
synchronized (this.deliveredMessages) {
tmp = new ArrayList(this.deliveredMessages);
}
for (MessageDispatch old : tmp) {
this.session.connection.rollbackDuplicate(this, old.getMessage());
}
tmp.clear();
}
}
if (!session.isTransacted()) {
synchronized(deliveredMessages) {
deliveredMessages.clear();
}
}
unconsumedMessages.close();
this.session.removeConsumer(this);
List list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
}
}
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
clearMessagesInProgress();
clearDispatchList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
boolean expired = message.isExpired();
if (!expired) {
listener.onMessage(message);
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// schedual redelivery and possible dlq processing
md.setRollbackCause(e);
rollback();
} else {
// Transacted or Client ack: Deliver the
// next message.
afterMessageIsConsumed(md, false);
}
}
} else {
if (!unconsumedMessages.isRunning()) {
// delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage());
}
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
}
} else {
if (!session.isTransacted()) {
LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
+ " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
session.sendAck(ack);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
}
boolean needsPoisonAck = false;
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
} else {
// delivery while pending redelivery to another consumer on the same connection
// not waiting for redelivery will help here
needsPoisonAck = true;
}
}
if (needsPoisonAck) {
MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
+ session.getConnection().getConnectionInfo().getConnectionId()));
LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
+ " consumer on this connection, failoverRedeliveryWaitPeriod="
+ failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
session.sendAck(poisonAck);
} else {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}
}
}
if (++dispatchedCount % 1000 == 0) {
dispatchedCount = 0;
Thread.yield();
}
} catch (Exception e) {
session.connection.onClientInternalException(e);
}
}
}
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
protected final ActiveMQSession session;
protected final ConsumerInfo info;
// These are the messages waiting to be delivered to the client
protected final MessageDispatchChannel unconsumedMessages;
// The are the messages that were delivered to the consumer but that have
// not been acknowledged. It's kept in reverse order since we
// Always walk list in reverse order.
private final LinkedList deliveredMessages = new LinkedList();
// track duplicate deliveries in a transaction such that the tx integrity can be validated
private PreviouslyDeliveredMap previouslyDeliveredMessages;
private int deliveredCounter;
private int additionalWindowSize;
private long redeliveryDelay;
private int ackCounter;
private int dispatchedCount;
private final AtomicReference messageListener = new AtomicReference();
private final JMSConsumerStatsImpl stats;
private final String selector;
private boolean synchronizationRegistered;
private final AtomicBoolean started = new AtomicBoolean(false);
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
private boolean optimizeAcknowledge;
private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
private ExecutorService executorService;
private MessageTransformer transformer;
private boolean clearDispatchList;
AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
private MessageAck pendingAck;
private long lastDeliveredSequenceId;
private IOException failureError;
private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAcknowledgeTimeOut = 0;
private long optimizedAckScheduledAckInterval = 0;
private Runnable optimizedAckTask;
private long failoverRedeliveryWaitPeriod = 0;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync, MessageListener messageListener) throws JMSException {
if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations");
} else if (dest.getPhysicalName() == null) {
throw new InvalidDestinationException("The destination object was not given a physical name.");
} else if (dest.isTemporary()) {
String physicalName = dest.getPhysicalName();
if (physicalName == null) {
throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
}
String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) options = IntrospectionSupport.extractProperties(
new HashMap(dest.getOptions()), "consumer.");
IntrospectionSupport.setProperties(this.info, options);
if (options.size() > 0) {
String msg = "There are " + options.size()
+ " consumer options that couldn't be set on the consumer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This consumer cannot be started.";
LOG.warn(msg);
throw new ConfigurationException(msg);
}
}
this.info.setDestination(dest);
this.info.setBrowser(browser);
if (selector != null && selector.trim().length() != 0) {
// Validate the selector
SelectorParser.parse(selector);
this.info.setSelector(selector);
this.selector = selector;
} else if (info.getSelector() != null) {
// Validate the selector
SelectorParser.parse(this.info.getSelector());
this.selector = this.info.getSelector();
} else {
this.selector = null;
}
this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
&& !info.isBrowser();
if (this.optimizeAcknowledge) {
this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
}
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
if (messageListener != null) {
setMessageListener(messageListener);
}
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
} catch (JMSException e) {
this.session.removeConsumer(this);
throw e;
}
if (session.connection.isStarted()) {
start();
}
}
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
if (wasRunning) {
session.stop();
}
this.messageListener.set(listener);
session.redispatch(this, unconsumedMessages);
if (wasRunning) {
session.start();
}
} else {
this.messageListener.set(null);
}
}
private MessageDispatch dequeue(long timeout) throws JMSException {
try {
long deadline = 0;
if (timeout > 0) {
deadline = System.currentTimeMillis() + timeout;
}
while (true) {
MessageDispatch md = unconsumedMessages.dequeue(timeout);
if (md == null) {
if (timeout > 0 && !unconsumedMessages.isClosed()) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
} else {
if (failureError != null) {
throw JMSExceptionSupport.create(failureError);
} else {
return null;
}
}
} else if (md.getMessage() == null) {
return null;
} else if (md.getMessage().isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " received expired message: " + md);
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, true);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(getConsumerId() + " received message: " + md);
}
return md;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e);
}
}
/**
* Receives the next message produced for this message consumer.
*
* This call blocks indefinitely until a message is produced or until this
* message consumer is closed.
*
* If this receive
is done within a transaction, the consumer
* retains the message until the transaction commits.
*
* @return the next message produced for this message consumer, or null if
* this message consumer is concurrently closed
*/
public Message receive() throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(0);
MessageDispatch md = dequeue(-1);
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return createActiveMQMessage(md);
}
/**
* @param md
* @return
*/
private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
}
if (transformer != null) {
Message transformedMessage = transformer.consumerTransform(session, this, m);
if (transformedMessage != null) {
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
}
}
if (session.isClientAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
session.acknowledge();
}
});
} else if (session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
acknowledge(md);
}
});
}
return m;
}
/**
* Receives the next message that arrives within the specified timeout
* interval.
*
* This call blocks until a message arrives, the timeout expires, or this
* message consumer is closed. A timeout
of zero never
* expires, and the call blocks indefinitely.
*
* @param timeout the timeout value (in milliseconds), a time out of zero
* never expires.
* @return the next message produced for this message consumer, or null if
* the timeout expires or this message consumer is concurrently
* closed
*/
public Message receive(long timeout) throws JMSException {
checkClosed();
checkMessageListener();
if (timeout == 0) {
return this.receive();
}
sendPullCommand(timeout);
while (timeout > 0) {
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we timeout.
} else {
md = dequeue(timeout);
}
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return createActiveMQMessage(md);
}
return null;
}
/**
* Receives the next message if one is immediately available.
*
* @return the next message produced for this message consumer, or null if
* one is not available
* @throws JMSException if the JMS provider fails to receive the next
* message due to some internal error.
*/
public Message receiveNoWait() throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(-1);
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we
// timeout.
} else {
md = dequeue(0);
}
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return createActiveMQMessage(md);
}
public void dispose() throws JMSException {
if (!unconsumedMessages.isClosed()) {
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now.
if (!session.getTransacted()) {
deliverAcks();
if (isAutoAcknowledgeBatch()) {
acknowledge();
}
}
if (executorService != null) {
ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
executorService = null;
}
if (optimizedAckTask != null) {
this.session.connection.getScheduler().cancel(optimizedAckTask);
optimizedAckTask = null;
}
if (session.isClientAcknowledge()) {
if (!this.info.isBrowser()) {
// rollback duplicates that aren't acknowledged
List tmp = null;
synchronized (this.deliveredMessages) {
tmp = new ArrayList(this.deliveredMessages);
}
for (MessageDispatch old : tmp) {
this.session.connection.rollbackDuplicate(this, old.getMessage());
}
tmp.clear();
}
}
if (!session.isTransacted()) {
synchronized(deliveredMessages) {
deliveredMessages.clear();
}
}
unconsumedMessages.close();
this.session.removeConsumer(this);
List list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
}
}
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
clearMessagesInProgress();
clearDispatchList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
boolean expired = message.isExpired();
if (!expired) {
listener.onMessage(message);
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// schedual redelivery and possible dlq processing
md.setRollbackCause(e);
rollback();
} else {
// Transacted or Client ack: Deliver the
// next message.
afterMessageIsConsumed(md, false);
}
}
} else {
if (!unconsumedMessages.isRunning()) {
// delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage());
}
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
}
} else {
if (!session.isTransacted()) {
LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
+ " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
session.sendAck(ack);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
}
boolean needsPoisonAck = false;
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
} else {
// delivery while pending redelivery to another consumer on the same connection
// not waiting for redelivery will help here
needsPoisonAck = true;
}
}
if (needsPoisonAck) {
MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
+ session.getConnection().getConnectionInfo().getConnectionId()));
LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
+ " consumer on this connection, failoverRedeliveryWaitPeriod="
+ failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
session.sendAck(poisonAck);
} else {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}
}
}
if (++dispatchedCount % 1000 == 0) {
dispatchedCount = 0;
Thread.yield();
}
} catch (Exception e) {
session.connection.onClientInternalException(e);
}
}
}
5,通过以下代码知道,当消费者批量获取消息为0时,则直接在wait上等待唤醒,否则等指定的timeout的时间
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we timeout.
} else {
md = dequeue(timeout);
}
6,消息进入消费者队列方法调用
7,上图的方法调用类
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
/**
* reads packets from a Socket
*/
public void run() {
LOG.trace("TCP consumer thread for " + this + " starting");
this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
doRun();
}
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
} catch (Throwable e){
stoppedLatch.get().countDown();
IOException ioe=new IOException("Unexpected error occured: " + e);
ioe.initCause(e);
onException(ioe);
}finally {
stoppedLatch.get().countDown();
}
}
protected void doRun() throws IOException {
try {
Object command = readCommand();
doConsume(command);
} catch (SocketTimeoutException e) {
} catch (InterruptedIOException e) {
}
}
/**
* Process the inbound command
*/
public void doConsume(Object command) {
if (command != null) {
if (transportListener != null) {
transportListener.onCommand(command);
} else {
LOG.error("No transportListener available to process inbound command: " + command);
}
}
}
}
public abstract class AbstractInactivityMonitor extends TransportFilter {
public void onCommand(Object command) {
commandReceived.set(true);
inReceive.set(true);
try {
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
sendLock.readLock().lock();
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
} finally {
sendLock.readLock().unlock();
}
}
} else {
if (command.getClass() == WireFormatInfo.class) {
synchronized (this) {
try {
processInboundWireFormatInfo((WireFormatInfo) command);
} catch (IOException e) {
onException(e);
}
}
}
transportListener.onCommand(command);
}
} finally {
inReceive.set(false);
}
}
}
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
public void onCommand(final Object o) {
final Command command = (Command)o;
if (!closed.get() && command != null) {
try {
command.visit(new CommandVisitorAdapter() {
@Override
public Response processMessageDispatch(MessageDispatch md) throws Exception {
waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
md.setMessage(msg);
}
dispatcher.dispatch(md);
}
return null;
}
@Override
public Response processProducerAck(ProducerAck pa) throws Exception {
if (pa != null && pa.getProducerId() != null) {
ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
if (producer != null) {
producer.onProducerAck(pa);
}
}
return null;
}
@Override
public Response processBrokerInfo(BrokerInfo info) throws Exception {
brokerInfo = info;
brokerInfoReceived.countDown();
optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
return null;
}
@Override
public Response processConnectionError(final ConnectionError error) throws Exception {
executor.execute(new Runnable() {
public void run() {
onAsyncException(error.getException());
}
});
return null;
}
@Override
public Response processControlCommand(ControlCommand command) throws Exception {
onControlCommand(command);
return null;
}
@Override
public Response processConnectionControl(ConnectionControl control) throws Exception {
onConnectionControl((ConnectionControl)command);
return null;
}
@Override
public Response processConsumerControl(ConsumerControl control) throws Exception {
onConsumerControl((ConsumerControl)command);
return null;
}
@Override
public Response processWireFormat(WireFormatInfo info) throws Exception {
onWireFormatInfo((WireFormatInfo)command);
return null;
}
});
} catch (Exception e) {
onClientInternalException(e);
}
}
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onCommand(command);
}
}
}
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
public void dispatch(MessageDispatch messageDispatch) {
try {
executor.execute(messageDispatch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
connection.onClientInternalException(e);
}
}
}
public class ActiveMQSessionExecutor implements Task {
void execute(MessageDispatch message) throws InterruptedException {
if (!startedOrWarnedThatNotStarted) {
ActiveMQConnection connection = session.connection;
long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
if (connection.isStarted() || aboutUnstartedConnectionTimeout aboutUnstartedConnectionTimeout) {
LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
+ " Received: " + message);
startedOrWarnedThatNotStarted = true;
}
}
}
if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
dispatch(message);
} else {
messageQueue.enqueue(message);
wakeup();
}
}
public void wakeup() {
if (!dispatchedBySessionPool) {
if (session.isSessionAsyncDispatch()) {
try {
TaskRunner taskRunner = this.taskRunner;
if (taskRunner == null) {
synchronized (this) {
if (this.taskRunner == null) {
if (!isRunning()) {
// stop has been called
return;
}
this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: " + session.getSessionId());
}
taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
while (iterate()) {
}
}
}
}
}
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
public void enqueue(MessageDispatch message) {
synchronized (mutex) {
getList(message).addLast(message);
this.size++;
mutex.notify();
}
}
}
8,任务分发截图
9,任务分ActiveMQMessageConsumer发代码
public Message receive(long timeout) throws JMSException {
checkClosed();
checkMessageListener();
if (timeout == 0) {
return this.receive();
}
sendPullCommand(timeout);
while (timeout > 0) {
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we timeout.
} else {
md = dequeue(timeout);
}
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);//取出成功及消费
return createActiveMQMessage(md);//返回给业务方法
}
return null;
}
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
// AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid session state.");
}
}
}
private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
}
if (transformer != null) {
Message transformedMessage = transformer.consumerTransform(session, this, m);
if (transformedMessage != null) {
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
}
}
if (session.isClientAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
session.acknowledge();
}
});
} else if (session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
acknowledge(md);
}
});
}
return m;
}
备注:
receive通过Object的带时间参数wait方法来实现,多久没有消息就返回null
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net