Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add x-shard-key to APIs #72

Merged
merged 2 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions Sources/Core/Auth.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import NIOCore
class AuthClientInterceptor<Request, Response>: ClientInterceptor<Request, Response> {
let apiKey: String?
let token: String?
let docKey: String?

init(apiKey: String? = nil, token: String? = nil) {
init(apiKey: String? = nil, token: String? = nil, docKey: String? = nil) {
self.apiKey = apiKey
self.token = token
self.docKey = docKey
}

override func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?, context: ClientInterceptorContext<Request, Response>) {
Expand All @@ -34,6 +36,14 @@ class AuthClientInterceptor<Request, Response>: ClientInterceptor<Request, Respo
case .metadata(var header):
if let apiKey {
header.add(name: "x-api-key", value: apiKey)

var shardKey = "\(apiKey)"

if let docKey = self.docKey, docKey.isEmpty == false {
shardKey += "/\(docKey)"
}

header.add(name: "x-shard-key", value: shardKey)
}

if let token {
Expand All @@ -53,10 +63,16 @@ class AuthClientInterceptor<Request, Response>: ClientInterceptor<Request, Respo
final class AuthClientInterceptors: YorkieServiceClientInterceptorFactoryProtocol {
let apiKey: String?
let token: String?
let docKey: String?

init(apiKey: String? = nil, token: String? = nil) {
init(apiKey: String? = nil, token: String? = nil, docKey: String? = nil) {
self.apiKey = apiKey
self.token = token
self.docKey = docKey
}

func docKeyChangedInterceptors(_ docKey: String?) -> AuthClientInterceptors {
AuthClientInterceptors(apiKey: self.apiKey, token: self.token, docKey: docKey)
}

func makeActivateClientInterceptors() -> [GRPC.ClientInterceptor<ActivateClientRequest, ActivateClientResponse>] {
Expand All @@ -68,26 +84,26 @@ final class AuthClientInterceptors: YorkieServiceClientInterceptorFactoryProtoco
}

func makeUpdatePresenceInterceptors() -> [GRPC.ClientInterceptor<UpdatePresenceRequest, UpdatePresenceResponse>] {
[AuthClientInterceptor<UpdatePresenceRequest, UpdatePresenceResponse>(apiKey: self.apiKey, token: self.token)]
[AuthClientInterceptor<UpdatePresenceRequest, UpdatePresenceResponse>(apiKey: self.apiKey, token: self.token, docKey: self.docKey)]
}

func makeAttachDocumentInterceptors() -> [GRPC.ClientInterceptor<AttachDocumentRequest, AttachDocumentResponse>] {
[AuthClientInterceptor<AttachDocumentRequest, AttachDocumentResponse>(apiKey: self.apiKey, token: self.token)]
[AuthClientInterceptor<AttachDocumentRequest, AttachDocumentResponse>(apiKey: self.apiKey, token: self.token, docKey: self.docKey)]
}

func makeDetachDocumentInterceptors() -> [GRPC.ClientInterceptor<DetachDocumentRequest, DetachDocumentResponse>] {
[AuthClientInterceptor<DetachDocumentRequest, DetachDocumentResponse>(apiKey: self.apiKey, token: self.token)]
[AuthClientInterceptor<DetachDocumentRequest, DetachDocumentResponse>(apiKey: self.apiKey, token: self.token, docKey: self.docKey)]
}

func makeWatchDocumentInterceptors() -> [GRPC.ClientInterceptor<WatchDocumentRequest, WatchDocumentResponse>] {
[AuthClientInterceptor<WatchDocumentRequest, WatchDocumentResponse>(apiKey: self.apiKey, token: self.token)]
[AuthClientInterceptor<WatchDocumentRequest, WatchDocumentResponse>(apiKey: self.apiKey, token: self.token, docKey: self.docKey)]
}

func makeRemoveDocumentInterceptors() -> [GRPC.ClientInterceptor<Yorkie_V1_RemoveDocumentRequest, Yorkie_V1_RemoveDocumentResponse>] {
[AuthClientInterceptor<RemoveDocumentRequest, RemoveDocumentResponse>(apiKey: self.apiKey, token: self.token)]
[AuthClientInterceptor<RemoveDocumentRequest, RemoveDocumentResponse>(apiKey: self.apiKey, token: self.token, docKey: self.docKey)]
}

func makePushPullChangesInterceptors() -> [GRPC.ClientInterceptor<Yorkie_V1_PushPullChangesRequest, Yorkie_V1_PushPullChangesResponse>] {
[AuthClientInterceptor<PushPullChangeRequest, PushPullChangeResponse>(apiKey: self.apiKey, token: self.token)]
[AuthClientInterceptor<PushPullChangeRequest, PushPullChangeResponse>(apiKey: self.apiKey, token: self.token, docKey: self.docKey)]
}
}
19 changes: 16 additions & 3 deletions Sources/Core/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public actor Client {
private let reconnectStreamDelay: Int
private let maximumAttachmentTimeout: Int

private let rpcClient: YorkieServiceAsyncClient
private var rpcClient: YorkieServiceAsyncClient

private let group: EventLoopGroup

Expand Down Expand Up @@ -230,6 +230,7 @@ public actor Client {
activateRequest.clientKey = self.key

do {
self.changeDocKeyOfAuthInterceptors(nil)
let activateResponse = try await self.rpcClient.activateClient(activateRequest, callOptions: nil)

self.id = activateResponse.clientID.toHexString
Expand Down Expand Up @@ -267,6 +268,7 @@ public actor Client {
deactivateRequest.clientID = clientIDData

do {
self.changeDocKeyOfAuthInterceptors(nil)
_ = try await self.rpcClient.deactivateClient(deactivateRequest)
} catch {
Logger.error("Failed to request deactivate client(\(self.key)).", error: error)
Expand Down Expand Up @@ -311,6 +313,7 @@ public actor Client {

self.semaphoresForInitialzation[docKey] = semaphore

self.changeDocKeyOfAuthInterceptors(docKey)
let result = try await self.rpcClient.attachDocument(attachDocumentRequest)

let pack = try Converter.fromChangePack(result.changePack)
Expand Down Expand Up @@ -368,6 +371,7 @@ public actor Client {
detachDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack())

do {
self.changeDocKeyOfAuthInterceptors(doc.getKey())
let result = try await self.rpcClient.detachDocument(detachDocumentRequest)

let pack = try Converter.fromChangePack(result.changePack)
Expand Down Expand Up @@ -427,6 +431,7 @@ public actor Client {
removeDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack(true))

do {
self.changeDocKeyOfAuthInterceptors(doc.getKey())
let result = try await self.rpcClient.removeDocument(removeDocumentRequest)

let pack = try Converter.fromChangePack(result.changePack)
Expand Down Expand Up @@ -523,6 +528,7 @@ public actor Client {
self.sendPeerChangeEvent(.presenceChanged, [docKey], id)

do {
self.changeDocKeyOfAuthInterceptors(docKey)
_ = try await self.rpcClient.updatePresence(updatePresenceRequest)
Logger.info("[UM] c\"\(self.key)\" updated")
} catch {
Expand Down Expand Up @@ -633,6 +639,7 @@ public actor Client {
request.client = Converter.toClient(id: id, presence: self.presenceInfo)
request.documentID = docID

self.changeDocKeyOfAuthInterceptors(docKey)
self.attachmentMap[docKey]?.remoteWatchStream = self.rpcClient.makeWatchDocumentCall(request)

let event = StreamConnectionStatusChangedEvent(value: .connected)
Expand Down Expand Up @@ -791,19 +798,21 @@ public actor Client {
pushPullRequest.documentID = attachment.docID

do {
let docKey = doc.getKey()

self.changeDocKeyOfAuthInterceptors(docKey)
let response = try await self.rpcClient.pushPullChanges(pushPullRequest)

let responsePack = try Converter.fromChangePack(response.changePack)
try await doc.applyChangePack(pack: responsePack)

if await doc.status == .removed {
self.attachmentMap.removeValue(forKey: doc.getKey())
self.attachmentMap.removeValue(forKey: docKey)
}

let event = DocumentSyncedEvent(value: .synced)
self.eventStream.send(event)

let docKey = doc.getKey()
let remoteSize = responsePack.getChangeSize()
Logger.info("[PP] c:\"\(self.key)\" sync d:\"\(docKey)\", push:\(localSize) pull:\(remoteSize) cp:\(responsePack.getCheckpoint().structureAsString)")

Expand All @@ -814,4 +823,8 @@ public actor Client {
throw error
}
}

private func changeDocKeyOfAuthInterceptors(_ docKey: String?) {
self.rpcClient.interceptors = (self.rpcClient.interceptors as? AuthClientInterceptors)?.docKeyChangedInterceptors(docKey)
}
}