diff --git a/cmd/conduit/root/pipelines/describe.go b/cmd/conduit/root/pipelines/describe.go new file mode 100644 index 000000000..2c3d5406f --- /dev/null +++ b/cmd/conduit/root/pipelines/describe.go @@ -0,0 +1,176 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "context" + "fmt" + "strings" + + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/cecdysis" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" +) + +var ( + _ cecdysis.CommandWithExecuteWithClient = (*DescribeCommand)(nil) + _ ecdysis.CommandWithAliases = (*DescribeCommand)(nil) + _ ecdysis.CommandWithDocs = (*DescribeCommand)(nil) + _ ecdysis.CommandWithArgs = (*DescribeCommand)(nil) +) + +type DescribeArgs struct { + PipelineID string +} +type DescribeCommand struct { + args DescribeArgs +} + +func (c *DescribeCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: "Describe an existing pipeline", + Long: `This command requires Conduit to be already running since it will list all pipelines registered +by Conduit. This will depend on the configured pipelines directory, which by default is /pipelines; however, it could +be configured via --pipelines.path at the time of running Conduit.`, + Example: "conduit pipelines describe\nconduit pipelines desc", + } +} + +func (c *DescribeCommand) Aliases() []string { return []string{"desc"} } + +func (c *DescribeCommand) Usage() string { return "describe" } + +func (c *DescribeCommand) Args(args []string) error { + if len(args) == 0 { + return cerrors.Errorf("requires a pipeline ID") + } + + if len(args) > 1 { + return cerrors.Errorf("too many arguments") + } + + c.args.PipelineID = args[0] + return nil +} + +func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error { + pipelineResp, err := client.PipelineServiceClient.GetPipeline(ctx, &apiv1.GetPipelineRequest{ + Id: c.args.PipelineID, + }) + if err != nil { + return fmt.Errorf("failed to get pipeline: %w", err) + } + + // needed to show processors in connectors too + connectorsResp, err := client.ConnectorServiceClient.ListConnectors(ctx, &apiv1.ListConnectorsRequest{ + PipelineId: c.args.PipelineID, + }) + if err != nil { + return fmt.Errorf("failed to list connectors for pipeline %s: %w", c.args.PipelineID, err) + } + + dlq, err := client.PipelineServiceClient.GetDLQ(ctx, &apiv1.GetDLQRequest{ + Id: c.args.PipelineID, + }) + if err != nil { + return fmt.Errorf("failed to fetch DLQ for pipeline %s: %w", c.args.PipelineID, err) + } + + err = displayPipeline(ctx, pipelineResp.Pipeline, connectorsResp.Connectors, dlq.Dlq) + if err != nil { + return fmt.Errorf("failed to display pipeline %s: %w", c.args.PipelineID, err) + } + + return nil +} + +func displayPipeline(ctx context.Context, pipeline *apiv1.Pipeline, connectors []*apiv1.Connector, dlq *apiv1.Pipeline_DLQ) error { + cobraCmd := ecdysis.CobraCmdFromContext(ctx) + w := cobraCmd.OutOrStdout() + var b strings.Builder + + // ID + fmt.Fprintf(&b, "ID: %s\n", pipeline.Id) + + // State + if pipeline.State != nil { + fmt.Fprintf(&b, "Status: %s\n", getPipelineStatus(pipeline)) + if pipeline.State.Error != "" { + fmt.Fprintf(&b, "Error: %s\n", pipeline.State.Error) + } + } + + // Config + if pipeline.Config != nil { + fmt.Fprintf(&b, "Name: %s\n", pipeline.Config.Name) + // no new line after description, as it's always added + // when parsed from the YAML config file + fmt.Fprintf(&b, "Description: %s", pipeline.Config.Description) + } + + // Connectors + b.WriteString("Sources:\n") + printConnectors(&b, connectors, apiv1.Connector_TYPE_SOURCE) + + printProcessors(&b, pipeline.ProcessorIds, 0) + + b.WriteString("Destinations:\n") + printConnectors(&b, connectors, apiv1.Connector_TYPE_DESTINATION) + + printDLQ(&b, dlq) + + // Timestamps + if pipeline.CreatedAt != nil { + fmt.Fprintf(&b, "Created At: %s\n", printTime(pipeline.CreatedAt)) + } + if pipeline.UpdatedAt != nil { + fmt.Fprintf(&b, "Updated At: %s\n", pipeline.UpdatedAt.AsTime().Format("2006-01-02T15:04:05Z")) + } + + // Write the complete string to the writer + _, err := w.Write([]byte(b.String())) + if err != nil { + return fmt.Errorf("writing output: %w", err) + } + + return nil +} + +func printDLQ(b *strings.Builder, dlq *apiv1.Pipeline_DLQ) { + b.WriteString("Dead-letter queue:\n") + fmt.Fprintf(b, "%sPlugin: %s\n", indentation(1), dlq.Plugin) +} + +func printConnectors(b *strings.Builder, connectors []*apiv1.Connector, connType apiv1.Connector_Type) { + for _, conn := range connectors { + if conn.Type == connType { + fmt.Fprintf(b, "%s- %s (%s)\n", indentation(1), conn.Id, conn.Plugin) + printProcessors(b, conn.ProcessorIds, 2) + } + } +} + +func printProcessors(b *strings.Builder, ids []string, indent int) { + if len(ids) == 0 { + return + } + + fmt.Fprintf(b, "%sProcessors:\n", indentation(indent)) + for _, id := range ids { + fmt.Fprintf(b, "%s- %s\n", indentation(indent+1), id) + } +} diff --git a/cmd/conduit/root/pipelines/list.go b/cmd/conduit/root/pipelines/list.go index 92f3e355c..7200e3e56 100644 --- a/cmd/conduit/root/pipelines/list.go +++ b/cmd/conduit/root/pipelines/list.go @@ -76,10 +76,10 @@ func displayPipelines(pipelines []*apiv1.Pipeline) { for _, p := range pipelines { r := []*simpletable.Cell{ - {Align: simpletable.AlignLeft, Text: p.Id}, - {Align: simpletable.AlignLeft, Text: p.State.Status.String()}, - {Align: simpletable.AlignLeft, Text: p.CreatedAt.AsTime().String()}, - {Align: simpletable.AlignLeft, Text: p.UpdatedAt.AsTime().String()}, + {Align: simpletable.AlignRight, Text: p.Id}, + {Align: simpletable.AlignLeft, Text: getPipelineStatus(p)}, + {Align: simpletable.AlignLeft, Text: printTime(p.CreatedAt)}, + {Align: simpletable.AlignLeft, Text: printTime(p.UpdatedAt)}, } table.Body.Cells = append(table.Body.Cells, r) diff --git a/cmd/conduit/root/pipelines/pipelines.go b/cmd/conduit/root/pipelines/pipelines.go index 0ae75ea5f..b55291c3e 100644 --- a/cmd/conduit/root/pipelines/pipelines.go +++ b/cmd/conduit/root/pipelines/pipelines.go @@ -32,6 +32,7 @@ func (c *PipelinesCommand) SubCommands() []ecdysis.Command { return []ecdysis.Command{ &InitCommand{}, &ListCommand{}, + &DescribeCommand{}, } } diff --git a/cmd/conduit/root/pipelines/print_utils.go b/cmd/conduit/root/pipelines/print_utils.go new file mode 100644 index 000000000..b4471d745 --- /dev/null +++ b/cmd/conduit/root/pipelines/print_utils.go @@ -0,0 +1,40 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "strings" + + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func indentation(level int) string { + return strings.Repeat(" ", level) +} + +func getPipelineStatus(pipeline *apiv1.Pipeline) string { + return prettyProtoEnum("STATUS_", pipeline.State.Status.String()) +} + +func prettyProtoEnum(prefix, protoEnum string) string { + return strings.ToLower( + strings.ReplaceAll(protoEnum, prefix, ""), + ) +} + +func printTime(ts *timestamppb.Timestamp) string { + return ts.AsTime().Format("2006-01-02T15:04:05Z") +} diff --git a/examples/pipelines/file-to-file.yaml b/examples/pipelines/file-to-file.yaml index b77df8f4c..0e40dad95 100644 --- a/examples/pipelines/file-to-file.yaml +++ b/examples/pipelines/file-to-file.yaml @@ -1,4 +1,4 @@ -version: 2.0 +version: 2.2 pipelines: - id: file-to-file status: running diff --git a/examples/pipelines/multiple-destinations.yaml b/examples/pipelines/multiple-destinations.yaml index 64c0837f7..ed3ae53f9 100644 --- a/examples/pipelines/multiple-destinations.yaml +++ b/examples/pipelines/multiple-destinations.yaml @@ -11,7 +11,10 @@ pipelines: plugin: builtin:generator settings: format.type: "structured" - format.options: "id:int,name:string,company:string,trial:bool" + format.options.id: "int" + format.options.name: "string" + format.options.company: "string" + format.options.trial: "bool" recordCount: "1" - id: file-destination-1 type: destination diff --git a/examples/pipelines/multiple-sources-with-processor.yml b/examples/pipelines/multiple-sources-with-processor.yml index 248f64f0c..cb389cc91 100644 --- a/examples/pipelines/multiple-sources-with-processor.yml +++ b/examples/pipelines/multiple-sources-with-processor.yml @@ -1,6 +1,6 @@ version: 2.2 pipelines: - - id: add-department + - id: multiple-source-with-processor status: running description: > An example pipeline that reads data (imaginary employees) from two generator @@ -13,14 +13,20 @@ pipelines: plugin: builtin:generator settings: format.type: "structured" - format.options: "id:int,name:string,company:string,trial:bool" + format.options.id: "int" + format.options.name: "string" + format.options.company: "string" + format.options.trial: "bool" recordCount: "1" - id: employees-2 type: source plugin: builtin:generator settings: format.type: "structured" - format.options: "id:int,name:string,company:string,trial:bool" + format.options.id: "int" + format.options.name: "string" + format.options.company: "string" + format.options.trial: "bool" recordCount: "2" - id: file-destination type: destination diff --git a/examples/pipelines/pipeline-with-dlq.yaml b/examples/pipelines/pipeline-with-dlq.yaml index aa31301a3..8b6c79875 100644 --- a/examples/pipelines/pipeline-with-dlq.yaml +++ b/examples/pipelines/pipeline-with-dlq.yaml @@ -1,4 +1,4 @@ -version: 2.2 +version: "2.2" pipelines: - id: pipeline-with-dlq status: running @@ -13,7 +13,8 @@ pipelines: plugin: builtin:generator settings: format.type: "structured" - format.options: "id:int,name:string" + format.options.id: "int" + format.options.name: "string" recordCount: "1" processors: - id: convert-name diff --git a/examples/pipelines/pipeline-with-standalone-connector.yaml b/examples/pipelines/pipeline-with-standalone-connector.yaml index 1770712a5..b945e2117 100644 --- a/examples/pipelines/pipeline-with-standalone-connector.yaml +++ b/examples/pipelines/pipeline-with-standalone-connector.yaml @@ -1,6 +1,6 @@ version: 2.2 pipelines: - - id: file-to-file + - id: chaos-to-file status: running description: > An example pipeline reading from the standalone chaos connector and writing into a file destination.