Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster management (membership & cache query) #231

Closed
AkihiroSuda opened this issue Dec 19, 2017 · 12 comments
Closed

Cluster management (membership & cache query) #231

AkihiroSuda opened this issue Dec 19, 2017 · 12 comments

Comments

@AkihiroSuda
Copy link
Member

Design

Membership

  • Worker nodes periodically report the following info to the master:
    -- worker ID: unique string in the cluster. e.g. workerhost01-containerd-overlay
    -- connection info for connecting to the worker from the master: implementation-specific. probably, e.g. tcp://workerhost01:12345 or unix://run/buildkit/instance01.sock
    --- Support for UNIX socket should be useful for testing purpose
    --- not unique; can be shared among multiple workers. (master puts workerID to all request messages)
    -- performance stat: loadavg, disk quota usage, and so on
    -- annotations

e.g.

{
  "worker_id": "workerhost01-containerd-overlay",
  "connections":[
    {
      "type": "grpc.v0",
      "socket": "tcp://workerhost01.12345"
    }
  ],
  "stats":[
    {
      "type": "cpu.v0",
      "loadavg": [0.01, 0.02, 0.01]
    }
  ],
  "annotations": {
    "os": "linux",
    "arch": "amd64",
    "executor": "containerd",
    "snapshotter": "overlay",
    "com.example.userspecific": "blahblahblah",
  }
}

Cache query

  • With the connection info above, managers can ask a worker whether the worker has the cache for the CacheKey.
    -- the answer does not need to be 100% accurate.
    -- How to transfer the cache data is another topic: Cache transfer #224

Initial naive implementation

  • Stateless master
    -- When the master dies, the orchestrator (k8s/swarm) restarts the master (and membership info will be lost)
    -- Multiple masters could be started, but no connection between masters

  • Worker connects to the master using gRPC
    -- the master address(es) can be specified via the daemon CLI flag: --join tcp://master:12345

  • Master connects to all workers using gRPC for querying cache existence
    -- does not scale for dozens of nodes, but probably acceptable for the initial work

Future possible implementation

  • Use IPFS (or just libp2p DHT library) for querying cache existence (and also transfer)?
    -- Membership state can be saved to IPFS as well?
    -- or Infinite? (is it still active?)
@tonistiigi
Copy link
Member

connection info for connecting to the worker from the master:

We should only need one-way connection. If worker connects to the manager there is no need for the manager to connect to the worker. If connecting happens in the other way then manager already needs to know the address(set by config/control api).

not unique; can be shared among multiple workers. (master puts workerID to all request messages)

Don't understand that. Worker ID should be unique and manager directly talks to a single worker already so there is no value in the request.

Cache query

see #224 (comment)

@AkihiroSuda
Copy link
Member Author

If worker connects to the manager there is no need for the manager to connect to the worker.

How manager can know the cache map?
I understand we can use the socket for the opposite direct communication, but not sure gRPC provides API for that.

Don't understand that. Worker ID should be unique and manager directly talks to a single worker already so there is no value in the request.

Worker ID is unique but connection address is shared across multiple workers in a single daemon.

@tonistiigi
Copy link
Member

tonistiigi commented Dec 20, 2017

How manager can know the cache map?

The worker sends it to the manager(same way as this membership report). Or it could open up a bidirectional stream but I think it is better to keep this async.

Worker ID is unique but connection address is shared across multiple workers in a single daemon.

Connection address to what, and for what process? Even if the manager would connect to a worker, why/how would two workers have the same address?

@AkihiroSuda
Copy link
Member Author

The worker sends it to the manager

Does it scale?

Connection address to what, and for what process?

To worker, for cache transfer purpose.

Even if the manager would connect to a worker, why/how would two workers have the same address?

Because we put multiple workers in a single daemon.
If we don't like this design, we should revert the recent changes.

@tonistiigi
Copy link
Member

tonistiigi commented Dec 20, 2017

Does it scale?

Why shouldn't it? It is just metadata database. I think the sync call doesn't scale because possibly thousands of these calls have to happen for a single build and if every one of them makes a new request to every worker it requires a very fast and stable network.

To worker, for cache transfer purpose.

As discussed above. There should be only one connection, if there is already a connection for sending this membership status we should just use that. Same model that swarmkit agent uses to pick up tasks from the manager for example.

Because we put multiple workers in a single daemon.

What do you call daemon here? We have a single manager(=solver) and multiple worker model. We should think of them as fully independent. Daemon is just a way for running a binary. If you are talking about the local workers that buildkitd starts automatically then they can either not use grpc, use io.Pipe or they can launch separate worker binaries automatically that each dial back to manager(or if manager dials then each worker listens on a separate unix socket).

We are far from introducing the grpc connection yet. The current builtin list of workers is a good start. Let's define a clean go interface between the solver+workercontroller and the workers(eg. that we don't expect that they both have access to sessionmanager) and make it so that the build can use both builtin workers and transfer data between them. Then it will be easy to just switch to do this interface over grpc.

@tonistiigi
Copy link
Member

Does it scale?

It is also likely that this only happens on startup(and after garbage collection). As manager is the only object ever setting new records to this metadata(instructioncache.Set) it already knows about the recent changes.

@AkihiroSuda
Copy link
Member Author

OK, thank you for explanation, but shouldn't we still need worker connection address for worker-to-worker direct cache transfer?

@tonistiigi
Copy link
Member

shouldn't we still need worker connection address for worker-to-worker direct cache transfer?

