From 9dde5776357e9a0a13cc0a5eaed6310a5ba10299 Mon Sep 17 00:00:00 2001 From: kimmking Date: Sat, 23 Dec 2017 22:31:48 +0800 Subject: [PATCH] initialize code --- .gitignore | 3 + pom.xml | 28 +-- .../activemq/store/mongodb/MongoDBHelper.java | 236 ++++++++++++++++++ .../store/mongodb/MongodbMessageStore.java | 38 +-- .../mongodb/MongodbPersistenceAdapter.java | 33 ++- .../mongodb/MongodbTopicMessageStore.java | 24 +- .../mongodb/MongodbTransactionStore.java | 2 +- .../activemq/store/mongodb/MongoDBHelper.java | 209 ---------------- .../http/activemq.apache.org/schema/core | 2 +- src/main/resources/activemq.xml | 12 +- src/main/resources/activemq.xsd | 14 ++ .../kimmking}/activemq/test/TestReceiver.java | 17 +- .../kimmking}/activemq/test/TestSender.java | 4 +- .../kimmking}/activemq/test/TestServer.java | 2 +- 14 files changed, 343 insertions(+), 281 deletions(-) create mode 100644 src/main/java/io/github/kimmking/activemq/store/mongodb/MongoDBHelper.java rename src/main/java/{org/qsoft => io/github/kimmking}/activemq/store/mongodb/MongodbMessageStore.java (70%) rename src/main/java/{org/qsoft => io/github/kimmking}/activemq/store/mongodb/MongodbPersistenceAdapter.java (88%) rename src/main/java/{org/qsoft => io/github/kimmking}/activemq/store/mongodb/MongodbTopicMessageStore.java (84%) rename src/main/java/{org/qsoft => io/github/kimmking}/activemq/store/mongodb/MongodbTransactionStore.java (95%) delete mode 100644 src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java rename src/test/java/{org/qsoft => io/github/kimmking}/activemq/test/TestReceiver.java (92%) rename src/test/java/{org/qsoft => io/github/kimmking}/activemq/test/TestSender.java (96%) rename src/test/java/{org/qsoft => io/github/kimmking}/activemq/test/TestServer.java (98%) diff --git a/.gitignore b/.gitignore index 51f1a20..0504a28 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,6 @@ target/ *.swp *.diff *.patch + +*.iml +.idea diff --git a/pom.xml b/pom.xml index 126c89c..0014d68 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.apache.activemq activemq-parent - 5.7.0 + 5.15.2 activemq-store-mongodb @@ -22,7 +22,7 @@ org.mongodb mongo-java-driver - 2.10.1 + 3.6.0 org.slf4j @@ -35,7 +35,7 @@ ${project.groupId} - activemq-core + activemq-all false @@ -47,17 +47,7 @@ org.apache.geronimo.specs geronimo-annotation_1.0_spec true - - - commons-pool - commons-pool - true - - - commons-dbcp - commons-dbcp - 1.4 - + mysql mysql-connector-java @@ -75,11 +65,6 @@ xbean-spring true - org.springframework @@ -146,11 +131,6 @@ commons-io test - diff --git a/src/main/java/io/github/kimmking/activemq/store/mongodb/MongoDBHelper.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongoDBHelper.java new file mode 100644 index 0000000..a6b3157 --- /dev/null +++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongoDBHelper.java @@ -0,0 +1,236 @@ +package io.github.kimmking.activemq.store.mongodb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.mongodb.*; +import com.mongodb.client.DistinctIterable; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Sorts; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.ByteSequenceData; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.wireformat.WireFormat; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +public class MongoDBHelper { + + protected static final String MSGS = "ACTIVEMQ_MSGS"; + protected static final String ACKS = "ACTIVEMQ_ACKS"; + protected static final String LOCK = "ACTIVEMQ_LOCK"; + + protected static final String MSG_COLUMN = "MSG"; + protected static final String DEST_COLUMN = "CONTAINER"; + + private static final Logger LOG = LoggerFactory.getLogger(MongoDBHelper.class); + + MongoClient mongoClient; + MongoDatabase mongoDatabase; + WireFormat wireFormat; + + + public MongoDBHelper(String host, int port, String dbName,String userName,String password, WireFormat wireFormat) { + LOG.info("Connect to MongoDB[" + host + ":" + port + ":" + dbName + "]"); + + ServerAddress serverAddress = new ServerAddress(host,port); + List addrs = new ArrayList(); + addrs.add(serverAddress); + List credentials = new ArrayList(); + + if(!StringUtils.isEmpty(userName)) { + MongoCredential credential = MongoCredential.createScramSha1Credential(userName, dbName, password.toCharArray()); + credentials.add(credential); + } + + this.mongoClient = new MongoClient(addrs,credentials); + this.mongoDatabase = mongoClient.getDatabase(dbName); + + this.wireFormat = wireFormat; + } + + public MongoCollection getMsgsCollection() { + return this.mongoDatabase.getCollection(MSGS); + } + + public MongoCollection getAcksCollection() { + return this.mongoDatabase.getCollection(ACKS); + } + + public MongoCollection getLockCollection() { + return this.mongoDatabase.getCollection(LOCK); + } + + public Boolean addMessage(Message message) throws IOException { + BasicDBObject bo = new BasicDBObject(); + MessageId messageId = message.getMessageId(); + // Serialize the Message.. + + List documents = new ArrayList(); + + + byte data[]; + try { + ByteSequence packet = wireFormat.marshal(message); + data = ByteSequenceData.toByteArray(packet); + } catch (IOException e) { + throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); + } + Document document = new Document("ID", messageId.getBrokerSequenceId()) + .append(DEST_COLUMN, message.getDestination().getQualifiedName()) + .append("MSGID_PROD", messageId.getProducerId().toString()) + .append("MSGID_SEQ", messageId.getProducerSequenceId()) + .append("EXPIRATION", message.getExpiration()) + .append(MSG_COLUMN, data) + .append("PRIORITY", message.getPriority()); + + documents.add(document); + getMsgsCollection().insertOne(document); + + return true; + } + + public static void main(String[] args) { + + } + + public void close() { + this.mongoClient.close(); + } + + public Message getMessage(MessageId messageId) throws IOException { + +// BasicDBObject bo = new BasicDBObject(); +// bo.append("MSGID_PROD", messageId.getProducerId().toString()); +// bo.append("MSGID_SEQ", messageId.getProducerSequenceId()); + FindIterable findIterable = getMsgsCollection().find(Filters.and( + Filters.eq("MSGID_SEQ", messageId.getProducerSequenceId()), + Filters.eq("MSGID_PROD",messageId.getProducerId().toString()) + )).limit(1); + Document document = findIterable.first(); + if (document == null) + return null; + byte[] data = (byte[]) document.get(MSG_COLUMN); + if (data == null) + return null; + + Message answer = null; + try { + answer = (Message) wireFormat.unmarshal(new ByteSequence(data)); + } catch (IOException e) { + throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); + } + return answer; + } + + public int count() { + return (int) getMsgsCollection().count(); + } + + public synchronized void removeMessage(ActiveMQDestination destination, MessageAck ack) { + MessageId messageId = ack.getLastMessageId(); + ack.getDestination(); + + FindIterable findIterable = getMsgsCollection().find(Filters.and( + Filters.eq(DEST_COLUMN, destination.getQualifiedName()), + Filters.eq("MSGID_SEQ", messageId.getProducerSequenceId()), + Filters.eq("MSGID_PROD",messageId.getProducerId().toString()) + )).limit(1); + Document document = findIterable.first(); + if(document != null){ + Object sequenceId = document.get("ID"); + getMsgsCollection().deleteOne(Filters.eq("ID",sequenceId)); + //.remove(new BasicDBObject("ID",sequenceId)); + //getMsgsCollection().remove(new BasicDBObject("ID",new BasicDBObject("$lte",sequenceId))); + }else{ + LOG.error(document.toString() + " is not found."); + } + + } + + public synchronized void removeAllMessages() { + getMsgsCollection().drop(); + getAcksCollection().drop(); + getLockCollection().drop(); + } + + public Message findOne() throws IOException { + FindIterable findIterable = getMsgsCollection().find().limit(1); + Document document = findIterable.first(); + if (document == null) + return null; + byte[] data = (byte[]) document.get(MSG_COLUMN); + if (data == null) + return null; + + Message answer = null; + try { + answer = (Message) wireFormat.unmarshal(new ByteSequence(data)); + } catch (IOException e) { + throw IOExceptionSupport.create("Failed to broker message in container: " + e, e); + } + return answer; + } + + public List find(int limit, String container, long sequenceId) throws IOException { + List msgs = new ArrayList(limit); + +// BasicDBObject bo = new BasicDBObject(); +// bo.append(DEST_COLUMN, container); +// bo.append("ID", new BasicDBObject("$gt",sequenceId)); + + FindIterable findIterable = getMsgsCollection().find(Filters.and( + Filters.eq(DEST_COLUMN, container), + Filters.gt("ID", sequenceId) + )).sort(Sorts.ascending("ID")).limit(limit); + + //DBCursor c = getMsgsCollection().find(bo).sort(new BasicDBObject("ID",1)).limit(limit); + while (findIterable.iterator().hasNext()) { + Document o = findIterable.iterator().next(); + if (o == null) + return null; + byte[] data = (byte[]) o.get(MSG_COLUMN); + if (data == null) + return null; + Message answer = null; + try { + answer = (Message) wireFormat.unmarshal(new ByteSequence(data)); + msgs.add(answer); + } catch (IOException e) { + throw IOExceptionSupport.create("Failed to broker message in container: " + e, e); + } + } + return trim(msgs); + } + + private List trim(List msgs2) { + if(msgs2 == null || msgs2.size() == 0) + return null; + Message m = msgs2.get(msgs2.size()-1); + while(m==null && msgs2.size() > 0){ + m = msgs2.remove(msgs2.size()-1); + } + return msgs2; + } + + @SuppressWarnings("unchecked") + public List findDestinations() { + List dists = new ArrayList(); + DistinctIterable iterable = getMsgsCollection().distinct(DEST_COLUMN,String.class); + while (iterable.iterator().hasNext()) { + dists.add(iterable.iterator().next()); + } + return dists; + } + +} diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbMessageStore.java similarity index 70% rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbMessageStore.java index 7a08a25..92eda52 100644 --- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java +++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbMessageStore.java @@ -1,4 +1,4 @@ -package org.qsoft.activemq.store.mongodb; +package io.github.kimmking.activemq.store.mongodb; import java.io.IOException; import java.util.List; @@ -30,54 +30,54 @@ public MongodbMessageStore(ActiveMQDestination destination, WireFormat wireForma @Override public void addMessage(ConnectionContext context, Message message) throws IOException { - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.addMessage: " + message); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.addMessage: " + message); this.helper.addMessage(message); } @Override public Message getMessage(MessageId identity) throws IOException { - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.getMessage:{0}", identity); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.getMessage:{0}", identity); return this.helper.getMessage(identity); } @Override public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.removeMessage: " + context + "," + ack); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.removeMessage: " + context + "," + ack); this.helper.removeMessage(this.getDestination(), ack); } @Override public void removeAllMessages(ConnectionContext context) throws IOException { - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.removeAllMessages"); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.removeAllMessages"); this.helper.removeAllMessages(); } @Override public void recover(MessageRecoveryListener container) throws Exception { - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.recover: " + container); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.recover: " + container); // TODO ? what is this } @Override public int getMessageCount() throws IOException { - LOG.debug("MongodbMessageStore.getMessageCount"); + LOG.info("MongodbMessageStore.getMessageCount:"+this.helper.count()); return this.helper.count(); } @Override public void resetBatching() { - LOG.debug("MongodbMessageStore.resetBatching"); + LOG.info("MongodbMessageStore.resetBatching"); } @Override public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.recoverNextMessages: " + maxReturned + " from " + lastRecoveredSequenceId.get()); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.recoverNextMessages: " + maxReturned + " from " + lastRecoveredSequenceId.get()); //long start = System.currentTimeMillis(); List msgs = this.helper.find(maxReturned, this.getDestination().getQualifiedName(), lastRecoveredSequenceId.get()); //long end1 = System.currentTimeMillis(); @@ -87,12 +87,12 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene listener.recoverMessage(message); lastRecoveredSequenceId.set(message.getMessageId().getBrokerSequenceId()); } - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.recoverNextMessages: " + msgs.size() + " ~ " + this.getDestination().getQualifiedName()); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.recoverNextMessages: " + msgs.size() + " ~ " + this.getDestination().getQualifiedName()); } else{ - if(LOG.isDebugEnabled()) - LOG.debug("MongodbMessageStore.recoverNextMessages: NONE ~ " + this.getDestination().getQualifiedName()); + if(LOG.isInfoEnabled()) + LOG.info("MongodbMessageStore.recoverNextMessages: NONE ~ " + this.getDestination().getQualifiedName()); } // long end = System.currentTimeMillis(); // System.out.println((end1-start)+" " + (end-end1)); diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbPersistenceAdapter.java similarity index 88% rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbPersistenceAdapter.java index 93de7c4..f769292 100644 --- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java +++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbPersistenceAdapter.java @@ -1,4 +1,4 @@ -package org.qsoft.activemq.store.mongodb; +package io.github.kimmking.activemq.store.mongodb; import java.io.File; import java.io.IOException; @@ -10,6 +10,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -37,6 +38,8 @@ public class MongodbPersistenceAdapter implements PersistenceAdapter, BrokerServ private String host; private int port; private String db; + private String user; + private String password; // private BrokerService brokerService; @@ -64,9 +67,25 @@ public void setDb(String db) { this.db = db; } + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + @Override public void start() throws Exception { - helper = new MongoDBHelper(host, port, db, wireFormat); + helper = new MongoDBHelper(host, port, db, user, password, wireFormat); } @Override @@ -113,6 +132,11 @@ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) thro return store; } + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + return null; + } + @Override public void removeQueueMessageStore(ActiveMQQueue destination) { if(LOG.isDebugEnabled()) @@ -165,6 +189,11 @@ public long getLastProducerSequenceId(ProducerId id) throws IOException { return 0; } + @Override + public void allowIOResumption() { + + } + public String toString() { return "MongodbPersistenceAdapter(" + host + ":" + port + "/" + db + ")"; } diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTopicMessageStore.java similarity index 84% rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTopicMessageStore.java index 3be53aa..e926340 100644 --- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java +++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTopicMessageStore.java @@ -1,23 +1,20 @@ -package org.qsoft.activemq.store.mongodb; +package io.github.kimmking.activemq.store.mongodb; import java.io.IOException; -import java.util.concurrent.Future; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MongodbTopicMessageStore extends MongodbMessageStore implements - TopicMessageStore { +public class MongodbTopicMessageStore extends MongodbMessageStore implements TopicMessageStore { private static final Logger LOG = LoggerFactory .getLogger(MongodbTopicMessageStore.class); @@ -68,6 +65,16 @@ public int getMessageCount(String clientId, String subscriberName) return 0; } + @Override + public long getMessageSize(String s, String s1) throws IOException { + return 0; + } + + @Override + public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { + return null; + } + @Override public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { @@ -82,9 +89,10 @@ public SubscriptionInfo[] getAllSubscriptions() throws IOException { } @Override - public void addSubsciption(SubscriptionInfo subscriptionInfo, - boolean retroactive) throws IOException { + public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { LOG.debug("MongodbTopicMessageStore.addSubsciption"); } + + } diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTransactionStore.java similarity index 95% rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTransactionStore.java index f9ac02a..1763bde 100644 --- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java +++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTransactionStore.java @@ -1,4 +1,4 @@ -package org.qsoft.activemq.store.mongodb; +package io.github.kimmking.activemq.store.mongodb; import java.io.IOException; diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java b/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java deleted file mode 100644 index 212be39..0000000 --- a/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java +++ /dev/null @@ -1,209 +0,0 @@ -package org.qsoft.activemq.store.mongodb; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.ByteSequenceData; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.wireformat.WireFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.mongodb.BasicDBObject; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.Mongo; -import com.mongodb.MongoException; - -public class MongoDBHelper { - - protected static final String MSGS = "ACTIVEMQ_MSGS"; - protected static final String ACKS = "ACTIVEMQ_ACKS"; - protected static final String LOCK = "ACTIVEMQ_LOCK"; - - protected static final String MSG_COLUMN = "MSG"; - protected static final String DEST_COLUMN = "CONTAINER"; - - private static final Logger LOG = LoggerFactory.getLogger(MongoDBHelper.class); - - Mongo mongo; - DB db; - WireFormat wireFormat; - - public MongoDBHelper(String host, int port, String dbName, WireFormat wireFormat) { - LOG.info("Connect to MongoDB[" + host + ":" + port + ":" + dbName + "]"); - try { - mongo = new Mongo(host, port); - db = mongo.getDB(dbName); - } catch (UnknownHostException e) { - LOG.error("error host.", e); - throw new RuntimeException(e); - } catch (MongoException e) { - LOG.error("MongoException.", e); - throw new RuntimeException(e); - } - - this.wireFormat = wireFormat; - } - - public DBCollection getMsgsCollection() { - return this.db.getCollection(MSGS); - } - - public DBCollection getAcksCollection() { - return this.db.getCollection(ACKS); - } - - public DBCollection getLockCollection() { - return this.db.getCollection(LOCK); - } - - public Boolean addMessage(Message message) throws IOException { - BasicDBObject bo = new BasicDBObject(); - MessageId messageId = message.getMessageId(); - // Serialize the Message.. - byte data[]; - try { - ByteSequence packet = wireFormat.marshal(message); - data = ByteSequenceData.toByteArray(packet); - } catch (IOException e) { - throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); - } - bo.append("ID", messageId.getBrokerSequenceId()); - bo.append(DEST_COLUMN, message.getDestination().getQualifiedName()); - bo.append("MSGID_PROD", messageId.getProducerId().toString()); - bo.append("MSGID_SEQ", messageId.getProducerSequenceId()); - bo.append("EXPIRATION", message.getExpiration()); - bo.append(MSG_COLUMN, data); - bo.append("PRIORITY", message.getPriority()); - getMsgsCollection().save(bo); - - return true; - } - - public static void main(String[] args) { - - } - - public void close() { - this.mongo.close(); - } - - public Message getMessage(MessageId messageId) throws IOException { - - BasicDBObject bo = new BasicDBObject(); - bo.append("MSGID_PROD", messageId.getProducerId().toString()); - bo.append("MSGID_SEQ", messageId.getProducerSequenceId()); - DBObject o = getMsgsCollection().findOne(bo); - if (o == null) - return null; - byte[] data = (byte[]) o.get(MSG_COLUMN); - if (data == null) - return null; - - Message answer = null; - try { - answer = (Message) wireFormat.unmarshal(new ByteSequence(data)); - } catch (IOException e) { - throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); - } - return answer; - } - - public int count() { - return (int) getMsgsCollection().count(); - } - - public synchronized void removeMessage(ActiveMQDestination destination, MessageAck ack) { - MessageId messageId = ack.getLastMessageId(); - ack.getDestination(); - BasicDBObject bo = new BasicDBObject(); - bo.append(DEST_COLUMN, destination.getQualifiedName()); - bo.append("MSGID_PROD", messageId.getProducerId().toString()); - bo.append("MSGID_SEQ", messageId.getProducerSequenceId()); - DBObject o = getMsgsCollection().findOne(bo); - if(o != null){ - Object sequenceId = o.get("ID"); - getMsgsCollection().remove(new BasicDBObject("ID",sequenceId)); - //getMsgsCollection().remove(new BasicDBObject("ID",new BasicDBObject("$lte",sequenceId))); - }else{ - LOG.error(bo.toString() + " is not found."); - } - - } - - public synchronized void removeAllMessages() { - getMsgsCollection().drop(); - getAcksCollection().drop(); - getLockCollection().drop(); - } - - public Message findOne() throws IOException { - DBObject o = getMsgsCollection().findOne(); - if (o == null) - return null; - byte[] data = (byte[]) o.get(MSG_COLUMN); - if (data == null) - return null; - - Message answer = null; - try { - answer = (Message) wireFormat.unmarshal(new ByteSequence(data)); - } catch (IOException e) { - throw IOExceptionSupport.create("Failed to broker message in container: " + e, e); - } - return answer; - } - - public List find(int limit, String container, long sequenceId) throws IOException { - List msgs = new ArrayList(limit); - - BasicDBObject bo = new BasicDBObject(); - bo.append(DEST_COLUMN, container); - bo.append("ID", new BasicDBObject("$gt",sequenceId)); - - DBCursor c = getMsgsCollection().find(bo).sort(new BasicDBObject("ID",1)).limit(limit); - while (c.hasNext()) { - DBObject o = c.next(); - if (o == null) - return null; - byte[] data = (byte[]) o.get(MSG_COLUMN); - if (data == null) - return null; - Message answer = null; - try { - answer = (Message) wireFormat.unmarshal(new ByteSequence(data)); - msgs.add(answer); - } catch (IOException e) { - throw IOExceptionSupport.create("Failed to broker message in container: " + e, e); - } - } - return trim(msgs); - } - - private List trim(List msgs2) { - if(msgs2 == null || msgs2.size() == 0) - return null; - Message m = msgs2.get(msgs2.size()-1); - while(m==null && msgs2.size() > 0){ - m = msgs2.remove(msgs2.size()-1); - } - return msgs2; - } - - @SuppressWarnings("unchecked") - public List findDestinations() { - List dists = getMsgsCollection().distinct(DEST_COLUMN); - return dists; - } - -} diff --git a/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core b/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core index 70e9cd8..83f9c89 100644 --- a/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core +++ b/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core @@ -314,6 +314,6 @@ vmQueueCursor = org.apache.activemq.broker.region.policy.VMPendingQueueMessageSt xaConnectionFactory = org.apache.activemq.spring.ActiveMQXAConnectionFactory xaConnectionFactory.initMethod = afterPropertiesSet -mongodb = org.qsoft.activemq.store.mongodb.MongodbPersistenceAdapter +mongodb = io.github.kimmking.activemq.store.mongodb.MongodbPersistenceAdapter diff --git a/src/main/resources/activemq.xml b/src/main/resources/activemq.xml index 7cdb6bd..1598a04 100644 --- a/src/main/resources/activemq.xml +++ b/src/main/resources/activemq.xml @@ -16,11 +16,11 @@ --> + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + @@ -41,7 +41,7 @@ - + diff --git a/src/main/resources/activemq.xsd b/src/main/resources/activemq.xsd index 682cda4..6ac6c46 100644 --- a/src/main/resources/activemq.xsd +++ b/src/main/resources/activemq.xsd @@ -4544,6 +4544,20 @@ batch) + + + + + + + + + + diff --git a/src/test/java/org/qsoft/activemq/test/TestReceiver.java b/src/test/java/io/github/kimmking/activemq/test/TestReceiver.java similarity index 92% rename from src/test/java/org/qsoft/activemq/test/TestReceiver.java rename to src/test/java/io/github/kimmking/activemq/test/TestReceiver.java index 7d68525..ee475b6 100644 --- a/src/test/java/org/qsoft/activemq/test/TestReceiver.java +++ b/src/test/java/io/github/kimmking/activemq/test/TestReceiver.java @@ -1,4 +1,4 @@ -package org.qsoft.activemq.test; +package io.github.kimmking.activemq.test; import java.util.concurrent.atomic.AtomicInteger; @@ -24,8 +24,8 @@ public class TestReceiver { */ public static void main(String[] args) { - int a = 1; - if(a== 1) + int a = 0; + if(a== 0) listen(); else receive(); @@ -85,12 +85,12 @@ private static void listen() { // init connection factory with activemq QueueConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // specify the destination - Queue queue = new ActiveMQQueue("kk.mysql"); + Queue queue = new ActiveMQQueue("kk.mongo"); // create connection,session,consumer and receive message ActiveMQConnection conn = (ActiveMQConnection) factory.createQueueConnection(); - conn.setOptimizeAcknowledge(true); - conn.setOptimizeAcknowledgeTimeOut(4000); - conn.setOptimizedAckScheduledAckInterval(2000); + //conn.setOptimizeAcknowledge(true); + //conn.setOptimizeAcknowledgeTimeOut(4000); + //conn.setOptimizedAckScheduledAckInterval(2000); //conn.setSendAcksAsync(true); conn.start(); final int count = 10000; @@ -103,7 +103,7 @@ private static void listen() { MessageListener listenerA1 = new MessageListener(){ public void onMessage(Message message) { int a = a1.getAndIncrement(); - if( a % 100 == 0) + if( a % 2 == 0) { times[1] = times[0]; times[0] = System.currentTimeMillis(); @@ -124,6 +124,7 @@ public void onMessage(Message message) { // } }}; receiverA1.setMessageListener(listenerA1 ); + sessionA1.run(); } catch (Exception e) { e.printStackTrace(); System.exit(1); diff --git a/src/test/java/org/qsoft/activemq/test/TestSender.java b/src/test/java/io/github/kimmking/activemq/test/TestSender.java similarity index 96% rename from src/test/java/org/qsoft/activemq/test/TestSender.java rename to src/test/java/io/github/kimmking/activemq/test/TestSender.java index 7fbc2e8..424241f 100644 --- a/src/test/java/org/qsoft/activemq/test/TestSender.java +++ b/src/test/java/io/github/kimmking/activemq/test/TestSender.java @@ -1,4 +1,4 @@ -package org.qsoft.activemq.test; +package io.github.kimmking.activemq.test; import javax.jms.BytesMessage; import javax.jms.Queue; @@ -34,7 +34,7 @@ public static void main(String[] args) { QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueSender sender = session.createSender(queue); long start = System.currentTimeMillis(); - int count = 10000; + int count = 10; int size = 1024; for (int i = 0; i < count; i++) { //String msgText = "testMessage-" + i; diff --git a/src/test/java/org/qsoft/activemq/test/TestServer.java b/src/test/java/io/github/kimmking/activemq/test/TestServer.java similarity index 98% rename from src/test/java/org/qsoft/activemq/test/TestServer.java rename to src/test/java/io/github/kimmking/activemq/test/TestServer.java index 7b21356..abf743d 100644 --- a/src/test/java/org/qsoft/activemq/test/TestServer.java +++ b/src/test/java/io/github/kimmking/activemq/test/TestServer.java @@ -1,4 +1,4 @@ -package org.qsoft.activemq.test; +package io.github.kimmking.activemq.test; import java.io.BufferedReader; import java.io.InputStreamReader;