Skip to content

Commit

Permalink
Merge pull request #123 from volcengine/feat/limit
Browse files Browse the repository at this point in the history
Feat/limit
  • Loading branch information
zpp12354321 authored Aug 30, 2023
2 parents cb17815 + d891331 commit f4074d0
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 202 deletions.
2 changes: 1 addition & 1 deletion common/common_volcengine_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ func (c *Config) Client() (*SdkClient, error) {

func init() {
InitLocks()
InitSyncLimit()
//InitSyncLimit()
}
223 changes: 103 additions & 120 deletions common/common_volcengine_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package common

import (
"context"
"fmt"
"time"

re "github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
)

type Dispatcher struct {
rateInfo *RateInfo
//rateInfo *RateInfo
}

var defaultDispatcher *Dispatcher
Expand All @@ -25,46 +22,32 @@ func DefaultDispatcher() *Dispatcher {
return defaultDispatcher
}

type RateInfo struct {
Create *Rate
Read *Rate
Update *Rate
Delete *Rate
Data *Rate
}
type Rate struct {
Limiter *rate.Limiter
Semaphore *semaphore.Weighted
}
func (d *Dispatcher) initDispatcher(resourceService ResourceService, resourceDate *schema.ResourceData, resource *schema.Resource) {

func NewRateLimitDispatcher(r *RateInfo) *Dispatcher {
return &Dispatcher{
rateInfo: r,
}
}

func (d *Dispatcher) Create(resourceService ResourceService, resourceDate *schema.ResourceData, resource *schema.Resource) (err error) {
defer func() {
if d.rateInfo != nil && d.rateInfo.Create != nil && d.rateInfo.Create.Semaphore != nil {
d.rateInfo.Create.Semaphore.Release(1)
}
}()
if d.rateInfo != nil && d.rateInfo.Create != nil {
ctx := context.Background()
if d.rateInfo.Create.Limiter != nil {
err = d.rateInfo.Create.Limiter.Wait(ctx)
if err != nil {
return err
}
}
if d.rateInfo.Create.Semaphore != nil {
err = d.rateInfo.Create.Semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
}

}
//defer func() {
// if d.rateInfo != nil && d.rateInfo.Create != nil && d.rateInfo.Create.Semaphore != nil {
// d.rateInfo.Create.Semaphore.Release(1)
// }
//}()
//if d.rateInfo != nil && d.rateInfo.Create != nil {
// ctx := context.Background()
// if d.rateInfo.Create.Limiter != nil {
// err = d.rateInfo.Create.Limiter.Wait(ctx)
// if err != nil {
// return err
// }
// }
// if d.rateInfo.Create.Semaphore != nil {
// err = d.rateInfo.Create.Semaphore.Acquire(ctx, 1)
// if err != nil {
// return err
// }
// }
//
//}
callbacks := resourceService.CreateResource(resourceDate, resource)
var calls []SdkCall
for _, callback := range callbacks {
Expand All @@ -81,26 +64,26 @@ func (d *Dispatcher) Create(resourceService ResourceService, resourceDate *schem
}

func (d *Dispatcher) Update(resourceService ResourceService, resourceDate *schema.ResourceData, resource *schema.Resource) (err error) {
defer func() {
if d.rateInfo != nil && d.rateInfo.Update != nil && d.rateInfo.Update.Semaphore != nil {
d.rateInfo.Update.Semaphore.Release(1)
}
}()
if d.rateInfo != nil && d.rateInfo.Update != nil {
ctx := context.Background()
if d.rateInfo.Update.Limiter != nil {
err = d.rateInfo.Update.Limiter.Wait(ctx)
if err != nil {
return err
}
}
if d.rateInfo.Update.Semaphore != nil {
err = d.rateInfo.Update.Semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
}
}
//defer func() {
// if d.rateInfo != nil && d.rateInfo.Update != nil && d.rateInfo.Update.Semaphore != nil {
// d.rateInfo.Update.Semaphore.Release(1)
// }
//}()
//if d.rateInfo != nil && d.rateInfo.Update != nil {
// ctx := context.Background()
// if d.rateInfo.Update.Limiter != nil {
// err = d.rateInfo.Update.Limiter.Wait(ctx)
// if err != nil {
// return err
// }
// }
// if d.rateInfo.Update.Semaphore != nil {
// err = d.rateInfo.Update.Semaphore.Acquire(ctx, 1)
// if err != nil {
// return err
// }
// }
//}
var callbacks []Callback
if projectUpdateEnabled, ok := resourceService.(ProjectUpdateEnabled); ok {
projectUpdateCallback := NewProjectService(resourceService.GetClient()).ModifyProject(projectUpdateEnabled.ProjectTrn(),
Expand All @@ -124,26 +107,26 @@ func (d *Dispatcher) Update(resourceService ResourceService, resourceDate *schem
}

func (d *Dispatcher) Read(resourceService ResourceService, resourceDate *schema.ResourceData, resource *schema.Resource) (err error) {
defer func() {
if d.rateInfo != nil && d.rateInfo.Read != nil && d.rateInfo.Read.Semaphore != nil {
d.rateInfo.Read.Semaphore.Release(1)
}
}()
if d.rateInfo != nil && d.rateInfo.Read != nil {
ctx := context.Background()
if d.rateInfo.Read.Limiter != nil {
err = d.rateInfo.Read.Limiter.Wait(ctx)
if err != nil {
return err
}
}
if d.rateInfo.Read.Semaphore != nil {
err = d.rateInfo.Read.Semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
}
}
//defer func() {
// if d.rateInfo != nil && d.rateInfo.Read != nil && d.rateInfo.Read.Semaphore != nil {
// d.rateInfo.Read.Semaphore.Release(1)
// }
//}()
//if d.rateInfo != nil && d.rateInfo.Read != nil {
// ctx := context.Background()
// if d.rateInfo.Read.Limiter != nil {
// err = d.rateInfo.Read.Limiter.Wait(ctx)
// if err != nil {
// return err
// }
// }
// if d.rateInfo.Read.Semaphore != nil {
// err = d.rateInfo.Read.Semaphore.Acquire(ctx, 1)
// if err != nil {
// return err
// }
// }
//}

var (
instance map[string]interface{}
Expand Down Expand Up @@ -191,26 +174,26 @@ func (d *Dispatcher) Read(resourceService ResourceService, resourceDate *schema.
}

func (d *Dispatcher) Delete(resourceService ResourceService, resourceDate *schema.ResourceData, resource *schema.Resource) (err error) {
defer func() {
if d.rateInfo != nil && d.rateInfo.Delete != nil && d.rateInfo.Delete.Semaphore != nil {
d.rateInfo.Delete.Semaphore.Release(1)
}
}()
if d.rateInfo != nil && d.rateInfo.Delete != nil {
ctx := context.Background()
if d.rateInfo.Delete.Limiter != nil {
err = d.rateInfo.Delete.Limiter.Wait(ctx)
if err != nil {
return err
}
}
if d.rateInfo.Delete.Semaphore != nil {
err = d.rateInfo.Delete.Semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
}
}
//defer func() {
// if d.rateInfo != nil && d.rateInfo.Delete != nil && d.rateInfo.Delete.Semaphore != nil {
// d.rateInfo.Delete.Semaphore.Release(1)
// }
//}()
//if d.rateInfo != nil && d.rateInfo.Delete != nil {
// ctx := context.Background()
// if d.rateInfo.Delete.Limiter != nil {
// err = d.rateInfo.Delete.Limiter.Wait(ctx)
// if err != nil {
// return err
// }
// }
// if d.rateInfo.Delete.Semaphore != nil {
// err = d.rateInfo.Delete.Semaphore.Acquire(ctx, 1)
// if err != nil {
// return err
// }
// }
//}
var (
callbacks []Callback
unsubscribeInfo *UnsubscribeInfo
Expand Down Expand Up @@ -251,26 +234,26 @@ func (d *Dispatcher) Data(resourceService ResourceService, resourceDate *schema.
condition map[string]interface{}
collection []interface{}
)
defer func() {
if d.rateInfo != nil && d.rateInfo.Data != nil && d.rateInfo.Data.Semaphore != nil {
d.rateInfo.Data.Semaphore.Release(1)
}
}()
if d.rateInfo != nil && d.rateInfo.Data != nil {
ctx := context.Background()
if d.rateInfo.Data.Limiter != nil {
err = d.rateInfo.Data.Limiter.Wait(ctx)
if err != nil {
return err
}
}
if d.rateInfo.Data.Semaphore != nil {
err = d.rateInfo.Data.Semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
}
}
//defer func() {
// if d.rateInfo != nil && d.rateInfo.Data != nil && d.rateInfo.Data.Semaphore != nil {
// d.rateInfo.Data.Semaphore.Release(1)
// }
//}()
//if d.rateInfo != nil && d.rateInfo.Data != nil {
// ctx := context.Background()
// if d.rateInfo.Data.Limiter != nil {
// err = d.rateInfo.Data.Limiter.Wait(ctx)
// if err != nil {
// return err
// }
// }
// if d.rateInfo.Data.Semaphore != nil {
// err = d.rateInfo.Data.Semaphore.Acquire(ctx, 1)
// if err != nil {
// return err
// }
// }
//}
info = resourceService.DatasourceResources(resourceDate, resource)
condition, err = DataSourceToRequest(resourceDate, resource, info)
if err != nil {
Expand Down
45 changes: 45 additions & 0 deletions common/common_volcengine_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package common

import (
"fmt"

"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
)

var rateInfoMap map[string]*Rate

func init() {
rateInfoMap = map[string]*Rate{
"ecs.RunInstances.2020-04-01": {
Limiter: rate.NewLimiter(4, 10),
Semaphore: semaphore.NewWeighted(14),
},
"ecs.DescribeInstances.2020-04-01": {
Limiter: rate.NewLimiter(4, 10),
Semaphore: semaphore.NewWeighted(14),
},
"ecs.DeleteInstance.2020-04-01": {
Limiter: rate.NewLimiter(4, 10),
Semaphore: semaphore.NewWeighted(10),
},
"vpc.DescribeNetworkInterfaces.2020-04-01": {
Limiter: rate.NewLimiter(4, 10),
Semaphore: semaphore.NewWeighted(10),
},
"vpc.DescribeSubnets.2020-04-01": {
Limiter: rate.NewLimiter(4, 10),
Semaphore: semaphore.NewWeighted(10),
},
}
}

type Rate struct {
Limiter *rate.Limiter
Semaphore *semaphore.Weighted
}

func GetRateInfoMap(svc, action, version string) *Rate {
key := fmt.Sprintf("%s.%s.%s", svc, action, version)
return rateInfoMap[key]
}
32 changes: 13 additions & 19 deletions common/common_volcengine_sync_limit.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
package common

import (
"context"

"golang.org/x/sync/semaphore"
)

var syncSemaphore *semaphore.Weighted

func InitSyncLimit() {
syncSemaphore = semaphore.NewWeighted(10)
}

func Acquire() {
_ = syncSemaphore.Acquire(context.Background(), 1)
}

func Release() {
syncSemaphore.Release(1)
}
//var syncSemaphore *semaphore.Weighted
//
//func InitSyncLimit() {
// syncSemaphore = semaphore.NewWeighted(10)
//}
//
//func Acquire() {
// _ = syncSemaphore.Acquire(context.Background(), 1)
//}
//
//func Release() {
// syncSemaphore.Release(1)
//}
Loading

0 comments on commit f4074d0

Please sign in to comment.