Skip to content

Commit

Permalink
Merge branch 'master' into feature/res-853-creating-ipfs-pr-for-pl-to…
Browse files Browse the repository at this point in the history
…-review
  • Loading branch information
smrz2001 authored Jan 23, 2023
2 parents 1fe1734 + 9327ee6 commit 609c31b
Show file tree
Hide file tree
Showing 29 changed files with 1,190 additions and 707 deletions.
4 changes: 4 additions & 0 deletions .circleci/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ jobs:
- image: cimg/go:1.19.1-node
parallelism: 4
resource_class: 2xlarge+
environment:
GO_IPFS_DIST_URL: https://dist-ipfs-tech.ipns.cf-ipfs.com # TODO remove this line when https://github.com/protocol/bifrost-infra/issues/2300 is closed
steps:
- *make_out_dirs
- attach_workspace:
Expand Down Expand Up @@ -328,6 +330,8 @@ jobs:
ipfs-webui:
executor: node-browsers
resource_class: 2xlarge+
environment:
GO_IPFS_DIST_URL: https://dist-ipfs-tech.ipns.cf-ipfs.com # TODO remove this line when https://github.com/protocol/bifrost-infra/issues/2300 is closed
steps:
- *make_out_dirs
- attach_workspace:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
LIBP2P_TCP_REUSEPORT: false
LIBP2P_ALLOW_WEAK_RSA_KEYS: 1
IPFS_GO_EXEC: ${{ github.workspace }}/cmd/ipfs/ipfs
GO_IPFS_DIST_URL: https://dist-ipfs-tech.ipns.cf-ipfs.com # TODO: remove this line when https://github.com/protocol/bifrost-infra/issues/2300 is closed
working-directory: interop
go-ipfs-api:
needs: [prepare]
Expand Down Expand Up @@ -161,6 +162,7 @@ jobs:
TRAVIS: 1
GIT_PAGER: cat
IPFS_CHECK_RCMGR_DEFAULTS: 1
GO_IPFS_DIST_URL: https://dist-ipfs-tech.ipns.cf-ipfs.com # TODO: remove this line when https://github.com/protocol/bifrost-infra/issues/2300 is closed
defaults:
run:
shell: bash
Expand Down
4 changes: 4 additions & 0 deletions config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ const DefaultConnMgrGracePeriod = time.Second * 20
// type.
const DefaultConnMgrType = "basic"

// DefaultResourceMgrMinInboundConns is a MAGIC number that probably a good
// enough number of inbound conns to be a good network citizen.
const DefaultResourceMgrMinInboundConns = 800

