Skip to content

Commit

Permalink
feat: allow usage of tcp socket between agent and worker (#818)
Browse files Browse the repository at this point in the history
# Motivation

Java does not support grpc communication between the worker and the
agent over unix sockets, this PR builds on the changes introduced in
aneoconsulting/ArmoniK.Api#583 to enable the
option to use a tcp socket instead.

# Description

Mimic the changes in
aneoconsulting/ArmoniK.Api#583 and enlarged the
kestrel configuration options in the `AgentHandler` such that it also
accepts to listen on a tcp socket. In addition this PR separates the
volumes used for shared data and unix socket. Before the PR both of them
where mounted on `\cache`, the updated mount point for shared data is
`\comm`.

# Testing

Terraform deployment and `just` file have been modified to take into
account a new variable `socket_type`, that can be set to either
`unixdomainsocket` or `tcp` so the deployment configures which socket
type should be employed accordingly. With these changes the same set of
tests that ran for unix sockets can be executed but using a tcp socket
instead .

# Impact

The desired life cycle behavior in a deployment with the whole
infrastructure is that the worker starts before the agent and stops
after it. This is currently enforced by checking the absence/existence
of the unix socket created by the agent. With the changes of this PR, in
the case of choosing a tcp socket for the connection, the unix socket is
not created. Hence, the infra should be adapted accordingly to guarantee
the desired worker/agent life cycle. One option is to transform the
worker container in Kubernetes POD into a sidecar container, but this
will require the direct use of Helm charts since the Kubernetes provider
we use for terraform does not support this yet.

# Additional Information

Not applicable

# Checklist

- [x] My code adheres to the coding and style guidelines of the project.
- [x] I have performed a self-review of my code.
- [ ] I have commented my code, particularly in hard-to-understand
areas.
- [ ] I have made corresponding changes to the documentation.
- [x] I have thoroughly tested my modifications and added tests when
necessary.
- [x] Tests pass locally and in the CI.
- [ ] I have assessed the performance impact of my modifications.
  • Loading branch information
jfonseca-aneo authored Jan 20, 2025
2 parents 0ad9ee2 + b460bf9 commit 51fd13a
Show file tree
Hide file tree
Showing 18 changed files with 131 additions and 48 deletions.
62 changes: 41 additions & 21 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -441,20 +441,40 @@ jobs:
strategy:
fail-fast: false
matrix:
queue:
- activemq
- rabbitmq
- rabbitmq091
- pubsub
- sqs
object:
- redis
- minio
- embed
log-level:
- Information
- Verbose
name: HtcMock ${{ matrix.queue }} ${{ matrix.object }} ${{ matrix.log-level }}
target:
- { queue: activemq, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: pubsub, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: sqs, object: redis, log-level: Information, socket_type: unixdomainsocket }

- { queue: activemq, object: redis, log-level: Information, socket_type: tcp }
- { queue: rabbitmq, object: redis, log-level: Information, socket_type: tcp }
- { queue: rabbitmq091, object: redis, log-level: Information, socket_type: tcp }
- { queue: pubsub, object: redis, log-level: Information, socket_type: tcp }
- { queue: sqs, object: redis, log-level: Information, socket_type: tcp }

- { queue: activemq, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: pubsub, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: sqs, object: redis, log-level: Verbose, socket_type: unixdomainsocket }

- { queue: activemq, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: pubsub, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: sqs, object: minio, log-level: Information, socket_type: unixdomainsocket }

- { queue: activemq, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: pubsub, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: sqs, object: embed, log-level: Information, socket_type: unixdomainsocket }

- { queue: activemq, object: redis, log-level: Information, socket_type: unixdomainsocket }

name: HtcMock ${{ matrix.target.queue }} ${{ matrix.target.object }} ${{ matrix.target.log-level }} ${{ matrix.target.socket_type }}
steps:
- name: Checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
Expand All @@ -478,7 +498,7 @@ jobs:
- name: Deploy Core
run: |
MONITOR_PREFIX="monitor/deploy/" tools/retry.sh -w 30 -- tools/monitor.sh \
just log_level=${{ matrix.log-level }} tag=${VERSION} queue=${{ matrix.queue }} object=${{ matrix.object }} worker=htcmock deploy
just log_level=${{ matrix.target.log-level }} tag=${VERSION} queue=${{ matrix.target.queue }} object=${{ matrix.target.object }} socket_type=${{ matrix.target.socket_type }} worker=htcmock deploy
sleep 10
- name: Print And Time Metrics
Expand Down Expand Up @@ -547,7 +567,7 @@ jobs:
- name: Run HtcMock test 1000 tasks 1 level
timeout-minutes: 3
if: ${{ matrix.log-level != 'Verbose' }}
if: ${{ matrix.target.log-level != 'Verbose' }}
run: |
MONITOR_PREFIX="monitor/htcmock-1000-1/" tools/monitor.sh \
docker run --net armonik_network --rm \
Expand All @@ -562,7 +582,7 @@ jobs:
- name: Run HtcMock test 1000 tasks 4 levels
timeout-minutes: 3
if: ${{ matrix.log-level != 'Verbose' }}
if: ${{ matrix.target.log-level != 'Verbose' }}
run: |
MONITOR_PREFIX="monitor/htcmock-1000-4/" tools/monitor.sh \
docker run --net armonik_network --rm \
Expand All @@ -586,8 +606,8 @@ jobs:
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
docker cp fluentd:/armonik-logs.json - | gzip -c | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}.json.gz
tar -czf - monitor/ | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-monitor.tar.gz
docker cp fluentd:/armonik-logs.json - | gzip -c | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}.json.gz
tar -czf - monitor/ | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}-monitor.tar.gz
- name: Collect docker container logs
uses: jwalton/gh-docker-logs@2741064ab9d7af54b0b1ffb6076cf64c16f0220e # v2
Expand All @@ -599,14 +619,14 @@ jobs:
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
tar -cvf - ./container-logs | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-container-logs.tar.gz
tar -cvf - ./container-logs | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}-container-logs.tar.gz
- name: Export and upload database
if: always()
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
bash tools/export_mongodb.sh
tar -cvf - *.json | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-database.tar.gz
tar -cvf - *.json | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}-database.tar.gz
testWindowsDocker:
needs:
Expand Down
2 changes: 1 addition & 1 deletion Common/src/ArmoniK.Core.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


