Skip to content

Commit

Permalink
[receiver/sqlqueryreceiver] Fix tracking results by timestamp column (#…
Browse files Browse the repository at this point in the history
…35195)

**Description:**

Formats retrieved time columns with milliseconds precision, so they are
not reprocessed when used as a tracking_column

**Link to tracking Issue:** #35194

**Testing:** Added integration test, updated test data

**Documentation:** n/a

Closes #35194
  • Loading branch information
Grandys authored Sep 30, 2024
1 parent 2225ccc commit 4c6d6a5
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 10 deletions.
27 changes: 27 additions & 0 deletions .chloggen/sqlqueryreceiver-fix-ts-tracking-column.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix reprocessing of logs when tracking_column type is timestamp

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35194]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion internal/sqlquery/row_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newRowScanner(colTypes []colType) *rowScanner {
}
format := "%v"
if t, isTime := v.(time.Time); isTime {
return t.Format(time.RFC3339), nil
return t.Format(time.RFC3339Nano), nil
}
if reflect.TypeOf(v).Kind() == reflect.Slice {
// The Postgres driver returns a []uint8 (ascii string) for decimal and numeric types,
Expand Down
47 changes: 47 additions & 0 deletions receiver/sqlqueryreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,53 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) {
testAllSimpleLogs(t, consumer.AllLogs())
}

func TestPostgresIntegrationLogsTrackingByTimestampColumnWithoutStorage(t *testing.T) {
// Start Postgres container.
externalPort := "15432"
dbContainer := startPostgresDbContainer(t, externalPort)
defer func() {
require.NoError(t, dbContainer.Terminate(context.Background()))
}()

// Start the SQL Query receiver.
receiverCreateSettings := receivertest.NewNopSettings()
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
config.CollectionInterval = 100 * time.Millisecond
config.Queries = []sqlquery.Query{
{
SQL: "select * from simple_logs where insert_time > $1 order by insert_time asc",
Logs: []sqlquery.LogsCfg{
{
BodyColumn: "body",
AttributeColumns: []string{"attribute"},
},
},
TrackingColumn: "insert_time",
TrackingStartValue: "2022-06-03 21:00:00+00",
},
}
host := componenttest.NewNopHost()
err := receiver.Start(context.Background(), host)
require.NoError(t, err)

// Verify there's 5 logs received.
require.Eventuallyf(
t,
func() bool {
return consumer.LogRecordCount() > 0
},
1*time.Minute,
500*time.Millisecond,
"failed to receive more than 0 logs",
)
require.Equal(t, 5, consumer.LogRecordCount())
testAllSimpleLogs(t, consumer.AllLogs())

// Stop the SQL Query receiver.
err = receiver.Shutdown(context.Background())
require.NoError(t, err)
}

func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
// start Postgres container
externalPort := "15431"
Expand Down
6 changes: 3 additions & 3 deletions receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ create table simple_logs

insert into simple_logs (id, insert_time, body, attribute) values
(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'),
(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'),
(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'),
(2, '2022-06-03 21:59:26.692991', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'),
(3, '2022-06-03 21:59:29.212212', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'),
(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'),
(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');
(5, '2022-06-03 21:59:31.332121', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ grant select on simple_logs to otel;
insert into simple_logs (id, insert_time, body, attribute) values
(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2');
insert into simple_logs (id, insert_time, body, attribute) values
(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1');
(2, TIMESTAMP '2022-06-03 21:59:26.692991 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1');
insert into simple_logs (id, insert_time, body, attribute) values
(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2');
(3, TIMESTAMP '2022-06-03 21:59:29.212212 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2');
insert into simple_logs (id, insert_time, body, attribute) values
(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1');
insert into simple_logs (id, insert_time, body, attribute) values
(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');
(5, TIMESTAMP '2022-06-03 21:59:31.332121 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ grant select, insert on simple_logs to otel;

insert into simple_logs (id, insert_time, body, attribute) values
(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'),
(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'),
(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'),
(2, '2022-06-03 21:59:26.692991+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'),
(3, '2022-06-03 21:59:29.212212+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'),
(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'),
(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');
(5, '2022-06-03 21:59:31.332121+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');

0 comments on commit 4c6d6a5

Please sign in to comment.