Skip to content

Commit

Permalink
Some adjustments for Storage Driver and SourceManager to run on a nodejs
Browse files Browse the repository at this point in the history
server
We want to use some methods from these 2 packages in our Latitude cloud
and we have to make it work
  • Loading branch information
andresgutgon committed Jun 19, 2024
1 parent 1b25797 commit 8393d36
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 54 deletions.
6 changes: 6 additions & 0 deletions .changeset/wet-bikes-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@latitude-data/source-manager": patch
"@latitude-data/storage-driver": patch
---

Dry resolve secrets helper function
19 changes: 7 additions & 12 deletions apps/server/src/lib/server/storageDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,21 @@ import {
StorageType,
getStorageDriver,
} from '@latitude-data/storage-driver'
import { resolveSecrets } from '@latitude-data/source-manager'

const DEFAULT_STORAGE_CONFIG = {
type: StorageType.disk,
path: STORAGE_DIR,
}

function readStorageConfig(): StorageDriverConfig {
if (!fs.existsSync(APP_CONFIG_PATH)) {
return DEFAULT_STORAGE_CONFIG
}

const file = fs.readFileSync(APP_CONFIG_PATH, 'utf8')
try {
const latitudeJson = JSON.parse(file)
const storageConfig = latitudeJson?.storage ?? {}
if (!storageConfig.type) return DEFAULT_STORAGE_CONFIG
return storageConfig
} catch (e) {
return DEFAULT_STORAGE_CONFIG
}
const latitudeJson = resolveSecrets({ unresolvedSecrets: JSON.parse(file) })
const storageConfig = latitudeJson?.storage

if (!storageConfig) return DEFAULT_STORAGE_CONFIG

return storageConfig
}

const storageDriver = getStorageDriver(readStorageConfig())
Expand Down
1 change: 1 addition & 0 deletions packages/cli/display_table/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export default {
external: [
'react',
'ink',
'v8',
'ink-spinner',
'chokidar',
'@latitude-data/source-manager',
Expand Down
1 change: 1 addition & 0 deletions packages/source_manager/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export default [
'fs',
'path',
'crypto',
'stream',
'uuid',
'dotenv/config',
'@latitude-data/sql-compiler',
Expand Down
1 change: 1 addition & 0 deletions packages/source_manager/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from './types'
export { default as SourceManager } from './manager'
export * from './source'
export * from './baseConnector'
export * from './utils'
export { CONNECTOR_PACKAGES } from './baseConnector/connectorFactory'
export { default as TestConnectorInternal } from './testConnector'
export {
Expand Down
5 changes: 4 additions & 1 deletion packages/source_manager/src/manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { createHash } from 'crypto'
import { ParquetWriter } from '@dsnp/parquetjs'
import { WriteStreamMinimal } from '@dsnp/parquetjs/dist/lib/util'
import { buildParquetSchema } from './parquetUtils'
import { Writable } from 'stream'

const MATERIALIZED_DIR_IN_STORAGE = 'materialized'

Expand Down Expand Up @@ -201,7 +202,9 @@ export default class SourceManager {
const startTime = performance.now()
const compiled = await source.compileQuery({ queryPath, params: {} })

const stream = await this.materializedStorage.createWriteStream(filename)
const stream = (await this.materializedStorage.createWriteStream(
filename,
)) as Writable

let writer: ParquetWriter
const ROW_GROUP_SIZE = 4096 // How many rows are in the ParquetWriter file buffer at a time
Expand Down
16 changes: 4 additions & 12 deletions packages/source_manager/src/source/readConfig.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import fs from 'fs'
import yaml from 'yaml'
import { SourceSchema, SourceFileNotFoundError } from '@/types'
import { resolveSecrets } from '@/utils'

export class InvalidSourceConfigError extends Error {
constructor(message: string) {
Expand All @@ -15,18 +16,9 @@ export default function readSourceConfig(sourcePath: string): SourceSchema {
}

const file = fs.readFileSync(sourcePath, 'utf8')
const config = yaml.parse(file, (_, value) => {
// if key starts with 'LATITUDE__', replace it with the environment variable
if (typeof value === 'string' && value.startsWith('LATITUDE__')) {
if (process.env[value]) return process.env[value]

throw new Error(`
Invalid configuration. Environment variable ${value} was not found in the environment. You can review how to set up secret source credentials in the documentation: https://docs.latitude.so/sources/credentials
`)
} else {
return value
}
})
const config = resolveSecrets({
unresolvedSecrets: yaml.parse(file),
}) as unknown as SourceSchema

// Validation requirements
if (!config?.type) {
Expand Down
31 changes: 31 additions & 0 deletions packages/source_manager/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export function resolveSecrets({
unresolvedSecrets,
}: {
unresolvedSecrets: Record<string, unknown>
}) {
return Object.entries(unresolvedSecrets).reduce(
(acc, [key, value]) => {
if (typeof value === 'object') {
acc[key] = resolveSecrets({
unresolvedSecrets: value as Record<string, unknown>,
})
return acc
}

if (typeof value === 'string' && value.startsWith('LATITUDE__')) {
if (process.env[value]) {
acc[key] = process.env[value]
return acc
}

throw new Error(`
Invalid configuration. Environment variable ${value} was not found in the environment.
`)
}

acc[key] = value
return acc
},
{} as Record<string, unknown>,
)
}
3 changes: 3 additions & 0 deletions packages/storage_driver/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ export default {
],
external: [
'fs',
'@aws-sdk/client-s3',
'@aws-sdk/lib-storage',
'stream',
'path',
],
}
8 changes: 7 additions & 1 deletion packages/storage_driver/src/drivers/base.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { FileStat } from '$/types'
import { FileStat, StorageType } from '$/types'
import { Writable } from 'stream'

export abstract class StorageDriver {
constructor() {}

public abstract type: StorageType

get isS3Driver(): boolean {
return this.type === StorageType.s3
}

abstract resolveUrl(path: string): Promise<string>

abstract listFiles(path: string, recursive: boolean): Promise<string[]>
Expand Down
3 changes: 2 additions & 1 deletion packages/storage_driver/src/drivers/disk/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FileStat } from '$/types'
import { FileStat, StorageType } from '$/types'
import { StorageDriver } from '$/drivers/base'
import fs from 'fs'
import path from 'path'
Expand All @@ -9,6 +9,7 @@ export type DiskDriverConfig = {
}

export class DiskDriver extends StorageDriver {
public type = StorageType.disk
private root: string

constructor(config: DiskDriverConfig) {
Expand Down
3 changes: 2 additions & 1 deletion packages/storage_driver/src/drivers/s3/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FileStat } from '$/types'
import { FileStat, StorageType } from '$/types'
import { StorageDriver } from '$/drivers/base'
import { Writable, Readable } from 'stream'
import {
Expand All @@ -20,6 +20,7 @@ export type S3DriverConfig = {
}

export class S3Driver extends StorageDriver {
public type = StorageType.s3
private client: S3Client
private bucket: string

Expand Down
Loading

0 comments on commit 8393d36

Please sign in to comment.