Skip to content

Commit

Permalink
[Fleet][Kafka] When compression is enabled, "None" shouldn't be an op…
Browse files Browse the repository at this point in the history
  • Loading branch information
szwarckonrad authored Aug 22, 2023
1 parent 9c83d2e commit c2a552e
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
2 changes: 0 additions & 2 deletions x-pack/plugins/fleet/cypress/e2e/fleet_settings_outputs.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ describe('Outputs', () => {
// Compression
cy.getBySel(SETTINGS_OUTPUTS_KAFKA.COMPRESSION_CODEC_INPUT).should('not.exist');
cy.getBySel(SETTINGS_OUTPUTS_KAFKA.COMPRESSION_SWITCH).click();
cy.getBySel(SETTINGS_OUTPUTS_KAFKA.COMPRESSION_LEVEL_INPUT).should('not.exist');
cy.getBySel(SETTINGS_OUTPUTS_KAFKA.COMPRESSION_CODEC_INPUT).select('gzip');
cy.getBySel(SETTINGS_OUTPUTS_KAFKA.COMPRESSION_LEVEL_INPUT).should('exist');
cy.getBySel(SETTINGS_OUTPUTS_KAFKA.COMPRESSION_LEVEL_INPUT).select('1');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ export const OutputFormKafkaCompression: React.FunctionComponent<{

const kafkaCompressionTypeOptions = useMemo(
() =>
(Object.keys(kafkaCompressionType) as Array<keyof typeof kafkaCompressionType>).map(
(key) => ({
(Object.keys(kafkaCompressionType) as Array<keyof typeof kafkaCompressionType>)
.filter((c) => c !== 'None')
.map((key) => ({
text: kafkaCompressionType[key],
label: kafkaCompressionType[key],
})
),
})),
[]
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ export function useOutputForm(onSucess: () => void, output?: Output) {
isDisabled('compression_level')
);
const kafkaCompressionCodecInput = useInput(
kafkaOutput?.compression ?? kafkaCompressionType.None,
kafkaOutput?.compression && kafkaOutput.compression !== kafkaCompressionType.None
? kafkaOutput.compression
: kafkaCompressionType.Gzip,
undefined,
isDisabled('compression')
);
Expand Down Expand Up @@ -642,8 +644,11 @@ export function useOutputForm(onSucess: () => void, output?: Output) {
client_id: kafkaClientIdInput.value || undefined,
version: kafkaVersionInput.value,
...(kafkaKeyInput.value ? { key: kafkaKeyInput.value } : {}),
compression: kafkaCompressionCodecInput.value,
...(kafkaCompressionCodecInput.value === kafkaCompressionType.Gzip
compression: kafkaCompressionInput.value
? kafkaCompressionCodecInput.value
: kafkaCompressionType.None,
...(kafkaCompressionInput.value &&
kafkaCompressionCodecInput.value === kafkaCompressionType.Gzip
? {
compression_level: parseIntegerIfStringDefined(
kafkaCompressionLevelInput.value
Expand Down Expand Up @@ -785,6 +790,7 @@ export function useOutputForm(onSucess: () => void, output?: Output) {
loadBalanceEnabledInput.value,
typeInput.value,
kafkaSslCertificateAuthoritiesInput.value,
kafkaCompressionInput.value,
nameInput.value,
kafkaHostsInput.value,
defaultOutputInput.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type {
} from '../../types';
import type { FullAgentPolicyOutputPermissions, PackageInfo } from '../../../common/types';
import { agentPolicyService } from '../agent_policy';
import { dataTypes, outputType } from '../../../common/constants';
import { dataTypes, kafkaCompressionType, outputType } from '../../../common/constants';
import { DEFAULT_OUTPUT } from '../../constants';

import { getPackageInfo } from '../epm/packages';
Expand Down Expand Up @@ -344,7 +344,7 @@ export function transformOutputToFullPolicyOutput(
version,
key,
compression,
compression_level,
...(compression === kafkaCompressionType.Gzip ? { compression_level } : {}),
...(username ? { username } : {}),
...(password ? { password } : {}),
...(sasl ? { sasl } : {}),
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/fleet/server/services/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,11 @@ class OutputService {
) {
updateData.compression_level = 4;
}
if (data.compression && data.compression !== kafkaCompressionType.Gzip) {
// Clear compression level if compression is not gzip
updateData.compression_level = null;
}

if (!data.client_id) {
updateData.client_id = 'Elastic';
}
Expand Down

0 comments on commit c2a552e

Please sign in to comment.