<ItemGroup>
<PackageReference Include="ArmoniK.Api.Core" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Core" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
26 changes: 21 additions & 5 deletions Common/src/Pollster/AgentHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,29 @@ public AgentHandler(LoggerInit loggerInit,

builder.WebHost.ConfigureKestrel(options =>
{
if (File.Exists(computePlaneOptions.AgentChannel.Address))
var address = computePlaneOptions.AgentChannel.Address;
switch (computePlaneOptions.AgentChannel.SocketType)
{
File.Delete(computePlaneOptions.AgentChannel.Address);
case GrpcSocketType.UnixDomainSocket:
{
if (File.Exists(address))
{
File.Delete(address);
}

options.ListenUnixSocket(address,
listenOptions => listenOptions.Protocols = HttpProtocols.Http2);
break;
}
case GrpcSocketType.Tcp:
var uri = new Uri(address);
options.ListenAnyIP(uri.Port,
listenOptions => listenOptions.Protocols = HttpProtocols.Http2);

break;
default:
throw new InvalidOperationException("Socket type unknown");
}

options.ListenUnixSocket(computePlaneOptions.AgentChannel.Address,
listenOptions => listenOptions.Protocols = HttpProtocols.Http2);
});

app_ = builder.Build();
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 as base-linux
RUN groupadd --gid 5000 armonikuser && useradd --home-dir /home/armonikuser --create-home --uid 5000 --gid 5000 --shell /bin/sh --skel /dev/null armonikuser
RUN mkdir /cache /local_storage && chown armonikuser: /cache /local_storage
RUN mkdir /cache /local_storage /comm && chown armonikuser: /cache /local_storage /comm
USER armonikuser
ENTRYPOINT [ "dotnet" ]

Expand Down
2 changes: 1 addition & 1 deletion Tests/Bench/Server/src/ArmoniK.Samples.Bench.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
2 changes: 1 addition & 1 deletion Tests/Common/Client/src/ArmoniK.Core.Common.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<PackageReference Include="Serilog.Settings.Configuration" Version="8.0.4" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageReference Include="Serilog.Formatting.Compact" Version="3.0.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
7 changes: 5 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ingress := "true"
prometheus := "true"
grafana := "true"
seq := "true"
socket_type := "unixdomainsocket"

# Export them as terraform environment variables
export TF_VAR_core_tag := tag
Expand All @@ -30,7 +31,7 @@ export TF_VAR_num_partitions := partitions
export TF_VAR_enable_grafana := grafana
export TF_VAR_enable_seq := seq
export TF_VAR_enable_prometheus := prometheus

export TF_VAR_socket_type := socket_type

# Sets the queue
export TF_VAR_queue_storage := if queue == "rabbitmq" {
Expand Down Expand Up @@ -137,7 +138,7 @@ export TF_VAR_mongodb_params:= if os_family() == "windows" {
}

export TF_VAR_compute_plane:= if os_family() == "windows" {
'{ "polling_agent" : { "image" : "' + image_polling_agent + '", "shared_socket" : "c:/cache", "shared_data" : "c:/cache" }, "worker" = {}}'
'{ "polling_agent" : { "image" : "' + image_polling_agent + '", "shared_socket" : "c:/cache", "shared_data" : "c:/comm" }, "worker" = {}}'
} else {
'{ "polling_agent" : { "image" : "' + image_polling_agent + '" }, "worker" = {}}'
}
Expand Down Expand Up @@ -189,6 +190,8 @@ _usage:
local_images: Let terraform build the docker images locally (default = false)
socket_type: Socket type used by agent and worker to communicate (default = unixdomainsocket)
IMPORTANT: In order to properly destroy the resources created you should call the recipe destroy with the
same parameters used for deploy
EOF
Expand Down
1 change: 1 addition & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ module "compute_plane" {
core_tag = local.compute_plane.tag
polling_agent = local.compute_plane.polling_agent
worker = local.compute_plane.worker
socket_type = var.socket_type
generated_env_vars = local.environment
volumes = local.volumes
network = docker_network.armonik.id
Expand Down
4 changes: 4 additions & 0 deletions terraform/modules/compute_plane/inputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ variable "core_tag" {
type = string
}

variable "socket_type" {
type = string
}

variable "polling_agent" {
type = object({
name = string,
Expand Down
13 changes: 9 additions & 4 deletions terraform/modules/compute_plane/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ locals {
"Amqp__PartitionId=TestPartition${local.partition_chooser}",
"PubSub__PartitionId=TestPartition${local.partition_chooser}",
]
worker_tcp = format("%s://%s:%s", "http", "${var.worker.name}${var.replica_counter}", "10667")
worker_socket = format("%s/%s", var.polling_agent.shared_socket, "armonik_worker.sock")
agent_tcp = format("%s://%s:%s", "http", "${var.polling_agent.name}${var.replica_counter}", "10666")
agent_socket = format("%s/%s", var.polling_agent.shared_socket, "armonik_agent.sock")
common_env = [
"ComputePlane__WorkerChannel__SocketType=unixdomainsocket",
"ComputePlane__WorkerChannel__Address=${var.polling_agent.shared_socket}/armonik_worker.sock",
"ComputePlane__AgentChannel__SocketType=unixdomainsocket",
"ComputePlane__AgentChannel__Address=${var.polling_agent.shared_socket}/armonik_agent.sock",
"ComputePlane__WorkerChannel__SocketType=${var.socket_type}",
"ComputePlane__WorkerChannel__Address=${var.socket_type == "tcp" ? local.worker_tcp : local.worker_socket}",
"ComputePlane__AgentChannel__SocketType=${var.socket_type}",
"ComputePlane__AgentChannel__Address=${var.socket_type == "tcp" ? local.agent_tcp : local.agent_socket}"
]
gen_env = [for k, v in var.generated_env_vars : "${k}=${v}"]
polling_agent_name = "${var.polling_agent.name}${var.replica_counter}"
socket_vol = var.socket_type == "tcp" ? {} : { (var.polling_agent.shared_socket) = one(docker_volume.socket_vol[*].name) }
}
34 changes: 29 additions & 5 deletions terraform/modules/compute_plane/main.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
resource "docker_volume" "socket_vol" {
name = "socket_vol${var.replica_counter}"
count = var.socket_type == "tcp" ? 0 : 1
name = "socket_vol${var.replica_counter}"

}

resource "docker_volume" "comm_vol" {
name = "comm_vol${var.replica_counter}"
}

resource "docker_image" "worker" {
Expand Down Expand Up @@ -27,10 +33,19 @@ resource "docker_container" "worker" {
external = var.worker.port + var.replica_counter
}

dynamic "mounts" {
for_each = local.socket_vol
content {
type = "volume"
target = mounts.key
source = mounts.value
}
}

mounts {
type = "volume"
target = var.polling_agent.shared_socket
source = docker_volume.socket_vol.name
target = var.polling_agent.shared_data
source = docker_volume.comm_vol.name
}
}

Expand Down Expand Up @@ -59,8 +74,17 @@ resource "docker_container" "polling_agent" {

mounts {
type = "volume"
target = var.polling_agent.shared_socket
source = docker_volume.socket_vol.name
target = var.polling_agent.shared_data
source = docker_volume.comm_vol.name
}

dynamic "mounts" {
for_each = local.socket_vol
content {
type = "volume"
target = mounts.key
source = mounts.value
}
}

restart = "unless-stopped"
Expand Down
12 changes: 11 additions & 1 deletion terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ variable "worker_image" {
default = "dockerhubaneo/armonik_core_htcmock_test_worker"
}

variable "socket_type" {
type = string
description = "Socket type used by agent and worker to communicate"
validation {
condition = can(regex("^(unixdomainsocket|tcp)$", var.socket_type))
error_message = "Socket must be either unixdomainsocket or tcp"
}
default = "unixdomainsocket"
}

variable "compute_plane" {
type = object({
worker = object({
Expand All @@ -125,7 +135,7 @@ variable "compute_plane" {
// env for the agent and env for the worker
// They will be used for both
shared_socket = optional(string, "/cache")
shared_data = optional(string, "/cache")
shared_data = optional(string, "/comm")
})
})
default = {
Expand Down

0 comments on commit 51fd13a

Please sign in to comment.