diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..51f1a20
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,16 @@
+# Package Files #
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..943f9d0
--- /dev/null
+++ b/README.md
@@ -0,0 +1,91 @@
+ActiveMQ Store MongoDB project
+### Introduce
+ This project for creating an ActiveMQ Store by MongoDB.
+### Architecture
+ 1. modify activemq.xsd in activemq-core.jar, insert mongodb node after kahaDB node:
+ 2. add mongodb in META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core in activemq-core.jar:
+ mongodb = org.qsoft.activemq.store.mongodb.MongodbPersistenceAdapter
+ 3. configure your mongodb server in your activemq.xml:
+ 4. package this project to a jar to reference in your project
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..126c89c
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,180 @@
+ 4.0.0
+ org.apache.activemq
+ activemq-parent
+ 5.7.0
+ activemq-store-mongodb
+ bundle
+ ActiveMQ :: Store :: Mongodb
+ A persistence adapter with mongodb.
+ org.mongodb
+ mongo-java-driver
+ 2.10.1
+ org.slf4j
+ slf4j-api
+ org.apache.geronimo.specs
+ geronimo-jms_1.1_spec
+ ${project.groupId}
+ activemq-core
+ false
+ org.apache.geronimo.specs
+ geronimo-j2ee-management_1.1_spec
+ org.apache.geronimo.specs
+ geronimo-annotation_1.0_spec
+ true
+ commons-pool
+ commons-pool
+ true
+ commons-dbcp
+ commons-dbcp
+ 1.4
+ mysql
+ mysql-connector-java
+ 5.1.15
+ com.thoughtworks.xstream
+ xstream
+ true
+ org.apache.xbean
+ xbean-spring
+ true
+ org.springframework
+ spring-context
+ org.apache.derby
+ derby
+ true
+ xalan
+ xalan
+ true
+ commons-net
+ commons-net
+ junit
+ junit
+ test
+ org.hamcrest
+ hamcrest-all
+ test
+ org.slf4j
+ slf4j-log4j12
+ test
+ log4j
+ log4j
+ test
+ ${project.groupId}
+ activeio-core
+ false
+ test-jar
+ org.springframework
+ spring-jms
+ test
+ org.springframework
+ spring-test
+ test
+ commons-io
+ commons-io
+ test
+ commons-collections
+ commons-collections
+ test
+ commons-primitives
+ commons-primitives
+ test
+ axion
+ axion
+ test
+ regexp
+ regexp
+ test
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java b/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java
new file mode 100644
index 0000000..212be39
--- /dev/null
+++ b/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java
@@ -0,0 +1,209 @@
+package org.qsoft.activemq.store.mongodb;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.MongoException;
+public class MongoDBHelper {
+ protected static final String MSGS = "ACTIVEMQ_MSGS";
+ protected static final String ACKS = "ACTIVEMQ_ACKS";
+ protected static final String LOCK = "ACTIVEMQ_LOCK";
+ protected static final String MSG_COLUMN = "MSG";
+ protected static final String DEST_COLUMN = "CONTAINER";
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDBHelper.class);
+ Mongo mongo;
+ DB db;
+ WireFormat wireFormat;
+ public MongoDBHelper(String host, int port, String dbName, WireFormat wireFormat) {
+ LOG.info("Connect to MongoDB[" + host + ":" + port + ":" + dbName + "]");
+ try {
+ mongo = new Mongo(host, port);
+ db = mongo.getDB(dbName);
+ } catch (UnknownHostException e) {
+ LOG.error("error host.", e);
+ throw new RuntimeException(e);
+ } catch (MongoException e) {
+ LOG.error("MongoException.", e);
+ throw new RuntimeException(e);
+ }
+ this.wireFormat = wireFormat;
+ }
+ public DBCollection getMsgsCollection() {
+ return this.db.getCollection(MSGS);
+ }
+ public DBCollection getAcksCollection() {
+ return this.db.getCollection(ACKS);
+ }
+ public DBCollection getLockCollection() {
+ return this.db.getCollection(LOCK);
+ }
+ public Boolean addMessage(Message message) throws IOException {
+ BasicDBObject bo = new BasicDBObject();
+ MessageId messageId = message.getMessageId();
+ // Serialize the Message..
+ byte data[];
+ try {
+ ByteSequence packet = wireFormat.marshal(message);
+ data = ByteSequenceData.toByteArray(packet);
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
+ }
+ bo.append("ID", messageId.getBrokerSequenceId());
+ bo.append(DEST_COLUMN, message.getDestination().getQualifiedName());
+ bo.append("MSGID_PROD", messageId.getProducerId().toString());
+ bo.append("MSGID_SEQ", messageId.getProducerSequenceId());
+ bo.append("EXPIRATION", message.getExpiration());
+ bo.append(MSG_COLUMN, data);
+ bo.append("PRIORITY", message.getPriority());
+ getMsgsCollection().save(bo);
+ return true;
+ }
+ public static void main(String[] args) {
+ }
+ public void close() {
+ this.mongo.close();
+ }
+ public Message getMessage(MessageId messageId) throws IOException {
+ BasicDBObject bo = new BasicDBObject();
+ bo.append("MSGID_PROD", messageId.getProducerId().toString());
+ bo.append("MSGID_SEQ", messageId.getProducerSequenceId());
+ DBObject o = getMsgsCollection().findOne(bo);
+ if (o == null)
+ return null;
+ byte[] data = (byte[]) o.get(MSG_COLUMN);
+ if (data == null)
+ return null;
+ Message answer = null;
+ try {
+ answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
+ }
+ return answer;
+ }
+ public int count() {
+ return (int) getMsgsCollection().count();
+ }
+ public synchronized void removeMessage(ActiveMQDestination destination, MessageAck ack) {
+ MessageId messageId = ack.getLastMessageId();
+ ack.getDestination();
+ BasicDBObject bo = new BasicDBObject();
+ bo.append(DEST_COLUMN, destination.getQualifiedName());
+ bo.append("MSGID_PROD", messageId.getProducerId().toString());
+ bo.append("MSGID_SEQ", messageId.getProducerSequenceId());
+ DBObject o = getMsgsCollection().findOne(bo);
+ if(o != null){
+ Object sequenceId = o.get("ID");
+ getMsgsCollection().remove(new BasicDBObject("ID",sequenceId));
+ //getMsgsCollection().remove(new BasicDBObject("ID",new BasicDBObject("$lte",sequenceId)));
+ }else{
+ LOG.error(bo.toString() + " is not found.");
+ }
+ }
+ public synchronized void removeAllMessages() {
+ getMsgsCollection().drop();
+ getAcksCollection().drop();
+ getLockCollection().drop();
+ }
+ public Message findOne() throws IOException {
+ DBObject o = getMsgsCollection().findOne();
+ if (o == null)
+ return null;
+ byte[] data = (byte[]) o.get(MSG_COLUMN);
+ if (data == null)
+ return null;
+ Message answer = null;
+ try {
+ answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message in container: " + e, e);
+ }
+ return answer;
+ }
+ public List find(int limit, String container, long sequenceId) throws IOException {
+ List msgs = new ArrayList(limit);
+ BasicDBObject bo = new BasicDBObject();
+ bo.append(DEST_COLUMN, container);
+ bo.append("ID", new BasicDBObject("$gt",sequenceId));
+ DBCursor c = getMsgsCollection().find(bo).sort(new BasicDBObject("ID",1)).limit(limit);
+ while (c.hasNext()) {
+ DBObject o = c.next();
+ if (o == null)
+ return null;
+ byte[] data = (byte[]) o.get(MSG_COLUMN);
+ if (data == null)
+ return null;
+ Message answer = null;
+ try {
+ answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ msgs.add(answer);
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message in container: " + e, e);
+ }
+ }
+ return trim(msgs);
+ }
+ private List trim(List msgs2) {
+ if(msgs2 == null || msgs2.size() == 0)
+ return null;
+ Message m = msgs2.get(msgs2.size()-1);
+ while(m==null && msgs2.size() > 0){
+ m = msgs2.remove(msgs2.size()-1);
+ }
+ return msgs2;
+ }
+ @SuppressWarnings("unchecked")
+ public List findDestinations() {
+ List dists = getMsgsCollection().distinct(DEST_COLUMN);
+ return dists;
+ }
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java
new file mode 100644
index 0000000..7a08a25
--- /dev/null
+++ b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java
@@ -0,0 +1,102 @@
+package org.qsoft.activemq.store.mongodb;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class MongodbMessageStore extends AbstractMessageStore {
+ protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
+ protected final WireFormat wireFormat;
+ protected final MongoDBHelper helper;
+ private static final Logger LOG = LoggerFactory.getLogger(MongodbMessageStore.class);
+ public MongodbMessageStore(ActiveMQDestination destination, WireFormat wireFormat, MongoDBHelper helper) {
+ super(destination);
+ this.wireFormat = wireFormat;
+ this.helper = helper;
+ }
+ @Override
+ public void addMessage(ConnectionContext context, Message message) throws IOException {
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.addMessage: " + message);
+ this.helper.addMessage(message);
+ }
+ @Override
+ public Message getMessage(MessageId identity) throws IOException {
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.getMessage:{0}", identity);
+ return this.helper.getMessage(identity);
+ }
+ @Override
+ public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.removeMessage: " + context + "," + ack);
+ this.helper.removeMessage(this.getDestination(), ack);
+ }
+ @Override
+ public void removeAllMessages(ConnectionContext context) throws IOException {
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.removeAllMessages");
+ this.helper.removeAllMessages();
+ }
+ @Override
+ public void recover(MessageRecoveryListener container) throws Exception {
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.recover: " + container);
+ // TODO ? what is this
+ }
+ @Override
+ public int getMessageCount() throws IOException {
+ LOG.debug("MongodbMessageStore.getMessageCount");
+ return this.helper.count();
+ }
+ @Override
+ public void resetBatching() {
+ LOG.debug("MongodbMessageStore.resetBatching");
+ }
+ @Override
+ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.recoverNextMessages: " + maxReturned + " from " + lastRecoveredSequenceId.get());
+ //long start = System.currentTimeMillis();
+ List msgs = this.helper.find(maxReturned, this.getDestination().getQualifiedName(), lastRecoveredSequenceId.get());
+ //long end1 = System.currentTimeMillis();
+ if(msgs != null) {
+ for (Message message : msgs) {
+ listener.recoverMessage(message);
+ lastRecoveredSequenceId.set(message.getMessageId().getBrokerSequenceId());
+ }
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.recoverNextMessages: " + msgs.size() + " ~ " + this.getDestination().getQualifiedName());
+ }
+ else{
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbMessageStore.recoverNextMessages: NONE ~ " + this.getDestination().getQualifiedName());
+ }
+// long end = System.currentTimeMillis();
+// System.out.println((end1-start)+" " + (end-end1));
+ }
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java
new file mode 100644
index 0000000..93de7c4
--- /dev/null
+++ b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java
@@ -0,0 +1,199 @@
+package org.qsoft.activemq.store.mongodb;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class MongodbPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
+ private static final Logger LOG = LoggerFactory.getLogger(MongodbPersistenceAdapter.class);
+ ConcurrentHashMap topicStores = new ConcurrentHashMap();
+ ConcurrentHashMap queueStores = new ConcurrentHashMap();
+ protected MongoDBHelper helper;
+ private WireFormat wireFormat = new OpenWireFormat();
+ private String host;
+ private int port;
+ private String db;
+ // private BrokerService brokerService;
+ public String getHost() {
+ return host;
+ }
+ public void setHost(String host) {
+ this.host = host;
+ }
+ public int getPort() {
+ return port;
+ }
+ public void setPort(int port) {
+ this.port = port;
+ }
+ public String getDb() {
+ return db;
+ }
+ public void setDb(String db) {
+ this.db = db;
+ }
+ @Override
+ public void start() throws Exception {
+ helper = new MongoDBHelper(host, port, db, wireFormat);
+ }
+ @Override
+ public void stop() throws Exception {
+ if (this.helper != null)
+ this.helper.close();
+ }
+ @Override
+ public void deleteAllMessages() throws IOException {
+ this.helper.removeAllMessages();
+ }
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ // this.brokerService = brokerService;
+ }
+ @Override
+ public Set getDestinations() {
+ Set set = new HashSet();
+ List destinations = this.helper.findDestinations();
+ for(String dest : destinations){
+ set.add(ActiveMQDestination.createDestination(dest, ActiveMQDestination.QUEUE_TYPE));
+ }
+ return set;
+ }
+ @Override
+ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+ if(LOG.isDebugEnabled())
+ LOG.debug("Create QueueMessageStore for destination:[" + destination.getQualifiedName() + "]");
+ MongodbMessageStore store = new MongodbMessageStore(destination, wireFormat, helper);
+ this.queueStores.put(destination, store);
+ return store;
+ }
+ @Override
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+ if(LOG.isDebugEnabled())
+ LOG.debug("Create TopicMessageStore for destination:[" + destination.getQualifiedName() + "]");
+ MongodbTopicMessageStore store = new MongodbTopicMessageStore(destination, wireFormat, helper);
+ this.topicStores.put(destination, store);
+ return store;
+ }
+ @Override
+ public void removeQueueMessageStore(ActiveMQQueue destination) {
+ if(LOG.isDebugEnabled())
+ LOG.debug("Remove QueueMessageStore for destination:[" + destination.getQualifiedName() + "]");
+ this.queueStores.remove(destination);
+ }
+ @Override
+ public void removeTopicMessageStore(ActiveMQTopic destination) {
+ if(LOG.isDebugEnabled())
+ LOG.debug("Remove TopicMessageStore for destination:[" + destination.getQualifiedName() + "]");
+ this.topicStores.remove(destination);
+ }
+ @Override
+ public long size() {
+ // ignore
+ return 0;
+ }
+ @Override
+ public void setUsageManager(SystemUsage usageManager) {
+ // ignore
+ }
+ @Override
+ public void setBrokerName(String brokerName) {
+ // ignore
+ }
+ @Override
+ public void setDirectory(File dir) {
+ // ignore
+ }
+ @Override
+ public void checkpoint(boolean sync) throws IOException {
+ // TODO not supported
+ }
+ @Override
+ public long getLastMessageBrokerSequenceId() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ @Override
+ public long getLastProducerSequenceId(ProducerId id) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ public String toString() {
+ return "MongodbPersistenceAdapter(" + host + ":" + port + "/" + db + ")";
+ }
+ // // =========== not supported for tx ==========
+ @Override
+ public TransactionStore createTransactionStore() throws IOException {
+ // TODO not supported
+ return new MongodbTransactionStore();
+ }
+ @Override
+ public void beginTransaction(ConnectionContext context) throws IOException {
+ // TODO not supported
+ }
+ @Override
+ public void commitTransaction(ConnectionContext context) throws IOException {
+ // TODO not supported
+ }
+ @Override
+ public void rollbackTransaction(ConnectionContext context) throws IOException {
+ // TODO not supported
+ }
+ @Override
+ public File getDirectory() {
+ return null;
+ }
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java
new file mode 100644
index 0000000..3be53aa
--- /dev/null
+++ b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java
@@ -0,0 +1,90 @@
+package org.qsoft.activemq.store.mongodb;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class MongodbTopicMessageStore extends MongodbMessageStore implements
+ TopicMessageStore {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(MongodbTopicMessageStore.class);
+ public MongodbTopicMessageStore(ActiveMQDestination destination,
+ WireFormat wireFormat, MongoDBHelper helper) {
+ super(destination, wireFormat, helper);
+ }
+ @Override
+ public void acknowledge(ConnectionContext context, String clientId,
+ String subscriptionName, MessageId messageId, MessageAck ack)
+ throws IOException {
+ if(LOG.isDebugEnabled())
+ LOG.debug("MongodbTopicMessageStore.acknowledge: " + clientId + ","
+ + subscriptionName + "," + messageId);
+ }
+ @Override
+ public void deleteSubscription(String clientId, String subscriptionName)
+ throws IOException {
+ LOG.debug("MongodbTopicMessageStore.deleteSubscription");
+ }
+ @Override
+ public void recoverSubscription(String clientId, String subscriptionName,
+ MessageRecoveryListener listener) throws Exception {
+ LOG.debug("MongodbTopicMessageStore.recoverSubscription");
+ }
+ @Override
+ public void recoverNextMessages(String clientId, String subscriptionName,
+ int maxReturned, MessageRecoveryListener listener) throws Exception {
+ LOG.debug("MongodbTopicMessageStore.recoverNextMessages");
+ }
+ @Override
+ public void resetBatching(String clientId, String subscriptionName) {
+ LOG.debug("MongodbTopicMessageStore.resetBatching");
+ }
+ @Override
+ public int getMessageCount(String clientId, String subscriberName)
+ throws IOException {
+ LOG.debug("MongodbTopicMessageStore.getMessageCount");
+ return 0;
+ }
+ @Override
+ public SubscriptionInfo lookupSubscription(String clientId,
+ String subscriptionName) throws IOException {
+ LOG.debug("MongodbTopicMessageStore.lookupSubscription");
+ return null;
+ }
+ @Override
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ LOG.debug("MongodbTopicMessageStore.getAllSubscriptions");
+ return new SubscriptionInfo[0];
+ }
+ @Override
+ public void addSubsciption(SubscriptionInfo subscriptionInfo,
+ boolean retroactive) throws IOException {
+ LOG.debug("MongodbTopicMessageStore.addSubsciption");
+ }
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java
new file mode 100644
index 0000000..f9ac02a
--- /dev/null
+++ b/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java
@@ -0,0 +1,54 @@
+package org.qsoft.activemq.store.mongodb;
+import java.io.IOException;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class MongodbTransactionStore implements TransactionStore {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(MongodbMessageStore.class);
+ @Override
+ public void start() throws Exception {
+ LOG.debug("MongodbTransactionStore.start");
+ }
+ @Override
+ public void stop() throws Exception {
+ LOG.debug("MongodbTransactionStore.stop");
+ }
+ @Override
+ public void prepare(TransactionId txid) throws IOException {
+ LOG.debug("MongodbTransactionStore.prepare");
+ }
+ @Override
+ public void commit(TransactionId txid, boolean wasPrepared,
+ Runnable preCommit, Runnable postCommit) throws IOException {
+ LOG.debug("MongodbTransactionStore.commit");
+ }
+ @Override
+ public void rollback(TransactionId txid) throws IOException {
+ LOG.debug("MongodbTransactionStore.rollback");
+ }
+ @Override
+ public void recover(TransactionRecoveryListener listener)
+ throws IOException {
+ LOG.debug("MongodbTransactionStore.recover");
+ }
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/broker/broker b/src/main/resources/META-INF/services/org/apache/activemq/broker/broker
new file mode 100644
index 0000000..47b64ca
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/broker/broker
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/broker/properties b/src/main/resources/META-INF/services/org/apache/activemq/broker/properties
new file mode 100644
index 0000000..6b7516b
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/broker/properties
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/broker/xbean b/src/main/resources/META-INF/services/org/apache/activemq/broker/xbean
new file mode 100644
index 0000000..baa4b1e
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/broker/xbean
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
new file mode 100644
index 0000000..59008c8
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/axion_jdbc_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/axion_jdbc_driver
new file mode 100644
index 0000000..ebd44ae
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/axion_jdbc_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/hsql_database_engine_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/hsql_database_engine_driver
new file mode 100644
index 0000000..5d84ff8
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/hsql_database_engine_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/i-net_sprinta_2000 b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/i-net_sprinta_2000
new file mode 100644
index 0000000..18ae53d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/i-net_sprinta_2000
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/ibm_db2_jdbc_universal_driver_architecture b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/ibm_db2_jdbc_universal_driver_architecture
new file mode 100644
index 0000000..7c3b0d6
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/ibm_db2_jdbc_universal_driver_architecture
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/ibm_informix_jdbc_driver_for_ibm_informix_dynamic_server b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/ibm_informix_jdbc_driver_for_ibm_informix_dynamic_server
new file mode 100644
index 0000000..14f612a
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/ibm_informix_jdbc_driver_for_ibm_informix_dynamic_server
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/jconnect__tm__for_jdbc__tm_ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/jconnect__tm__for_jdbc__tm_
new file mode 100644
index 0000000..72a2f45
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/jconnect__tm__for_jdbc__tm_
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/jtds_type_4_jdbc_driver_for_ms_sql_server_and_sybase b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/jtds_type_4_jdbc_driver_for_ms_sql_server_and_sybase
new file mode 100644
index 0000000..18ae53d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/jtds_type_4_jdbc_driver_for_ms_sql_server_and_sybase
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_2005_jdbc_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_2005_jdbc_driver
new file mode 100644
index 0000000..8eb77e5
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_2005_jdbc_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_jdbc_driver_2_0 b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_jdbc_driver_2_0
new file mode 100644
index 0000000..8eb77e5
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_jdbc_driver_2_0
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_jdbc_driver_3_0 b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_jdbc_driver_3_0
new file mode 100644
index 0000000..8eb77e5
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/lock/microsoft_sql_server_jdbc_driver_3_0
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_2005_jdbc_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_2005_jdbc_driver
new file mode 100644
index 0000000..18ae53d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_2005_jdbc_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_jdbc_driver_2_0 b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_jdbc_driver_2_0
new file mode 100644
index 0000000..18ae53d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_jdbc_driver_2_0
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_jdbc_driver_3_0 b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_jdbc_driver_3_0
new file mode 100644
index 0000000..18ae53d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/microsoft_sql_server_jdbc_driver_3_0
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/mysql-ab_jdbc_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/mysql-ab_jdbc_driver
new file mode 100644
index 0000000..54ddcb4
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/mysql-ab_jdbc_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/oracle_jdbc_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/oracle_jdbc_driver
new file mode 100644
index 0000000..0ab67f6
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/oracle_jdbc_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/postgresql_native_driver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/postgresql_native_driver
new file mode 100644
index 0000000..d9b4438
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/postgresql_native_driver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/sap_db b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/sap_db
new file mode 100644
index 0000000..120a4ac
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/sap_db
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/sqlserver b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/sqlserver
new file mode 100644
index 0000000..18ae53d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/sqlserver
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/discovery b/src/main/resources/META-INF/services/org/apache/activemq/transport/discovery
new file mode 100644
index 0000000..929c938
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/discovery
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/multicast b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/multicast
new file mode 100644
index 0000000..966830f
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/multicast
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/rendezvous b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/rendezvous
new file mode 100644
index 0000000..1c0358d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/rendezvous
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/simple b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/simple
new file mode 100644
index 0000000..9436f04
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/simple
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/static b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/static
new file mode 100644
index 0000000..9436f04
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/static
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/failover b/src/main/resources/META-INF/services/org/apache/activemq/transport/failover
new file mode 100644
index 0000000..81c2151
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/failover
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/fanout b/src/main/resources/META-INF/services/org/apache/activemq/transport/fanout
new file mode 100644
index 0000000..62d5dd6
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/fanout
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-byte b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-byte
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-byte
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-json b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-json
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-json
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-xml b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-xml
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-map-xml
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-json b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-json
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-json
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-xml b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-xml
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-object-xml
@@ -0,0 +1,17 @@
diff --git a/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
new file mode 100644
index 0000000..e7fb9ad
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+# NOTE: this file is autogenerated by Apache XBean
+# beans
+abortSlowConsumerStrategy = org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy
+amqPersistenceAdapter = org.apache.activemq.store.amq.AMQPersistenceAdapter
+amqPersistenceAdapter.indexPageSize.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+amqPersistenceAdapter.maxCheckpointMessageAddSize.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+amqPersistenceAdapter.maxFileLength.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+amqPersistenceAdapter.maxReferenceFileLength.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+amqPersistenceAdapterFactory = org.apache.activemq.store.amq.AMQPersistenceAdapterFactory
+authenticationUser = org.apache.activemq.security.AuthenticationUser
+org.apache.activemq.security.AuthenticationUser(java.lang.String,java.lang.String,java.lang.String).parameterNames = username password groups
+authorizationEntry = org.apache.activemq.security.AuthorizationEntry
+authorizationMap = org.apache.activemq.security.DefaultAuthorizationMap
+org.apache.activemq.security.DefaultAuthorizationMap(java.util.List).parameterNames = authorizationEntries
+authorizationPlugin = org.apache.activemq.security.AuthorizationPlugin
+org.apache.activemq.security.AuthorizationPlugin(org.apache.activemq.security.AuthorizationMap).parameterNames = map
+axionJDBCAdapter = org.apache.activemq.store.jdbc.adapter.AxionJDBCAdapter
+blobJDBCAdapter = org.apache.activemq.store.jdbc.adapter.BlobJDBCAdapter
+broker = org.apache.activemq.xbean.XBeanBrokerService
+broker.initMethod = afterPropertiesSet
+broker.destroyMethod = destroy
+brokerService = org.apache.activemq.broker.BrokerService
+bytesJDBCAdapter = org.apache.activemq.store.jdbc.adapter.BytesJDBCAdapter
+commandAgent = org.apache.activemq.broker.util.CommandAgent
+commandAgent.initMethod = start
+commandAgent.destroyMethod = stop
+compositeDemandForwardingBridge = org.apache.activemq.network.CompositeDemandForwardingBridge
+org.apache.activemq.network.CompositeDemandForwardingBridge(org.apache.activemq.network.NetworkBridgeConfiguration,org.apache.activemq.transport.Transport,org.apache.activemq.transport.Transport).parameterNames = configuration localBroker remoteBroker
+compositeQueue = org.apache.activemq.broker.region.virtual.CompositeQueue
+compositeTopic = org.apache.activemq.broker.region.virtual.CompositeTopic
+connectionDotFilePlugin = org.apache.activemq.broker.view.ConnectionDotFilePlugin
+connectionFactory = org.apache.activemq.spring.ActiveMQConnectionFactory
+connectionFactory.initMethod = afterPropertiesSet
+constantPendingMessageLimitStrategy = org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy
+database-locker = org.apache.activemq.store.jdbc.DefaultDatabaseLocker
+org.apache.activemq.store.jdbc.DefaultDatabaseLocker(org.apache.activemq.store.jdbc.JDBCPersistenceAdapter).parameterNames = persistenceAdapter
+db2JDBCAdapter = org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter
+defaultIOExceptionHandler = org.apache.activemq.util.DefaultIOExceptionHandler
+defaultJDBCAdapter = org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter
+defaultUsageCapacity = org.apache.activemq.usage.DefaultUsageCapacity
+demandForwardingBridge = org.apache.activemq.network.DemandForwardingBridge
+org.apache.activemq.network.DemandForwardingBridge(org.apache.activemq.network.NetworkBridgeConfiguration,org.apache.activemq.transport.Transport,org.apache.activemq.transport.Transport).parameterNames = configuration localBroker remoteBroker
+destinationDotFilePlugin = org.apache.activemq.broker.view.DestinationDotFilePlugin
+destinationEntry = org.apache.activemq.filter.DefaultDestinationMapEntry
+discardingDLQBrokerPlugin = org.apache.activemq.plugin.DiscardingDLQBrokerPlugin
+fileCursor = org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy
+fileDurableSubscriberCursor = org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy
+fileQueueCursor = org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy
+filteredDestination = org.apache.activemq.broker.region.virtual.FilteredDestination
+fixedCountSubscriptionRecoveryPolicy = org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy
+fixedSizedSubscriptionRecoveryPolicy = org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy
+forcePersistencyModeBroker = org.apache.activemq.plugin.ForcePersistencyModeBroker
+org.apache.activemq.plugin.ForcePersistencyModeBroker(org.apache.activemq.broker.Broker).parameterNames = next
+forcePersistencyModeBrokerPlugin = org.apache.activemq.plugin.ForcePersistencyModeBrokerPlugin
+forwardingBridge = org.apache.activemq.network.ForwardingBridge
+org.apache.activemq.network.ForwardingBridge(org.apache.activemq.transport.Transport,org.apache.activemq.transport.Transport).parameterNames = localBroker remoteBroker
+hsqldb-jdbc-adapter = org.apache.activemq.store.jdbc.adapter.HsqldbJDBCAdapter
+imageBasedJDBCAdaptor = org.apache.activemq.store.jdbc.adapter.ImageBasedJDBCAdaptor
+inboundQueueBridge = org.apache.activemq.network.jms.InboundQueueBridge
+org.apache.activemq.network.jms.InboundQueueBridge(java.lang.String).parameterNames = inboundQueueName
+inboundTopicBridge = org.apache.activemq.network.jms.InboundTopicBridge
+org.apache.activemq.network.jms.InboundTopicBridge(java.lang.String).parameterNames = inboundTopicName
+individualDeadLetterStrategy = org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy
+informixJDBCAdapter = org.apache.activemq.store.jdbc.adapter.InformixJDBCAdapter
+jaasAuthenticationPlugin = org.apache.activemq.security.JaasAuthenticationPlugin
+jaasCertificateAuthenticationPlugin = org.apache.activemq.security.JaasCertificateAuthenticationPlugin
+jaasDualAuthenticationPlugin = org.apache.activemq.security.JaasDualAuthenticationPlugin
+jdbcPersistenceAdapter = org.apache.activemq.store.jdbc.JDBCPersistenceAdapter
+org.apache.activemq.store.jdbc.JDBCPersistenceAdapter(javax.sql.DataSource,org.apache.activemq.wireformat.WireFormat).parameterNames = ds wireFormat
+jmsQueueConnector = org.apache.activemq.network.jms.JmsQueueConnector
+jmsTopicConnector = org.apache.activemq.network.jms.JmsTopicConnector
+journalPersistenceAdapter = org.apache.activemq.store.journal.JournalPersistenceAdapter
+org.apache.activemq.store.journal.JournalPersistenceAdapter(org.apache.activeio.journal.Journal,org.apache.activemq.store.PersistenceAdapter,org.apache.activemq.thread.TaskRunnerFactory).parameterNames = journal longTermPersistence taskRunnerFactory
+journalPersistenceAdapterFactory = org.apache.activemq.store.journal.JournalPersistenceAdapterFactory
+journalPersistenceAdapterFactory.journalLogFileSize.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+journaledJDBC = org.apache.activemq.store.PersistenceAdapterFactoryBean
+journaledJDBC.journalLogFileSize.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+kahaDB = org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter
+kahaDB.indexCacheSize.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+kahaDB.indexWriteBatchSize.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+kahaDB.journalMaxFileLength.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+kahaDB.journalMaxWriteBatchSize.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+kahaPersistenceAdapter = org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter
+kahaPersistenceAdapter.maxDataFileLength.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter(java.util.concurrent.atomic.AtomicLong).parameterNames = size
+lDAPAuthorizationMap = org.apache.activemq.security.LDAPAuthorizationMap
+org.apache.activemq.security.LDAPAuthorizationMap(java.util.Map).parameterNames = options
+lastImageSubscriptionRecoveryPolicy = org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy
+ldapNetworkConnector = org.apache.activemq.network.LdapNetworkConnector
+ldapNetworkConnector.prefetchSize.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+loggingBrokerPlugin = org.apache.activemq.broker.util.LoggingBrokerPlugin
+loggingBrokerPlugin.initMethod = afterPropertiesSet
+managementContext = org.apache.activemq.broker.jmx.ManagementContext
+managementContext.connectorPort.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+managementContext.rmiServerPort.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+org.apache.activemq.broker.jmx.ManagementContext(javax.management.MBeanServer).parameterNames = server
+masterConnector = org.apache.activemq.broker.ft.MasterConnector
+org.apache.activemq.broker.ft.MasterConnector(java.lang.String).parameterNames = remoteUri
+maxdb-jdbc-adapter = org.apache.activemq.store.jdbc.adapter.MaxDBJDBCAdapter
+memoryPersistenceAdapter = org.apache.activemq.store.memory.MemoryPersistenceAdapter
+memoryUsage = org.apache.activemq.usage.MemoryUsage
+memoryUsage.limit.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+memoryUsage.percentUsageMinDelta.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+org.apache.activemq.usage.MemoryUsage(org.apache.activemq.usage.MemoryUsage).parameterNames = parent
+org.apache.activemq.usage.MemoryUsage(java.lang.String).parameterNames = name
+org.apache.activemq.usage.MemoryUsage(org.apache.activemq.usage.MemoryUsage,java.lang.String).parameterNames = parent name
+org.apache.activemq.usage.MemoryUsage(org.apache.activemq.usage.MemoryUsage,java.lang.String,float).parameterNames = parent name portion
+messageGroupHashBucketFactory = org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory
+mirroredQueue = org.apache.activemq.broker.region.virtual.MirroredQueue
+multicastNetworkConnector = org.apache.activemq.network.MulticastNetworkConnector
+multicastNetworkConnector.prefetchSize.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+org.apache.activemq.network.MulticastNetworkConnector(java.net.URI).parameterNames = remoteURI
+multicastTraceBrokerPlugin = org.apache.activemq.broker.util.MulticastTraceBrokerPlugin
+mysql-jdbc-adapter = org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter
+networkConnector = org.apache.activemq.network.DiscoveryNetworkConnector
+networkConnector.prefetchSize.propertyEditor = org.apache.activemq.util.MemoryIntPropertyEditor
+org.apache.activemq.network.DiscoveryNetworkConnector(java.net.URI).parameterNames = discoveryURI
+noSubscriptionRecoveryPolicy = org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy
+oldestMessageEvictionStrategy = org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy
+oldestMessageWithLowestPriorityEvictionStrategy = org.apache.activemq.broker.region.policy.OldestMessageWithLowestPriorityEvictionStrategy
+oracleJDBCAdapter = org.apache.activemq.store.jdbc.adapter.OracleJDBCAdapter
+outboundQueueBridge = org.apache.activemq.network.jms.OutboundQueueBridge
+org.apache.activemq.network.jms.OutboundQueueBridge(java.lang.String).parameterNames = outboundQueueName
+outboundTopicBridge = org.apache.activemq.network.jms.OutboundTopicBridge
+org.apache.activemq.network.jms.OutboundTopicBridge(java.lang.String).parameterNames = outboundTopicName
+pListStore = org.apache.activemq.store.kahadb.plist.PListStore
+policyEntry = org.apache.activemq.broker.region.policy.PolicyEntry
+policyEntry.memoryLimit.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+policyMap = org.apache.activemq.broker.region.policy.PolicyMap
+postgresql-jdbc-adapter = org.apache.activemq.store.jdbc.adapter.PostgresqlJDBCAdapter
+prefetchPolicy = org.apache.activemq.ActiveMQPrefetchPolicy
+prefetchRatePendingMessageLimitStrategy = org.apache.activemq.broker.region.policy.PrefetchRatePendingMessageLimitStrategy
+priorityNetworkDispatchPolicy = org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy
+proxyConnector = org.apache.activemq.proxy.ProxyConnector
+queryBasedSubscriptionRecoveryPolicy = org.apache.activemq.broker.region.policy.QueryBasedSubscriptionRecoveryPolicy
+queue = org.apache.activemq.command.ActiveMQQueue
+org.apache.activemq.command.ActiveMQQueue(java.lang.String).parameterNames = name
+queueDispatchSelector = org.apache.activemq.broker.region.QueueDispatchSelector
+org.apache.activemq.broker.region.QueueDispatchSelector(org.apache.activemq.command.ActiveMQDestination).parameterNames = destination
+redeliveryPolicy = org.apache.activemq.RedeliveryPolicy
+roundRobinDispatchPolicy = org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy
+sharedDeadLetterStrategy = org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy
+simpleAuthenticationPlugin = org.apache.activemq.security.SimpleAuthenticationPlugin
+org.apache.activemq.security.SimpleAuthenticationPlugin(java.util.List).parameterNames = users
+simpleAuthorizationMap = org.apache.activemq.security.SimpleAuthorizationMap
+org.apache.activemq.security.SimpleAuthorizationMap(org.apache.activemq.filter.DestinationMap,org.apache.activemq.filter.DestinationMap,org.apache.activemq.filter.DestinationMap).parameterNames = writeACLs readACLs adminACLs
+simpleDispatchPolicy = org.apache.activemq.broker.region.policy.SimpleDispatchPolicy
+simpleDispatchSelector = org.apache.activemq.broker.region.policy.SimpleDispatchSelector
+org.apache.activemq.broker.region.policy.SimpleDispatchSelector(org.apache.activemq.command.ActiveMQDestination).parameterNames = destination
+simpleJmsMessageConvertor = org.apache.activemq.network.jms.SimpleJmsMessageConvertor
+simpleMessageGroupMapFactory = org.apache.activemq.broker.region.group.SimpleMessageGroupMapFactory
+sslContext = org.apache.activemq.spring.SpringSslContext
+sslContext.initMethod = afterPropertiesSet
+statements = org.apache.activemq.store.jdbc.Statements
+statisticsBrokerPlugin = org.apache.activemq.plugin.StatisticsBrokerPlugin
+storeCursor = org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy
+storeDurableSubscriberCursor = org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy
+storeUsage = org.apache.activemq.usage.StoreUsage
+storeUsage.limit.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+storeUsage.percentUsageMinDelta.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+org.apache.activemq.usage.StoreUsage(java.lang.String,org.apache.activemq.store.PersistenceAdapter).parameterNames = name store
+org.apache.activemq.usage.StoreUsage(org.apache.activemq.usage.StoreUsage,java.lang.String).parameterNames = parent name
+streamJDBCAdapter = org.apache.activemq.store.jdbc.adapter.StreamJDBCAdapter
+strictOrderDispatchPolicy = org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy
+sybase-jdbc-adapter = org.apache.activemq.store.jdbc.adapter.SybaseJDBCAdapter
+systemUsage = org.apache.activemq.usage.SystemUsage
+org.apache.activemq.usage.SystemUsage(java.lang.String,org.apache.activemq.store.PersistenceAdapter,org.apache.activemq.store.kahadb.plist.PListStore).parameterNames = name adapter tempStore
+org.apache.activemq.usage.SystemUsage(org.apache.activemq.usage.SystemUsage,java.lang.String).parameterNames = parent name
+tempDestinationAuthorizationEntry = org.apache.activemq.security.TempDestinationAuthorizationEntry
+tempUsage = org.apache.activemq.usage.TempUsage
+tempUsage.limit.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+tempUsage.percentUsageMinDelta.propertyEditor = org.apache.activemq.util.MemoryPropertyEditor
+org.apache.activemq.usage.TempUsage(java.lang.String,org.apache.activemq.store.kahadb.plist.PListStore).parameterNames = name store
+org.apache.activemq.usage.TempUsage(org.apache.activemq.usage.TempUsage,java.lang.String).parameterNames = parent name
+timeStampingBrokerPlugin = org.apache.activemq.broker.util.TimeStampingBrokerPlugin
+timedSubscriptionRecoveryPolicy = org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy
+topic = org.apache.activemq.command.ActiveMQTopic
+org.apache.activemq.command.ActiveMQTopic(java.lang.String).parameterNames = name
+traceBrokerPathPlugin = org.apache.activemq.broker.util.TraceBrokerPathPlugin
+transact-database-locker = org.apache.activemq.store.jdbc.adapter.TransactDatabaseLocker
+org.apache.activemq.store.jdbc.adapter.TransactDatabaseLocker(org.apache.activemq.store.jdbc.JDBCPersistenceAdapter).parameterNames = persistenceAdapter
+transact-jdbc-adapter = org.apache.activemq.store.jdbc.adapter.TransactJDBCAdapter
+transportConnector = org.apache.activemq.broker.TransportConnector
+org.apache.activemq.broker.TransportConnector(org.apache.activemq.transport.TransportServer).parameterNames = server
+udpTraceBrokerPlugin = org.apache.activemq.broker.util.UDPTraceBrokerPlugin
+usageCapacity = org.apache.activemq.usage.UsageCapacity
+virtualDestinationInterceptor = org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor
+virtualTopic = org.apache.activemq.broker.region.virtual.VirtualTopic
+vmCursor = org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy
+vmDurableCursor = org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy
+vmQueueCursor = org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy
+xaConnectionFactory = org.apache.activemq.spring.ActiveMQXAConnectionFactory
+xaConnectionFactory.initMethod = afterPropertiesSet
+mongodb = org.qsoft.activemq.store.mongodb.MongodbPersistenceAdapter
+ ]]>
+ Mirrored
+Queues should be supported by default if they have not been
+explicitly configured.
+ ]]>
+ Virtual
+Topics should be supported by default if they have not been
+explicitly configured.
+ ]]>
+ Mirrored
+Queues should be supported by default if they have not been
+explicitly configured.
+ ]]>
+ Virtual
+Topics should be supported by default if they have not been
+explicitly configured.
+ ]]>
+ DOT file creator plugin which
+creates a DOT file showing the current connections
+ ]]>
+ Spring enhanced connection
+factory which will automatically use the Spring bean name as the clientIDPrefix property
+so that connections created have client IDs related to your Spring.xml file for
+easier comprehension from JMX.
+ ]]>
+ Note: access to this clientInternalExceptionListener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ Note: access to this exceptionLinstener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ prefetch
+policy for consumers created by this connection.
+ ]]>
+ connection
+URL used to connect to the ActiveMQ broker.
+ ]]>
+ Note: access to this clientInternalExceptionListener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ dispatched
+synchronously or asynchronously by the broker. For non-durable
+topics for example we typically dispatch synchronously by default to
+minimize context switches which boost performance. However sometimes its
+better to go slower to ensure that a single blocked consumer socket does
+not block delivery to other consumers.
+ ]]>
+ Note: access to this exceptionLinstener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ Message Groups
+ ]]>
+ Nested
+Structures of Map and List objects
+ ]]>
+ prefetch
+policy for consumers created by this connection.
+ ]]>
+ Async Sends which
+adds a massive performance boost; but means that the send() method will
+return immediately whether the message has been sent or not which could
+lead to message loss.
+ ]]>
+ start
+the connection so this option makes the default case to create a
+warning if the user forgets. To disable the warning just set the value to <
+0 (say -1).
+ ]]>
+ sub-classing is
+encouraged to override the default implementation of methods to account for differences in JDBC Driver
+implementations. The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations.
+The databases/JDBC drivers that use this adapter are:
+ ]]>
+file creator plugin which creates a DOT file showing the current topic & queue hierarchies.
+ ]]>
+ ]]>
+ container name field and subscription id field must be reduced to 150 characters.
+Therefore be sure not to use longer names for container name and subscription id than 150 characters.
+ ]]>
+ java.security.auth.login.config system property
+is not defined then it is set to the location of the login.config file on the classpath.
+ ]]>
+ java.security.auth.login.config system property
+is not defined then it is set to the location of the login.config file on the classpath.
+ ]]>
+ java.security.auth.login.config system property
+is not defined then it is set to the location of the login.config file on the classpath.
+ ]]>
+ Master Slave for High
+Availability of messages.
+ ]]>
+ Message
+Groups functionality.
+ ]]>
+ Mirrored
+Queue using a prefix and postfix to define the topic name on which to mirror the queue to.
+ ]]>
+ multicast://address:port
+ ]]>
+Subclassing is encouraged to override the default
+implementation of methods to account for differences
+in JDBC Driver implementations.
+The JDBCAdapter inserts and extracts BLOB data using the
+getBytes()/setBytes() operations.
+The databases/JDBC drivers that use this adapter are:
+ ]]>
+ Message Groups
+ ]]>
+ Message Groups
+ ]]>
+Subclassing is encouraged to override the default
+implementation of methods to account for differences
+in JDBC Driver implementations.
+The JDBCAdapter inserts and extracts BLOB data using the
+getBytes()/setBytes() operations.
+The databases/JDBC drivers that use this adapter are:
+ ]]>
+ Message Groups functionality.
+ ]]>
+ ]]>
+ Virtual Topics.
+ ]]>
+ Virtual
+Topics using a prefix and postfix. The virtual destination creates a
+wildcard that is then used to look up all active queue subscriptions which
+ ]]>
+ Spring enhanced XA connection
+factory which will automatically use the Spring bean name as the clientIDPrefix property
+so that connections created have client IDs related to your Spring.xml file for
+easier comprehension from JMX.
+ ]]>
+ Note: access to this clientInternalExceptionListener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ Note: access to this exceptionLinstener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ prefetch
+policy for consumers created by this connection.
+ ]]>
+ connection
+URL used to connect to the ActiveMQ broker.
+ ]]>
+ Note: access to this clientInternalExceptionListener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ dispatched
+synchronously or asynchronously by the broker. For non-durable
+topics for example we typically dispatch synchronously by default to
+minimize context switches which boost performance. However sometimes its
+better to go slower to ensure that a single blocked consumer socket does
+not block delivery to other consumers.
+ ]]>
+ Note: access to this exceptionLinstener will not be serialized if it is associated with more than
+on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ ]]>
+ Message Groups
+ ]]>
+ Nested
+Structures of Map and List objects
+ ]]>
+ prefetch
+policy for consumers created by this connection.
+ ]]>
+ Async Sends which
+adds a massive performance boost; but means that the send() method will
+return immediately whether the message has been sent or not which could
+lead to message loss.
+ ]]>
+ start
+the connection so this option makes the default case to create a
+warning if the user forgets. To disable the warning just set the value to <
+0 (say -1).
+ ]]>
+select id,container,msgid_prod,msgid_seq,expiration,msg,priority from activemq_msgs;
+select id,time,broker_name from activemq_lock;
+����brokerʵ������������ʱ������һ�� mysql���ݿ���û������ġ�
+log4j.rootLogger=INFO, stdout, logfile
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+# CONSOLE appender
+log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %m%n
+# Log File appender
+log4j.appender.logfile.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+# You can change logger levels here.
\ No newline at end of file
+2���������õ�activemq 5.x�汾
+activemq.xsd �����˶�mongodb�ڵ��schema����
+META-INF\services\org\apache\xbean\spring\http\activemq.apache.org\schema\core ������mongodb��ʵ����
+1\ ������databaselocker
+package kaha;
+import java.net.URI;
+import javax.jms.BytesMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+public class TestSenderKaha {
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ try {
+// XBeanBrokerFactory factory = new XBeanBrokerFactory();
+// BrokerService broker = factory.createBroker(new URI("activemq-mysql.xml"));
+// broker.setUseJmx(true);
+// broker.setStartAsync(false);
+// broker.start();
+ QueueConnectionFactory connfactory = new ActiveMQConnectionFactory("tcp://");
+ Queue queue = new ActiveMQQueue("kk.kaha");
+ ActiveMQConnection conn = (ActiveMQConnection) connfactory.createQueueConnection();
+ conn.setOptimizeAcknowledge(true);
+ conn.setUseAsyncSend(true);
+ QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSender sender = session.createSender(queue);
+ long start = System.currentTimeMillis();
+ int count = 1000000;
+ int size = 128;
+ for (int i = 0; i < count; i++) {
+ //String msgText = "testMessage-" + i;
+ BytesMessage msg = session.createBytesMessage();
+ //msg.writeBytes(createBytesMessage(size));
+ msg.writeBytes(bytes);
+ //if(i%2 == 1)msg.setIntProperty("score", 10);
+ sender.send(msg);
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("send " + count + " messages with " + size + " bytes in " + (end-start)/1000.0 + " s");
+ session.close();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ static byte[] bytes = createBytesMessage(1024);
+ private static byte[] createBytesMessage(int i) {
+ byte[] bs = new byte[i];
+ for (int j = 0; j < i; j++) {
+ bs[j] = 'A';
+ }
+ return bs;
+ }
+package kaha;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+public class TestServerKaha {
+ private static final String url = "tcp://localhost:61616";;
+ private static final String QUEUE_NAME = "kk.kaha";
+ //private static final String TOPIC_NAME = "mysql";
+ public static void main(String[] args) throws Exception {
+ XBeanBrokerFactory factory = new XBeanBrokerFactory();
+ BrokerService broker = factory.createBroker(new URI("activemq-kaha.xml"));
+ // BrokerService broker = new BrokerService();
+ // broker.setBrokerName("kk");
+ // broker.setPersistent(false);
+ // broker.setUseJmx(true);
+ broker.start();
+ while(true){
+ Thread.sleep(1000);
+ }
+// Connection connection = null;
+// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+// connection = connectionFactory.createConnection();
+// connection.start();
+// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// Destination queue = session.createQueue(QUEUE_NAME);
+// Destination topic = session.createTopic(TOPIC_NAME);
+// MessageProducer producer = session.createProducer(queue);
+// MessageConsumer receiver = session.createConsumer(queue);
+// MessageProducer producer1 = session.createProducer(topic);
+// MessageConsumer receiver1 = session.createConsumer(topic);
+// try {
+// BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
+// while (true) {
+// String line = reader.readLine();
+// if (line == null)
+// continue;
+// if (line.length() < 5) {
+// System.out.println(" error command: " + line);
+// continue;
+// }
+// if ("quit".equalsIgnoreCase(line)) {
+// System.out.println(line);
+// break;
+// }
+// String[] sline = line.split(" ");
+// String command = sline[0];
+// String content = "";
+// long ct = 10;
+// if (sline.length > 1)
+// content = sline[1];
+// if (sline.length > 2)
+// ct = Integer.parseInt(sline[2]);
+// if ("send".equalsIgnoreCase(command)) {
+// TextMessage message = session.createTextMessage(content);
+// producer.send(message);
+// System.out.println(" send message: " + message);
+// } else if ("recv".equalsIgnoreCase(command)) {
+// try {
+// Message message = receiver.receive(1000);
+// System.out.println(" receive message: " + message);
+// } catch (Exception e) {
+// System.out.println(" error: receive message ");
+// e.printStackTrace();
+// }
+// } else if ("sent".equalsIgnoreCase(command)) {
+// TextMessage message = session.createTextMessage(content);
+// message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+// for (int i = 0; i < ct; i++) {
+// producer1.send(message);
+// System.out.println(i + " send message: " + message);
+// }
+// } else if ("rect".equalsIgnoreCase(command)) {
+// try {
+// for (int i = 0; i < ct; i++) {
+// Message message = receiver1.receive();
+// System.out.println(i +" receive message: " + message);
+// }
+// } catch (Exception e) {
+// System.out.println(" error: receive message ");
+// e.printStackTrace();
+// }
+// } else {
+// System.out.println(" error command: " + line);
+// }
+// }
+// receiver.close();
+// producer.close();
+// receiver1.close();
+// producer1.close();
+// session.close();
+// } catch (Exception e) {
+// e.printStackTrace();
+// } finally {
+// connection.close();
+// }
+// broker.stop();
+ }
+package kk;
+import java.net.URI;
+import javax.jms.BytesMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+public class TestSenderMysql {
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ try {
+// XBeanBrokerFactory factory = new XBeanBrokerFactory();
+// BrokerService broker = factory.createBroker(new URI("activemq-mysql.xml"));
+// broker.setUseJmx(true);
+// broker.setStartAsync(false);
+// broker.start();
+ QueueConnectionFactory connfactory = new ActiveMQConnectionFactory("tcp://");
+ Queue queue = new ActiveMQQueue("kk.mysql");
+ ActiveMQConnection conn = (ActiveMQConnection) connfactory.createQueueConnection();
+ conn.setOptimizeAcknowledge(true);
+ conn.setUseAsyncSend(true);
+ QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSender sender = session.createSender(queue);
+ long start = System.currentTimeMillis();
+ int count = 10000;
+ int size = 1024;
+ for (int i = 0; i < count; i++) {
+ //String msgText = "testMessage-" + i;
+ BytesMessage msg = session.createBytesMessage();
+ msg.writeBytes(createBytesMessage(size));
+ //if(i%2 == 1)msg.setIntProperty("score", 10);
+ sender.send(msg);
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("send " + count + " messages with " + size + " bytes in " + (end-start)/1000.0 + " s");
+ session.close();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ private static byte[] createBytesMessage(int i) {
+ byte[] bs = new byte[i];
+ for (int j = 0; j < i; j++) {
+ bs[j] = 'A';
+ }
+ return bs;
+ }
+package kk;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+public class TestServerMysql {
+ private static final String url = "tcp://localhost:61616";;
+ private static final String QUEUE_NAME = "kk.mysql";
+ //private static final String TOPIC_NAME = "mysql";
+ public static void main(String[] args) throws Exception {
+ XBeanBrokerFactory factory = new XBeanBrokerFactory();
+ BrokerService broker = factory.createBroker(new URI("activemq-mysql.xml"));
+ // BrokerService broker = new BrokerService();
+ // broker.setBrokerName("kk");
+ // broker.setPersistent(false);
+ // broker.setUseJmx(true);
+ broker.start();
+ while(true){
+ Thread.sleep(1000);
+ }
+// Connection connection = null;
+// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+// connection = connectionFactory.createConnection();
+// connection.start();
+// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// Destination queue = session.createQueue(QUEUE_NAME);
+// Destination topic = session.createTopic(TOPIC_NAME);
+// MessageProducer producer = session.createProducer(queue);
+// MessageConsumer receiver = session.createConsumer(queue);
+// MessageProducer producer1 = session.createProducer(topic);
+// MessageConsumer receiver1 = session.createConsumer(topic);
+// try {
+// BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
+// while (true) {
+// String line = reader.readLine();
+// if (line == null)
+// continue;
+// if (line.length() < 5) {
+// System.out.println(" error command: " + line);
+// continue;
+// }
+// if ("quit".equalsIgnoreCase(line)) {
+// System.out.println(line);
+// break;
+// }
+// String[] sline = line.split(" ");
+// String command = sline[0];
+// String content = "";
+// long ct = 10;
+// if (sline.length > 1)
+// content = sline[1];
+// if (sline.length > 2)
+// ct = Integer.parseInt(sline[2]);
+// if ("send".equalsIgnoreCase(command)) {
+// TextMessage message = session.createTextMessage(content);
+// producer.send(message);
+// System.out.println(" send message: " + message);
+// } else if ("recv".equalsIgnoreCase(command)) {
+// try {
+// Message message = receiver.receive(1000);
+// System.out.println(" receive message: " + message);
+// } catch (Exception e) {
+// System.out.println(" error: receive message ");
+// e.printStackTrace();
+// }
+// } else if ("sent".equalsIgnoreCase(command)) {
+// TextMessage message = session.createTextMessage(content);
+// message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+// for (int i = 0; i < ct; i++) {
+// producer1.send(message);
+// System.out.println(i + " send message: " + message);
+// }
+// } else if ("rect".equalsIgnoreCase(command)) {
+// try {
+// for (int i = 0; i < ct; i++) {
+// Message message = receiver1.receive();
+// System.out.println(i +" receive message: " + message);
+// }
+// } catch (Exception e) {
+// System.out.println(" error: receive message ");
+// e.printStackTrace();
+// }
+// } else {
+// System.out.println(" error command: " + line);
+// }
+// }
+// receiver.close();
+// producer.close();
+// receiver1.close();
+// producer1.close();
+// session.close();
+// } catch (Exception e) {
+// e.printStackTrace();
+// } finally {
+// connection.close();
+// }
+// broker.stop();
+ }
+package org.qsoft.activemq.test;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+public class TestReceiver {
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ int a = 1;
+ if(a== 1)
+ listen();
+ else
+ receive();
+ }
+ private static void receive() {
+ try {
+ // init connection factory with activemq
+ QueueConnectionFactory factory = new ActiveMQConnectionFactory("tcp://");
+ // specify the destination
+ Queue queue = new ActiveMQQueue("kk.mongo");
+ // create connection,session,consumer and receive message
+ QueueConnection conn = factory.createQueueConnection();
+ conn.start();
+ QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueReceiver receiver = session.createReceiver(queue);//, "score=10");
+ int index = 0;
+ int count = 10000;
+ final long[] times = new long[3];
+ times[2] = times[0] = System.currentTimeMillis();
+ while (index++ < count) {
+ Message msg = receiver.receive(800);
+ if(msg == null) break;
+ //System.out.println("*********");
+ //System.out.println(msg.getIntProperty("score"));
+ //System.out.println(msg.getText());
+// if((index+1) % 100 == 0)
+// System.out.println((index+1)+ " - " + msg.getJMSMessageID());
+ int a = index;
+ if( a % 100 == 0)
+ {
+ times[1] = times[0];
+ times[0] = System.currentTimeMillis();
+ //System.out.println(times[0] - times[1]);
+ System.out.println((times[0] - times[1]) + " -> " +((a+1)*1000.0)/(times[0] - times[2]));
+ }
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("receive " + (index-1) + " messages in " + (end-times[2])/1000.0 + " s");
+ session.close();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ private static void listen() {
+ try {
+ // init connection factory with activemq
+ QueueConnectionFactory factory = new ActiveMQConnectionFactory("tcp://");
+ // specify the destination
+ Queue queue = new ActiveMQQueue("kk.mysql");
+ // create connection,session,consumer and receive message
+ ActiveMQConnection conn = (ActiveMQConnection) factory.createQueueConnection();
+ conn.setOptimizeAcknowledge(true);
+ conn.setOptimizeAcknowledgeTimeOut(4000);
+ conn.setOptimizedAckScheduledAckInterval(2000);
+ //conn.setSendAcksAsync(true);
+ conn.start();
+ final int count = 10000;
+ // first receiver on broker1
+ QueueSession sessionA1 = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueReceiver receiverA1 = sessionA1.createReceiver(queue);
+ final AtomicInteger a1 = new AtomicInteger(0);
+ final long[] times = new long[3];
+ times[2] = times[0] = System.currentTimeMillis();
+ MessageListener listenerA1 = new MessageListener(){
+ public void onMessage(Message message) {
+ int a = a1.getAndIncrement();
+ if( a % 100 == 0)
+ {
+ times[1] = times[0];
+ times[0] = System.currentTimeMillis();
+ //System.out.println(times[0] - times[1]);
+ System.out.println((times[0] - times[1]) + " -> " +((a+1)*1000.0)/(times[0] - times[2]));
+ }
+ if(a == count - 1){
+ System.out.println("onMessage " + count + " message for " + (System.currentTimeMillis()-times[2])/1000.0 + " s");
+ }
+// try {
+// System.out.println(aint1.incrementAndGet()+" => A1 receive from kk.mongo: " + ((TextMessage)message).getText());
+// } catch (JMSException e) {
+// e.printStackTrace();
+// }
+ }};
+ receiverA1.setMessageListener(listenerA1 );
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+package org.qsoft.activemq.test;
+import javax.jms.BytesMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+public class TestSender {
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ try {
+ // init connection factory with activemq
+ QueueConnectionFactory factory = new ActiveMQConnectionFactory("tcp://");
+ // specify the destination
+ Queue queue = new ActiveMQQueue("kk.mongo");
+ // create connection,session,producer and deliver message
+ ActiveMQConnection conn = (ActiveMQConnection) factory.createQueueConnection();
+ conn.setOptimizeAcknowledge(true);
+ conn.setUseAsyncSend(true);
+ //conn.setSendAcksAsync(true);
+ QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSender sender = session.createSender(queue);
+ long start = System.currentTimeMillis();
+ int count = 10000;
+ int size = 1024;
+ for (int i = 0; i < count; i++) {
+ //String msgText = "testMessage-" + i;
+ BytesMessage msg = session.createBytesMessage();
+ msg.writeBytes(createBytesMessage(size));
+ //if(i%2 == 1)msg.setIntProperty("score", 10);
+ sender.send(msg);
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("send " + count + " messages with " + size + " bytes in " + (end-start)/1000.0 + " s");
+ session.close();
+ conn.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ private static byte[] createBytesMessage(int i) {
+ byte[] bs = new byte[i];
+ for (int j = 0; j < i; j++) {
+ bs[j] = 'A';
+ }
+ return bs;
+ }
+package org.qsoft.activemq.test;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+public class TestServer {
+ private static final String url = "tcp://localhost:61616";;
+ private static final String QUEUE_NAME = "kk.mongo";
+ private static final String TOPIC_NAME = "kk.mongoTopic";
+ public static void main(String[] args) throws Exception {
+ XBeanBrokerFactory factory = new XBeanBrokerFactory();
+ BrokerService broker = factory.createBroker(new URI("activemq.xml"));
+ // BrokerService broker = new BrokerService();
+ // broker.setBrokerName("kk");
+ // broker.setPersistent(false);
+ // broker.setUseJmx(true);
+ broker.start();
+// Connection connection = null;
+// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+// connection = connectionFactory.createConnection();
+// connection.start();
+// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// Destination queue = session.createQueue(QUEUE_NAME);
+// Destination topic = session.createTopic(TOPIC_NAME);
+// MessageProducer producer = session.createProducer(queue);
+// MessageConsumer receiver = session.createConsumer(queue);
+// MessageProducer producer1 = session.createProducer(topic);
+// MessageConsumer receiver1 = session.createConsumer(topic);
+// try {
+// BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
+// while (true) {
+// String line = reader.readLine();
+// if (line == null)
+// continue;
+// if (line.length() < 5) {
+// System.out.println(" error command: " + line);
+// continue;
+// }
+// if ("quit".equalsIgnoreCase(line)) {
+// System.out.println(line);
+// break;
+// }
+// String[] sline = line.split(" ");
+// String command = sline[0];
+// String content = "";
+// long ct = 10;
+// if (sline.length > 1)
+// content = sline[1];
+// if (sline.length > 2)
+// ct = Integer.parseInt(sline[2]);
+// if ("send".equalsIgnoreCase(command)) {
+// TextMessage message = session.createTextMessage(content);
+// producer.send(message);
+// System.out.println(" send message: " + message);
+// } else if ("recv".equalsIgnoreCase(command)) {
+// try {
+// Message message = receiver.receive(1000);
+// System.out.println(" receive message: " + message);
+// } catch (Exception e) {
+// System.out.println(" error: receive message ");
+// e.printStackTrace();
+// }
+// } else if ("sent".equalsIgnoreCase(command)) {
+// TextMessage message = session.createTextMessage(content);
+// message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+// for (int i = 0; i < ct; i++) {
+// producer1.send(message);
+// System.out.println(i + " send message: " + message);
+// }
+// } else if ("rect".equalsIgnoreCase(command)) {
+// try {
+// for (int i = 0; i < ct; i++) {
+// Message message = receiver1.receive();
+// System.out.println(i +" receive message: " + message);
+// }
+// } catch (Exception e) {
+// System.out.println(" error: receive message ");
+// e.printStackTrace();
+// }
+// } else {
+// System.out.println(" error command: " + line);
+// }
+// }
+// receiver.close();
+// producer.close();
+// receiver1.close();
+// producer1.close();
+// session.close();
+// } catch (Exception e) {
+// e.printStackTrace();
+// } finally {
+// connection.close();
+// }
+// broker.stop();
+ }