-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: index.ts to async-first #412
Merged
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
d24868e
index.ts to async-first
AVaksman de99495
clean up imports
AVaksman c0972c8
add test for coverage
AVaksman 4c9a8ad
add assert
AVaksman 4dcc4f6
Merge branch 'master' into index_to_async-first
JustinBeckwith 50ecc8e
minor fix and more tests
AVaksman a4dbbcc
one more test for coverage
AVaksman 69c7a77
Merge branch 'master' into index_to_async-first
AVaksman ac60d9a
Merge branch 'master' into index_to_async-first
AVaksman 9032d46
Merge branch 'master' into index_to_async-first
AVaksman 8e12071
getEntries and getSinks to async-first
AVaksman 0acdbda
Merge branch 'master' into index_to_async-first
AVaksman 83d4c3c
Merge branch 'master' into index_to_async-first
JustinBeckwith b1f5ca8
Merge branch 'master' into index_to_async-first
JustinBeckwith f0edf13
skip setProjectId() for getSinks() and getSinksStream()
AVaksman 2ad927b
Merge branch 'master' into index_to_async-first
AVaksman 4d05e81
skip setProjectId() for getEntries() and getEntriesStream()
AVaksman bf88c40
add optional log param to GetEntriesRequest
AVaksman 72fefc1
combine user provided filter and logName filter
AVaksman c09ce54
Merge branch 'master' into index_to_async-first
AVaksman 78d2caf
Merge branch 'master' into index_to_async-first
AVaksman fef29a9
Merge branch 'master' into index_to_async-first
JustinBeckwith bd2e618
Merge remote-tracking branch 'upstream/master' into index_to_async-first
AVaksman 3a1edac
improve types
AVaksman f65892f
lint
AVaksman 6621c18
pass system-test
AVaksman f098a22
Merge branch 'master' into index_to_async-first
AVaksman 4b7d69e
Merge remote-tracking branch 'master' into index_to_async-first
AVaksman b92fff3
Merge branch 'master' into index_to_async-first
AVaksman 7771526
Merge branch 'master' into index_to_async-first
sduskis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
import * as common from '@google-cloud/common-grpc'; | ||
import {paginator} from '@google-cloud/paginator'; | ||
import {replaceProjectIdToken} from '@google-cloud/projectify'; | ||
import {promisifyAll} from '@google-cloud/promisify'; | ||
import {callbackifyAll, promisifyAll} from '@google-cloud/promisify'; | ||
import * as arrify from 'arrify'; | ||
import * as extend from 'extend'; | ||
import {GoogleAuth} from 'google-auth-library'; | ||
|
@@ -224,6 +224,8 @@ class Logging { | |
options: LoggingOptions; | ||
projectId: string; | ||
detectedResource?: object; | ||
configService?: typeof v2.ConfigServiceV2Client; | ||
loggingService?: typeof v2.LoggingServiceV2Client; | ||
|
||
constructor(options?: LoggingOptions) { | ||
// Determine what scopes are needed. | ||
|
@@ -252,6 +254,8 @@ class Logging { | |
this.auth = new GoogleAuth(options_); | ||
this.options = options_; | ||
this.projectId = this.options.projectId || '{{projectId}}'; | ||
this.configService = new v2.ConfigServiceV2Client(this.options); | ||
this.loggingService = new v2.LoggingServiceV2Client(this.options); | ||
} | ||
/** | ||
* Config to set for the sink. Not all available options are listed here, see | ||
|
@@ -336,49 +340,34 @@ class Logging { | |
createSink( | ||
name: string, config: CreateSinkRequest, | ||
callback: CreateSinkCallback): void; | ||
createSink( | ||
name: string, config: CreateSinkRequest, | ||
callback?: CreateSinkCallback): Promise<[Sink, LogSink]>|void { | ||
const self = this; | ||
if (!is.string(name)) { | ||
async createSink(name: string, config: CreateSinkRequest): | ||
Promise<[Sink, LogSink]> { | ||
if (typeof name !== 'string') { | ||
throw new Error('A sink name must be provided.'); | ||
} | ||
if (!is.object(config)) { | ||
if (typeof config !== 'object') { | ||
throw new Error('A sink configuration object must be provided.'); | ||
} | ||
if (common.util.isCustomType(config.destination, 'bigquery/dataset')) { | ||
this.setAclForDataset_(name, config, callback!); | ||
return; | ||
await this.setAclForDataset_(config); | ||
} | ||
if (common.util.isCustomType(config.destination, 'pubsub/topic')) { | ||
this.setAclForTopic_(name, config, callback!); | ||
return; | ||
await this.setAclForTopic_(config); | ||
} | ||
if (common.util.isCustomType(config.destination, 'storage/bucket')) { | ||
this.setAclForBucket_(name, config, callback!); | ||
return; | ||
await this.setAclForBucket_(config); | ||
} | ||
const reqOpts = { | ||
parent: 'projects/' + this.projectId, | ||
sink: extend({}, config, {name}), | ||
}; | ||
delete reqOpts.sink.gaxOptions; | ||
this.request( | ||
{ | ||
client: 'ConfigServiceV2Client', | ||
method: 'createSink', | ||
reqOpts, | ||
gaxOpts: config.gaxOptions, | ||
}, | ||
(err, resp) => { | ||
if (err) { | ||
callback!(err, null, resp); | ||
return; | ||
} | ||
const sink = self.sink(resp.name); | ||
sink.metadata = resp; | ||
callback!(null, sink, resp); | ||
}); | ||
await this.setProjectId(reqOpts); | ||
const [resp] = | ||
await this.configService.createSink(reqOpts, config.gaxOptions); | ||
const sink = this.sink(resp.name); | ||
sink.metadata = resp; | ||
return [sink, resp]; | ||
} | ||
|
||
/** | ||
|
@@ -615,17 +604,42 @@ class Logging { | |
autoPaginate: options.autoPaginate, | ||
}, | ||
options.gaxOptions); | ||
requestStream = self.request({ | ||
client: 'LoggingServiceV2Client', | ||
method: 'listLogEntriesStream', | ||
reqOpts, | ||
gaxOpts: gaxOptions, | ||
|
||
let gaxStream: ClientReadableStream<LogEntry>; | ||
requestStream = streamEvents<Duplex>(through.obj()); | ||
(requestStream as AbortableDuplex).abort = () => { | ||
if (gaxStream && gaxStream.cancel) { | ||
gaxStream.cancel(); | ||
} | ||
}; | ||
|
||
requestStream.once('reading', () => { | ||
// tslint:disable-next-line no-any | ||
if ((global as any).GCLOUD_SANDBOX_ENV) { | ||
return through.obj(); | ||
} | ||
self.setProjectId(reqOpts).then(() => { | ||
try { | ||
gaxStream = | ||
this.loggingService.listLogEntriesStream(reqOpts, gaxOptions); | ||
} catch (error) { | ||
requestStream.destroy(error); | ||
} | ||
gaxStream | ||
.on('error', | ||
err => { | ||
requestStream.destroy(err); | ||
}) | ||
.pipe(requestStream); | ||
}); | ||
return; | ||
}); | ||
// tslint:disable-next-line no-any | ||
(userStream as any).setPipeline(requestStream, toEntryStream); | ||
}); | ||
return userStream; | ||
} | ||
|
||
/** | ||
* Query object for listing sinks. | ||
* | ||
|
@@ -756,7 +770,7 @@ class Logging { | |
const self = this; | ||
options = options || {}; | ||
let requestStream: Duplex; | ||
const userStream = streamEvents(pumpify.obj()); | ||
const userStream = streamEvents<Duplex>(pumpify.obj()); | ||
(userStream as AbortableDuplex).abort = () => { | ||
if (requestStream) { | ||
(requestStream as AbortableDuplex).abort(); | ||
|
@@ -777,11 +791,33 @@ class Logging { | |
autoPaginate: options.autoPaginate, | ||
}, | ||
options.gaxOptions); | ||
requestStream = self.request({ | ||
client: 'ConfigServiceV2Client', | ||
method: 'listSinksStream', | ||
reqOpts, | ||
gaxOpts: gaxOptions, | ||
|
||
let gaxStream: ClientReadableStream<LogSink>; | ||
requestStream = streamEvents<Duplex>(through.obj()); | ||
(requestStream as AbortableDuplex).abort = () => { | ||
if (gaxStream && gaxStream.cancel) { | ||
gaxStream.cancel(); | ||
} | ||
}; | ||
requestStream.once('reading', () => { | ||
// tslint:disable-next-line no-any | ||
if ((global as any).GCLOUD_SANDBOX_ENV) { | ||
return through.obj(); | ||
} | ||
self.setProjectId(reqOpts).then(() => { | ||
try { | ||
gaxStream = this.configService.listSinksStream(reqOpts, gaxOptions); | ||
} catch (error) { | ||
requestStream.destroy(error); | ||
} | ||
gaxStream | ||
.on('error', | ||
err => { | ||
requestStream.destroy(err); | ||
}) | ||
.pipe(requestStream); | ||
}); | ||
return; | ||
}); | ||
// tslint:disable-next-line no-any | ||
(userStream as any).setPipeline(requestStream, toSinkStream); | ||
|
@@ -919,22 +955,11 @@ class Logging { | |
* | ||
* @private | ||
*/ | ||
setAclForBucket_( | ||
name: string, config: CreateSinkRequest, callback: CreateSinkCallback) { | ||
const self = this; | ||
async setAclForBucket_(config: CreateSinkRequest) { | ||
const bucket = config.destination as Bucket; | ||
// tslint:disable-next-line no-any | ||
(bucket.acl.owners as any) | ||
.addGroup( | ||
'[email protected]', | ||
(err: Error, apiResp: request.Response) => { | ||
if (err) { | ||
callback(err, null, apiResp); | ||
return; | ||
} | ||
config.destination = 'storage.googleapis.com/' + bucket.name; | ||
self.createSink(name, config, callback); | ||
}); | ||
await (bucket.acl.owners as any).addGroup('[email protected]'); | ||
config.destination = 'storage.googleapis.com/' + bucket.name; | ||
} | ||
|
||
/** | ||
|
@@ -946,37 +971,22 @@ class Logging { | |
* | ||
* @private | ||
*/ | ||
setAclForDataset_( | ||
name: string, config: CreateSinkRequest, callback: CreateSinkCallback) { | ||
const self = this; | ||
async setAclForDataset_(config: CreateSinkRequest) { | ||
const dataset = config.destination as Dataset; | ||
dataset.getMetadata((err, metadata, apiResp) => { | ||
if (err) { | ||
callback(err, null, apiResp); | ||
return; | ||
} | ||
// tslint:disable-next-line no-any | ||
const access = ([] as any[]).slice.call(arrify(metadata.access)); | ||
access.push({ | ||
role: 'WRITER', | ||
groupByEmail: '[email protected]', | ||
}); | ||
dataset.setMetadata( | ||
{ | ||
access, | ||
}, | ||
(err, apiResp) => { | ||
if (err) { | ||
callback(err, null, apiResp); | ||
return; | ||
} | ||
const baseUrl = 'bigquery.googleapis.com'; | ||
const pId = (dataset.parent as BigQuery).projectId; | ||
const dId = dataset.id; | ||
config.destination = `${baseUrl}/projects/${pId}/datasets/${dId}`; | ||
self.createSink(name, config, callback); | ||
}); | ||
const [metadata] = await dataset.getMetadata(); | ||
// tslint:disable-next-line no-any | ||
const access = ([] as any[]).slice.call(arrify(metadata.access)); | ||
access.push({ | ||
role: 'WRITER', | ||
groupByEmail: '[email protected]', | ||
}); | ||
await dataset.setMetadata({ | ||
access, | ||
}); | ||
const baseUrl = 'bigquery.googleapis.com'; | ||
const pId = (dataset.parent as BigQuery).projectId; | ||
const dId = dataset.id; | ||
config.destination = `${baseUrl}/projects/${pId}/datasets/${dId}`; | ||
} | ||
|
||
/** | ||
|
@@ -988,34 +998,36 @@ class Logging { | |
* | ||
* @private | ||
*/ | ||
setAclForTopic_( | ||
name: string, config: CreateSinkRequest, callback: CreateSinkCallback) { | ||
const self = this; | ||
async setAclForTopic_(config: CreateSinkRequest) { | ||
const topic = config.destination as Topic; | ||
topic.iam.getPolicy((err, policy) => { | ||
if (err) { | ||
callback(err, null); | ||
return; | ||
} | ||
policy!.bindings = arrify(policy!.bindings); | ||
policy!.bindings.push({ | ||
role: 'roles/pubsub.publisher', | ||
members: ['serviceAccount:[email protected]'], | ||
}); | ||
topic.iam.setPolicy(policy!, (err, policy) => { | ||
if (err) { | ||
callback(err, null); | ||
return; | ||
} | ||
const baseUrl = 'pubsub.googleapis.com'; | ||
const topicName = topic.name; | ||
config.destination = `${baseUrl}/${topicName}`; | ||
self.createSink(name, config, callback); | ||
}); | ||
const [policy] = await topic.iam.getPolicy(); | ||
policy.bindings = arrify(policy.bindings); | ||
policy!.bindings.push({ | ||
role: 'roles/pubsub.publisher', | ||
members: ['serviceAccount:[email protected]'], | ||
}); | ||
await topic.iam.setPolicy(policy); | ||
const baseUrl = 'pubsub.googleapis.com'; | ||
const topicName = topic.name; | ||
config.destination = `${baseUrl}/${topicName}`; | ||
} | ||
|
||
async setProjectId(reqOpts: {}) { | ||
if (this.projectId === '{{projectId}}') { | ||
this.projectId = await this.auth.getProjectId(); | ||
} | ||
reqOpts = replaceProjectIdToken(reqOpts, this.projectId!); | ||
} | ||
} | ||
|
||
/*! Developer Documentation | ||
* All async methods (except for streams) will execute a callback in the event | ||
* that a callback is provided. | ||
*/ | ||
callbackifyAll(Logging, { | ||
exclude: ['request', 'getEntries', 'getSinks'], | ||
}); | ||
|
||
/*! Developer Documentation | ||
* | ||
* These methods can be auto-paginated. | ||
|
@@ -1028,7 +1040,7 @@ paginator.extend(Logging, ['getEntries', 'getSinks']); | |
* that a callback is omitted. | ||
*/ | ||
promisifyAll(Logging, { | ||
exclude: ['entry', 'log', 'request', 'sink'], | ||
exclude: ['entry', 'log', 'request', 'sink', 'createSink'], | ||
}); | ||
|
||
/** | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's awesome seeing this pattern, where we can get the project ID in an un-obtrusive way with await 👍 I think we have a chance to ditch the
{{projectId}}
placeholder token here altogether. If we just get the project ID before we build thereqOpts
object (L#360), then we knowthis.projectId
is the right value-- no need to use the token for later replacement, since if we can't get it now, we can't get it later, either.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tagging related issue which seeks to destroy the placeholder: googleapis/nodejs-common#10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is dependent on objects from other libraries
storage.bucket
,bigQuery.dataset
orpubsub.topic
which could still have{{projectId}}
placeholder. I would have to still runreplaceProjectIdToken
after building thereqOpts
object. So I placed it after, to have it in one place for both.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, that is reasonable. I believe this specific method is unique, so I might have picked a bad one to make this suggestion on. For the others, could we skip the
setProjectId()
call, and instead just put the real projectId right where we need it in the reqOpts?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephenplusplus
It looks like I can skip
setProjectId()
getSinks
andgetSinksStream
since these methods do not depend on theSink
constructor.I am leaving
getEntries
andgetEntriesStream
as is since these guys do depend on theLog
constructor and I would still need to toreplaceProjectIdToken
foroptions.filter
nodejs-logging/src/log.ts
Lines 525 to 530 in 81138d6
from
nodejs-logging/src/log.ts
Lines 108 to 110 in 81138d6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For
getEntries
/getEntriesStream
, I think we can do the refresh right there:The mission for a while has been to eliminate that token altogether, but it was the absence of async/await that has prevented it. But now that we have it, anywhere
this.formattedName_
is used, it should be replaced with code similar to the above, in this file and others where possible. Just a thought, but maybeformatName()
should be repurposed to do the project ID fetching as well.