Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filestream] migrate state from previous ID to current #42624

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

belimawr
Copy link
Contributor

@belimawr belimawr commented Feb 7, 2025

Proposed commit message

This commit enables Filestream inputs to migrate file states from previous IDs to its current ID, this is done by adding a previous_ids entry to the input configuration. We look in the store for all states that match an active file with one of the previous IDs and migrate this state to the new ID. The migrated old states are marked for removal from the store.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Disruptive User Impact

Author's Checklist

  • Test changing file identity and ID at the same time.
  • Investigate any possible side effects of migrating ID + file identity at the same time.

How to test this PR locally

Run the tests

cd filebeat
go test -tags=integration -run=TestFilestreamCanMigrateID  -v ./tests/integration

Manual test

  1. Create a file with more than 1kb in size: docker run -it --rm mingrammer/flog -f rfc5424 -n 15 > /tmp/flog.log

  2. Run Filebeat with the following configuration:

    filebeat.yml

    filebeat.inputs:
      - type: filestream
        id: "first-id"
        paths:
          - /tmp/flog.log
    
    queue.mem:
      flush.timeout: 0s
    
    output.file:
      path: ${path.home}
      filename: "output-file"
      rotate_on_startup: false
    
    filebeat.registry:
      cleanup_interval: 5s
      flush: 1s
    
    logging:
      level: debug
      selectors:
        - input
        - input.filestream
        - input.filestream.prospector
      metrics:
        enabled: false
    

  3. Wait until the file is fully ingested (15 lines in the output file)

  4. Stop Filebeat

  5. Optionally, remove the logs, this will make the next steps easier. rm -rf logs

  6. Run Filebeat with the following configuration (note the change in id and the new previous_ids):

    filebeat.yml

    filebeat.inputs:
      - type: filestream
        id: "second"
        paths:
          - /tmp/flog.log
        previous_ids:
          - "first-id"
    
    queue.mem:
      flush.timeout: 0s
    
    output.file:
      path: ${path.home}
      filename: "output-file"
      rotate_on_startup: false
    
    filebeat.registry:
      cleanup_interval: 5s
      flush: 1s
    
    logging:
      level: debug
      selectors:
        - input
        - input.filestream
        - input.filestream.prospector
      metrics:
        enabled: false
    

  7. Wait until Filebeat "read" the files to the end. Look for End of file reached: /tmp/flog.log; Backoff now. in the second execution logs.

  8. Ensure no new data was added to the output file

  9. You can also look for log entries like:

    Migrating input ID: 'filestream::first-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be' -> 'filestream::second-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be'
    migrated entry in registry from 'filestream::first-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be' to 'filestream::second-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be'. Cursor: map[offset:309]
    

Related issues

## Use cases
## Screenshots

Logs

{
  "log.level": "info",
  "@timestamp": "2025-02-07T15:12:34.224-0500",
  "log.logger": "input.filestream.prospector",
  "log.origin": {
    "function": "github.com/elastic/beats/v7/filebeat/input/filestream.(*fileProspector).Init.func4",
    "file.name": "filestream/prospector.go",
    "file.line": 214
  },
  "message": "Migrating input ID: 'filestream::first-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be' -> 'filestream::second-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be'",
  "service.name": "filebeat",
  "filestream_id": "second-id",
  "ecs.version": "1.6.0"
}
{
  "log.level": "info",
  "@timestamp": "2025-02-07T15:12:34.224-0500",
  "log.logger": "input",
  "log.origin": {
    "function": "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.(*sourceStore).updateIdentifiers",
    "file.name": "input-logfile/store.go",
    "file.line": 273
  },
  "message": "migrated entry in registry from 'filestream::first-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be' to 'filestream::second-id::fingerprint::6fb3cb6c565bdba1354f64a42dd47ef937964019400dd571f25c2cd13a9fb5be'. Cursor: map[offset:309]",
  "service.name": "filebeat",
  "input_type": "filestream",
  "ecs.version": "1.6.0"
}
{
  "log.level": "info",
  "@timestamp": "2025-02-07T15:12:34.224-0500",
  "log.logger": "input.filestream.prospector",
  "log.origin": {
    "function": "github.com/elastic/beats/v7/filebeat/input/filestream.(*fileProspector).Init.func4",
    "file.name": "filestream/prospector.go",
    "file.line": 214
  },
  "message": "Migrating input ID: 'filestream::first-id::fingerprint::db8399294e69089070405b13d4f057672f3852fa8e0f56ce4b6c92398aef1b6a' -> 'filestream::second-id::fingerprint::db8399294e69089070405b13d4f057672f3852fa8e0f56ce4b6c92398aef1b6a'",
  "service.name": "filebeat",
  "filestream_id": "second-id",
  "ecs.version": "1.6.0"
}
{
  "log.level": "info",
  "@timestamp": "2025-02-07T15:12:34.224-0500",
  "log.logger": "input",
  "log.origin": {
    "function": "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.(*sourceStore).updateIdentifiers",
    "file.name": "input-logfile/store.go",
    "file.line": 273
  },
  "message": "migrated entry in registry from 'filestream::first-id::fingerprint::db8399294e69089070405b13d4f057672f3852fa8e0f56ce4b6c92398aef1b6a' to 'filestream::second-id::fingerprint::db8399294e69089070405b13d4f057672f3852fa8e0f56ce4b6c92398aef1b6a'. Cursor: map[offset:296]",
  "service.name": "filebeat",
  "input_type": "filestream",
  "ecs.version": "1.6.0"
}

This commit enables Filestream inputs to migrate file states from
previous IDs to its current ID, this is done by adding a
`previous_ids` entry to the input configuration. We look in the store
for all states that match an active file with one of the previous IDs
and migrate this state to the new ID. The migrated old states are
marked for removal from the store.
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 7, 2025
@belimawr belimawr self-assigned this Feb 7, 2025
@belimawr belimawr added Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team backport-skip Skip notification from the automated backport with mergify and removed needs_team Indicates that the issue/PR needs a Team:* label labels Feb 7, 2025
@belimawr belimawr marked this pull request as ready for review February 10, 2025 20:18
@belimawr belimawr requested a review from a team as a code owner February 10, 2025 20:18
@belimawr belimawr requested review from AndersonQ and rdner February 10, 2025 20:18
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

Comment on lines +531 to +533
if err != nil {
t.Fatalf("cannot get absolute path for 'testdata': %s", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT | Consistency]
you could use require.NoError

Comment on lines +559 to +560
// eofMsg := fmt.Sprintf("End of file reached: %s; Backoff now.", logFilepath)
// filebeat.WaitForLogs(eofMsg, time.Second*10, "EOF was not reached")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover from a refactor or debug?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip Skip notification from the automated backport with mergify Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow changing filestream IDs without duplication by providing the previous ID values
3 participants