Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow usage of tcp socket between agent and worker #818

Merged
merged 9 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -441,20 +441,40 @@ jobs:
strategy:
fail-fast: false
matrix:
queue:
- activemq
- rabbitmq
- rabbitmq091
- pubsub
- sqs
object:
- redis
- minio
- embed
log-level:
- Information
- Verbose
name: HtcMock ${{ matrix.queue }} ${{ matrix.object }} ${{ matrix.log-level }}
target:
- { queue: activemq, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: pubsub, object: redis, log-level: Information, socket_type: unixdomainsocket }
- { queue: sqs, object: redis, log-level: Information, socket_type: unixdomainsocket }

- { queue: activemq, object: redis, log-level: Information, socket_type: tcp }
- { queue: rabbitmq, object: redis, log-level: Information, socket_type: tcp }
- { queue: rabbitmq091, object: redis, log-level: Information, socket_type: tcp }
- { queue: pubsub, object: redis, log-level: Information, socket_type: tcp }
- { queue: sqs, object: redis, log-level: Information, socket_type: tcp }

- { queue: activemq, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: pubsub, object: redis, log-level: Verbose, socket_type: unixdomainsocket }
- { queue: sqs, object: redis, log-level: Verbose, socket_type: unixdomainsocket }

- { queue: activemq, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: pubsub, object: minio, log-level: Information, socket_type: unixdomainsocket }
- { queue: sqs, object: minio, log-level: Information, socket_type: unixdomainsocket }

- { queue: activemq, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: rabbitmq091, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: pubsub, object: embed, log-level: Information, socket_type: unixdomainsocket }
- { queue: sqs, object: embed, log-level: Information, socket_type: unixdomainsocket }

- { queue: activemq, object: redis, log-level: Information, socket_type: unixdomainsocket }

name: HtcMock ${{ matrix.target.queue }} ${{ matrix.target.object }} ${{ matrix.target.log-level }} ${{ matrix.target.socket_type }}
steps:
- name: Checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
Expand All @@ -478,7 +498,7 @@ jobs:
- name: Deploy Core
run: |
MONITOR_PREFIX="monitor/deploy/" tools/retry.sh -w 30 -- tools/monitor.sh \
just log_level=${{ matrix.log-level }} tag=${VERSION} queue=${{ matrix.queue }} object=${{ matrix.object }} worker=htcmock deploy
just log_level=${{ matrix.target.log-level }} tag=${VERSION} queue=${{ matrix.target.queue }} object=${{ matrix.target.object }} socket_type=${{ matrix.target.socket_type }} worker=htcmock deploy
sleep 10

- name: Print And Time Metrics
Expand Down Expand Up @@ -547,7 +567,7 @@ jobs:

- name: Run HtcMock test 1000 tasks 1 level
timeout-minutes: 3
if: ${{ matrix.log-level != 'Verbose' }}
if: ${{ matrix.target.log-level != 'Verbose' }}
run: |
MONITOR_PREFIX="monitor/htcmock-1000-1/" tools/monitor.sh \
docker run --net armonik_network --rm \
Expand All @@ -562,7 +582,7 @@ jobs:

- name: Run HtcMock test 1000 tasks 4 levels
timeout-minutes: 3
if: ${{ matrix.log-level != 'Verbose' }}
if: ${{ matrix.target.log-level != 'Verbose' }}
run: |
MONITOR_PREFIX="monitor/htcmock-1000-4/" tools/monitor.sh \
docker run --net armonik_network --rm \
Expand All @@ -586,8 +606,8 @@ jobs:
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
docker cp fluentd:/armonik-logs.json - | gzip -c | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}.json.gz
tar -czf - monitor/ | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-monitor.tar.gz
docker cp fluentd:/armonik-logs.json - | gzip -c | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}.json.gz
tar -czf - monitor/ | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}-monitor.tar.gz

- name: Collect docker container logs
uses: jwalton/gh-docker-logs@2741064ab9d7af54b0b1ffb6076cf64c16f0220e # v2
Expand All @@ -599,14 +619,14 @@ jobs:
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
tar -cvf - ./container-logs | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-container-logs.tar.gz
tar -cvf - ./container-logs | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}-container-logs.tar.gz
- name: Export and upload database
if: always()
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
bash tools/export_mongodb.sh
tar -cvf - *.json | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-database.tar.gz
tar -cvf - *.json | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.socket_type }}-database.tar.gz

testWindowsDocker:
needs:
Expand Down
2 changes: 1 addition & 1 deletion Common/src/ArmoniK.Core.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


