diff --git a/.gitignore b/.gitignore
index 51f1a20..0504a28 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,6 @@ target/
*.swp
*.diff
*.patch
+
+*.iml
+.idea
diff --git a/pom.xml b/pom.xml
index 126c89c..0014d68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
org.apache.activemq
activemq-parent
- 5.7.0
+ 5.15.2
activemq-store-mongodb
@@ -22,7 +22,7 @@
org.mongodb
mongo-java-driver
- 2.10.1
+ 3.6.0
org.slf4j
@@ -35,7 +35,7 @@
${project.groupId}
- activemq-core
+ activemq-all
false
@@ -47,17 +47,7 @@
org.apache.geronimo.specs
geronimo-annotation_1.0_spec
true
-
-
- commons-pool
- commons-pool
- true
-
-
- commons-dbcp
- commons-dbcp
- 1.4
-
+
mysql
mysql-connector-java
@@ -75,11 +65,6 @@
xbean-spring
true
-
org.springframework
@@ -146,11 +131,6 @@
commons-io
test
-
diff --git a/src/main/java/io/github/kimmking/activemq/store/mongodb/MongoDBHelper.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongoDBHelper.java
new file mode 100644
index 0000000..a6b3157
--- /dev/null
+++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongoDBHelper.java
@@ -0,0 +1,236 @@
+package io.github.kimmking.activemq.store.mongodb;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mongodb.*;
+import com.mongodb.client.DistinctIterable;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.Sorts;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+
+public class MongoDBHelper {
+
+ protected static final String MSGS = "ACTIVEMQ_MSGS";
+ protected static final String ACKS = "ACTIVEMQ_ACKS";
+ protected static final String LOCK = "ACTIVEMQ_LOCK";
+
+ protected static final String MSG_COLUMN = "MSG";
+ protected static final String DEST_COLUMN = "CONTAINER";
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDBHelper.class);
+
+ MongoClient mongoClient;
+ MongoDatabase mongoDatabase;
+ WireFormat wireFormat;
+
+
+ public MongoDBHelper(String host, int port, String dbName,String userName,String password, WireFormat wireFormat) {
+ LOG.info("Connect to MongoDB[" + host + ":" + port + ":" + dbName + "]");
+
+ ServerAddress serverAddress = new ServerAddress(host,port);
+ List addrs = new ArrayList();
+ addrs.add(serverAddress);
+ List credentials = new ArrayList();
+
+ if(!StringUtils.isEmpty(userName)) {
+ MongoCredential credential = MongoCredential.createScramSha1Credential(userName, dbName, password.toCharArray());
+ credentials.add(credential);
+ }
+
+ this.mongoClient = new MongoClient(addrs,credentials);
+ this.mongoDatabase = mongoClient.getDatabase(dbName);
+
+ this.wireFormat = wireFormat;
+ }
+
+ public MongoCollection getMsgsCollection() {
+ return this.mongoDatabase.getCollection(MSGS);
+ }
+
+ public MongoCollection getAcksCollection() {
+ return this.mongoDatabase.getCollection(ACKS);
+ }
+
+ public MongoCollection getLockCollection() {
+ return this.mongoDatabase.getCollection(LOCK);
+ }
+
+ public Boolean addMessage(Message message) throws IOException {
+ BasicDBObject bo = new BasicDBObject();
+ MessageId messageId = message.getMessageId();
+ // Serialize the Message..
+
+ List documents = new ArrayList();
+
+
+ byte data[];
+ try {
+ ByteSequence packet = wireFormat.marshal(message);
+ data = ByteSequenceData.toByteArray(packet);
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
+ }
+ Document document = new Document("ID", messageId.getBrokerSequenceId())
+ .append(DEST_COLUMN, message.getDestination().getQualifiedName())
+ .append("MSGID_PROD", messageId.getProducerId().toString())
+ .append("MSGID_SEQ", messageId.getProducerSequenceId())
+ .append("EXPIRATION", message.getExpiration())
+ .append(MSG_COLUMN, data)
+ .append("PRIORITY", message.getPriority());
+
+ documents.add(document);
+ getMsgsCollection().insertOne(document);
+
+ return true;
+ }
+
+ public static void main(String[] args) {
+
+ }
+
+ public void close() {
+ this.mongoClient.close();
+ }
+
+ public Message getMessage(MessageId messageId) throws IOException {
+
+// BasicDBObject bo = new BasicDBObject();
+// bo.append("MSGID_PROD", messageId.getProducerId().toString());
+// bo.append("MSGID_SEQ", messageId.getProducerSequenceId());
+ FindIterable findIterable = getMsgsCollection().find(Filters.and(
+ Filters.eq("MSGID_SEQ", messageId.getProducerSequenceId()),
+ Filters.eq("MSGID_PROD",messageId.getProducerId().toString())
+ )).limit(1);
+ Document document = findIterable.first();
+ if (document == null)
+ return null;
+ byte[] data = (byte[]) document.get(MSG_COLUMN);
+ if (data == null)
+ return null;
+
+ Message answer = null;
+ try {
+ answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
+ }
+ return answer;
+ }
+
+ public int count() {
+ return (int) getMsgsCollection().count();
+ }
+
+ public synchronized void removeMessage(ActiveMQDestination destination, MessageAck ack) {
+ MessageId messageId = ack.getLastMessageId();
+ ack.getDestination();
+
+ FindIterable findIterable = getMsgsCollection().find(Filters.and(
+ Filters.eq(DEST_COLUMN, destination.getQualifiedName()),
+ Filters.eq("MSGID_SEQ", messageId.getProducerSequenceId()),
+ Filters.eq("MSGID_PROD",messageId.getProducerId().toString())
+ )).limit(1);
+ Document document = findIterable.first();
+ if(document != null){
+ Object sequenceId = document.get("ID");
+ getMsgsCollection().deleteOne(Filters.eq("ID",sequenceId));
+ //.remove(new BasicDBObject("ID",sequenceId));
+ //getMsgsCollection().remove(new BasicDBObject("ID",new BasicDBObject("$lte",sequenceId)));
+ }else{
+ LOG.error(document.toString() + " is not found.");
+ }
+
+ }
+
+ public synchronized void removeAllMessages() {
+ getMsgsCollection().drop();
+ getAcksCollection().drop();
+ getLockCollection().drop();
+ }
+
+ public Message findOne() throws IOException {
+ FindIterable findIterable = getMsgsCollection().find().limit(1);
+ Document document = findIterable.first();
+ if (document == null)
+ return null;
+ byte[] data = (byte[]) document.get(MSG_COLUMN);
+ if (data == null)
+ return null;
+
+ Message answer = null;
+ try {
+ answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message in container: " + e, e);
+ }
+ return answer;
+ }
+
+ public List find(int limit, String container, long sequenceId) throws IOException {
+ List msgs = new ArrayList(limit);
+
+// BasicDBObject bo = new BasicDBObject();
+// bo.append(DEST_COLUMN, container);
+// bo.append("ID", new BasicDBObject("$gt",sequenceId));
+
+ FindIterable findIterable = getMsgsCollection().find(Filters.and(
+ Filters.eq(DEST_COLUMN, container),
+ Filters.gt("ID", sequenceId)
+ )).sort(Sorts.ascending("ID")).limit(limit);
+
+ //DBCursor c = getMsgsCollection().find(bo).sort(new BasicDBObject("ID",1)).limit(limit);
+ while (findIterable.iterator().hasNext()) {
+ Document o = findIterable.iterator().next();
+ if (o == null)
+ return null;
+ byte[] data = (byte[]) o.get(MSG_COLUMN);
+ if (data == null)
+ return null;
+ Message answer = null;
+ try {
+ answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ msgs.add(answer);
+ } catch (IOException e) {
+ throw IOExceptionSupport.create("Failed to broker message in container: " + e, e);
+ }
+ }
+ return trim(msgs);
+ }
+
+ private List trim(List msgs2) {
+ if(msgs2 == null || msgs2.size() == 0)
+ return null;
+ Message m = msgs2.get(msgs2.size()-1);
+ while(m==null && msgs2.size() > 0){
+ m = msgs2.remove(msgs2.size()-1);
+ }
+ return msgs2;
+ }
+
+ @SuppressWarnings("unchecked")
+ public List findDestinations() {
+ List dists = new ArrayList();
+ DistinctIterable iterable = getMsgsCollection().distinct(DEST_COLUMN,String.class);
+ while (iterable.iterator().hasNext()) {
+ dists.add(iterable.iterator().next());
+ }
+ return dists;
+ }
+
+}
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbMessageStore.java
similarity index 70%
rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java
rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbMessageStore.java
index 7a08a25..92eda52 100644
--- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbMessageStore.java
+++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbMessageStore.java
@@ -1,4 +1,4 @@
-package org.qsoft.activemq.store.mongodb;
+package io.github.kimmking.activemq.store.mongodb;
import java.io.IOException;
import java.util.List;
@@ -30,54 +30,54 @@ public MongodbMessageStore(ActiveMQDestination destination, WireFormat wireForma
@Override
public void addMessage(ConnectionContext context, Message message) throws IOException {
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.addMessage: " + message);
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.addMessage: " + message);
this.helper.addMessage(message);
}
@Override
public Message getMessage(MessageId identity) throws IOException {
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.getMessage:{0}", identity);
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.getMessage:{0}", identity);
return this.helper.getMessage(identity);
}
@Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.removeMessage: " + context + "," + ack);
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.removeMessage: " + context + "," + ack);
this.helper.removeMessage(this.getDestination(), ack);
}
@Override
public void removeAllMessages(ConnectionContext context) throws IOException {
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.removeAllMessages");
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.removeAllMessages");
this.helper.removeAllMessages();
}
@Override
public void recover(MessageRecoveryListener container) throws Exception {
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.recover: " + container);
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.recover: " + container);
// TODO ? what is this
}
@Override
public int getMessageCount() throws IOException {
- LOG.debug("MongodbMessageStore.getMessageCount");
+ LOG.info("MongodbMessageStore.getMessageCount:"+this.helper.count());
return this.helper.count();
}
@Override
public void resetBatching() {
- LOG.debug("MongodbMessageStore.resetBatching");
+ LOG.info("MongodbMessageStore.resetBatching");
}
@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.recoverNextMessages: " + maxReturned + " from " + lastRecoveredSequenceId.get());
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.recoverNextMessages: " + maxReturned + " from " + lastRecoveredSequenceId.get());
//long start = System.currentTimeMillis();
List msgs = this.helper.find(maxReturned, this.getDestination().getQualifiedName(), lastRecoveredSequenceId.get());
//long end1 = System.currentTimeMillis();
@@ -87,12 +87,12 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
listener.recoverMessage(message);
lastRecoveredSequenceId.set(message.getMessageId().getBrokerSequenceId());
}
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.recoverNextMessages: " + msgs.size() + " ~ " + this.getDestination().getQualifiedName());
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.recoverNextMessages: " + msgs.size() + " ~ " + this.getDestination().getQualifiedName());
}
else{
- if(LOG.isDebugEnabled())
- LOG.debug("MongodbMessageStore.recoverNextMessages: NONE ~ " + this.getDestination().getQualifiedName());
+ if(LOG.isInfoEnabled())
+ LOG.info("MongodbMessageStore.recoverNextMessages: NONE ~ " + this.getDestination().getQualifiedName());
}
// long end = System.currentTimeMillis();
// System.out.println((end1-start)+" " + (end-end1));
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbPersistenceAdapter.java
similarity index 88%
rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java
rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbPersistenceAdapter.java
index 93de7c4..f769292 100644
--- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbPersistenceAdapter.java
+++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbPersistenceAdapter.java
@@ -1,4 +1,4 @@
-package org.qsoft.activemq.store.mongodb;
+package io.github.kimmking.activemq.store.mongodb;
import java.io.File;
import java.io.IOException;
@@ -10,6 +10,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -37,6 +38,8 @@ public class MongodbPersistenceAdapter implements PersistenceAdapter, BrokerServ
private String host;
private int port;
private String db;
+ private String user;
+ private String password;
// private BrokerService brokerService;
@@ -64,9 +67,25 @@ public void setDb(String db) {
this.db = db;
}
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
@Override
public void start() throws Exception {
- helper = new MongoDBHelper(host, port, db, wireFormat);
+ helper = new MongoDBHelper(host, port, db, user, password, wireFormat);
}
@Override
@@ -113,6 +132,11 @@ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) thro
return store;
}
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ return null;
+ }
+
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
if(LOG.isDebugEnabled())
@@ -165,6 +189,11 @@ public long getLastProducerSequenceId(ProducerId id) throws IOException {
return 0;
}
+ @Override
+ public void allowIOResumption() {
+
+ }
+
public String toString() {
return "MongodbPersistenceAdapter(" + host + ":" + port + "/" + db + ")";
}
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTopicMessageStore.java
similarity index 84%
rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java
rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTopicMessageStore.java
index 3be53aa..e926340 100644
--- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTopicMessageStore.java
+++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTopicMessageStore.java
@@ -1,23 +1,20 @@
-package org.qsoft.activemq.store.mongodb;
+package io.github.kimmking.activemq.store.mongodb;
import java.io.IOException;
-import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MongodbTopicMessageStore extends MongodbMessageStore implements
- TopicMessageStore {
+public class MongodbTopicMessageStore extends MongodbMessageStore implements TopicMessageStore {
private static final Logger LOG = LoggerFactory
.getLogger(MongodbTopicMessageStore.class);
@@ -68,6 +65,16 @@ public int getMessageCount(String clientId, String subscriberName)
return 0;
}
+ @Override
+ public long getMessageSize(String s, String s1) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
+ return null;
+ }
+
@Override
public SubscriptionInfo lookupSubscription(String clientId,
String subscriptionName) throws IOException {
@@ -82,9 +89,10 @@ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
}
@Override
- public void addSubsciption(SubscriptionInfo subscriptionInfo,
- boolean retroactive) throws IOException {
+ public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
LOG.debug("MongodbTopicMessageStore.addSubsciption");
}
+
+
}
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTransactionStore.java
similarity index 95%
rename from src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java
rename to src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTransactionStore.java
index f9ac02a..1763bde 100644
--- a/src/main/java/org/qsoft/activemq/store/mongodb/MongodbTransactionStore.java
+++ b/src/main/java/io/github/kimmking/activemq/store/mongodb/MongodbTransactionStore.java
@@ -1,4 +1,4 @@
-package org.qsoft.activemq.store.mongodb;
+package io.github.kimmking.activemq.store.mongodb;
import java.io.IOException;
diff --git a/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java b/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java
deleted file mode 100644
index 212be39..0000000
--- a/src/main/java/org/qsoft/activemq/store/mongodb/MongoDBHelper.java
+++ /dev/null
@@ -1,209 +0,0 @@
-package org.qsoft.activemq.store.mongodb;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.ByteSequenceData;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.mongodb.BasicDBObject;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-import com.mongodb.Mongo;
-import com.mongodb.MongoException;
-
-public class MongoDBHelper {
-
- protected static final String MSGS = "ACTIVEMQ_MSGS";
- protected static final String ACKS = "ACTIVEMQ_ACKS";
- protected static final String LOCK = "ACTIVEMQ_LOCK";
-
- protected static final String MSG_COLUMN = "MSG";
- protected static final String DEST_COLUMN = "CONTAINER";
-
- private static final Logger LOG = LoggerFactory.getLogger(MongoDBHelper.class);
-
- Mongo mongo;
- DB db;
- WireFormat wireFormat;
-
- public MongoDBHelper(String host, int port, String dbName, WireFormat wireFormat) {
- LOG.info("Connect to MongoDB[" + host + ":" + port + ":" + dbName + "]");
- try {
- mongo = new Mongo(host, port);
- db = mongo.getDB(dbName);
- } catch (UnknownHostException e) {
- LOG.error("error host.", e);
- throw new RuntimeException(e);
- } catch (MongoException e) {
- LOG.error("MongoException.", e);
- throw new RuntimeException(e);
- }
-
- this.wireFormat = wireFormat;
- }
-
- public DBCollection getMsgsCollection() {
- return this.db.getCollection(MSGS);
- }
-
- public DBCollection getAcksCollection() {
- return this.db.getCollection(ACKS);
- }
-
- public DBCollection getLockCollection() {
- return this.db.getCollection(LOCK);
- }
-
- public Boolean addMessage(Message message) throws IOException {
- BasicDBObject bo = new BasicDBObject();
- MessageId messageId = message.getMessageId();
- // Serialize the Message..
- byte data[];
- try {
- ByteSequence packet = wireFormat.marshal(message);
- data = ByteSequenceData.toByteArray(packet);
- } catch (IOException e) {
- throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
- }
- bo.append("ID", messageId.getBrokerSequenceId());
- bo.append(DEST_COLUMN, message.getDestination().getQualifiedName());
- bo.append("MSGID_PROD", messageId.getProducerId().toString());
- bo.append("MSGID_SEQ", messageId.getProducerSequenceId());
- bo.append("EXPIRATION", message.getExpiration());
- bo.append(MSG_COLUMN, data);
- bo.append("PRIORITY", message.getPriority());
- getMsgsCollection().save(bo);
-
- return true;
- }
-
- public static void main(String[] args) {
-
- }
-
- public void close() {
- this.mongo.close();
- }
-
- public Message getMessage(MessageId messageId) throws IOException {
-
- BasicDBObject bo = new BasicDBObject();
- bo.append("MSGID_PROD", messageId.getProducerId().toString());
- bo.append("MSGID_SEQ", messageId.getProducerSequenceId());
- DBObject o = getMsgsCollection().findOne(bo);
- if (o == null)
- return null;
- byte[] data = (byte[]) o.get(MSG_COLUMN);
- if (data == null)
- return null;
-
- Message answer = null;
- try {
- answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
- } catch (IOException e) {
- throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
- }
- return answer;
- }
-
- public int count() {
- return (int) getMsgsCollection().count();
- }
-
- public synchronized void removeMessage(ActiveMQDestination destination, MessageAck ack) {
- MessageId messageId = ack.getLastMessageId();
- ack.getDestination();
- BasicDBObject bo = new BasicDBObject();
- bo.append(DEST_COLUMN, destination.getQualifiedName());
- bo.append("MSGID_PROD", messageId.getProducerId().toString());
- bo.append("MSGID_SEQ", messageId.getProducerSequenceId());
- DBObject o = getMsgsCollection().findOne(bo);
- if(o != null){
- Object sequenceId = o.get("ID");
- getMsgsCollection().remove(new BasicDBObject("ID",sequenceId));
- //getMsgsCollection().remove(new BasicDBObject("ID",new BasicDBObject("$lte",sequenceId)));
- }else{
- LOG.error(bo.toString() + " is not found.");
- }
-
- }
-
- public synchronized void removeAllMessages() {
- getMsgsCollection().drop();
- getAcksCollection().drop();
- getLockCollection().drop();
- }
-
- public Message findOne() throws IOException {
- DBObject o = getMsgsCollection().findOne();
- if (o == null)
- return null;
- byte[] data = (byte[]) o.get(MSG_COLUMN);
- if (data == null)
- return null;
-
- Message answer = null;
- try {
- answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
- } catch (IOException e) {
- throw IOExceptionSupport.create("Failed to broker message in container: " + e, e);
- }
- return answer;
- }
-
- public List find(int limit, String container, long sequenceId) throws IOException {
- List msgs = new ArrayList(limit);
-
- BasicDBObject bo = new BasicDBObject();
- bo.append(DEST_COLUMN, container);
- bo.append("ID", new BasicDBObject("$gt",sequenceId));
-
- DBCursor c = getMsgsCollection().find(bo).sort(new BasicDBObject("ID",1)).limit(limit);
- while (c.hasNext()) {
- DBObject o = c.next();
- if (o == null)
- return null;
- byte[] data = (byte[]) o.get(MSG_COLUMN);
- if (data == null)
- return null;
- Message answer = null;
- try {
- answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
- msgs.add(answer);
- } catch (IOException e) {
- throw IOExceptionSupport.create("Failed to broker message in container: " + e, e);
- }
- }
- return trim(msgs);
- }
-
- private List trim(List msgs2) {
- if(msgs2 == null || msgs2.size() == 0)
- return null;
- Message m = msgs2.get(msgs2.size()-1);
- while(m==null && msgs2.size() > 0){
- m = msgs2.remove(msgs2.size()-1);
- }
- return msgs2;
- }
-
- @SuppressWarnings("unchecked")
- public List findDestinations() {
- List dists = getMsgsCollection().distinct(DEST_COLUMN);
- return dists;
- }
-
-}
diff --git a/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core b/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core
index 70e9cd8..83f9c89 100644
--- a/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core
+++ b/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core
@@ -314,6 +314,6 @@ vmQueueCursor = org.apache.activemq.broker.region.policy.VMPendingQueueMessageSt
xaConnectionFactory = org.apache.activemq.spring.ActiveMQXAConnectionFactory
xaConnectionFactory.initMethod = afterPropertiesSet
-mongodb = org.qsoft.activemq.store.mongodb.MongodbPersistenceAdapter
+mongodb = io.github.kimmking.activemq.store.mongodb.MongodbPersistenceAdapter
diff --git a/src/main/resources/activemq.xml b/src/main/resources/activemq.xml
index 7cdb6bd..1598a04 100644
--- a/src/main/resources/activemq.xml
+++ b/src/main/resources/activemq.xml
@@ -16,11 +16,11 @@
-->
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
@@ -41,7 +41,7 @@
-
+
diff --git a/src/main/resources/activemq.xsd b/src/main/resources/activemq.xsd
index 682cda4..6ac6c46 100644
--- a/src/main/resources/activemq.xsd
+++ b/src/main/resources/activemq.xsd
@@ -4544,6 +4544,20 @@ batch)
+
+
+
+
+
+
+
+
+
+
diff --git a/src/test/java/org/qsoft/activemq/test/TestReceiver.java b/src/test/java/io/github/kimmking/activemq/test/TestReceiver.java
similarity index 92%
rename from src/test/java/org/qsoft/activemq/test/TestReceiver.java
rename to src/test/java/io/github/kimmking/activemq/test/TestReceiver.java
index 7d68525..ee475b6 100644
--- a/src/test/java/org/qsoft/activemq/test/TestReceiver.java
+++ b/src/test/java/io/github/kimmking/activemq/test/TestReceiver.java
@@ -1,4 +1,4 @@
-package org.qsoft.activemq.test;
+package io.github.kimmking.activemq.test;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,8 +24,8 @@ public class TestReceiver {
*/
public static void main(String[] args) {
- int a = 1;
- if(a== 1)
+ int a = 0;
+ if(a== 0)
listen();
else
receive();
@@ -85,12 +85,12 @@ private static void listen() {
// init connection factory with activemq
QueueConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// specify the destination
- Queue queue = new ActiveMQQueue("kk.mysql");
+ Queue queue = new ActiveMQQueue("kk.mongo");
// create connection,session,consumer and receive message
ActiveMQConnection conn = (ActiveMQConnection) factory.createQueueConnection();
- conn.setOptimizeAcknowledge(true);
- conn.setOptimizeAcknowledgeTimeOut(4000);
- conn.setOptimizedAckScheduledAckInterval(2000);
+ //conn.setOptimizeAcknowledge(true);
+ //conn.setOptimizeAcknowledgeTimeOut(4000);
+ //conn.setOptimizedAckScheduledAckInterval(2000);
//conn.setSendAcksAsync(true);
conn.start();
final int count = 10000;
@@ -103,7 +103,7 @@ private static void listen() {
MessageListener listenerA1 = new MessageListener(){
public void onMessage(Message message) {
int a = a1.getAndIncrement();
- if( a % 100 == 0)
+ if( a % 2 == 0)
{
times[1] = times[0];
times[0] = System.currentTimeMillis();
@@ -124,6 +124,7 @@ public void onMessage(Message message) {
// }
}};
receiverA1.setMessageListener(listenerA1 );
+ sessionA1.run();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
diff --git a/src/test/java/org/qsoft/activemq/test/TestSender.java b/src/test/java/io/github/kimmking/activemq/test/TestSender.java
similarity index 96%
rename from src/test/java/org/qsoft/activemq/test/TestSender.java
rename to src/test/java/io/github/kimmking/activemq/test/TestSender.java
index 7fbc2e8..424241f 100644
--- a/src/test/java/org/qsoft/activemq/test/TestSender.java
+++ b/src/test/java/io/github/kimmking/activemq/test/TestSender.java
@@ -1,4 +1,4 @@
-package org.qsoft.activemq.test;
+package io.github.kimmking.activemq.test;
import javax.jms.BytesMessage;
import javax.jms.Queue;
@@ -34,7 +34,7 @@ public static void main(String[] args) {
QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(queue);
long start = System.currentTimeMillis();
- int count = 10000;
+ int count = 10;
int size = 1024;
for (int i = 0; i < count; i++) {
//String msgText = "testMessage-" + i;
diff --git a/src/test/java/org/qsoft/activemq/test/TestServer.java b/src/test/java/io/github/kimmking/activemq/test/TestServer.java
similarity index 98%
rename from src/test/java/org/qsoft/activemq/test/TestServer.java
rename to src/test/java/io/github/kimmking/activemq/test/TestServer.java
index 7b21356..abf743d 100644
--- a/src/test/java/org/qsoft/activemq/test/TestServer.java
+++ b/src/test/java/io/github/kimmking/activemq/test/TestServer.java
@@ -1,4 +1,4 @@
-package org.qsoft.activemq.test;
+package io.github.kimmking.activemq.test;
import java.io.BufferedReader;
import java.io.InputStreamReader;