Skip to content

Commit

Permalink
Merge pull request #158 from disney/develop
Browse files Browse the repository at this point in the history
Merge dev fixes up to tpch-support
  • Loading branch information
guymolinari authored Dec 19, 2024
2 parents b5a4b8f + 1d8dcbf commit e6e760b
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 119 deletions.
4 changes: 0 additions & 4 deletions Docker/kinesis_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ fi
if [ -n "$CHECKPOINT_TABLE" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --checkpoint-table=${CHECKPOINT_TABLE}"
if [ -n "$POST_CHECKPOINT_INIT_DELAY" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --post-checkpoint-init-delay=${POST_CHECKPOINT_INIT_DELAY}"
fi
fi
if [ -n "$LOG_LEVEL" ]
then
Expand Down
5 changes: 0 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ require (
github.com/aws/aws-sdk-go-v2 v1.32.6
github.com/aws/aws-sdk-go-v2/config v1.28.6
github.com/aws/aws-sdk-go-v2/credentials v1.17.47
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0
github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.7
github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2
Expand Down Expand Up @@ -77,17 +75,14 @@ require (
github.com/apache/thrift v0.14.2 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.20 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.43 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect
Expand Down
80 changes: 0 additions & 80 deletions go.sum

Large diffs are not rendered by default.

29 changes: 7 additions & 22 deletions quanta-kinesis-consumer-lib/q-kinesis-lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type Main struct {
AssumeRoleArn string
AssumeRoleArnRegion string
Deaggregate bool
PostCheckpointInitDelay int
Collate bool
ShardKey string
ProtoConfig string
Expand Down Expand Up @@ -180,32 +179,19 @@ func (m *Main) Init(customEndpoint string) (int, error) {
return 0, err
}
shardCount := len(shout.Shards)
initializedShardsInDB := false
foundCheckpointRecords := false
if db != nil && m.InitialPos != "TRIM_HORIZON" {
for _, v := range shout.Shards {
seq, _ := db.GetCheckpoint(*streamName, *v.ShardId)
if seq != "" && seq != "0" {
foundCheckpointRecords = true
continue
}
sequenceRange := *v.SequenceNumberRange
sequenceNumber := *sequenceRange.StartingSequenceNumber
if sequenceRange.EndingSequenceNumber != nil {
sequenceNumber = *sequenceRange.EndingSequenceNumber
}
if sequenceNumber == "" {
sequenceNumber = *sequenceRange.StartingSequenceNumber
}
err := db.SetCheckpoint(*streamName, *v.ShardId, sequenceNumber)
//u.Debugf("Initializing checkpoint for shard %s.%s, SEQ = %s", *streamName, *v.ShardId,
// sequenceNumber)
if err != nil {
return 0, fmt.Errorf("failed to set inital checkpoint, %v", err)
}
initializedShardsInDB = true
}
}
if initializedShardsInDB {
time.Sleep(time.Duration(m.PostCheckpointInitDelay) * time.Second)
if m.InitialPos == "LATEST" && foundCheckpointRecords {
u.Errorf("Checkpoint enabled and records exist. Shard iterator is 'LATEST' setting it to 'AFTER_SEQUENCE_NUMBER'.")
m.InitialPos = "AFTER_SEQUENCE_NUMBER"
}

m.metrics = cloudwatch.New(sess)
Expand Down Expand Up @@ -360,9 +346,8 @@ func (m *Main) MainProcessingLoop() error {
u.Warnf("Received Cancellation.")
}
if m.InitialPos == "TRIM_HORIZON" {
u.Error("can't re-initialize 'in-place' if set to TRIM_HORIZON, exiting")
// os.Exit(1)
return fmt.Errorf("can't re-initialize 'in-place' if set to TRIM_HORIZON")
u.Error("initial position was TRIM_HORIZON, re-initializing with set to LATEST ")
m.InitialPos = "LATEST"
}
u.Warnf("Re-initializing.")
var err error
Expand Down
6 changes: 0 additions & 6 deletions quanta-kinesis-consumer/quanta-kinesis-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func main() {
deaggregate := app.Flag("deaggregate", "Incoming payload records are aggregated.").Bool()
scanInterval := app.Flag("scan-interval", "Scan interval (milliseconds)").Default("1000").Int()
protoPath := app.Flag("proto-path", "Path to protobuf descriptor files root directory.").String()
postCheckpointInitDelay := app.Flag("post-checkpoint-init-delay", "Delay seconds after checkpoint table init.").Default("30").Int()
logLevel := app.Flag("log-level", "Log Level [ERROR, WARN, INFO, DEBUG]").Default("WARN").String()

kingpin.MustParse(app.Parse(os.Args[1:]))
Expand All @@ -66,7 +65,6 @@ func main() {
main.ScanInterval = *scanInterval
main.Port = int(*port)
main.ConsulAddr = *consul
main.PostCheckpointInitDelay = *postCheckpointInitDelay

log.Printf("Set Logging level to %v.", *logLevel)
log.Printf("Kinesis stream %v.", main.Stream)
Expand Down Expand Up @@ -95,10 +93,6 @@ func main() {
os.Exit(1)
}
log.Printf("DynamoDB checkpoint table name [%s]", main.CheckpointTable)
if main.InitialPos == "LATEST" {
u.Errorf("Checkpoint enabled. Shard iterator is 'LATEST' setting it to 'AFTER_SEQUENCE_NUMBER'.")
main.InitialPos = "AFTER_SEQUENCE_NUMBER"
}
}

if shardKey != nil {
Expand Down
2 changes: 1 addition & 1 deletion sink/s3sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (s *S3ParquetSink) Open(ctx *plan.Context, bucketpath string, params map[st
case value.BoolType:
s.md[i] = fmt.Sprintf("name=%s, type=BOOLEAN", v.As)
default:
s.md[i] = fmt.Sprintf("name=%s, type=UTF8, encoding=PLAIN_DICTIONARY", v.As)
s.md[i] = fmt.Sprintf("name=%s, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY", v.As)
}
}

Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
quanta 0.9.15-rc-6
quanta 0.9.15-rc-8


0 comments on commit e6e760b

Please sign in to comment.