Good point. We could do this the other way around then that only manager connects to the worker and there is no way for workers to connect to the manager. The manager would connect and subscribe to these membership events. The worker address is unique and every worker has own address. It is bit more complicated to bootstrap with this method.

Also, the data transfer should still be controlled by the manager. Not that a worker can randomly connect to another worker, but manager sends a specific instruction to a worker how to pull a specific ref from external location(another worker).

We could also (at least initially) transfer through the manager. I'm sure a network person could figure out STUN/ICE tunnel to have the best performance automatically but I have no such background. We could also do it so that transfer through manager is always supported but if you want p2p you can launch a worker with specific options that enable external connections.

@tonistiigi
Copy link
Member

@AkihiroSuda

Rough proposal for the remote worker API

This API is exposed from a remote worker. The manager is a client for this API.

// Worker exposes containerd content API. There may be a need to rethink
// gc logic. Eg. some root labels may need to be managed by the server or
// a session is needed to be set.
containerd.services.content.v1.Content

service Control {
	rpc DiskUsage(DiskUsageRequest) returns (DiskUsageResponse);
	rpc Prune(PruneRequest) returns (stream UsageRecord);
	
	// Session forwards session to the manager (and optionally from there to client)
	// Session call happens per build, not per worker. Session ID may not be same as
	// the one communicating with the client.
	// Logging/tracing is also handled through the session
	rpc Session(stream BytesMessage) returns (stream BytesMessage);
	
	rpc CacheMap(CacheMapRequest) returns (CacheMapResponse);
	rpc Exec(ExecRequest) returns (ExecResponse);
	
	rpc Info(InfoRequest) returns (InfoResponse);
	
	rpc TakeRef(TakeRefRequest) returns (TakeRefResponse);
	rpc ReleaseRef(ReleaseRefRequest) returns (ReleaseRefRequest);
	
	// ExecProcess is used for launching frontends. Implements stdio on stream.
        // Maybe use types from containerd API
	ExecProcess(stream) returns (stream)
	
	// Nested build are also implemented using the session that returns the gateway
	// frontend API.
	
	// from gateway
	rpc ResolveImageConfig(ResolveImageConfigRequest) returns (ResolveImageConfigResponse);
	// rpc ReadFile(ReadFileRequest) returns (ReadFileResponse);
	rpc ContentHash(ContentHashRequest) returns (ContentHashResponse);
}

message CacheMapRequest {
	pb.Op op;
	int index;
	string session;
}

message CacheMapResponse {
	CacheMap map;
	bool final;
}

message ExecRequest {
	pb.Op op;
	repeated Ref refs;
	string session;
}

message ExecResponse {
	repeated Ref refs;
}

message InfoRequest {
}

message InfoResponse {
	string id;
	map<string, string> labels; 
	repeated types.Platform platforms;
	
	systemstats stats;
}

// Manager takes a ref if it wants to hold onto it. A timeout must be set
// to avoid refs not getting released (or same can be achieved with a session).
// Also need to figure out how this plays with content store gc and if the manager
// needs to update ref metadata.
message TakeRefRequest {
	oneof id {
	string refID;
	string snapshotID;
	repeated digest.Digest blobs;
	}
	int timeout;
	bool blobs; // if false then blobs may be empty in return value
	string session;
}
message TakeRefResponse {
	Ref ref;
	int timeout;
}

message ReleaseRefRequest {
	string refID;
}

message ReleaseRefResponse {
}

message Ref {
	string id;
	string snapshotID;
	repeated digest.Digest blobs;
}

Dialing

I propose considering dialing and exposing sockets later as there are many options here that would be easier to evaluate on a working product. Generally speaking for the workers to exchange data directly would either need to open a port or use a service to communicate and that would happen though mtls. But the details for this are not that important atm. Currently, we could start by just saying that something starts a worker process and uses stdio to communicate with it. Eg.

buildkitd --agent

Starts a worker daemon that doesn't have a control api/solver but instead exposes worker remote API on the stdio. It could also be a separate binary. In first implementation the manager would just call these programs (for example through SSH or Docker for node switching).

Exporters

Exporters as an abstraction do not belong to worker directly, but there needs to be a way to return data out of the worker references so they can be used by the exporter. The exporters that work on content blobs already have access to the data through the references and contentstrore.

Picking worker:

The correct worker picking logic needs to be defined separately and probably needs a lot of tweaking with real-world cases. It is probably a combination of an LRU key-value map in the manager that helps to predict the same workers for similar workloads and the result of an info request showing the current state of the worker. This could be a learning component in the future.

@AkihiroSuda
Copy link
Member Author

👍

  • To support multi-manager, I guess we need to put ManagerID to TakeRefRequest and ReleaseRefRequest?
  • IIRC last year we discussed making workers to dial managers? Do we still want to keep this design? (Yes, manager can still call worker gRPC using stream with this design)

@tonistiigi
Copy link
Member

IIRC last year we discussed making workers to dial managers? Do we still want to keep this design? (Yes, manager can still call worker gRPC using stream with this design)

I'm not completely sure so I left it open. As you say yourself it doesn't really change the other design so we could just do it on a fixed connection/stdio initially. Bootstrapping may be easier in some cases when workers dial and there is only a single port to open for a cluster. Otoh if workers would expose a port already for p2p it doesn't make sense for the manager to do it as well. Also, some upgrades could be easier if manager connects as it can just dial into a new set of workers on restart.

@AkihiroSuda
Copy link
Member Author

Closing as we have Kubernetes driver in buildx now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants