Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update interfaces and structs for admin use #2533

Merged
merged 2 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions common/persistence/serialization/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@ func (e *DeserializationError) Error() string {
}

func (t *serializerImpl) ShardInfoToBlob(info *persistencespb.ShardInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) ShardInfoFromBlob(data *commonpb.DataBlob, clusterName string) (*persistencespb.ShardInfo, error) {
shardInfo := &persistencespb.ShardInfo{}
err := proto3DecodeBlob(data, shardInfo)
err := ProtoDecodeBlob(data, shardInfo)

if err != nil {
return nil, err
Expand Down Expand Up @@ -317,124 +317,124 @@ func (t *serializerImpl) ShardInfoFromBlob(data *commonpb.DataBlob, clusterName
}

func (t *serializerImpl) NamespaceDetailToBlob(info *persistencespb.NamespaceDetail, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) NamespaceDetailFromBlob(data *commonpb.DataBlob) (*persistencespb.NamespaceDetail, error) {
result := &persistencespb.NamespaceDetail{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) HistoryTreeInfoToBlob(info *persistencespb.HistoryTreeInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) HistoryTreeInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.HistoryTreeInfo, error) {
result := &persistencespb.HistoryTreeInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) HistoryBranchToBlob(info *persistencespb.HistoryBranch, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) HistoryBranchFromBlob(data *commonpb.DataBlob) (*persistencespb.HistoryBranch, error) {
result := &persistencespb.HistoryBranch{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) WorkflowExecutionInfoToBlob(info *persistencespb.WorkflowExecutionInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) WorkflowExecutionInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.WorkflowExecutionInfo, error) {
result := &persistencespb.WorkflowExecutionInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) WorkflowExecutionStateToBlob(info *persistencespb.WorkflowExecutionState, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) WorkflowExecutionStateFromBlob(data *commonpb.DataBlob) (*persistencespb.WorkflowExecutionState, error) {
result := &persistencespb.WorkflowExecutionState{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) ActivityInfoToBlob(info *persistencespb.ActivityInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) ActivityInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.ActivityInfo, error) {
result := &persistencespb.ActivityInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) ChildExecutionInfoToBlob(info *persistencespb.ChildExecutionInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) ChildExecutionInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.ChildExecutionInfo, error) {
result := &persistencespb.ChildExecutionInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) SignalInfoToBlob(info *persistencespb.SignalInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) SignalInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.SignalInfo, error) {
result := &persistencespb.SignalInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) RequestCancelInfoToBlob(info *persistencespb.RequestCancelInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) RequestCancelInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.RequestCancelInfo, error) {
result := &persistencespb.RequestCancelInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) TimerInfoToBlob(info *persistencespb.TimerInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) TimerInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TimerInfo, error) {
result := &persistencespb.TimerInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) TaskInfoToBlob(info *persistencespb.AllocatedTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) TaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.AllocatedTaskInfo, error) {
result := &persistencespb.AllocatedTaskInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) TaskQueueInfoToBlob(info *persistencespb.TaskQueueInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) TaskQueueInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TaskQueueInfo, error) {
result := &persistencespb.TaskQueueInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) ChecksumToBlob(checksum *persistencespb.Checksum, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
// nil is replaced with empty object because it is not supported for "checksum" field in DB.
if checksum == nil {
checksum = &persistencespb.Checksum{}
}
return proto3EncodeBlob(checksum, encodingType)
return ProtoEncodeBlob(checksum, encodingType)
}

func (t *serializerImpl) ChecksumFromBlob(data *commonpb.DataBlob) (*persistencespb.Checksum, error) {
result := &persistencespb.Checksum{}
err := proto3DecodeBlob(data, result)
err := ProtoDecodeBlob(data, result)
if err != nil || result.GetFlavor() == enumsspb.CHECKSUM_FLAVOR_UNSPECIFIED {
// If result is an empty struct (Flavor is unspecified), replace it with nil, because everywhere in the code checksum is pointer type.
return nil, err
Expand All @@ -452,15 +452,15 @@ func (t *serializerImpl) QueueMetadataFromBlob(data *commonpb.DataBlob) (*persis
}

func (t *serializerImpl) ReplicationTaskToBlob(replicationTask *replicationspb.ReplicationTask, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(replicationTask, encodingType)
return ProtoEncodeBlob(replicationTask, encodingType)
}

func (t *serializerImpl) ReplicationTaskFromBlob(data *commonpb.DataBlob) (*replicationspb.ReplicationTask, error) {
result := &replicationspb.ReplicationTask{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func proto3DecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
func ProtoDecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
if data == nil {
// TODO: should we return nil or error?
return NewDeserializationError("cannot decode nil")
Expand Down Expand Up @@ -490,7 +490,7 @@ func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
case enumspb.ENCODING_TYPE_JSON:
return codec.NewJSONPBEncoder().Decode(data.Data, result)
case enumspb.ENCODING_TYPE_PROTO3:
return proto3DecodeBlob(data, result)
return ProtoDecodeBlob(data, result)
default:
return NewUnknownEncodingTypeError(data.EncodingType)
}
Expand All @@ -515,13 +515,13 @@ func encodeBlob(o proto.Message, encoding enumspb.EncodingType) (*commonpb.DataB
EncodingType: enumspb.ENCODING_TYPE_JSON,
}, nil
case enumspb.ENCODING_TYPE_PROTO3:
return proto3EncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3)
return ProtoEncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3)
default:
return nil, NewUnknownEncodingTypeError(encoding)
}
}

