Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI: Add pipelines describe command #2078

Merged
merged 14 commits into from
Jan 17, 2025
8 changes: 5 additions & 3 deletions cmd/conduit/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type Client struct {
conn *grpc.ClientConn
apiv1.PipelineServiceClient
apiv1.ConnectorServiceClient
healthgrpc.HealthClient
}

Expand All @@ -40,9 +41,10 @@ func NewClient(ctx context.Context, address string) (*Client, error) {
}

client := &Client{
conn: conn,
PipelineServiceClient: apiv1.NewPipelineServiceClient(conn),
HealthClient: healthgrpc.NewHealthClient(conn),
conn: conn,
PipelineServiceClient: apiv1.NewPipelineServiceClient(conn),
ConnectorServiceClient: apiv1.NewConnectorServiceClient(conn),
HealthClient: healthgrpc.NewHealthClient(conn),
}

if err := client.CheckHealth(ctx, address); err != nil {
Expand Down
173 changes: 173 additions & 0 deletions cmd/conduit/root/pipelines/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelines

import (
"context"
"fmt"
"strings"

"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/cmd/conduit/cecdysis"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"github.com/conduitio/ecdysis"
)

var (
_ cecdysis.CommandWithExecuteWithClient = (*DescribeCommand)(nil)
_ ecdysis.CommandWithAliases = (*DescribeCommand)(nil)
_ ecdysis.CommandWithDocs = (*DescribeCommand)(nil)
raulb marked this conversation as resolved.
Show resolved Hide resolved
)

type DescribeCommand struct {
PipelineID string
raulb marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *DescribeCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "Describe an existing ",
raulb marked this conversation as resolved.
Show resolved Hide resolved
Long: `This command requires Conduit to be already running since it will list all pipelines registered
by Conduit. This will depend on the configured pipelines directory, which by default is /pipelines; however, it could
be configured via --pipelines.path at the time of running Conduit.`,
Example: "conduit pipelines describe",
hariso marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (c *DescribeCommand) Aliases() []string { return []string{"desc"} }

func (c *DescribeCommand) Usage() string { return "describe" }

func (c *DescribeCommand) Args(args []string) error {
if len(args) == 0 {
return cerrors.Errorf("requires a pipeline ID")
}

if len(args) > 1 {
return cerrors.Errorf("too many arguments")
}

c.PipelineID = args[0]
return nil
}

func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error {
pipelineResp, err := client.PipelineServiceClient.GetPipeline(ctx, &apiv1.GetPipelineRequest{
Id: c.PipelineID,
})
if err != nil {
return fmt.Errorf("failed to list pipelines: %w", err)
hariso marked this conversation as resolved.
Show resolved Hide resolved
}

// needed to show processors in connectors too
connectorsResp, err := client.ConnectorServiceClient.ListConnectors(ctx, &apiv1.ListConnectorsRequest{
PipelineId: c.PipelineID,
})
if err != nil {
return fmt.Errorf("failed to list connectors for pipeline %s: %w", c.PipelineID, err)
}

dlq, err := client.PipelineServiceClient.GetDLQ(ctx, &apiv1.GetDLQRequest{
Id: c.PipelineID,
})
if err != nil {
return fmt.Errorf("failed to fetch DLQ for pipeline %s: %w", c.PipelineID, err)
}

err = displayPipeline(ctx, pipelineResp.Pipeline, connectorsResp.Connectors, dlq.Dlq)
if err != nil {
return fmt.Errorf("failed to display pipeline %s: %w", c.PipelineID, err)
}

return nil
}

func displayPipeline(ctx context.Context, pipeline *apiv1.Pipeline, connectors []*apiv1.Connector, dlq *apiv1.Pipeline_DLQ) error {
cobraCmd := ecdysis.CobraCmdFromContext(ctx)
w := cobraCmd.OutOrStdout()
var b strings.Builder

// ID
fmt.Fprintf(&b, "ID: %s\n", pipeline.Id)

// State
if pipeline.State != nil {
fmt.Fprintf(&b, "Status: %s\n", getPipelineStatus(pipeline))
if pipeline.State.Error != "" {
fmt.Fprintf(&b, "Error: %s\n", pipeline.State.Error)
}
}

// Config
if pipeline.Config != nil {
fmt.Fprintf(&b, "Name: %s\n", pipeline.Config.Name)
fmt.Fprintf(&b, "Description: %s\n", pipeline.Config.Description)
}

// Connectors
b.WriteString("Sources:\n")
printConnectors(&b, connectors, apiv1.Connector_TYPE_SOURCE)

printProcessors(&b, pipeline.ProcessorIds, 0)

b.WriteString("Destinations:\n")
printConnectors(&b, connectors, apiv1.Connector_TYPE_DESTINATION)

b.WriteString("Destinations:\n")
printConnectors(&b, connectors, apiv1.Connector_TYPE_DESTINATION)
hariso marked this conversation as resolved.
Show resolved Hide resolved

printDLQ(&b, dlq)

// Timestamps
if pipeline.CreatedAt != nil {
fmt.Fprintf(&b, "Created At: %s\n", printTime(pipeline.CreatedAt))
}
if pipeline.UpdatedAt != nil {
fmt.Fprintf(&b, "Updated At: %s\n", pipeline.UpdatedAt.AsTime().Format("2006-01-02T15:04:05Z"))
}

// Write the complete string to the writer
_, err := w.Write([]byte(b.String()))
if err != nil {
return fmt.Errorf("writing output: %w", err)
}

return nil
}

func printDLQ(b *strings.Builder, dlq *apiv1.Pipeline_DLQ) {
b.WriteString("Dead-letter queue:\n")
fmt.Fprintf(b, "%sPlugin: %s\n", indentation(1), dlq.Plugin)
}

func printConnectors(b *strings.Builder, connectors []*apiv1.Connector, connType apiv1.Connector_Type) {
for _, conn := range connectors {
if conn.Type == connType {
fmt.Fprintf(b, "%s- %s (%s)\n", indentation(1), conn.Id, conn.Plugin)
printProcessors(b, conn.ProcessorIds, 2)
}
}
}

func printProcessors(b *strings.Builder, ids []string, indent int) {
if len(ids) == 0 {
return
}

fmt.Fprintf(b, "%sProcessors:\n", indentation(indent))
for _, id := range ids {
fmt.Fprintf(b, "%s- %s\n", indentation(indent+1), id)
}
}
6 changes: 3 additions & 3 deletions cmd/conduit/root/pipelines/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func displayPipelines(pipelines []*apiv1.Pipeline) {
for _, p := range pipelines {
r := []*simpletable.Cell{
{Align: simpletable.AlignRight, Text: p.Id},
{Align: simpletable.AlignLeft, Text: p.State.Status.String()},
{Align: simpletable.AlignLeft, Text: p.CreatedAt.AsTime().String()},
{Align: simpletable.AlignLeft, Text: p.UpdatedAt.AsTime().String()},
{Align: simpletable.AlignLeft, Text: getPipelineStatus(p)},
{Align: simpletable.AlignLeft, Text: printTime(p.CreatedAt)},
{Align: simpletable.AlignLeft, Text: printTime(p.UpdatedAt)},
}

table.Body.Cells = append(table.Body.Cells, r)
Expand Down
1 change: 1 addition & 0 deletions cmd/conduit/root/pipelines/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (c *PipelinesCommand) SubCommands() []ecdysis.Command {
return []ecdysis.Command{
&InitCommand{},
&ListCommand{},
&DescribeCommand{},
}
}

Expand Down
40 changes: 40 additions & 0 deletions cmd/conduit/root/pipelines/print_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelines

import (
"strings"

apiv1 "github.com/conduitio/conduit/proto/api/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)

func indentation(level int) string {
return strings.Repeat(" ", level)
}

func getPipelineStatus(pipeline *apiv1.Pipeline) string {
return prettyProtoEnum("STATUS_", pipeline.State.Status.String())
}

func prettyProtoEnum(prefix, protoEnum string) string {
return strings.ToLower(
strings.ReplaceAll(protoEnum, prefix, ""),
)
}

func printTime(ts *timestamppb.Timestamp) string {
return ts.AsTime().Format("2006-01-02T15:04:05Z")
}
2 changes: 1 addition & 1 deletion examples/pipelines/file-to-file.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 2.0
version: 2.2
pipelines:
hariso marked this conversation as resolved.
Show resolved Hide resolved
- id: file-to-file
status: running
Expand Down
5 changes: 4 additions & 1 deletion examples/pipelines/multiple-destinations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ pipelines:
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
format.options.id: "int"
hariso marked this conversation as resolved.
Show resolved Hide resolved
format.options.name: "string"
format.options.company: "string"
format.options.trial: "bool"
recordCount: "1"
- id: file-destination-1
type: destination
Expand Down
12 changes: 9 additions & 3 deletions examples/pipelines/multiple-sources-with-processor.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: 2.2
pipelines:
- id: add-department
- id: multiple-source-with-processor
status: running
description: >
An example pipeline that reads data (imaginary employees) from two generator
Expand All @@ -13,14 +13,20 @@ pipelines:
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
format.options.id: "int"
format.options.name: "string"
format.options.company: "string"
format.options.trial: "bool"
recordCount: "1"
- id: employees-2
type: source
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
format.options.id: "int"
format.options.name: "string"
format.options.company: "string"
format.options.trial: "bool"
recordCount: "2"
- id: file-destination
type: destination
Expand Down
7 changes: 4 additions & 3 deletions examples/pipelines/pipeline-with-dlq.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 2.2
version: "2.2"
pipelines:
- id: pipeline-with-dlq
status: running
Expand All @@ -13,7 +13,8 @@ pipelines:
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string"
format.options.id: "int"
format.options.name: "string"
recordCount: "1"
processors:
- id: convert-name
Expand All @@ -31,4 +32,4 @@ pipelines:
settings: # Configure the file plugin used for DLQ
path: "./dlq.out"
window-size: 2 # DLQ nack window size
window-nack-threshold: 1 # DLQ nack window threshold
window-nack-threshold: 1 # DLQ nack window threshold
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: 2.2
pipelines:
- id: file-to-file
- id: chaos-to-file
hariso marked this conversation as resolved.
Show resolved Hide resolved
status: running
description: >
An example pipeline reading from the standalone chaos connector and writing into a file destination.
Expand Down
Loading