-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add storage based ingester to m3coordinator #1038
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1038 +/- ##
==========================================
- Coverage 77.88% 77.85% -0.04%
==========================================
Files 530 532 +2
Lines 45304 45361 +57
==========================================
+ Hits 35287 35316 +29
- Misses 7799 7819 +20
- Partials 2218 2226 +8
Continue to review full report at Codecov.
|
@@ -0,0 +1,65 @@ | |||
package ingest |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -0,0 +1,175 @@ | |||
package ingest |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
||
// NewIngester creates an ingester. | ||
func NewIngester( | ||
opts *Options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should make this non-pointer type? Options should be required it looks like so no need to make pointer type.
// Ingest ingests a metric asynchronously with callback. | ||
func (i *Ingester) Ingest( | ||
id []byte, | ||
metricTimeNanos int64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we perhaps just make this time.Time
considering it's converted to that later anyhow? Less chance of getting the precision wrong if so.
return op.s.Write( | ||
// NB: No timeout is needed for this as the m3db client has a timeout | ||
// configured to it. | ||
context.Background(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm is it worth plumbing a ctx
all the way from the top to here? ctx.Background()
is a bit untrue, the context really should come from the ingester server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could, but then the server also does not have anything to add to the context. This is mainly to fulfill the storage interface, I'll just make a context whenever a new ingestOp is made like statsdex.
} | ||
op.resetDataPoints() | ||
op.q.Raw = string(op.id) | ||
op.q.Unit = op.sp.Resolution().Precision |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just checked how Precision
gets resolved for say a storage policy of 1m:30d
and it seems like it's going to set Unit of xtime.Minute
which won't be compatible with M3DB (which at most supports a unit/precision of time.Second).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll just use xtime.Millisecond
then, seems that's the default value used in several places in query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating the usage in flush_handler.go as well
op.q.Tags = op.q.Tags[:0] | ||
for op.it.Next() { | ||
name, value := op.it.Current() | ||
op.q.Tags = append(op.q.Tags, models.Tag{Name: name, Value: value}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unsafe to do, this is the bug I just fixed where you can't hold onto the byte refs from the tag iterator since it reuses them.
Ingester ingest.Configuration `yaml:"ingester"` | ||
|
||
// M3msg is the configuration for m3msg server. | ||
M3msg m3msg.Configuration `yaml:"m3msg"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this should probably be (due to camel casing convention):
M3Msg m3msg.Configuration `yaml:"m3msg"`
@@ -142,7 +143,7 @@ func (w *downsamplerFlushHandlerWriter) Write( | |||
Timestamp: time.Unix(0, mp.TimeNanos), | |||
Value: mp.Value, | |||
}}, | |||
Unit: mp.StoragePolicy.Resolution().Precision, | |||
Unit: xtime.Millisecond, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will have pretty poor compression for things that can be written in with second precision.
Could we please do at least something here where we try to find the best? i.e.
switch {
case time.Duration(nanos) % time.Second == 0:
unit = xtime.Second
case time.Duration(nanos) % time.Millisecond == 0:
unit = xtime.Millisecond
case time.Duration(nanos) % time.Microsecond == 0:
unit = xtime.Microsecond
default:
unit = xtime.Nanosecond
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update flush_handler.go
to use the new method you created? common.SanitizeUnitForM3DB(mp.StoragePolicy.Resolution().Precision)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, forgot about this
metricTime time.Time, | ||
value float64, | ||
sp policy.StoragePolicy, | ||
callback *m3msg.RefCountedCallback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add ctx context.Context
as the first argument? (lint forces some context to always be the first arg)
We don't want to use the TODO context each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, the m3msg server will only be able to pass a TODO
/Background
context each time though, although I can reuse one context there, is it what you wanted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think that's ideal, in the future if we have a per-op context then we'll be able to use that. Otherwise here it'll be a single one that can never be specialized. We should at least let the callers into this layer be able to specify their own.
it: serialize.NewMetricTagsIterator(tagDecoder, nil), | ||
p: p, | ||
m: m, | ||
c: context.TODO(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can just be nil if it gets set on each ingest op in .Ingest(...)
.
return err | ||
} | ||
op.resetDataPoints() | ||
op.q.Raw = string(op.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually doesn't need to be set, while you're here can you remove Raw
from the storage.WriteQuery
type? I just checked references and it doesn't get used anywhere except the String()
method on storage.WriteQuery
(which we can replace with q.Tags.String()
or similar instead of q.Raw
).
func (cfg Configuration) newOptions( | ||
appender storage.Appender, | ||
instrumentOptions instrument.Options, | ||
) (*Options, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you need to derefence this pointer later now for use in the constructor, perhaps make this return type (Options, error)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, was trying to avoid returning Options{}, err
, I can change that
return err | ||
} | ||
op.resetDataPoints() | ||
op.q.Unit = common.SanitizeUnitForM3DB(op.sp.Resolution().Precision) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great 👍
@@ -32,6 +34,8 @@ import ( | |||
"github.com/uber-go/tally" | |||
) | |||
|
|||
var ctx = context.TODO() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making this global, can you give one to each handler, then pass that to each per consumer handler?
Also it makes more sense to derive from context.Background()
rather than context.TODO()
since we want to signify this was from a background listen server, rather than intending to replace it later.
src/x/common/unit.go
Outdated
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit: Can we rename this to convert
? We could rename SanitizeUnitForM3DB
to something like UnitForM3DB(unit xtime.Unit) xtime.Unit
and start putting other conversion functions in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
No description provided.