Skip to content

Commit

Permalink
Allow perpetual cron schedules (#256)
Browse files Browse the repository at this point in the history
# Goals

Allow cron schedules to keep running when they run out of deals

# Implementation

- add attribute to the model
- add ability to set attribute on CreateSchedule 
- when jobs run out of deals, if they are cron and set to perpetual,
they don't get marked complete
  • Loading branch information
hannahhoward committed Sep 1, 2023
1 parent ab00b34 commit fd11e6d
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 69 deletions.
3 changes: 3 additions & 0 deletions client/swagger/models/model_schedule.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/swagger/models/schedule_create_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions docs/swagger/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -10383,6 +10383,9 @@
"scheduleCron": {
"type": "string"
},
"scheduleCronPerpetual": {
"type": "boolean"
},
"scheduleDealNumber": {
"type": "integer"
},
Expand Down Expand Up @@ -10617,6 +10620,10 @@
"description": "Schedule cron patter",
"type": "string"
},
"scheduleCronPerpetual": {
"description": "Whether a cron schedule should run in definitely",
"type": "boolean"
},
"scheduleDealNumber": {
"description": "Number of deals per scheduled time",
"type": "integer"
Expand Down
5 changes: 5 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5068,6 +5068,8 @@ definitions:
type: string
scheduleCron:
type: string
scheduleCronPerpetual:
type: boolean
scheduleDealNumber:
type: integer
scheduleDealSize:
Expand Down Expand Up @@ -5237,6 +5239,9 @@ definitions:
scheduleCron:
description: Schedule cron patter
type: string
scheduleCronPerpetual:
description: Whether a cron schedule should run in definitely
type: boolean
scheduleDealNumber:
description: Number of deals per scheduled time
type: integer
Expand Down
86 changes: 44 additions & 42 deletions handler/deal/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@ import (

//nolint:lll
type CreateRequest struct {
DatasetName string `json:"datasetName" validation:"required"` // Dataset name
Provider string `json:"provider" validation:"required"` // Provider
HTTPHeaders []string `json:"httpHeaders"` // http headers to be passed with the request (i.e. key=value)
URLTemplate string `json:"urlTemplate"` // URL template with PIECE_CID placeholder for boost to fetch the CAR file, i.e. http://127.0.0.1/piece/{PIECE_CID}.car
PricePerGBEpoch float64 `default:"0" json:"pricePerGbEpoch"` // Price in FIL per GiB per epoch
PricePerGB float64 `default:"0" json:"pricePerGb"` // Price in FIL per GiB
PricePerDeal float64 `default:"0" json:"pricePerDeal"` // Price in FIL per deal
Verified bool `default:"true" json:"verified"` // Whether the deal should be verified
IPNI bool `default:"true" json:"ipni"` // Whether the deal should be IPNI
KeepUnsealed bool `default:"true" json:"keepUnsealed"` // Whether the deal should be kept unsealed
StartDelay string `default:"72h" json:"startDelay"` // Deal start delay in epoch or in duration format, i.e. 1000, 72h
Duration string `default:"12840h" json:"duration"` // Duration in epoch or in duration format, i.e. 1500000, 2400h
ScheduleCron string `json:"scheduleCron"` // Schedule cron patter
ScheduleDealNumber int `json:"scheduleDealNumber"` // Number of deals per scheduled time
TotalDealNumber int `json:"totalDealNumber"` // Total number of deals
ScheduleDealSize string `json:"scheduleDealSize"` // Size of deals per schedule trigger in human readable format, i.e. 100 TiB
TotalDealSize string `json:"totalDealSize"` // Total size of deals in human readable format, i.e. 100 TiB
Notes string `json:"notes"` // Notes
MaxPendingDealSize string `json:"maxPendingDealSize"` // Max pending deal size in human readable format, i.e. 100 TiB
MaxPendingDealNumber int `json:"maxPendingDealNumber"` // Max pending deal number
DatasetName string `json:"datasetName" validation:"required"` // Dataset name
Provider string `json:"provider" validation:"required"` // Provider
HTTPHeaders []string `json:"httpHeaders"` // http headers to be passed with the request (i.e. key=value)
URLTemplate string `json:"urlTemplate"` // URL template with PIECE_CID placeholder for boost to fetch the CAR file, i.e. http://127.0.0.1/piece/{PIECE_CID}.car
PricePerGBEpoch float64 `default:"0" json:"pricePerGbEpoch"` // Price in FIL per GiB per epoch
PricePerGB float64 `default:"0" json:"pricePerGb"` // Price in FIL per GiB
PricePerDeal float64 `default:"0" json:"pricePerDeal"` // Price in FIL per deal
Verified bool `default:"true" json:"verified"` // Whether the deal should be verified
IPNI bool `default:"true" json:"ipni"` // Whether the deal should be IPNI
KeepUnsealed bool `default:"true" json:"keepUnsealed"` // Whether the deal should be kept unsealed
StartDelay string `default:"72h" json:"startDelay"` // Deal start delay in epoch or in duration format, i.e. 1000, 72h
Duration string `default:"12840h" json:"duration"` // Duration in epoch or in duration format, i.e. 1500000, 2400h
ScheduleCron string `json:"scheduleCron"` // Schedule cron patter
ScheduleCronPerpetual bool `json:"scheduleCronPerpetual"` // Whether a cron schedule should run in definitely
ScheduleDealNumber int `json:"scheduleDealNumber"` // Number of deals per scheduled time
TotalDealNumber int `json:"totalDealNumber"` // Total number of deals
ScheduleDealSize string `json:"scheduleDealSize"` // Size of deals per schedule trigger in human readable format, i.e. 100 TiB
TotalDealSize string `json:"totalDealSize"` // Total size of deals in human readable format, i.e. 100 TiB
Notes string `json:"notes"` // Notes
MaxPendingDealSize string `json:"maxPendingDealSize"` // Max pending deal size in human readable format, i.e. 100 TiB
MaxPendingDealNumber int `json:"maxPendingDealNumber"` // Max pending deal number
//nolint:tagliatelle
AllowedPieceCIDs []string `json:"allowedPieceCids"` // Allowed piece CIDs in this schedule
}
Expand Down Expand Up @@ -160,28 +161,29 @@ func createHandler(
}

schedule := model.Schedule{
DatasetID: dataset.ID,
URLTemplate: request.URLTemplate,
HTTPHeaders: request.HTTPHeaders,
Provider: request.Provider,
TotalDealNumber: request.TotalDealNumber,
TotalDealSize: int64(totalDealSize),
Verified: request.Verified,
KeepUnsealed: request.KeepUnsealed,
AnnounceToIPNI: request.IPNI,
StartDelay: startDelay,
Duration: duration,
State: model.ScheduleActive,
ScheduleDealNumber: request.ScheduleDealNumber,
ScheduleDealSize: int64(scheduleDealSize),
MaxPendingDealNumber: request.MaxPendingDealNumber,
MaxPendingDealSize: int64(pendingDealSize),
Notes: request.Notes,
AllowedPieceCIDs: request.AllowedPieceCIDs,
ScheduleCron: scheduleCron,
PricePerGBEpoch: request.PricePerGBEpoch,
PricePerGB: request.PricePerGB,
PricePerDeal: request.PricePerDeal,
DatasetID: dataset.ID,
URLTemplate: request.URLTemplate,
HTTPHeaders: request.HTTPHeaders,
Provider: request.Provider,
TotalDealNumber: request.TotalDealNumber,
TotalDealSize: int64(totalDealSize),
Verified: request.Verified,
KeepUnsealed: request.KeepUnsealed,
AnnounceToIPNI: request.IPNI,
StartDelay: startDelay,
Duration: duration,
State: model.ScheduleActive,
ScheduleDealNumber: request.ScheduleDealNumber,
ScheduleDealSize: int64(scheduleDealSize),
MaxPendingDealNumber: request.MaxPendingDealNumber,
MaxPendingDealSize: int64(pendingDealSize),
Notes: request.Notes,
AllowedPieceCIDs: request.AllowedPieceCIDs,
ScheduleCron: scheduleCron,
ScheduleCronPerpetual: request.ScheduleCronPerpetual,
PricePerGBEpoch: request.PricePerGBEpoch,
PricePerGB: request.PricePerGB,
PricePerDeal: request.PricePerDeal,
}

if err := database.DoRetry(ctx, func() error {
Expand Down
55 changes: 28 additions & 27 deletions model/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,33 +81,34 @@ func (d Deal) Key() string {
}

type Schedule struct {
ID uint32 `gorm:"primaryKey" json:"id"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
DatasetID uint32 `json:"datasetId"`
Dataset *Dataset `gorm:"foreignKey:DatasetID;constraint:OnDelete:CASCADE" json:"dataset,omitempty" swaggerignore:"true"`
URLTemplate string `json:"urlTemplate"`
HTTPHeaders StringSlice `gorm:"type:JSON" json:"httpHeaders"`
Provider string `json:"provider"`
PricePerGBEpoch float64 `json:"pricePerGbEpoch"`
PricePerGB float64 `json:"pricePerGb"`
PricePerDeal float64 `json:"pricePerDeal"`
TotalDealNumber int `json:"totalDealNumber"`
TotalDealSize int64 `json:"totalDealSize"`
Verified bool `json:"verified"`
KeepUnsealed bool `json:"keepUnsealed"`
AnnounceToIPNI bool `json:"announceToIpni"`
StartDelay time.Duration `json:"startDelay" swaggertype:"primitive,integer"`
Duration time.Duration `json:"duration" swaggertype:"primitive,integer"`
State ScheduleState `gorm:"index" json:"state"`
ScheduleCron string `json:"scheduleCron"`
ScheduleDealNumber int `json:"scheduleDealNumber"`
ScheduleDealSize int64 `json:"scheduleDealSize"`
MaxPendingDealNumber int `json:"maxPendingDealNumber"`
MaxPendingDealSize int64 `json:"maxPendingDealSize"`
Notes string `json:"notes"`
ErrorMessage string `json:"errorMessage"`
AllowedPieceCIDs StringSlice `gorm:"type:JSON" json:"allowedPieceCids"`
ID uint32 `gorm:"primaryKey" json:"id"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
DatasetID uint32 `json:"datasetId"`
Dataset *Dataset `gorm:"foreignKey:DatasetID;constraint:OnDelete:CASCADE" json:"dataset,omitempty" swaggerignore:"true"`
URLTemplate string `json:"urlTemplate"`
HTTPHeaders StringSlice `gorm:"type:JSON" json:"httpHeaders"`
Provider string `json:"provider"`
PricePerGBEpoch float64 `json:"pricePerGbEpoch"`
PricePerGB float64 `json:"pricePerGb"`
PricePerDeal float64 `json:"pricePerDeal"`
TotalDealNumber int `json:"totalDealNumber"`
TotalDealSize int64 `json:"totalDealSize"`
Verified bool `json:"verified"`
KeepUnsealed bool `json:"keepUnsealed"`
AnnounceToIPNI bool `json:"announceToIpni"`
StartDelay time.Duration `json:"startDelay" swaggertype:"primitive,integer"`
Duration time.Duration `json:"duration" swaggertype:"primitive,integer"`
State ScheduleState `gorm:"index" json:"state"`
ScheduleCron string `json:"scheduleCron"`
ScheduleCronPerpetual bool `json:"scheduleCronPerpetual"`
ScheduleDealNumber int `json:"scheduleDealNumber"`
ScheduleDealSize int64 `json:"scheduleDealSize"`
MaxPendingDealNumber int `json:"maxPendingDealNumber"`
MaxPendingDealSize int64 `json:"maxPendingDealSize"`
Notes string `json:"notes"`
ErrorMessage string `json:"errorMessage"`
AllowedPieceCIDs StringSlice `gorm:"type:JSON" json:"allowedPieceCids"`
}

type Wallet struct {
Expand Down
4 changes: 4 additions & 0 deletions service/dealpusher/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule)
})).First(&car).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
Logger.Infow("no more pieces to send deal", "schedule_id", schedule.ID)
// we're out of deals to schedule, but if we're running a perpetual cron, we simply put things on hold till next cron
if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual {
return "", nil
}
return model.ScheduleCompleted, nil
}
if err != nil {
Expand Down

1 comment on commit fd11e6d

@xinaxu
Copy link
Contributor

@xinaxu xinaxu commented on fd11e6d Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included in #221

Please sign in to comment.