From 31bb5714d697abdc6cfa86b786c157323fc3ef30 Mon Sep 17 00:00:00 2001
From: Georg Traar <georg@crate.io>
Date: Mon, 5 Aug 2024 10:56:54 +0200
Subject: [PATCH] Ignore samples with different values with same timestamp

---
 CHANGES.rst |  5 +++++
 crate.go    | 14 ++++++--------
 2 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/CHANGES.rst b/CHANGES.rst
index 86e5ca1..957bfaf 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -5,6 +5,11 @@ CHANGES for CrateDB Prometheus Adapter
 Unreleased
 ==========
 
+- Changed the behavior to ignore samples with identical timestamps but different
+  values during ingestion. This change aligns with Prometheus' behavior, as
+  duplicates should not occur. However, since this enforcement is recent,
+  there may be setups with existing duplicates; thus, we now ignore them to
+  prevent errors.
 
 2024-01-23 0.5.1
 ================
diff --git a/crate.go b/crate.go
index 564b5b4..920a749 100644
--- a/crate.go
+++ b/crate.go
@@ -13,7 +13,7 @@ import (
 	"github.com/prometheus/common/model"
 )
 
-const crateWriteStatement = `INSERT INTO metrics ("labels", "labels_hash", "timestamp", "value", "valueRaw") VALUES ($1, $2, $3, $4, $5)`
+const crateWriteStatement = `INSERT INTO metrics ("labels", "labels_hash", "timestamp", "value", "valueRaw") VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`
 
 type crateRow struct {
 	labels     model.Metric
@@ -112,10 +112,9 @@ func newCrateEndpoint(ep *endpointConfig) *crateEndpoint {
 }
 
 func (c *crateEndpoint) endpoint() endpoint.Endpoint {
-	/**
-	 * Initialize connection pools lazily here instead of in `newCrateEndpoint()`,
-	 * so that the adapter does not crash on startup if the endpoint is unavailable.
-	**/
+
+	// Initialize connection pools lazily here instead of in `newCrateEndpoint()`,
+	// so that the adapter does not crash on startup if the endpoint is unavailable.
 	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
 
 		// Initialize database connection pools.
@@ -134,9 +133,8 @@ func (c *crateEndpoint) endpoint() endpoint.Endpoint {
 }
 
 func (c *crateEndpoint) createPools(ctx context.Context) (err error) {
-	/**
-	 * Initialize two connection pools, one for read/write each.
-	**/
+
+	// Initialize two connection pools, one for read/write each.
 	c.readPool, err = createPoolWithPoolSize(ctx, c.poolConf.Copy(), c.readPoolSize)
 	if c.readPool == nil {
 		c.readPool, err = createPoolWithPoolSize(ctx, c.poolConf.Copy(), c.readPoolSize)