func addressesConfig() Addresses {
return Addresses{
Swarm: []string{
Expand Down
21 changes: 11 additions & 10 deletions core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request

// Support custom response formats passed via ?format or Accept HTTP header
switch responseFormat {
case "":
switch resolvedPath.Cid().Prefix().Codec {
case uint64(mc.Json), uint64(mc.DagJson), uint64(mc.Cbor), uint64(mc.DagCbor):
case "", "application/json", "application/cbor":
switch mc.Code(resolvedPath.Cid().Prefix().Codec) {
case mc.Json, mc.DagJson, mc.Cbor, mc.DagCbor:
logger.Debugw("serving codec", "path", contentPath)
i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
default:
Expand All @@ -441,14 +441,13 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
logger.Debugw("serving tar file", "path", contentPath)
i.serveTAR(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
return
case "application/json", "application/vnd.ipld.dag-json",
"application/cbor", "application/vnd.ipld.dag-cbor":
case "application/vnd.ipld.dag-json", "application/vnd.ipld.dag-cbor":
logger.Debugw("serving codec", "path", contentPath)
i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
return
default: // catch-all for unsuported application/vnd.*
err := fmt.Errorf("unsupported format %q", responseFormat)
webError(w, "failed respond with requested content type", err, http.StatusBadRequest)
webError(w, "failed to respond with requested content type", err, http.StatusBadRequest)
return
}
}
Expand Down Expand Up @@ -878,14 +877,14 @@ func customResponseFormat(r *http.Request) (mediaType string, params map[string]
return "application/vnd.ipld.car", nil, nil
case "tar":
return "application/x-tar", nil, nil
case "dag-json":
return "application/vnd.ipld.dag-json", nil, nil
case "json":
return "application/json", nil, nil
case "dag-cbor":
return "application/vnd.ipld.dag-cbor", nil, nil
case "cbor":
return "application/cbor", nil, nil
case "dag-json":
return "application/vnd.ipld.dag-json", nil, nil
case "dag-cbor":
return "application/vnd.ipld.dag-cbor", nil, nil
}
}
// Browsers and other user agents will send Accept header with generic types like:
Expand All @@ -908,6 +907,8 @@ func customResponseFormat(r *http.Request) (mediaType string, params map[string]
}
}
}
// If none of special-cased content types is found, return empty string
// to indicate default, implicit UnixFS response should be prepared
return "", nil, nil
}

Expand Down
83 changes: 41 additions & 42 deletions core/corehttp/gateway_handler_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@ import (

// codecToContentType maps the supported IPLD codecs to the HTTP Content
// Type they should have.
var codecToContentType = map[uint64]string{
uint64(mc.Json): "application/json",
uint64(mc.Cbor): "application/cbor",
uint64(mc.DagJson): "application/vnd.ipld.dag-json",
uint64(mc.DagCbor): "application/vnd.ipld.dag-cbor",
var codecToContentType = map[mc.Code]string{
mc.Json: "application/json",
mc.Cbor: "application/cbor",
mc.DagJson: "application/vnd.ipld.dag-json",
mc.DagCbor: "application/vnd.ipld.dag-cbor",
}

// contentTypeToCodecs maps the HTTP Content Type to the respective
// possible codecs. If the original data is in one of those codecs,
// we stream the raw bytes. Otherwise, we encode in the last codec
// of the list.
var contentTypeToCodecs = map[string][]uint64{
"application/json": {uint64(mc.Json), uint64(mc.DagJson)},
"application/vnd.ipld.dag-json": {uint64(mc.DagJson)},
"application/cbor": {uint64(mc.Cbor), uint64(mc.DagCbor)},
"application/vnd.ipld.dag-cbor": {uint64(mc.DagCbor)},
// contentTypeToRaw maps the HTTP Content Type to the respective codec that
// allows raw response without any conversion.
var contentTypeToRaw = map[string][]mc.Code{
"application/json": {mc.Json, mc.DagJson},
"application/cbor": {mc.Cbor, mc.DagCbor},
}

// contentTypeToCodec maps the HTTP Content Type to the respective codec. We
// only add here the codecs that we want to convert-to-from.
var contentTypeToCodec = map[string]mc.Code{
"application/vnd.ipld.dag-json": mc.DagJson,
"application/vnd.ipld.dag-cbor": mc.DagCbor,
}

// contentTypeToExtension maps the HTTP Content Type to the respective file
Expand All @@ -56,7 +59,7 @@ func (i *gatewayHandler) serveCodec(ctx context.Context, w http.ResponseWriter,
ctx, span := tracing.Span(ctx, "Gateway", "ServeCodec", trace.WithAttributes(attribute.String("path", resolvedPath.String()), attribute.String("requestedContentType", requestedContentType)))
defer span.End()

cidCodec := resolvedPath.Cid().Prefix().Codec
cidCodec := mc.Code(resolvedPath.Cid().Prefix().Codec)
responseContentType := requestedContentType

// If the resolved path still has some remainder, return error for now.
Expand Down Expand Up @@ -90,50 +93,44 @@ func (i *gatewayHandler) serveCodec(ctx context.Context, w http.ResponseWriter,
// No content type is specified by the user (via Accept, or format=). However,
// we support this format. Let's handle it.
if requestedContentType == "" {
isDAG := cidCodec == uint64(mc.DagJson) || cidCodec == uint64(mc.DagCbor)
isDAG := cidCodec == mc.DagJson || cidCodec == mc.DagCbor
acceptsHTML := strings.Contains(r.Header.Get("Accept"), "text/html")
download := r.URL.Query().Get("download") == "true"

if isDAG && acceptsHTML && !download {
i.serveCodecHTML(ctx, w, r, resolvedPath, contentPath)
} else {
// This covers CIDs with codec 'json' and 'cbor' as those do not have
// an explicit requested content type.
i.serveCodecRaw(ctx, w, r, resolvedPath, contentPath, name, modtime)
}

return
}

// Otherwise, the user has requested a specific content type. Let's first get
// the codecs that can be used with this content type.
codecs, ok := contentTypeToCodecs[requestedContentType]
// If DAG-JSON or DAG-CBOR was requested using corresponding plain content type
// return raw block as-is, without conversion
skipCodecs, ok := contentTypeToRaw[requestedContentType]
if ok {
for _, skipCodec := range skipCodecs {
if skipCodec == cidCodec {
i.serveCodecRaw(ctx, w, r, resolvedPath, contentPath, name, modtime)
return
}
}
}

// Otherwise, the user has requested a specific content type (a DAG-* variant).
// Let's first get the codecs that can be used with this content type.
toCodec, ok := contentTypeToCodec[requestedContentType]
if !ok {
// This is never supposed to happen unless function is called with wrong parameters.
err := fmt.Errorf("unsupported content type: %s", requestedContentType)
webError(w, err.Error(), err, http.StatusInternalServerError)
return
}

// If we need to convert, use the last codec (strict dag- variant)
toCodec := codecs[len(codecs)-1]

// If the requested content type has "dag-", ALWAYS go through the encoding
// process in order to validate the content.
if strings.Contains(requestedContentType, "dag-") {
i.serveCodecConverted(ctx, w, r, resolvedPath, contentPath, toCodec, modtime)
return
}

// Otherwise, check if the data is encoded with the requested content type.
// If so, we can directly stream the raw data. serveRawBlock cannot be directly
// used here as it sets different headers.
for _, codec := range codecs {
if resolvedPath.Cid().Prefix().Codec == codec {
i.serveCodecRaw(ctx, w, r, resolvedPath, contentPath, name, modtime)
return
}
}

// Finally, if nothing of the above is true, we have to actually convert the codec.
// This handles DAG-* conversions and validations.
i.serveCodecConverted(ctx, w, r, resolvedPath, contentPath, toCodec, modtime)
}

Expand Down Expand Up @@ -165,6 +162,7 @@ func (i *gatewayHandler) serveCodecHTML(ctx context.Context, w http.ResponseWrit
}
}

// serveCodecRaw returns the raw block without any conversion
func (i *gatewayHandler) serveCodecRaw(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, name string, modtime time.Time) {
blockCid := resolvedPath.Cid()
blockReader, err := i.api.Block().Get(ctx, resolvedPath)
Expand All @@ -184,7 +182,8 @@ func (i *gatewayHandler) serveCodecRaw(ctx context.Context, w http.ResponseWrite
_, _, _ = ServeContent(w, r, name, modtime, content)
}

func (i *gatewayHandler) serveCodecConverted(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, toCodec uint64, modtime time.Time) {
// serveCodecConverted returns payload converted to codec specified in toCodec
func (i *gatewayHandler) serveCodecConverted(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, toCodec mc.Code, modtime time.Time) {
obj, err := i.api.Dag().Get(ctx, resolvedPath.Cid())
if err != nil {
webError(w, "ipfs dag get "+html.EscapeString(resolvedPath.String()), err, http.StatusInternalServerError)
Expand All @@ -199,7 +198,7 @@ func (i *gatewayHandler) serveCodecConverted(ctx context.Context, w http.Respons
}
finalNode := universal.(ipld.Node)

encoder, err := multicodec.LookupEncoder(toCodec)
encoder, err := multicodec.LookupEncoder(uint64(toCodec))
if err != nil {
webError(w, err.Error(), err, http.StatusInternalServerError)
return
Expand Down
58 changes: 53 additions & 5 deletions core/node/libp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err)
}

limitConfig, err := createDefaultLimitConfig(cfg)
var limitConfig rcmgr.LimitConfig
defaultComputedLimitConfig, err := createDefaultLimitConfig(cfg)
if err != nil {
return nil, opts, err
}
Expand All @@ -61,10 +62,19 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
// is documented in docs/config.md.
// Any changes here should be reflected there.
if cfg.ResourceMgr.Limits != nil {
l := *cfg.ResourceMgr.Limits
// This effectively overrides the computed default LimitConfig with any vlues from cfg.ResourceMgr.Limits
l.Apply(limitConfig)
limitConfig = l
userSuppliedOverrideLimitConfig := *cfg.ResourceMgr.Limits
// This effectively overrides the computed default LimitConfig with any non-zero values from cfg.ResourceMgr.Limits.
// Because of how how Apply works, any 0 value for a user supplied override
// will be overriden with a computed default value.
// There currently isn't a way for a user to supply a 0-value override.
userSuppliedOverrideLimitConfig.Apply(defaultComputedLimitConfig)
limitConfig = userSuppliedOverrideLimitConfig
} else {
limitConfig = defaultComputedLimitConfig
}

if err := ensureConnMgrMakeSenseVsResourceMgr(limitConfig, cfg.ConnMgr); err != nil {
return nil, opts, err
}

limiter := rcmgr.NewFixedLimiter(limitConfig)
Expand Down Expand Up @@ -598,3 +608,41 @@ func NetResetLimit(mgr network.ResourceManager, repo repo.Repo, scope string) (r

return result, nil
}

func ensureConnMgrMakeSenseVsResourceMgr(rcm rcmgr.LimitConfig, cmgr config.ConnMgr) error {
if cmgr.Type.WithDefault(config.DefaultConnMgrType) == "none" {
return nil // none connmgr, no checks to do
}
highWater := cmgr.HighWater.WithDefault(config.DefaultConnMgrHighWater)
if rcm.System.ConnsInbound <= rcm.System.Conns {
if int64(rcm.System.ConnsInbound) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.ConnsInbound (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.ConnsInbound, highWater)
}
} else if int64(rcm.System.Conns) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.Conns (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.Conns, highWater)
}
if rcm.System.StreamsInbound <= rcm.System.Streams {
if int64(rcm.System.StreamsInbound) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.StreamsInbound (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.StreamsInbound, highWater)
}
} else if int64(rcm.System.Streams) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.Streams (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.Streams, highWater)
}
return nil
}
19 changes: 19 additions & 0 deletions core/node/libp2p/rcmgr_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,24 @@ Run 'ipfs swarm limit all' to see the resulting limits.

defaultLimitConfig := scalingLimitConfig.Scale(int64(maxMemory), int(numFD))

// Simple checks to overide autoscaling ensuring limits make sense versus the connmgr values.
// There are ways to break this, but this should catch most problems already.
// We might improve this in the future.
// See: https://github.com/ipfs/kubo/issues/9545
if cfg.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) != "none" {
maxInboundConns := int64(defaultLimitConfig.System.ConnsInbound)
if connmgrHighWaterTimesTwo := cfg.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater) * 2; maxInboundConns < connmgrHighWaterTimesTwo {
maxInboundConns = connmgrHighWaterTimesTwo
}

if maxInboundConns < config.DefaultResourceMgrMinInboundConns {
maxInboundConns = config.DefaultResourceMgrMinInboundConns
}

// Scale System.StreamsInbound as well, but use the existing ratio of StreamsInbound to ConnsInbound
defaultLimitConfig.System.StreamsInbound = int(maxInboundConns * int64(defaultLimitConfig.System.StreamsInbound) / int64(defaultLimitConfig.System.ConnsInbound))
defaultLimitConfig.System.ConnsInbound = int(maxInboundConns)
}

return defaultLimitConfig, nil
}
Loading

0 comments on commit 609c31b

Please sign in to comment.