Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jraddaoui committed Mar 18, 2024
0 parents commit f70b04b
Show file tree
Hide file tree
Showing 66 changed files with 5,230 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
**
!cmd
!hack/sampledata/xsd
!internal
!go.mod
!go.sum
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/build
/.tilt.env
1 change: 1 addition & 0 deletions .go-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.21.6
42 changes: 42 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# syntax = docker/dockerfile:1.4

ARG GO_VERSION

FROM golang:${GO_VERSION}-alpine AS build-go
WORKDIR /src
ENV CGO_ENABLED=0
COPY --link go.* ./
RUN --mount=type=cache,target=/go/pkg/mod go mod download
COPY --link . .

FROM build-go AS build-preprocessing-sfa-worker
ARG VERSION_PATH
ARG VERSION_LONG
ARG VERSION_SHORT
ARG VERSION_GIT_HASH
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build \
-trimpath \
-ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \
-o /out/preprocessing-sfa-worker \
./cmd/worker

FROM alpine:3.18.2 AS base
ARG USER_ID=1000
ARG GROUP_ID=1000
RUN addgroup -g ${GROUP_ID} -S preprocessing-sfa
RUN adduser -u ${USER_ID} -S -D preprocessing-sfa preprocessing-sfa
USER preprocessing-sfa

FROM base AS preprocessing-sfa-worker
ENV PYTHONUNBUFFERED=1
USER root
RUN apk add --update --no-cache python3 && \
ln -sf python3 /usr/bin/python && \
python3 -m ensurepip
USER preprocessing-sfa
RUN pip3 install --no-cache --upgrade pip lxml bagit==v1.8.1
COPY --from=build-preprocessing-sfa-worker --link /src/hack/sampledata/xsd/* /
COPY --from=build-preprocessing-sfa-worker --link /out/preprocessing-sfa-worker /home/preprocessing-sfa/bin/preprocessing-sfa-worker
CMD ["/home/preprocessing-sfa/bin/preprocessing-sfa-worker"]
83 changes: 83 additions & 0 deletions Tiltfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
version_settings(constraint=">=0.22.2")
secret_settings(disable_scrub=True)
load("ext://uibutton", "cmd_button", "text_input")
load('ext://dotenv', 'dotenv')

# Load tilt env file if it exists
dotenv_path = ".tilt.env"
if os.path.exists(dotenv_path):
dotenv(fn=dotenv_path)

# Configure trigger mode
true = ("true", "1", "yes", "t", "y")
trigger_mode = TRIGGER_MODE_MANUAL
if os.environ.get('TRIGGER_MODE_AUTO', '').lower() in true:
trigger_mode = TRIGGER_MODE_AUTO

# Docker images
custom_build(
ref="preprocessing-sfa-worker:dev",
command=["hack/build_docker.sh"],
deps=["."],
)

# Load Kubernetes resources
k8s_yaml(kustomize("hack/kube/overlays/dev"))

# SFA resources
k8s_resource(
"preprocessing-sfa-worker",
labels=["01-SFA"],
trigger_mode=trigger_mode
)

# Other resources
k8s_resource("mysql", port_forwards="3306", labels=["02-Others"])
k8s_resource("temporal", labels=["02-Others"])
k8s_resource("temporal-ui", port_forwards="8080", labels=["02-Others"])

# Tools
k8s_resource(
"mysql-recreate-databases",
labels=["03-Tools"],
auto_init=False,
trigger_mode=TRIGGER_MODE_MANUAL
)
k8s_resource(
"start-workflow",
labels=["03-Tools"],
auto_init=False,
trigger_mode=TRIGGER_MODE_MANUAL
)

# Buttons
cmd_button(
"submit",
argv=[
"sh",
"-c",
'FILENAME=$(basename -- "$LOCAL_PATH"); \
kubectl -n enduro-sdps cp "$LOCAL_PATH" preprocessing-sfa-worker-0:/tmp/"$FILENAME"; \
kubectl -n enduro-sdps delete secret start-workflow-secret --ignore-not-found; \
kubectl -n enduro-sdps create secret generic start-workflow-secret --from-literal=relative_path="$FILENAME"; \
tilt trigger start-workflow;',
],
location="nav",
icon_name="cloud_upload",
text="Submit",
inputs=[text_input("LOCAL_PATH", label="Local path")]
)
cmd_button(
"flush",
argv=[
"sh",
"-c",
"tilt trigger mysql-recreate-databases; \
sleep 5; \
tilt trigger temporal; \
tilt trigger preprocessing-sfa-worker;",
],
location="nav",
icon_name="delete",
text="Flush"
)
32 changes: 32 additions & 0 deletions Tiltfile.enduro
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version_settings(constraint=">=0.22.2")
secret_settings(disable_scrub=True)
load("ext://uibutton", "cmd_button", "text_input")
load("ext://dotenv", "dotenv")

# Load tilt env file if it exists
dotenv_path = ".tilt.env"
if os.path.exists(dotenv_path):
dotenv(fn=dotenv_path)

# Configure trigger mode
true = ("true", "1", "yes", "t", "y")
trigger_mode = TRIGGER_MODE_MANUAL
if os.environ.get("TRIGGER_MODE_AUTO", "").lower() in true:
trigger_mode = TRIGGER_MODE_AUTO

# Docker images
custom_build(
ref="preprocessing-sfa-worker:dev",
command=["hack/build_docker.sh"],
deps=["."],
)

# Load Kubernetes resources
k8s_yaml(kustomize("hack/kube/overlays/enduro"))

# Preprocessing resources
k8s_resource(
"preprocessing-sfa-worker",
labels=["Preprocessing"],
trigger_mode=trigger_mode
)
79 changes: 79 additions & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"runtime"

"github.com/spf13/pflag"
"go.artefactual.dev/tools/log"

"github.com/artefactual-sdps/preprocessing-sfa/cmd/worker/workercmd"
"github.com/artefactual-sdps/preprocessing-sfa/internal/config"
"github.com/artefactual-sdps/preprocessing-sfa/internal/version"
)

func main() {
p := pflag.NewFlagSet(workercmd.Name, pflag.ExitOnError)
p.String("config", "", "Configuration file")
if err := p.Parse(os.Args[1:]); err == flag.ErrHelp {
os.Exit(1)
} else if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

var cfg config.Configuration
configFile, _ := p.GetString("config")
configFileFound, configFileUsed, err := config.Read(&cfg, configFile)
if err != nil {
fmt.Printf("Failed to read configuration: %v\n", err)
os.Exit(1)
}

logger := log.New(os.Stderr,
log.WithName(workercmd.Name),
log.WithDebug(cfg.Debug),
log.WithLevel(cfg.Verbosity),
)
defer log.Sync(logger)

keys := []interface{}{
"version", version.Long,
"pid", os.Getpid(),
"go", runtime.Version(),
}
if version.GitCommit != "" {
keys = append(keys, "commit", version.GitCommit)
}
logger.Info("Starting...", keys...)

if configFileFound {
logger.Info("Configuration file loaded.", "path", configFileUsed)
} else {
logger.Info("Configuration file not found.")
}

ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() { <-c; cancel() }()

m := workercmd.NewMain(logger, cfg)

if err := m.Run(ctx); err != nil {
_ = m.Close()
os.Exit(1)
}

<-ctx.Done()

if err := m.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
logger.Error(err, "Failed to close the application.")
os.Exit(1)
}
}
108 changes: 108 additions & 0 deletions cmd/worker/workercmd/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package workercmd

import (
"context"

"github.com/go-logr/logr"
"go.artefactual.dev/tools/temporal"
temporalsdk_activity "go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
temporalsdk_workflow "go.temporal.io/sdk/workflow"

"github.com/artefactual-sdps/preprocessing-sfa/internal/activities"
"github.com/artefactual-sdps/preprocessing-sfa/internal/config"
"github.com/artefactual-sdps/preprocessing-sfa/internal/workflow"
)

const Name = "preprocessing-worker"

type Main struct {
logger logr.Logger
cfg config.Configuration
temporalWorker worker.Worker
temporalClient client.Client
}

func NewMain(logger logr.Logger, cfg config.Configuration) *Main {
return &Main{
logger: logger,
cfg: cfg,
}
}

func (m *Main) Run(ctx context.Context) error {
c, err := client.Dial(client.Options{
HostPort: m.cfg.Temporal.Address,
Namespace: m.cfg.Temporal.Namespace,
Logger: temporal.Logger(m.logger.WithName("temporal")),
})
if err != nil {
m.logger.Error(err, "Unable to create Temporal client.")
return err
}
m.temporalClient = c

w := worker.New(m.temporalClient, m.cfg.Temporal.TaskQueue, worker.Options{
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: m.cfg.Worker.MaxConcurrentSessions,
Interceptors: []interceptor.WorkerInterceptor{
temporal.NewLoggerInterceptor(m.logger.WithName("worker")),
},
})
m.temporalWorker = w

w.RegisterWorkflowWithOptions(
workflow.NewPreprocessingWorkflow(m.cfg.SharedPath).Execute,
temporalsdk_workflow.RegisterOptions{Name: m.cfg.Temporal.WorkflowName},
)

w.RegisterActivityWithOptions(
activities.NewExtractPackage().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.ExtractPackageName},
)
w.RegisterActivityWithOptions(
activities.NewCheckSipStructure().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.CheckSipStructureName},
)
w.RegisterActivityWithOptions(
activities.NewAllowedFileFormatsActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.AllowedFileFormatsName},
)
w.RegisterActivityWithOptions(
activities.NewMetadataValidationActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.MetadataValidationName},
)
w.RegisterActivityWithOptions(
activities.NewSipCreationActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.SipCreationName},
)
// w.RegisterActivityWithOptions(
// activities.NewSendToFailedBuckeActivity(ft, fs).Execute,
// temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedBucketName},
// )
w.RegisterActivityWithOptions(
activities.NewRemovePaths().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.RemovePathsName},
)

if err := w.Start(); err != nil {
m.logger.Error(err, "Worker failed to start or fatal error during its execution.")
return err
}

return nil
}

func (m *Main) Close() error {
if m.temporalWorker != nil {
m.temporalWorker.Stop()
}

if m.temporalClient != nil {
m.temporalClient.Close()
}

return nil
}
Loading

0 comments on commit f70b04b

Please sign in to comment.