<ItemGroup>
<PackageReference Include="ArmoniK.Api.Core" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Core" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
26 changes: 21 additions & 5 deletions Common/src/Pollster/AgentHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,29 @@ public AgentHandler(LoggerInit loggerInit,

builder.WebHost.ConfigureKestrel(options =>
{
if (File.Exists(computePlaneOptions.AgentChannel.Address))
var address = computePlaneOptions.AgentChannel.Address;
switch (computePlaneOptions.AgentChannel.SocketType)
{
File.Delete(computePlaneOptions.AgentChannel.Address);
case GrpcSocketType.UnixDomainSocket:
{
if (File.Exists(address))
{
File.Delete(address);
}

options.ListenUnixSocket(address,
listenOptions => listenOptions.Protocols = HttpProtocols.Http2);
break;
}
case GrpcSocketType.Tcp:
var uri = new Uri(address);
options.ListenAnyIP(uri.Port,
listenOptions => listenOptions.Protocols = HttpProtocols.Http2);

break;
default:
throw new InvalidOperationException("Socket type unknown");
}

options.ListenUnixSocket(computePlaneOptions.AgentChannel.Address,
listenOptions => listenOptions.Protocols = HttpProtocols.Http2);
});

app_ = builder.Build();
Expand Down
2 changes: 1 addition & 1 deletion Tests/Bench/Server/src/ArmoniK.Samples.Bench.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
2 changes: 1 addition & 1 deletion Tests/Common/Client/src/ArmoniK.Core.Common.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<PackageReference Include="Serilog.Settings.Configuration" Version="8.0.4" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageReference Include="Serilog.Formatting.Compact" Version="3.0.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0-jfagenttcp.25.585b499" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.22.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.23.0-jfagenttcp.25.585b499" />
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ingress := "true"
prometheus := "true"
grafana := "true"
seq := "true"
socket_type := "unixdomainsocket"

# Export them as terraform environment variables
export TF_VAR_core_tag := tag
Expand All @@ -30,7 +31,7 @@ export TF_VAR_num_partitions := partitions
export TF_VAR_enable_grafana := grafana
export TF_VAR_enable_seq := seq
export TF_VAR_enable_prometheus := prometheus

export TF_VAR_socket_type := socket_type

# Sets the queue
export TF_VAR_queue_storage := if queue == "rabbitmq" {
Expand Down
1 change: 1 addition & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ module "compute_plane" {
core_tag = local.compute_plane.tag
polling_agent = local.compute_plane.polling_agent
worker = local.compute_plane.worker
socket_type = var.socket_type
generated_env_vars = local.environment
volumes = local.volumes
network = docker_network.armonik.id
Expand Down
4 changes: 4 additions & 0 deletions terraform/modules/compute_plane/inputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ variable "core_tag" {
type = string
}

variable "socket_type" {
type = string
}

variable "polling_agent" {
type = object({
name = string,
Expand Down
12 changes: 8 additions & 4 deletions terraform/modules/compute_plane/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ locals {
"Amqp__PartitionId=TestPartition${local.partition_chooser}",
"PubSub__PartitionId=TestPartition${local.partition_chooser}",
]
worker_tcp = format("%s://%s:%s", "http", "${var.worker.name}${var.replica_counter}", "10667")
worker_socket = format("%s/%s", var.polling_agent.shared_socket, "armonik_worker.sock")
agent_tcp = format("%s://%s:%s", "http", "${var.polling_agent.name}${var.replica_counter}", "10666")
agent_socket = format("%s/%s", var.polling_agent.shared_socket, "armonik_agent.sock")
common_env = [
"ComputePlane__WorkerChannel__SocketType=unixdomainsocket",
"ComputePlane__WorkerChannel__Address=${var.polling_agent.shared_socket}/armonik_worker.sock",
"ComputePlane__AgentChannel__SocketType=unixdomainsocket",
"ComputePlane__AgentChannel__Address=${var.polling_agent.shared_socket}/armonik_agent.sock",
"ComputePlane__WorkerChannel__SocketType=${var.socket_type}",
"ComputePlane__WorkerChannel__Address=${var.socket_type == "tcp" ? local.worker_tcp : local.worker_socket}",
"ComputePlane__AgentChannel__SocketType=${var.socket_type}",
"ComputePlane__AgentChannel__Address=${var.socket_type == "tcp" ? local.agent_tcp : local.agent_socket}"
]
gen_env = [for k, v in var.generated_env_vars : "${k}=${v}"]
polling_agent_name = "${var.polling_agent.name}${var.replica_counter}"
Expand Down
10 changes: 10 additions & 0 deletions terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ variable "worker_image" {
default = "dockerhubaneo/armonik_core_htcmock_test_worker"
}

variable "socket_type" {
type = string
description = "Socket type used by agent and worker to communicate"
validation {
condition = can(regex("^(unixdomainsocket|tcp)$", var.socket_type))
error_message = "Socket must be either unixdomainsocket or tcp"
}
default = "unixdomaisocket"
jfonseca-aneo marked this conversation as resolved.
Show resolved Hide resolved
}

variable "compute_plane" {
type = object({
worker = object({
Expand Down
Loading