-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fleet] Package policy upgrade telemetry with sender #115180
Conversation
…ibana into upgrade-telemetry
Pinging @elastic/fleet (Team:Fleet) |
@elasticmachine merge upstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The biggest architectural decision I'd like to discuss before moving forward with this is how we want to organize our different events. With the current state of this PR, we'll be adding additional channels for each event type that we implement. Concerns I have with this:
- It requires that we create a new indexer for each channel since I believe indexers can only read from a single channel
- Each channel is stored separately in the GCP storage bucket. Would this make other types of processing that we want to do in the future difficult? For example, using tools like Apache Spark.
Alternatively, we could use a single channel for all Fleet telemetry events and use a well-known field name, such as event.type
(to borrow from ECS), to describe the schema of the event's properties. This would allow us to use a single indexer for all of our events, which I assume would involve less boilerplate in creating all the necessary bits to stand up a new indexer. Questions I have with this alternative approach:
- If we use a single channel and single indexer, can we still ingest the documents into separate indices depending on the
event.type
? - How practical would it be to use a single index with dynamic mappings instead? We could even define in our collection API which fields will be dynamically mapped and which will not. This would allow us to more rapidly add new event types without having to touch the indexer at all. For instance, we could index events in this type of format:
{
"event.type": "package_policy_upgrade_result",
"cluster_uuid": "...",
"cluster_name": "...",
"event.properties.mapped": {
// We could have `dynamic: true` specified for these fields, with the option to explicitly map them in the indexer when the dynamic mappings are not good enough
// All field names have the event.type prepended to avoid overlapping field names
"package_policy_upgrade_result.package_name": "system",
"package_policy_upgrade_result.current_version": "1.1.0",
...
},
"event.properties.raw": {
// In the mappings these fields would not be mapped by default, but could be mapped later or used by runtime fields
"package_policy_upgrade_result.error": "...",
}
}
The JavaScript API for queuing such events would be something like:
sender.queueEvent('package_policy_upgrade_result', {
mapped: {
package_version: 'system',
current_version: '1.1.0',
},
raw: {
error: '...' // just an example, we may want to actually map this particular field
}
}
@@ -146,7 +146,8 @@ export const updatePackagePolicyHandler: RequestHandler< | |||
esClient, | |||
request.params.packagePolicyId, | |||
{ ...newData, package: pkg, inputs }, | |||
{ user } | |||
{ user }, | |||
packagePolicy.package?.version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we switch to named parameters / object on this API? The list is quite long and it's not obvious what each arg is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit cautious of changing this, the update
method is being called from a few plugins outside of fleet like apm and security_solution.
`Package policy successfully upgraded ${JSON.stringify(upgradeMeta)}`, | ||
upgradeMeta | ||
if (packagePolicy.package.version !== currentVersion) { | ||
appContextService.getLogger().info(`Package policy upgraded successfully`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's go ahead and keep the detailed log info as well, but maybe it should be at the debug level instead?
@@ -184,6 +193,8 @@ export class FleetPlugin | |||
this.kibanaBranch = this.initializerContext.env.packageInfo.branch; | |||
this.logger = this.initializerContext.logger.get(); | |||
this.configInitialValue = this.initializerContext.config.get(); | |||
this.telemetryEventsSender = new TelemetryEventsSender(this.logger); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use a child logger here so the logs are clearly labeled as coming from our telemetry code. Right now this is done inside the TelemetryEventsSender class, but I think it's more clear and explicit if we do it from this level.
this.telemetryEventsSender = new TelemetryEventsSender(this.logger); | |
this.telemetryEventsSender = new TelemetryEventsSender(this.logger.get('telemetry_events')); |
return; | ||
} | ||
|
||
const clusterInfo = await this.receiver?.fetchClusterInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I quite understand the purpose of the receiver class or it's naming (what is it receiving?). Should we just move this fetchClusterInfo
function in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security had a lot more functionality in receiver, I can simplify here since we have only fetchClusterInfo
} | ||
} | ||
|
||
public queueTelemetryEvents(events: TelemetryEvent[], channel: string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be a good idea to improve the type safety here to ensure that events passed in conform to an expected type for the channel. I also slightly prefer having the channel name as the first argument to better match other popular telemetry APIs. Maybe something like this:
interface FleetTelemetryChannelEvents {
// channel name => event type
'fleet-upgrades': PackagePolicyUpgradeUsage;
}
type FleetTelemetryChannel = keyof FleetTelemetryChannelEvents;
public queueTelemetryEvents<T extends FleetTelemetryChannel>(
channel: T,
events: Array<FleetTelemetryChannelEvents[T]>
) { ... }
private isOptedIn?: boolean = true; // Assume true until the first check | ||
|
||
constructor(logger: Logger) { | ||
this.logger = logger.get('telemetry_events'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment in server/plugin.ts
. I think we should create the child logger from the calling code rather than do this here.
// do not add events with same id | ||
events = events.filter( | ||
(event) => !this.queue.find((qItem) => qItem.id && event.id && qItem.id === event.id) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this de-duplication functionality? Can we also support events without ids?
It's worth noting that this de-duplication only works if the duplicate event hasn't been sent yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because dry run might run multiple times when someone is navigating to Integration details - Settings page or Upgrade a Policy. I added telemetry at dry run time because that is where the conflicts are checked. This logic is not filtering out events without an id.
It's true that this won't filter out duplicates if the events are already sent. I was thinking to group by cluster uuid when building dashboards in telemetry, because in real world there won't be more than one upgrade of the same package version twice.
We could remove this logic and see how many duplicates are sent in normal usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, makes more sense now. I'm ok leaving this here in that case, as long as we consider it a best-effort de-duplication / 'spam' prevention and don't rely on it on the analysis side.
because in real world there won't be more than one upgrade of the same package version twice.
nit: since these failures happen on package policy upgrades there may actually be more than one per cluster if they have configured the same integration for multiple policies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, I'm only seeing this now.
So I removed the id filtering logic. Instead, I have added an id function on telemetry side to create doc id based on package name, version, status, cluster id and date, so this way we are not going to get multiple docs with the same properties from a cluster each day. Even if there are multiple policies upgraded, they will return the same results on dry run, so hopefully we don't need those extra events.
https://github.com/elastic/telemetry/blob/main/kpi-engine/src/telemetry/fleet.clj#L38
try { | ||
this.logger.debug(`Telemetry URL: ${telemetryUrl}`); | ||
|
||
const toSend: TelemetryEvent[] = cloneDeep(events).map((event) => ({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should do the cloning when the event is queued rather than when it is being sent. That way no modifications to the objects from calling code can cause any problems after they have been queued.
export type SearchTypes = BaseSearchTypes | BaseSearchTypes[] | undefined; | ||
|
||
// For getting cluster info. Copied from telemetry_collection/get_cluster_info.ts | ||
export interface ESClusterInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a type available in @elastic/elasticsearch
that we can just re-use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I'll update
@joshdover Regarding the support of different events:
AFAIK you need multiple indexers to ingest to separate indices e.g. you see the indexer config has to define the index/alias to ingest into, though you can have multiple indexers reading from the same channel
I'm not sure whether we can specify dynamic mappings in indexer, I'll ask around.
If we go with this unified channel approach, should we rename the channel to something more generic? |
@elasticmachine merge upstream |
As discussed on slack, we will stay with this solution for now, and come back to it when we have more use cases for telemetry to decide whether we want to create a generic message type or use different messages and channels. |
* 2.0. | ||
*/ | ||
|
||
export const TELEMETRY_MAX_BUFFER_SIZE = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) It might be more accurate to name this TELEMETRY_MAX_QUEUE_SIZE
|
||
import { loggingSystemMock } from 'src/core/server/mocks'; | ||
|
||
import axios from 'axios'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Group imports from installed packages above imports from local modules.
@mostlyjason @joshdover Discover |
ack: will review tomorrow |
@elasticmachine merge upstream |
💛 Build succeeded, but was flaky
Test Failures
Metrics [docs]Public APIs missing comments
History
To update your PR or re-run it, just comment with: |
* draft of upgrade usage collector * telemetry sender service * fixed tests and types * cleanup * type fix * removed collector * made required field message generic, added test * cleanup * cleanup * cleanup * removed v3-dev as outdated * removed conditional from telemetry url creation * supporting multiple channels in sender * fix types * refactor * using json content type * fix test * simplified telemetry url * fixed type * added back ndjson * moved telemetry to update, added dryrun * fix types * fix prettier * updated after review * fix imports * added error_message field * review fixes Co-authored-by: Kibana Machine <[email protected]>
💚 Backport successful
This backport PR will be merged automatically after passing CI. |
* draft of upgrade usage collector * telemetry sender service * fixed tests and types * cleanup * type fix * removed collector * made required field message generic, added test * cleanup * cleanup * cleanup * removed v3-dev as outdated * removed conditional from telemetry url creation * supporting multiple channels in sender * fix types * refactor * using json content type * fix test * simplified telemetry url * fixed type * added back ndjson * moved telemetry to update, added dryrun * fix types * fix prettier * updated after review * fix imports * added error_message field * review fixes Co-authored-by: Kibana Machine <[email protected]> Co-authored-by: Julia Bardi <[email protected]>
Summary
Collecting telemetry of package policy upgrades: #109870
See latest commits for Event Sender implementation (in this case we don't need collector).
Took it from security_solution and simplified for our use case.
See
sender.ts
To test:
Install older version of packages, e.g. apache-0.3.3 (no conflicts), aws-0.6.1 (conflicts).
Add policies for each and update them.
Aws policy can only be upgraded manually by resolving conflicts.
For event sender:
Add to
kibana.dev.yml
to opt in telemetry and see debug logs:In case of success/failure of upgrades, the debug log should include events sent to telemetry v3 endpoint.
Checklist
Delete any items that are not applicable to this PR.
Risk Matrix
Delete this section if it is not applicable to this PR.
Before closing this PR, invite QA, stakeholders, and other developers to identify risks that should be tested prior to the change/feature release.
When forming the risk matrix, consider some of the following examples and how they may potentially impact the change:
For maintainers