Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Add a consolidated multi-pod, multi-container log query #1689

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@
## Prerequisites

- laf cluster installed locally or remotely (~/.kube/config)
- telepresence installed (see https://www.telepresence.io/reference/install)
- minio client installed (see https://min.io/download#)
- telepresence installed (see <https://www.telepresence.io/reference/install>)
- minio client installed (see <https://min.io/download#>)

## Start service locally

```bash
cd server/

# Install telepresence traffic manager (only
# telepresence version v2.16.1
# Install telepresence traffic manager
telepresence helm install
# Connect your computer to laf-dev cluster
telepresence connect
# Connect local server to laf server cluster
telepresence intercept laf-server -n laf-system -p 3000:3000 -e $(pwd)/.env
# Connect your computer to laf-dev cluster (namespace laf-system)
telepresence connect -n laf-system
# Connect local server to laf server cluster (namespace laf-system)
telepresence intercept laf-server -p 3000:3000 -e $(pwd)/.env

npm install
npm run watch
Expand All @@ -49,7 +50,7 @@ npm run watch
> Clean up

```bash
telepresence leave laf-server-laf-system
telepresence leave laf-server
```

## Troubleshooting
Expand Down
1 change: 1 addition & 0 deletions server/src/application/application.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import { PodController } from './pod.controller'
ApplicationConfigurationService,
EnvironmentVariableService,
BundleService,
PodService,
],
})
export class ApplicationModule {}
20 changes: 13 additions & 7 deletions server/src/application/dto/pod.dto.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import { ApiProperty } from '@nestjs/swagger'
import { IsNotEmpty, IsArray, IsString } from 'class-validator'

export class PodNamesDto {
export class PodNameListDto {
@ApiProperty()
@IsNotEmpty()
@IsString()
appid: string

@ApiProperty({
description: 'List of pod identifiers',
example: ['pod1', 'pod2'],
})
@IsNotEmpty()
@IsArray()
@IsString({ each: true })
pods: string[]
podNameList: string[]
}

export class ContainerNameListDto {
@ApiProperty()
podName: string

@ApiProperty({
description: 'List of container identifiers',
example: ['container1', 'container2'],
})
containerNameList: string[]
}
45 changes: 36 additions & 9 deletions server/src/application/pod.controller.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import { Controller, Get, Logger, Param, UseGuards } from '@nestjs/common'
import {
Controller,
Get,
Logger,
Param,
Query,
UseGuards,
} from '@nestjs/common'
import { ApiBearerAuth, ApiOperation, ApiTags } from '@nestjs/swagger'
import { ApplicationAuthGuard } from 'src/authentication/application.auth.guard'
import { JwtAuthGuard } from 'src/authentication/jwt.auth.guard'
import { ApiResponseArray, ResponseUtil } from 'src/utils/response'
import { PodNamesDto } from './dto/pod.dto'
import { ApiResponseObject, ResponseUtil } from 'src/utils/response'
import { ContainerNameListDto, PodNameListDto } from './dto/pod.dto'
import { PodService } from './pod.service'

@ApiTags('Application')
@ApiBearerAuth('Authorization')
@Controller('apps/:appid/pods')
@Controller('apps/:appid/pod')
export class PodController {
private readonly logger = new Logger(PodController.name)

Expand All @@ -19,14 +26,34 @@ export class PodController {
* @param appid
* @returns
*/
@ApiResponseArray(PodNamesDto)
@ApiResponseObject(PodNameListDto)
@ApiOperation({ summary: 'Get app all pod name' })
@UseGuards(JwtAuthGuard, ApplicationAuthGuard)
@Get()
async get(@Param('appid') appid: string) {
const podNames: PodNamesDto = await this.podService.getPodNameListByAppid(
appid,
)
async getPodNameList(@Param('appid') appid: string) {
const podNames: PodNameListDto =
await this.podService.getPodNameListByAppid(appid)
return ResponseUtil.ok(podNames)
}

/**
* Get pod's containers
* @param appid
* @returns
*/
@ApiResponseObject(ContainerNameListDto)
@ApiOperation({ summary: "Get pod's containers" })
@UseGuards(JwtAuthGuard, ApplicationAuthGuard)
@Get('container')
async getContainerNameList(
@Param('appid') appid: string,
@Query('podName') podName: string,
) {
if (!podName) {
return ResponseUtil.error('no podName')
}
const containerNames: ContainerNameListDto =
await this.podService.getContainerNameListByPodName(appid, podName)
return ResponseUtil.ok(containerNames)
}
}
27 changes: 23 additions & 4 deletions server/src/application/pod.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { V1PodList } from '@kubernetes/client-node'
import { V1Pod, V1PodList } from '@kubernetes/client-node'
import { Injectable, Logger } from '@nestjs/common'
import { ClusterService } from 'src/region/cluster/cluster.service'
import { RegionService } from 'src/region/region.service'
import { GetApplicationNamespace } from 'src/utils/getter'
import http from 'http'
import { PodNamesDto } from './dto/pod.dto'
import { PodNameListDto, ContainerNameListDto } from './dto/pod.dto'
import { LABEL_KEY_APP_ID } from 'src/constants'

@Injectable()
Expand All @@ -28,10 +28,29 @@ export class PodService {
undefined,
`${LABEL_KEY_APP_ID}=${appid}`,
)
const podNames: PodNamesDto = { appid: appid, pods: [] }
const podNames: PodNameListDto = { appid: appid, podNameList: [] }
for (const item of res.body.items) {
podNames.pods.push(item.metadata.name)
podNames.podNameList.push(item.metadata.name)
}
return podNames
}

async getContainerNameListByPodName(appid: string, podName: string) {
const region = await this.regionService.findByAppId(appid)
const namespaceOfApp = GetApplicationNamespace(region, appid)
const coreV1Api = this.cluster.makeCoreV1Api(region)

const res: { response: http.IncomingMessage; body: V1Pod } =
await coreV1Api.readNamespacedPod(podName, namespaceOfApp)

const containerNameList =
res.body.spec.containers?.map((container) => container.name) || []

const containerNames: ContainerNameListDto = {
podName: podName,
containerNameList: containerNameList,
}

return containerNames
}
}
76 changes: 59 additions & 17 deletions server/src/log/log.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
UseGuards,
Sse,
} from '@nestjs/common'
import http from 'http'
import { ApiBearerAuth, ApiOperation, ApiQuery, ApiTags } from '@nestjs/swagger'
import { FunctionService } from '../function/function.service'
import { ApiResponsePagination, ResponseUtil } from '../utils/response'
Expand All @@ -18,7 +19,8 @@ import { Log } from '@kubernetes/client-node'
import { GetApplicationNamespace } from 'src/utils/getter'
import { RegionService } from 'src/region/region.service'
import { ClusterService } from 'src/region/cluster/cluster.service'
import { Observable } from 'rxjs'
import { Observable, async } from 'rxjs'
import { PodService } from 'src/application/pod.service'

@ApiBearerAuth('Authorization')
@Controller('apps/:appid/logs')
Expand All @@ -29,6 +31,7 @@ export class LogController {
private readonly funcService: FunctionService,
private readonly regionService: RegionService,
private readonly clusterService: ClusterService,
private readonly podService: PodService,
) {}

/**
Expand Down Expand Up @@ -98,59 +101,98 @@ export class LogController {
@Sse(':podName')
async streamLogs(
@Param('podName') podName: string,
@Query('containerName') containerName: string,
@Param('appid') appid: string,
) {
if (!containerName) {
containerName = appid
}
let podNameList: string[] = undefined
if (podName === 'all') {
podNameList = (await this.podService.getPodNameListByAppid(appid))
.podNameList
}

const region = await this.regionService.findByAppId(appid)
const namespaceOfApp = GetApplicationNamespace(region, appid)
const kc = this.clusterService.loadKubeConfig(region)

return new Observable<MessageEvent>((subscriber) => {
const logStream = new PassThrough()
const combinedLogStream = new PassThrough()
const logs = new Log(kc)
let k8sResponse

const streamsEnded = new Set<string>()

const destroyStream = () => {
if (k8sResponse) {
k8sResponse?.destroy()
}
logStream?.removeAllListeners()
logStream?.destroy()
combinedLogStream?.removeAllListeners()
combinedLogStream?.destroy()
}

logStream.on('data', (chunk) => {
combinedLogStream.on('data', (chunk) => {
subscriber.next(chunk.toString() as MessageEvent)
})

logStream.on('error', (error) => {
this.logger.error('stream error', error)
combinedLogStream.on('error', (error) => {
this.logger.error('Combined stream error', error)
subscriber.error(error)
destroyStream()
})

logStream.on('end', () => {
combinedLogStream.on('end', () => {
subscriber.complete()
destroyStream()
})
;(async () => {

const fetchLog = async (podName: string) => {
let k8sResponse: http.IncomingMessage | undefined
const podLogStream = new PassThrough()
streamsEnded.add(podName)

try {
k8sResponse = await logs.log(
namespaceOfApp,
podName,
appid,
logStream,
containerName,
podLogStream,
{
follow: true,
previous: false,
pretty: false,
timestamps: false,
tailLines: 1000,
},
)
podLogStream.pipe(combinedLogStream, { end: false })

podLogStream.on('error', (error) => {
combinedLogStream.emit('error', error)
podLogStream.removeAllListeners()
podLogStream.destroy()
})

podLogStream.once('end', () => {
streamsEnded.delete(podName)
if (streamsEnded.size === 0) {
combinedLogStream.end()
}
})
} catch (error) {
this.logger.error('Failed to get logs', error)
this.logger.error(`Failed to get logs for pod ${podName}`, error)
subscriber.error(error)
k8sResponse?.destroy()
podLogStream.removeAllListeners()
podLogStream.destroy()
destroyStream()
}
})()
}

if (podNameList && podNameList.length > 0) {
podNameList.forEach((podName) => {
fetchLog(podName)
})
} else {
fetchLog(podName)
}
// Clean up when the client disconnects
return () => destroyStream()
})
Expand Down