Skip to content

Commit

Permalink
Updating samples (#63)
Browse files Browse the repository at this point in the history
* Updating samples to use the new interfaces with LeaseLost and ShardEnded methods
  • Loading branch information
sahilpalvia authored and pfifer committed Mar 6, 2019
1 parent a2be81a commit 8165727
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
23 changes: 10 additions & 13 deletions samples/basic_sample/consumer/sample_kcl_app.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ permissions and limitations under the License.

'use strict';


var fs = require('fs');
var path = require('path');
var util = require('util');
var kcl = require('../../..');
var logger = require('../../util/logger');
Expand Down Expand Up @@ -66,20 +63,20 @@ function recordProcessor() {
});
},

shutdownRequested: function(shutdownRequestedInput, completeCallback) {
shutdownRequestedInput.checkpointer.checkpoint(function (err) {
leaseLost: function(leaseLostInput, completeCallback) {
log.info(util.format('Lease was lost for ShardId: %s', shardId));
completeCallback();
},

shardEnded: function(shardEndedInput, completeCallback) {
log.info(util.format('ShardId: %s has ended. Will checkpoint now.', shardId));
shardEndedInput.checkpointer.checkpoint(function(err) {
completeCallback();
});
},

shutdown: function(shutdownInput, completeCallback) {
// Checkpoint should only be performed when shutdown reason is TERMINATE.
if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
// Whenever checkpointing, completeCallback should only be invoked once checkpoint is complete.
shutdownInput.checkpointer.checkpoint(function(err) {
shutdownRequested: function(shutdownRequestedInput, completeCallback) {
shutdownRequestedInput.checkpointer.checkpoint(function (err) {
completeCallback();
});
}
Expand Down
22 changes: 13 additions & 9 deletions samples/click_stream_sample/consumer/click_stream_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,25 +162,29 @@ function clickStreamProcessor(emitter, cfg) {
/**
* Called by the KCL to indicate that this record processor should shut down.
* After the shutdown operation is complete, there will not be any more calls to
* any other functions of this record processor. Note that the shutdown reason
* could be either TERMINATE or ZOMBIE. If ZOMBIE, clients should not
* any other functions of this record processor. If lease is lost, clients should not
* checkpoint because there is possibly another record processor which has
* acquired the lease for this shard. If TERMINATE, then
* acquired the lease for this shard.
*/
leaseLost: function(leaseLostInput, completeCallback) {
completeCallback();
},

/**
* Called by the KCL to indicate that this record processor should shut down.
* After the shutdown operation is complete, there will not be any more calls to
* any other functions of this record processor. If shard has ended, then
* checkpointer.checkpoint() should be called to checkpoint at the end of
* the shard so that this processor will be shut down and new processors
* will be created for the children of this shard.
*/
shutdown: function(shutdownInput, completeCallback) {
if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
shardEnded: function(shardEndedInput, completeCallback) {
// Make sure to emit all remaining buffered data to S3 before shutting down.
commitQueue.push({
key: shardId + '/' + buffer.getFirstSequenceNumber() + '-' + buffer.getLastSequenceNumber(),
sequenceNumber: buffer.getLastSequenceNumber(),
data: buffer.readAndClearRecords(),
checkpointer: shutdownInput.checkpointer
checkpointer: shardEndedInput.checkpointer
}, function(error) {
if (error) {
log.error(util.format('Received error while shutting down: %s', error));
Expand Down

0 comments on commit 8165727

Please sign in to comment.