-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.go
130 lines (114 loc) · 3.89 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"context"
"net/http"
"os"
"syscall"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/profiler"
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/configmap"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/entrypoint"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/config"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry/datadog"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry/metric/prometheus"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry/pprof"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)
const (
ServiceName = "stream"
ENVPrefix = "STREAM_"
)
func main() {
entrypoint.Run(run, config.New(), entrypoint.Config{ENVPrefix: ENVPrefix})
}
func run(ctx context.Context, cfg config.Config, posArgs []string) error {
// Create logger
logger := log.NewServiceLogger(os.Stdout, cfg.DebugLog) // nolint:forbidigo
// Get list of enabled components
components, err := stream.ParseComponentsList(posArgs)
if err == nil {
logger.Infof(ctx, "enabled components: %s", components.String())
} else {
return err
}
// Dump configuration, sensitive values are masked
dump, err := configmap.NewDumper().Dump(cfg).AsJSON(false)
if err == nil {
logger.Infof(ctx, "configuration: %s", string(dump))
} else {
return err
}
// Create process abstraction
proc := servicectx.New(servicectx.WithLogger(logger))
// PProf profiler
if cfg.PProf.Enabled {
logger.Infof(ctx, `PProf profiler enabled, listening on %q`, cfg.PProf.Listen)
srv := pprof.NewHTTPServer(cfg.PProf.Listen)
go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorf(ctx, `PProf HTTP server error: %s`, err)
}
}()
defer func() {
if err := srv.Close(); err != nil {
logger.Errorf(ctx, `cannot stop PProf HTTP server: %s`, err)
}
}()
}
// Datadog profiler
if cfg.Datadog.Enabled && cfg.Datadog.Profiler.Enabled {
logger.Infof(ctx, "Datadog profiler enabled")
if err := profiler.Start(profiler.WithProfileTypes(cfg.Datadog.Profiler.ProfilerTypes()...)); err != nil {
return err
}
defer profiler.Stop()
}
// Setup telemetry
tel, err := telemetry.New(
func() (trace.TracerProvider, error) {
if cfg.Datadog.Enabled {
return datadog.NewTracerProvider(
logger, proc,
tracer.WithGlobalTag("stream.components", components.String()),
tracer.WithRuntimeMetrics(),
tracer.WithSamplingRules([]tracer.SamplingRule{tracer.RateRule(1.0)}),
tracer.WithAnalyticsRate(1.0),
tracer.WithDebugMode(cfg.Datadog.Debug),
), nil
}
return nil, nil
},
func() (metric.MeterProvider, error) {
return prometheus.ServeMetrics(ctx, cfg.Metrics, logger, proc, ServiceName)
},
)
if err != nil {
return err
}
// Check max opened files limit
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
logger.Warnf(ctx, `cannot get opened file descriptors limit value: %s`, err)
} else if limit.Cur < 10000 {
logger.Warnf(ctx, `opened file descriptors limit is too small: %d`, limit.Cur)
}
// Create dependencies
d, err := dependencies.NewServiceScope(ctx, cfg, proc, logger, tel, os.Stdout, os.Stderr) //nolint:forbidigo
if err != nil {
return err
}
// Start service components
if err := stream.StartComponents(ctx, d, cfg, components...); err != nil {
return err
}
// Wait for the service shutdown
proc.WaitForShutdown()
return nil
}