Skip to content

Commit

Permalink
fix: circleci plugin pagination (#7770)
Browse files Browse the repository at this point in the history
* fix(circleci-plugin): correct page-token query param name

* fix(circleci-plugin): set default page size for circleci api requests to 20

* refactor(circleci-plugin): extract common api pagination functions to shared source file

---------

Co-authored-by: Lynwee <[email protected]>
  • Loading branch information
Nickcw6 and d4x1 authored Jul 25, 2024
1 parent 45db215 commit 7ac45c7
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 92 deletions.
40 changes: 9 additions & 31 deletions backend/plugins/circleci/tasks/job_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ limitations under the License.
package tasks

import (
"encoding/json"
"reflect"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/circleci/models"
"net/http"
"net/url"
"reflect"
)

const RAW_JOB_TABLE = "circleci_api_jobs"
Expand Down Expand Up @@ -63,33 +61,13 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
}

collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
Input: iterator,
GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
res := CircleciPageTokenResp[any]{}
err := api.UnmarshalResponse(prevPageResponse, &res)
if err != nil {
return nil, err
}
if res.NextPageToken == "" {
return nil, api.ErrFinishCollect
}
return res.NextPageToken, nil
},
Query: func(reqData *api.RequestData) (url.Values, errors.Error) {
query := url.Values{}
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
query.Set("page_token", reqData.CustomData.(string))
}
return query, nil
},
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)
return data.Items, err
},
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
Input: iterator,
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
})
if err != nil {
logger.Error(err, "collect jobs error")
Expand Down
37 changes: 7 additions & 30 deletions backend/plugins/circleci/tasks/pipeline_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ limitations under the License.
package tasks

import (
"encoding/json"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"net/http"
"net/url"
)

const RAW_PIPELINE_TABLE = "circleci_api_pipelines"
Expand All @@ -43,33 +40,13 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect pipelines")
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
res := CircleciPageTokenResp[any]{}
err := api.UnmarshalResponse(prevPageResponse, &res)
if err != nil {
return nil, err
}
if res.NextPageToken == "" {
return nil, api.ErrFinishCollect
}
return res.NextPageToken, nil
},
Query: func(reqData *api.RequestData) (url.Values, errors.Error) {
query := url.Values{}
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
query.Set("page_token", reqData.CustomData.(string))
}
return query, nil
},
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)
return data.Items, err
},
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
})
if err != nil {
logger.Error(err, "collect pipelines error")
Expand Down
30 changes: 30 additions & 0 deletions backend/plugins/circleci/tasks/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ limitations under the License.
package tasks

import (
"encoding/json"
"net/http"
"net/url"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
Expand Down Expand Up @@ -91,3 +95,29 @@ func findPipelineById(db dal.Dal, id string) (*models.CircleciPipeline, errors.E
}
return pipeline, nil
}

func ExtractNextPageToken(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
res := CircleciPageTokenResp[any]{}
err := api.UnmarshalResponse(prevPageResponse, &res)
if err != nil {
return nil, err
}
if res.NextPageToken == "" {
return nil, api.ErrFinishCollect
}
return res.NextPageToken, nil
}

func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, errors.Error) {
query := url.Values{}
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
query.Set("page-token", pageToken)
}
return query, nil
}

func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)
return data.Items, err
}
3 changes: 3 additions & 0 deletions backend/plugins/circleci/tasks/task_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ func DecodeAndValidateTaskOptions(options map[string]interface{}) (*CircleciOpti
if op.ConnectionId == 0 {
return nil, errors.Default.New("connectionId is invalid")
}
if op.PageSize == 0 {
op.PageSize = 20 // CircleCI API default page size
}
return &op, nil
}
40 changes: 9 additions & 31 deletions backend/plugins/circleci/tasks/workflow_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ limitations under the License.
package tasks

import (
"encoding/json"
"reflect"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/circleci/models"
"net/http"
"net/url"
"reflect"
)

const RAW_WORKFLOW_TABLE = "circleci_api_workflows"
Expand Down Expand Up @@ -63,33 +61,13 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
}

collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
Input: iterator,
GetNextPageCustomData: func(prevReqData *api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
res := CircleciPageTokenResp[any]{}
err := api.UnmarshalResponse(prevPageResponse, &res)
if err != nil {
return nil, err
}
if res.NextPageToken == "" {
return nil, api.ErrFinishCollect
}
return res.NextPageToken, nil
},
Query: func(reqData *api.RequestData) (url.Values, errors.Error) {
query := url.Values{}
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
query.Set("page_token", reqData.CustomData.(string))
}
return query, nil
},
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)
return data.Items, err
},
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
Input: iterator,
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
})
if err != nil {
logger.Error(err, "collect workflows error")
Expand Down

0 comments on commit 7ac45c7

Please sign in to comment.