Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data node process state machine #21210

Open
wants to merge 60 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
8292e51
Initial reworked, simplified abstract node startup
moesterheld Aug 14, 2024
17e4b3a
use Graylog CmdLineTool in data node
moesterheld Aug 14, 2024
c2c7d71
remove unused pid file declaration
moesterheld Aug 14, 2024
271ea0c
remove freshInstallation check as it doesn't make sense since it chec…
moesterheld Aug 14, 2024
60db00d
move mongo preflight check into regular preflight checks since it doe…
moesterheld Aug 14, 2024
a440a7c
remove migration feature for data node
moesterheld Aug 14, 2024
77c361d
code cleanup
moesterheld Aug 14, 2024
7440e65
remove unused local flag, code cleaup
moesterheld Aug 14, 2024
cd5eb96
rename server to datanode for better clarification
moesterheld Aug 14, 2024
7e61307
fix feature flag test to use config for env/sys props
moesterheld Aug 14, 2024
8420a3e
remove Graylog path configuration from data node configuration
moesterheld Aug 14, 2024
9dc7534
temporary fix for excluding zstd library fix from data node
moesterheld Aug 14, 2024
82a0a19
add requireexplicitbindings for new node types
moesterheld Aug 14, 2024
13435d6
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Aug 16, 2024
0cbad48
remove unneeded settings bean
moesterheld Aug 16, 2024
9c1cfe3
Merge branch 'master' into refactor/abstract-node-startup
todvora Aug 28, 2024
1f315bf
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Sep 23, 2024
ac8ff99
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Oct 17, 2024
881e1eb
do not use `pluginLoaderConfig` for fixing zstd temp directory
moesterheld Oct 18, 2024
c2c48e1
do tls protocols configuration and setting of netty defaults only in …
moesterheld Oct 18, 2024
5a7089c
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Nov 18, 2024
79a4ef4
add selective loading of plugins for a specific node type
moesterheld Nov 20, 2024
2aaf10e
add trino node plugin skeleton
moesterheld Nov 20, 2024
fb678d3
Merge branch 'master' into refactor/abstract-node-startup
bernd Nov 21, 2024
7087af4
remove ExampleCommand, replace with MinimalNode test
moesterheld Nov 22, 2024
4d22665
change forbidden method invocation
moesterheld Nov 22, 2024
bf01a31
adjust log
moesterheld Nov 22, 2024
1f3fc78
Change cmdline option description
moesterheld Nov 22, 2024
3492040
Change cmdline option description
moesterheld Nov 22, 2024
861f0e8
Change cmdline option description
moesterheld Nov 22, 2024
9ea12dd
make usage of NodeIdFile configurable
moesterheld Nov 25, 2024
7910ccf
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Nov 26, 2024
2cc4aa8
adjust ServerStatus and provide common node command test
moesterheld Nov 26, 2024
f6abde4
Merge branch 'refactor/abstract-node-startup' into trino-node-plugin
moesterheld Nov 26, 2024
8332bbd
add missing method to test interface
moesterheld Nov 26, 2024
dab9b56
code cleanup
moesterheld Nov 28, 2024
0dc9999
make configuration a field, remove unused constructor
moesterheld Nov 28, 2024
a6ccc9b
code cleanup
moesterheld Nov 28, 2024
7eb0b08
use parent field
moesterheld Nov 28, 2024
7be3252
make Datanode and Server extend AbstractNodeCommand
moesterheld Nov 29, 2024
1a4e19b
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Dec 4, 2024
6fafa14
change field names to camel case
moesterheld Dec 5, 2024
f702c6e
remove redundant bindings, make journal commands extends AbstractNode…
moesterheld Dec 5, 2024
d39ca69
remove static field
moesterheld Dec 5, 2024
03b8c74
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Dec 5, 2024
fa2ecbf
remove redundant bindings
moesterheld Dec 5, 2024
1b2663c
Merge branch 'refactor/abstract-node-startup' into trino-node-plugin
moesterheld Dec 6, 2024
8bb8887
move plugin path configuration out of PathConfiguration to not requir…
moesterheld Dec 6, 2024
c246d8b
always applySecuritySettings
moesterheld Dec 10, 2024
a203f34
add javadocs to GraylogNodeModule
moesterheld Dec 10, 2024
f445c4c
Merge branch 'master' into refactor/abstract-node-startup
moesterheld Dec 10, 2024
a2a376a
Merge branch 'refactor/abstract-node-startup' into trino-node-plugin
moesterheld Dec 10, 2024
1ba2ffd
add javadoc
moesterheld Dec 10, 2024
5114c00
Merge branch 'master' into trino-node-plugin
moesterheld Dec 12, 2024
dc19d65
add test for plugin filtering
moesterheld Dec 12, 2024
bf122a2
code cleanup
moesterheld Dec 12, 2024
d2eca10
change path resolution
moesterheld Dec 16, 2024
1bcfa9e
Merge branch 'master' into trino-node-plugin
moesterheld Dec 16, 2024
14ef711
Merge branch 'master' into trino-node-plugin
moesterheld Dec 17, 2024
5400eb8
add an abstraction layer for state machine handling to avoid duplicat…
moesterheld Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions data-node/src/main/java/org/graylog/datanode/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.graylog2.configuration.Documentation;
import org.graylog2.plugin.Tools;
import org.graylog2.shared.SuppressForbidden;
import org.graylog2.shared.plugins.PluginLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -670,4 +671,15 @@ public String getEnvironmentVariablePrefix() {
public String getSystemPropertyPrefix() {
return "graylog.datanode.";
}

@Override
public boolean withPlugins() {
return true;
}

@Override
public PluginLoader.NodeType getPluginNodeType() {
return PluginLoader.NodeType.DATA_NODE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import org.graylog.datanode.opensearch.configuration.beans.impl.OpensearchDefaultConfigFilesBean;
import org.graylog.datanode.opensearch.configuration.beans.impl.OpensearchSecurityConfigurationBean;
import org.graylog.datanode.opensearch.configuration.beans.impl.SearchableSnapshotsConfigurationBean;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachineProvider;
import org.graylog.datanode.opensearch.statemachine.tracer.ClusterNodeStateTracer;
import org.graylog.datanode.opensearch.statemachine.tracer.OpensearchStateMachineTransitionLogger;
import org.graylog.datanode.opensearch.statemachine.tracer.OpensearchWatchdog;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTransitionLogger;
import org.graylog.datanode.process.configuration.beans.DatanodeConfigurationBean;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;

public class OpensearchProcessBindings extends AbstractModule {

Expand Down Expand Up @@ -82,10 +84,12 @@ protected void configure() {
bind(DatanodeTrustManagerProvider.class);

// tracer
Multibinder<StateMachineTracer> tracerBinder = Multibinder.newSetBinder(binder(), StateMachineTracer.class);
TypeLiteral<StateMachineTracer<OpensearchState, OpensearchEvent>> tracerType = new TypeLiteral<>() {};
Multibinder<StateMachineTracer<OpensearchState, OpensearchEvent>> tracerBinder =
Multibinder.newSetBinder(binder(), tracerType);
tracerBinder.addBinding().to(ClusterNodeStateTracer.class).asEagerSingleton();
tracerBinder.addBinding().to(OpensearchWatchdog.class).asEagerSingleton();
tracerBinder.addBinding().to(StateMachineTransitionLogger.class).asEagerSingleton();
tracerBinder.addBinding().to(OpensearchStateMachineTransitionLogger.class).asEagerSingleton();
tracerBinder.addBinding().to(ConfigureMetricsIndexSettings.class).asEagerSingleton();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.periodicals.MetricsCollector;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.graylog.storage.opensearch2.DataStreamAdapterOS2;
import org.graylog.storage.opensearch2.ism.IsmApi;
import org.graylog2.cluster.nodes.DataNodeDto;
Expand All @@ -50,7 +50,7 @@
import java.util.Map;
import java.util.stream.Collectors;

public class ConfigureMetricsIndexSettings implements StateMachineTracer {
public class ConfigureMetricsIndexSettings implements StateMachineTracer<OpensearchState, OpensearchEvent> {

private final Logger log = LoggerFactory.getLogger(ConfigureMetricsIndexSettings.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ private static List<DatanodeConfigFile> copyFromJar(Path configRelativePath, URI
}

private static List<DatanodeConfigFile> copyFromLocalFs(Path configRelativePath) throws URISyntaxException, IOException {
final Path resourcesRoot = Paths.get(OpensearchDefaultConfigFilesBean.class.getResource("/").toURI());
final Path source = resourcesRoot.resolve(configRelativePath);
final Path source = Paths.get(OpensearchDefaultConfigFilesBean.class.getResource("/" + configRelativePath).toURI());
return collectRecursively(source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@
*/
package org.graylog.datanode.opensearch.statemachine;

import com.github.oxo42.stateless4j.StateMachine;
import com.github.oxo42.stateless4j.StateMachineConfig;
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracerAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.graylog.datanode.process.statemachine.ProcessStateMachine;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;

import java.util.Set;

public class OpensearchStateMachine extends StateMachine<OpensearchState, OpensearchEvent> {

private static final Logger LOG = LoggerFactory.getLogger(OpensearchStateMachine.class);
public class OpensearchStateMachine extends ProcessStateMachine<OpensearchState, OpensearchEvent> {

/**
* How many times can the OS rest api call fail before we switch to the failed state
Expand All @@ -37,14 +32,13 @@ public class OpensearchStateMachine extends StateMachine<OpensearchState, Opense
public static final int MAX_REST_STARTUP_FAILURES = 5;
public static final int MAX_REBOOT_FAILURES = 3;

StateMachineTracerAggregator tracerAggregator = new StateMachineTracerAggregator();

public OpensearchStateMachine(OpensearchState initialState, StateMachineConfig<OpensearchState, OpensearchEvent> config) {
super(initialState, config);
setTrace(tracerAggregator);
public OpensearchStateMachine(OpensearchState initialState,
StateMachineConfig<OpensearchState, OpensearchEvent> config,
Set<StateMachineTracer<OpensearchState, OpensearchEvent>> tracer) {
super(initialState, config, tracer);
}

public static OpensearchStateMachine createNew(OpensearchProcess process, Set<StateMachineTracer> tracer) {
public static OpensearchStateMachine createNew(OpensearchProcess process, Set<StateMachineTracer<OpensearchState, OpensearchEvent>> tracer) {
final FailuresCounter restFailureCounter = FailuresCounter.oneBased(MAX_REST_TEMPORARY_FAILURES);
final FailuresCounter startupFailuresCounter = FailuresCounter.oneBased(MAX_REST_STARTUP_FAILURES);
final FailuresCounter rebootCounter = FailuresCounter.oneBased(MAX_REBOOT_FAILURES);
Expand Down Expand Up @@ -130,29 +124,12 @@ public static OpensearchStateMachine createNew(OpensearchProcess process, Set<St
.permit(OpensearchEvent.RESET, OpensearchState.WAITING_FOR_CONFIGURATION, process::reset)
.ignore(OpensearchEvent.PROCESS_STOPPED);

OpensearchStateMachine stateMachine = new OpensearchStateMachine(OpensearchState.WAITING_FOR_CONFIGURATION, config);
tracer.forEach(t -> {
t.setStateMachine(stateMachine);
stateMachine.getTracerAggregator().addTracer(t);
});
return stateMachine;
}

public StateMachineTracerAggregator getTracerAggregator() {
return tracerAggregator;
}

private void fire(OpensearchEvent trigger, OpensearchEvent errorEvent) {
try {
super.fire(trigger);
} catch (Exception e) {
LOG.error("Failed to fire event " + trigger, e);
super.fire(errorEvent);
}
return new OpensearchStateMachine(OpensearchState.WAITING_FOR_CONFIGURATION, config, tracer);
}

@Override
public void fire(OpensearchEvent trigger) {
fire(trigger, OpensearchEvent.HEALTH_CHECK_FAILED);
protected OpensearchEvent getErrorEvent() {
return OpensearchEvent.HEALTH_CHECK_FAILED;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;

import java.util.Set;

public class OpensearchStateMachineProvider implements Provider<OpensearchStateMachine> {
private final OpensearchStateMachine opensearchStateMachine;

@Inject
public OpensearchStateMachineProvider(Set<StateMachineTracer> tracer, OpensearchProcess process) {
public OpensearchStateMachineProvider(Set<StateMachineTracer<OpensearchState, OpensearchEvent>> tracer, OpensearchProcess process) {
this.opensearchStateMachine = OpensearchStateMachine.createNew(process, tracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import jakarta.inject.Inject;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.graylog2.cluster.NodeNotFoundException;
import org.graylog2.cluster.nodes.DataNodeDto;
import org.graylog2.cluster.nodes.NodeService;
Expand All @@ -27,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterNodeStateTracer implements StateMachineTracer {
public class ClusterNodeStateTracer implements StateMachineTracer<OpensearchState, OpensearchEvent> {

private final Logger log = LoggerFactory.getLogger(ClusterNodeStateTracer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,9 @@
*/
package org.graylog.datanode.opensearch.statemachine.tracer;

import com.github.oxo42.stateless4j.delegates.Trace;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;

/**
* The tracer allows to observe triggered event (before) and transitions (after) of the {@link OpensearchStateMachine}
*/
public interface StateMachineTracer extends Trace<OpensearchState, OpensearchEvent> {

default void setStateMachine(OpensearchStateMachine stateMachine) {
}
import org.graylog.datanode.process.statemachine.tracer.StateMachineTransitionLogger;

public class OpensearchStateMachineTransitionLogger extends StateMachineTransitionLogger<OpensearchState, OpensearchEvent> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
*/
package org.graylog.datanode.opensearch.statemachine.tracer;

import com.github.oxo42.stateless4j.StateMachine;
import jakarta.inject.Inject;
import org.graylog.datanode.opensearch.statemachine.FailuresCounter;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This process watchdog follows transitions of the state machine and will try to restart the process in case of termination.
* If the process is actually stopped, it won't restart it and will automatically deactivate itself.
*/
public class OpensearchWatchdog implements StateMachineTracer {
public class OpensearchWatchdog implements StateMachineTracer<OpensearchState, OpensearchEvent> {

private static final Logger LOG = LoggerFactory.getLogger(OpensearchWatchdog.class);

Expand Down Expand Up @@ -97,7 +99,7 @@ public boolean isActive() {
}

@Override
public void setStateMachine(OpensearchStateMachine stateMachine) {
this.stateMachine = stateMachine;
public void setStateMachine(StateMachine<OpensearchState, OpensearchEvent> stateMachine) {
this.stateMachine = (OpensearchStateMachine) stateMachine;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package org.graylog.datanode.process;

import com.github.oxo42.stateless4j.delegates.Trace;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;

public interface ManagableProcess<T, EVENT, STATE> {

void configure(T configuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.process.statemachine;

import com.github.oxo42.stateless4j.StateMachine;
import com.github.oxo42.stateless4j.StateMachineConfig;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracerAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

public abstract class ProcessStateMachine<STATE, EVENT> extends StateMachine<STATE, EVENT> {

private final Logger LOG = LoggerFactory.getLogger(ProcessStateMachine.class);

private final StateMachineTracerAggregator<STATE, EVENT> tracerAggregator = new StateMachineTracerAggregator<>();

public ProcessStateMachine(STATE initialState,
StateMachineConfig<STATE, EVENT> config,
Set<StateMachineTracer<STATE, EVENT>> tracer) {
super(initialState, config);
addTracer(tracer);
setTrace(tracerAggregator);
}

private void addTracer(Set<StateMachineTracer<STATE, EVENT>> tracers) {
tracers.forEach(tracer -> {
tracer.setStateMachine(this);
tracerAggregator.addTracer(tracer);
});
}

protected abstract EVENT getErrorEvent();

private void fire(EVENT trigger, EVENT errorEvent) {
try {
super.fire(trigger);
} catch (Exception e) {
LOG.error("Failed to fire event " + trigger, e);
super.fire(errorEvent);
}
}

@Override
public void fire(EVENT trigger) {
fire(trigger, getErrorEvent());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.process.statemachine.tracer;

import com.github.oxo42.stateless4j.StateMachine;
import com.github.oxo42.stateless4j.delegates.Trace;

public interface StateMachineTracer<STATE, EVENT> extends Trace<STATE, EVENT> {

default void setStateMachine(StateMachine<STATE, EVENT> stateMachine) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,30 @@
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.opensearch.statemachine.tracer;

import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;
package org.graylog.datanode.process.statemachine.tracer;

import java.util.LinkedList;
import java.util.List;

public class StateMachineTracerAggregator implements StateMachineTracer {
public class StateMachineTracerAggregator<STATE, EVENT> implements StateMachineTracer<STATE, EVENT> {

private final List<StateMachineTracer> delegates = new LinkedList<>();
private final List<StateMachineTracer<STATE, EVENT>> delegates = new LinkedList<>();

public void addTracer(StateMachineTracer tracer) {
public void addTracer(StateMachineTracer<STATE, EVENT> tracer) {
delegates.add(tracer);
}

public void removeTracer(StateMachineTracer tracer) {
public void removeTracer(StateMachineTracer<STATE, EVENT> tracer) {
delegates.remove(tracer);
}

@Override
public void trigger(OpensearchEvent processEvent) {
public void trigger(EVENT processEvent) {
delegates.forEach(d -> d.trigger(processEvent));
}

@Override
public void transition(OpensearchEvent processEvent, OpensearchState s1, OpensearchState s2) {
public void transition(EVENT processEvent, STATE s1, STATE s2) {
delegates.forEach(d -> d.transition(processEvent, s1, s2));
}

Expand Down
Loading
Loading