func proto3EncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) {
func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) {
if encoding != enumspb.ENCODING_TYPE_PROTO3 {
return nil, NewUnknownEncodingTypeError(encoding)
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ type (
getFinishedChan() <-chan struct{}
readTimerTasks() ([]tasks.Task, tasks.Task, bool, error)
completeTimerTask(time.Time, int64)
getAckLevel() timerKey
getReadLevel() timerKey
getAckLevel() tasks.Key
getReadLevel() tasks.Key
updateAckLevel() error
}
)
8 changes: 4 additions & 4 deletions service/history/historyEngineInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func newQueueProcessorBase(

var taskProcessor *taskProcessor
if !options.EnablePriorityTaskProcessor() {
taskProcessorOptions := taskProcessorOptions{
queueSize: options.BatchSize(),
workerCount: options.WorkerCount(),
taskProcessorOptions := TaskProcessorOptions{
QueueSize: options.BatchSize(),
WorkerCount: options.WorkerCount(),
}
taskProcessor = newTaskProcessor(taskProcessorOptions, shard, historyCache, logger)
taskProcessor = NewTaskProcessor(taskProcessorOptions, shard, historyCache, logger)
}

p := &queueProcessorBase{
Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *queueProcessorBase) Start() {
defer p.logger.Info("", tag.LifeCycleStarted, tag.ComponentTransferQueue)

if p.taskProcessor != nil {
p.taskProcessor.start()
p.taskProcessor.Start()
}
p.shutdownWG.Add(1)
p.notifyNewTask()
Expand All @@ -164,7 +164,7 @@ func (p *queueProcessorBase) Stop() {
}

if p.taskProcessor != nil {
p.taskProcessor.stop()
p.taskProcessor.Stop()
}
}

Expand Down Expand Up @@ -280,7 +280,7 @@ func (p *queueProcessorBase) submitTask(
) bool {

return p.taskProcessor.addTask(
newTaskInfo(
NewTaskInfo(
p.processor,
taskInfo,
initializeLoggerForTask(p.shard.GetShardID(), taskInfo, p.logger),
Expand Down
22 changes: 11 additions & 11 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ const (
)

type (
taskProcessorOptions struct {
queueSize int
workerCount int
TaskProcessorOptions struct {
QueueSize int
WorkerCount int
}

taskInfo struct {
Expand Down Expand Up @@ -92,7 +92,7 @@ type (
}
)

func newTaskInfo(
func NewTaskInfo(
processor taskExecutor,
task tasks.Task,
logger log.Logger,
Expand All @@ -109,36 +109,36 @@ func newTaskInfo(
}
}

func newTaskProcessor(
options taskProcessorOptions,
func NewTaskProcessor(
options TaskProcessorOptions,
shard shard.Context,
historyCache workflow.Cache,
logger log.Logger,
) *taskProcessor {

var workerNotificationChs []chan struct{}
for index := 0; index < options.workerCount; index++ {
for index := 0; index < options.WorkerCount; index++ {
workerNotificationChs = append(workerNotificationChs, make(chan struct{}, 1))
}

base := &taskProcessor{
shard: shard,
cache: historyCache,
shutdownCh: make(chan struct{}),
tasksCh: make(chan *taskInfo, options.queueSize),
tasksCh: make(chan *taskInfo, options.QueueSize),
config: shard.GetConfig(),
logger: logger,
metricsClient: shard.GetMetricsClient(),
timeSource: shard.GetTimeSource(),
workerNotificationChans: workerNotificationChs,
retryPolicy: common.CreatePersistenceRetryPolicy(),
numOfWorker: options.workerCount,
numOfWorker: options.WorkerCount,
}

return base
}

func (t *taskProcessor) start() {
func (t *taskProcessor) Start() {
for i := 0; i < t.numOfWorker; i++ {
t.workerWG.Add(1)
notificationChan := t.workerNotificationChans[i]
Expand All @@ -147,7 +147,7 @@ func (t *taskProcessor) start() {
t.logger.Info("Task processor started.")
}

func (t *taskProcessor) stop() {
func (t *taskProcessor) Stop() {
close(t.shutdownCh)
if success := common.AwaitWaitGroup(&t.workerWG, time.Minute); !success {
t.logger.Warn("Task processor timed out on shutdown.")
Expand Down
Loading