Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error while running example WordCountTopology #58

Open
harsha273 opened this issue Dec 16, 2015 · 1 comment
Open

Error while running example WordCountTopology #58

harsha273 opened this issue Dec 16, 2015 · 1 comment

Comments

@harsha273
Copy link

Guys,

I am quite new to trident and cassandra. While trying to run the wordcount example , am getting this exception midway through the run. Any ideas please?

DEBUG (com.hmsonline.trident.cql.CassandraCqlMapState:165) - Putting the following keys: [[went, spout1], [and, spout2]] with values: [73, 73]
ERROR (clojure.tools.logging$eval37$fn__43:16) - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
    at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745)
    at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
    at com.hmsonline.trident.cql.CassandraCqlMapState.checkCassandraException(CassandraCqlMapState.java:227)
    at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:157)
    at storm.trident.state.map.CachedMap.multiGet(CachedMap.java:52)
    at storm.trident.state.map.NonTransactionalMap.multiGet(NonTransactionalMap.java:39)
    at storm.trident.state.map.SnapshottableMap.multiGet(SnapshottableMap.java:37)
    at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:31)
    at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:28)
    at storm.trident.planner.processor.StateQueryProcessor.finishBatch(StateQueryProcessor.java:84)
    at storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.java:152)
    at storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExecutor.java:252)
    at storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExecutor.java:285)
    at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:364)
    at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630)
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398)
    at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
    ... 6 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at storm.trident.tuple.TridentTupleView.getValue(TridentTupleView.java:227)
    at storm.trident.tuple.TridentTupleView.get(TridentTupleView.java:222)
    at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:41)
    at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:15)
    at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:137)
    ... 20 more

@gaurchandan
Copy link

Actually problem is in WordCountTopology we were not passing source(spout) so what I do to overcome this...

  1. Change the parameters of drpc client
    client.execute("words", "cat:spout1 dog:spout2 the:spout1 man:spout1"));
    with each word , i m sending spout source seprated by double colon.
  2. Declare my own MultiSplit class which will emit both word and source
    @OverRide
    public class MultiSplit extends Split{
public void execute(TridentTuple tuple, TridentCollector collector) {
    // TODO Auto-generated method stub
    for(String word: tuple.getString(0).split(" ")) {
        if(word.length() > 0) {
            String[] sourceAndWord=word.split(":");
            collector.emit(new Values(sourceAndWord[0],sourceAndWord[1]));
        }
    }
}
  1. Then use it in TopologyBuilder

TridentState wordCounts = topology.newStream("spout1", spout1)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word", "source"))
.persistentAggregate(CassandraCqlMapState.nonTransactional(new WordCountAndSourceMapper()),
new IntegerCount(), new Fields("count"))
.parallelismHint(6);
topology.newDRPCStream("words", drpc).each(new Fields("args"), new MultiSplit(), new Fields("word", "source"))
.groupBy(new Fields("word", "source"))
.stateQuery(wordCounts, new Fields("word", "source"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));

4,. And last change in WordCountAndSourceMapper as i change the indexes of where clause
@OverRide
public Statement retrieve(List keys) {
// Retrieve all the columns associated with the keys
Select statement = QueryBuilder.select().column(SOURCE_KEY_NAME)
.column(WORD_KEY_NAME).column(VALUE_NAME)
.from(KEYSPACE_NAME, TABLE_NAME);
statement.where(QueryBuilder.eq(WORD_KEY_NAME, keys.get(0)));
statement.where(QueryBuilder.eq(SOURCE_KEY_NAME, keys.get(1)));
return statement;
}

so this will definately resolves the issue.
All the best

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants