Skip to content

Commit

Permalink
Merge pull request #1594 from jumpserver/dev
Browse files Browse the repository at this point in the history
v4.4.0
  • Loading branch information
BaiJiangJie authored Nov 21, 2024
2 parents 67ccf14 + 4e200cf commit caa4ccd
Show file tree
Hide file tree
Showing 69 changed files with 6,364 additions and 4,812 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/llm-code-review.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: LLM Code Review

permissions:
contents: read
pull-requests: write

on:
pull_request:
types: [opened, reopened, synchronize]

jobs:
llm-code-review:
runs-on: ubuntu-latest
steps:
- uses: fit2cloud/LLM-CodeReview-Action@main
env:
GITHUB_TOKEN: ${{ secrets.FIT2CLOUDRD_LLM_CODE_REVIEW_TOKEN }}
OPENAI_API_KEY: ${{ secrets.ALIYUN_LLM_API_KEY }}
LANGUAGE: English
OPENAI_API_ENDPOINT: https://dashscope.aliyuncs.com/compatible-mode/v1
MODEL: qwen2-1.5b-instruct
PROMPT: "Please check the following code differences for any irregularities, potential issues, or optimization suggestions, and provide your answers in English."
top_p: 1
temperature: 1
# max_tokens: 10000
MAX_PATCH_LENGTH: 10000
IGNORE_PATTERNS: "/node_modules,*.md,/dist,/.github"
FILE_PATTERNS: "*.java,*.go,*.py,*.vue,*.ts,*.js,*.css,*.scss,*.html"
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM jumpserver/koko-base:20241011_023915 AS stage-build
FROM jumpserver/koko-base:20241030_063234 AS stage-build

WORKDIR /opt/koko
ARG TARGETARCH
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-base
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ RUN set -ex \

WORKDIR /opt/koko

ARG MONGOSH_VERSION=2.2.12
ARG MONGOSH_VERSION=2.3.2
RUN set -ex \
&& mkdir -p /opt/koko/lib \
&& \
Expand Down
10 changes: 10 additions & 0 deletions pkg/auth/user_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (u *UserAuthClient) Authenticate(ctx context.Context) (user model.User, aut
logger.Errorf("User %s Authenticate err: %s", u.Opts.Username, err)
return
}
unsupportedMfaTypes := map[string]bool{
"face": true,
"FACE": true,
}
if resp.Err != "" {
switch resp.Err {
case ErrLoginConfirmWait:
Expand All @@ -44,6 +48,12 @@ func (u *UserAuthClient) Authenticate(ctx context.Context) (user model.User, aut
case ErrMFARequired:
u.mfaTypes = nil
for _, choiceType := range resp.Data.Choices {
if _, ok := unsupportedMfaTypes[choiceType]; ok {
logger.Infof("User %s login need MFA, skip %s as it not supported", u.Opts.Username,
choiceType)
continue
}

u.authOptions[choiceType] = authOptions{
MFAType: choiceType,
Url: resp.Data.Url,
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func getDefaultConfig() Config {

EnableLocalPortForward: false,
EnableVscodeSupport: false,
DisableInputAsCommand: true,
}

}
Expand Down
37 changes: 19 additions & 18 deletions pkg/handler/sftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,24 @@ func (s *SftpHandler) Filewrite(r *sftp.Request) (io.WriterAt, error) {
if err != nil {
return nil, err
}

go func() {
<-r.Context().Done()

fileInfo, err2 := f.Stat()
if err2 != nil {
logger.Errorf("Get file %s stat err: %s", r.Filepath, err2)
return
}

if err1 := s.recorder.ChunkedRecord(f.FTPLog, f, 0, fileInfo.Size()); err1 != nil {
logger.Errorf("Record file %s err: %s", r.Filepath, err1)
}

if err := f.Close(); err != nil {
logger.Errorf("Remote sftp file %s close err: %s", r.Filepath, err)
}
logger.Infof("Sftp file write %s done", r.Filepath)
s.recorder.FinishFTPFile(f.FTPLog.ID)
}()
return NewWriterAt(f, s.recorder), err
}
Expand All @@ -100,20 +111,18 @@ func (s *SftpHandler) Fileread(r *sftp.Request) (io.ReaderAt, error) {
return nil, err
}

if err1 := s.recorder.ChunkedRecord(f.FTPLog, f, 0, fileInfo.Size()); err1 != nil {
logger.Errorf("Record file %s err: %s", r.Filepath, err1)
}

// 重置文件指针
_, _ = f.Seek(0, io.SeekStart)
go func() {
<-r.Context().Done()

if err1 := s.recorder.ChunkedRecord(f.FTPLog, f, 0, fileInfo.Size()); err1 != nil {
logger.Errorf("Record file %s err: %s", r.Filepath, err1)
}

if err2 := f.Close(); err2 != nil {
logger.Errorf("Remote sftp file %s close err: %s", r.Filepath, err2)
}
logger.Infof("Sftp File read %s done", r.Filepath)
s.recorder.FinishFTPFile(f.FTPLog.ID)

logger.Infof("Sftp File read %s done", r.Filepath)
}()
// 包裹一层,兼容 WinSCP 目录的批量下载
return NewReaderAt(f), err
Expand Down Expand Up @@ -153,18 +162,10 @@ type clientReadWritAt struct {
}

func (c *clientReadWritAt) WriteAt(p []byte, off int64) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if err1 := c.recorder.RecordFtpChunk(c.f.FTPLog, p, off); err1 != nil {
logger.Errorf("Record write err: %s", err1)
}
_, _ = c.f.Seek(off, 0)
return c.f.Write(p)
return c.f.WriteAt(p, off)
}

func (c *clientReadWritAt) ReadAt(p []byte, off int64) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.f.ReadAt(p, off)
}

Expand Down
50 changes: 35 additions & 15 deletions pkg/httpd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,37 +54,57 @@ func (c *Client) Read(p []byte) (n int, err error) {

// 向客户端发送数据进行1毫秒的防抖处理
func (c *Client) Write(p []byte) (n int, err error) {
c.bufferMutex.Lock()
defer c.bufferMutex.Unlock()
category := ""
connectToken := c.Conn.ConnectToken
if connectToken != nil {
category = connectToken.Platform.Category.Value
}

c.buffer.Write(p)
if category == "database" {
c.bufferMutex.Lock()
c.buffer.Write(p)
c.bufferMutex.Unlock()

if c.timer == nil {
c.timer = time.AfterFunc(time.Millisecond, c.flushBuffer)
}
return len(p), nil

if c.timer == nil {
c.timer = time.AfterFunc(time.Millisecond, c.flushBuffer)
}

messageType := TerminalBinary
if c.KubernetesId != "" {
messageType = TerminalK8SBinary
}

msg := Message{
Id: c.Conn.Uuid,
Type: messageType,
Raw: p,
KubernetesId: c.KubernetesId,
}
c.Conn.SendMessage(&msg)
return len(p), nil
}

func (c *Client) flushBuffer() {
c.bufferMutex.Lock()
defer c.bufferMutex.Unlock()
messageType := TerminalBinary
if c.KubernetesId != "" {
messageType = TerminalK8SBinary
}

if c.buffer.Len() > 0 {
msg := Message{
Id: c.Conn.Uuid,
Type: messageType,
Raw: c.buffer.Bytes(),
KubernetesId: c.KubernetesId,
Id: c.Conn.Uuid,
Type: TerminalBinary,
Raw: c.buffer.Bytes(),
}
c.Conn.SendMessage(&msg)
c.buffer.Reset()
}
c.timer.Stop()
c.timer = nil

if c.buffer.Len() == 0 && c.timer != nil {
c.timer.Stop()
c.timer = nil
}
}

func (c *Client) Pty() ssh.Pty {
Expand Down
13 changes: 7 additions & 6 deletions pkg/httpd/tty.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ func (h *tty) sendK8SCloseMessage(KubernetesId string) {
h.ws.SendMessage(&closedMsg)
}

func (h *tty) sendSessionMessage(data string) {
func (h *tty) sendSessionMessage(data string, KubernetesId string) {
msg := Message{
Id: h.ws.Uuid,
Type: TerminalSession,
Data: data,
Id: h.ws.Uuid,
Type: TerminalSession,
Data: data,
KubernetesId: KubernetesId,
}
h.ws.SendMessage(&msg)
}
Expand Down Expand Up @@ -158,7 +159,7 @@ func (h *tty) validateAndInitSession(msg *Message) (TerminalConnectData, error)
Session: &sessionDetail,
}
data, _ := json.Marshal(sessionInfo)
h.sendSessionMessage(string(data))
h.sendSessionMessage(string(data), msg.KubernetesId)
}
return connectInfo, nil
}
Expand Down Expand Up @@ -458,7 +459,7 @@ func (h *tty) proxy(wg *sync.WaitGroup, client *Client) {
}
srv.OnSessionInfo = func(info *proxy.SessionInfo) {
data, _ := json.Marshal(info)
h.sendSessionMessage(string(data))
h.sendSessionMessage(string(data), client.KubernetesId)
}
srv.Proxy()
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/koko/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func uploadRemainReplay(jmsService *service.JMService) {
logger.Errorf("Update session %s activity log failed: %s", id, err1)
}
}
if len(allRemainFiles) == 0 {
logger.Info("No remain replay file to upload")
return
}

logger.Infof("Start upload remain %d replay files 10 min later ", len(allRemainFiles))
time.Sleep(10 * time.Minute)

for absPath, remainReplay := range allRemainFiles {
absGzPath := absPath
Expand Down Expand Up @@ -115,6 +122,12 @@ func uploadRemainFTPFile(jmsService *service.JMService) {
}
return nil
})
if len(allRemainFiles) == 0 {
logger.Info("No remain ftp file to upload")
return
}
logger.Infof("Start upload remain %d ftp files 10 min later ", len(allRemainFiles))
time.Sleep(10 * time.Minute)

for absPath, remainFTPFile := range allRemainFiles {
absGzPath := absPath
Expand Down
37 changes: 1 addition & 36 deletions pkg/proxy/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,24 +351,6 @@ func (r *FTPFileRecorder) CreateFTPFileInfo(logData *model.FTPLog) (info *FTPFil
return info, nil
}

func (r *FTPFileRecorder) RecordFtpChunk(ftpLog *model.FTPLog, p []byte, off int64) (err error) {
if r.isNullStorage() {
return
}
info := r.getFTPFile(ftpLog.ID)
if info == nil {
info, err = r.CreateFTPFileInfo(ftpLog)
}
if err != nil {
return
}
if info.isExceedWrittenSize() {
logger.Errorf("FTP file %s is exceeds the max limit and discard it", ftpLog.ID)
return nil
}
return info.WriteChunk(p, off)
}

func (r *FTPFileRecorder) FinishFTPFile(id string) {
info := r.getFTPFile(id)
if info == nil {
Expand Down Expand Up @@ -409,7 +391,7 @@ func (r *FTPFileRecorder) ChunkedRecord(ftpLog *model.FTPLog, readerAt io.Reader
return err
}

if info.isExceedWrittenSize() {
if info.isExceedWrittenSize() || totalSize >= info.maxWrittenSize {
logger.Errorf("FTP file %s is exceeds the max limit and discard it", ftpLog.ID)
return nil
}
Expand Down Expand Up @@ -495,23 +477,6 @@ type FTPFileInfo struct {
writtenBytes int64
}

func (f *FTPFileInfo) WriteChunk(p []byte, off int64) error {
var (
nw int
err error
)
_, err = f.fd.Seek(off, io.SeekStart)
if err != nil {
return err
}
nw, err = f.fd.Write(p)
if nw > 0 {
f.writtenBytes += int64(nw)
}
return err

}

func (f *FTPFileInfo) WriteFromReader(r io.Reader) error {
buf := make([]byte, 32*1024)
var err error
Expand Down
13 changes: 3 additions & 10 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ func NewServer(conn UserConnection, jmsService *service.JMService, opts ...Conne
_, err2 := jmsService.CreateSession(*apiSession)
return err2
},
ConnectedSuccessCallback: func() error {
return jmsService.SessionSuccess(apiSession.ID)
},
ConnectedFailedCallback: func(err error) error {
return jmsService.SessionFailed(apiSession.ID, err)
},
Expand Down Expand Up @@ -150,10 +147,9 @@ type Server struct {

cacheSSHConnection *srvconn.SSHConnection

CreateSessionCallback func() error
ConnectedSuccessCallback func() error
ConnectedFailedCallback func(err error) error
DisConnectedCallback func() error
CreateSessionCallback func() error
ConnectedFailedCallback func(err error) error
DisConnectedCallback func() error

keyboardMode int32

Expand Down Expand Up @@ -1005,9 +1001,6 @@ func (s *Server) Proxy() {
}

logger.Infof("Conn[%s] create session %s success", s.UserConn.ID(), s.ID)
if err2 := s.ConnectedSuccessCallback(); err2 != nil {
logger.Errorf("Conn[%s] update session %s err: %s", s.UserConn.ID(), s.ID, err2)
}
if s.OnSessionInfo != nil {
actions := s.connOpts.authInfo.Actions
tokenConnOpts := s.connOpts.authInfo.ConnectOptions
Expand Down
Loading

0 comments on commit caa4ccd

Please sign in to comment.