Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

refactor kafkaMdm to manage its own offsets. #296

Merged
merged 18 commits into from
Sep 6, 2016

Conversation

woodsaj
Copy link
Member

@woodsaj woodsaj commented Aug 25, 2016

  • removes use of sarama-cluster.
  • As kafkaMdm currently always consumes from all partitions, using
    a consumerGroup just adds complexity without any up side.
  • Instead of storing parition offsets in kafka, this change now tracks the
    offsets in a leveldb database. Offests are flushed to the index every 5seconds.
  • In addition to tracking the offsets, this commit also makes it
    easier to specify where a consumer should start from. The options
    are:
    • newest: the newest data available.
    • oldest: the oldest data available.
    • last: the last commited offset.
    • duration: a time.Duration amount of time ago to start from. eg
      "30m" would start consuming from data written 30minutes ago.

- removes use of sarama-cluster.
- As kafkaMdm currently always consumes from all partitions, using
a consumerGroup just adds complexity without any up side.
- Instead of storing parition offsets in kafka, this change now tracks the
offsets in a leveldb database. Offests are flushed to the index every 5seconds.
- In addition to tracking the offsets, this commit also makes it
easier to specify where a consumer should start from. The options
are:
 - newest: the newest data available.
 - oldest: the oldest data available.
 - last: the last commited offset.
 - <duration>: a time.Duration amount of time ago to start from. eg
       "30m" would start consuming from data written 30minutes ago.
@Dieterbe
Copy link
Contributor

note that DurationVars, if invalid, just get a value of 0ns. So the values still need to be validated (which i did for es retry-duration at ca0ff63 )

case "oldest":
case "newest":
default:
_, err := time.ParseDuration(offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we set a package-level variable here, so that we don't have to re-parse in Start()

@Dieterbe
Copy link
Contributor

Dieterbe commented Aug 25, 2016

few remarks, but overall 👍

this will fix #236

also don't forget to update the update the "doesn't work yet" in the install guides in the kafka section.
and the example config as well as the docker/package config

woodsaj and others added 14 commits August 26, 2016 12:55
This allows us to use the same offsetMgr for both ingestion and
the clusterHandler
was always seeing something like:

~/g/s/g/r/metrictank ❯❯❯ govendor status
The following packages are missing or modified locally:
	github.com/shopify/sarama

note that we, and sarama-cluster use github.com/Shopify/sarama as import
path.  vendor.json had both shopify and Shopify in the json, but only
the one with caps in the vendor dir.
by calling os.MkdirAll() on empty directories
we would get 'no such file or directory'

let's default to '' (working dir) for dev builds
and /var/lib/metrictank for docker and packages
@Dieterbe Dieterbe merged commit 3b5f787 into master Sep 6, 2016
@Dieterbe Dieterbe deleted the kafkaMdmNoConsumerGroup branch December 15, 2017 19:52
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants