From b941ea4db6ba5de5daa655243e047e82d134b1b5 Mon Sep 17 00:00:00 2001 From: Prasun Anand Date: Wed, 18 Dec 2024 16:10:57 +0530 Subject: [PATCH] Code cleaning and Improve logging --- content/content_manager.go | 16 +++----- kernel/kernel_manager.go | 22 +++++------ kernel/kernel_session.go | 22 ++++------- kernel/kernel_supervisor.go | 7 ++-- kernel/launcher/launcher.go | 2 +- kernel/provisioner/local_provisioner.go | 2 +- kernel/utils.go | 2 +- kernelspec/kernelspec_manager.go | 11 +++--- websocket/channels.go | 50 +++++-------------------- websocket/kernel_websocket_handler.go | 27 ++++++------- 10 files changed, 59 insertions(+), 102 deletions(-) diff --git a/content/content_manager.go b/content/content_manager.go index 2c55d0d..1ccb84e 100644 --- a/content/content_manager.go +++ b/content/content_manager.go @@ -15,7 +15,7 @@ import ( ) func GetContent(relativePath string, contentType string, format string, hash int) models.ContentModel { - log.Info().Msgf("getting content for path : %s", relativePath) + log.Debug().Msgf("getting content for path : %s", relativePath) // get path info osPath := GetOSPath(relativePath) info, err := os.Lstat(osPath) @@ -26,7 +26,7 @@ func GetContent(relativePath string, contentType string, format string, hash int var model models.ContentModel - log.Info().Msgf("%t", info.IsDir()) + log.Debug().Msgf("Is directory %t", info.IsDir()) if info.IsDir() { model = getDirectoryModel(relativePath) } else { @@ -42,7 +42,6 @@ func GetContent(relativePath string, contentType string, format string, hash int } func getNotebookModel(path string) models.ContentModel { - log.Info().Msg("Notebook model") // fmt.Println(path) osPath := GetOSPath(path) @@ -83,7 +82,7 @@ func nbformatReads(data string, version int, capture_validation_error bool) OutN } func getDirectoryModel(relativePath string) models.ContentModel { - log.Info().Msgf("relative path %s", relativePath) + log.Debug().Msgf("relative path %s", relativePath) abspath := GetOSPath(relativePath) info, err := os.Lstat(abspath) @@ -97,7 +96,6 @@ func getDirectoryModel(relativePath string) models.ContentModel { Path: relativePath, Last_modified: info.ModTime().GoString(), } - // fmt.Println(path) dir, err := os.Open(abspath) if err != nil { @@ -119,8 +117,6 @@ func getDirectoryModel(relativePath string) models.ContentModel { } func getFileModel(abspath, relativePath, fileName string) models.ContentModel { - // log.Info().Msgf("abs path %s", abspath) - // log.Info().Msgf("relative path %s", relativePath) os_path := filepath.Join(abspath, fileName) @@ -175,8 +171,8 @@ func getFileModelWithContent(path string) models.ContentModel { func read_file2(path string, fileName string) string { extension := filepath.Ext(fileName) - log.Info().Msgf("reading path extension: %s", extension) - log.Info().Msgf("reading path: %s", path) + log.Debug().Msgf("reading path extension: %s", extension) + log.Debug().Msgf("reading path: %s", path) file, err := os.ReadFile(path) if err != nil { panic(err) @@ -188,7 +184,7 @@ func read_file2(path string, fileName string) string { } func read_file(path string) string { - log.Info().Msgf("reading path: %s", path) + log.Debug().Msgf("reading path: %s", path) file, err := os.ReadFile(path) if err != nil { panic(err) diff --git a/kernel/kernel_manager.go b/kernel/kernel_manager.go index 90a999e..6e514f7 100644 --- a/kernel/kernel_manager.go +++ b/kernel/kernel_manager.go @@ -74,7 +74,7 @@ func (km *KernelManager) asyncPrestartKernel(kernelName string) ([]string, map[s kw := km.preLaunch() kernelCmd := kw["cmd"].([]string) - log.Info().Msgf("kenelName: %s", kernelName) + log.Debug().Msgf("kenelName: %s", kernelName) return kernelCmd, kw } @@ -92,20 +92,17 @@ func isLocalIP(ip string) bool { *********************************************************************/ func (km *KernelManager) asyncLaunchKernel(kernelCmd []string, kw map[string]interface{}) { - // log.Info().Msgf("kw : %s", kw) - // log.Info().Msgf("cmd : %s", kernelCmd) - ConnectionInfo := km.Provisioner.LaunchKernel(kernelCmd, kw, km.ConnectionFile) - log.Info().Msgf("connectionInfo: %s", ConnectionInfo) + log.Debug().Msgf("connectionInfo: %s", ConnectionInfo) } func (km *KernelManager) preLaunch() map[string]interface{} { if km.ConnectionInfo.Transport == "tcp" && !isLocalIP(km.ConnectionInfo.IP) { - log.Info().Msg("Can only launch a kernel on a local interface.") + log.Debug().Msg("Can only launch a kernel on a local interface.") } - // log.Info().Msgf("cache ports: %t", km.CachePorts) - // log.Info().Msgf("km.Provisioner.PortsCached %t", km.Provisioner.PortsCached) + log.Debug().Msgf("cache ports: %t", km.CachePorts) + log.Debug().Msgf("km.Provisioner.PortsCached %t", km.Provisioner.PortsCached) if km.CachePorts && !km.Provisioner.PortsCached { km.ConnectionInfo.ShellPort, _ = findAvailablePort() @@ -113,16 +110,15 @@ func (km *KernelManager) preLaunch() map[string]interface{} { km.ConnectionInfo.StdinPort, _ = findAvailablePort() km.ConnectionInfo.HbPort, _ = findAvailablePort() km.ConnectionInfo.ControlPort, _ = findAvailablePort() - log.Info().Msgf("connectionInfo : %+v", km.ConnectionInfo) + log.Debug().Msgf("connectionInfo : %+v", km.ConnectionInfo) } - log.Info().Msgf("km.ConnectionFile : %+v", km.ConnectionFile) + log.Debug().Msgf("km.ConnectionFile : %+v", km.ConnectionFile) km.writeConnectionFile(km.ConnectionFile) - // km.writeConnectionFile("kernelConnection.json") kernelCmd := km.formatKernelCmd() - log.Info().Msgf("kernel cmd is %s", kernelCmd) - // the following one is from super + log.Debug().Msgf("kernel cmd is %s", kernelCmd) + env := make(map[string]interface{}) env["cmd"] = kernelCmd env["env"] = os.Environ() diff --git a/kernel/kernel_session.go b/kernel/kernel_session.go index 49d3385..518649d 100644 --- a/kernel/kernel_session.go +++ b/kernel/kernel_session.go @@ -95,26 +95,20 @@ func (ks *KernelSession) Send( metadata map[string]interface{}, ) Message { - log.Info().Msg("================sending the message===============") - var msg Message switch v := msgOrType.(type) { case Message: - // log.Print("received Message type") msg = v if buffers == nil { - // fill the buffer with msg buffers buffers = msg.Buffers } case string: - // kernel info request goes - // log.Print("received String type") msg = ks.createMsg(v, content, parent, header, metadata) default: - log.Info().Msgf("msg_or_type must be of type Message or string, got %T", v) + log.Debug().Msgf("msg_or_type must be of type Message or string, got %T", v) } - log.Info().Msgf("message is %+v", msg) + log.Debug().Msgf("message is %+v", msg) if ks.CheckPid && os.Getpid() != ks.Pid { log.Info().Msgf("WARNING: attempted to send message from fork %+v", msg) @@ -161,9 +155,9 @@ func (ks *KernelSession) Send( tracker, _ = stream.SendMessage(toSend) if ks.Debug { - log.Info().Msgf("Message: %s\n", msg.MsgId) - log.Info().Msgf("ToSend: %s\n", toSend) - log.Info().Msgf("Buffers: %s\n", buffers) + log.Debug().Msgf("Message: %s\n", msg.MsgId) + log.Debug().Msgf("ToSend: %s\n", toSend) + log.Debug().Msgf("Buffers: %s\n", buffers) } msg.Tracker = tracker @@ -189,12 +183,12 @@ func (ks *KernelSession) serialize(msg Message, ident [][]byte) [][]byte { json_packer(msg.Content), // []byte("kernel_info_request"), } to_send := [][]byte{} - // log.Info().Msgf("real message is %s", realMessage) + log.Debug().Msgf("real message is %s", realMessage) signature := ks.sign(realMessage) to_send = append(to_send, []byte(DELIM)) to_send = append(to_send, []byte(signature)) to_send = append(to_send, realMessage...) - // log.Info().Msgf("after signing message is %s", realMessage) + log.Debug().Msgf("after signing message is %s", realMessage) return to_send } @@ -268,7 +262,7 @@ func (ks *KernelSession) deserialize( // message.Buffers = msgList[5:] // Debug print - fmt.Printf("Message: %+v\n", message) + log.Debug().Msgf("Message: %+v\n", message) // Adapt to the current version (implement as needed) // message = adapt(message) diff --git a/kernel/kernel_supervisor.go b/kernel/kernel_supervisor.go index 1323205..2cfcec1 100644 --- a/kernel/kernel_supervisor.go +++ b/kernel/kernel_supervisor.go @@ -2,7 +2,6 @@ package kernel import ( "fmt" - "log" "os" "path/filepath" "sync" @@ -12,6 +11,8 @@ import ( "github.com/zasper-io/zasper/models" "github.com/google/uuid" + + "github.com/rs/zerolog/log" ) var ZasperPendingKernels map[string]KernelManager @@ -99,7 +100,7 @@ func StartKernelManager(kernelPath string, kernelName string, env map[string]str kernelId := uuid.New().String() km, kernel_name, kernel_id := createKernelManager(kernelName, kernelId) - log.Println(km, kernel_name, kernel_id) + log.Debug().Msgf("%v | %v | %v ", km, kernel_name, kernel_id) km.StartKernel(kernelName) @@ -130,7 +131,7 @@ func createKernelManager(kernelName string, kernelId string) (KernelManager, str km.ConnectionInfo.Transport = "tcp" km.ConnectionInfo.IP = "127.0.0.1" km.Session = getSession() - log.Println("session is", km.Session) + log.Info().Msgf("session is %v", km.Session) return km, kernelName, kernelId } diff --git a/kernel/launcher/launcher.go b/kernel/launcher/launcher.go index f4fbf47..e7077bb 100644 --- a/kernel/launcher/launcher.go +++ b/kernel/launcher/launcher.go @@ -52,6 +52,6 @@ func LaunchKernel(kernelCmd []string, kw map[string]interface{}, connFile string } }() - log.Info().Msg("Process started successfully") + log.Debug().Msg("Process started successfully") return cmd.Process } diff --git a/kernel/provisioner/local_provisioner.go b/kernel/provisioner/local_provisioner.go index ecc105b..272791c 100644 --- a/kernel/provisioner/local_provisioner.go +++ b/kernel/provisioner/local_provisioner.go @@ -24,6 +24,6 @@ type LocalProvisioner struct { func (provisioner *LocalProvisioner) LaunchKernel(kernelCmd []string, kw map[string]interface{}, connFile string) KernelConnectionInfo { process := launcher.LaunchKernel(kernelCmd, kw, connFile) provisioner.Pid = process.Pid - log.Info().Msgf("kernel launched with pid: %d", process.Pid) + log.Debug().Msgf("kernel launched with pid: %d", process.Pid) return provisioner.ConnectionInfo } diff --git a/kernel/utils.go b/kernel/utils.go index 2ab7e2b..ffc81e0 100644 --- a/kernel/utils.go +++ b/kernel/utils.go @@ -58,7 +58,7 @@ func findAvailablePort() (int, error) { if portExists(port) { continue } - // log.Info().Msgf("check port %d", port) + log.Debug().Msgf("check port %d", port) l, err := net.Listen("tcp", ":"+strconv.Itoa(port)) if err == nil { currentlyUsedPorts = append(currentlyUsedPorts, port) diff --git a/kernelspec/kernelspec_manager.go b/kernelspec/kernelspec_manager.go index 4a88504..56205d2 100644 --- a/kernelspec/kernelspec_manager.go +++ b/kernelspec/kernelspec_manager.go @@ -3,12 +3,13 @@ package kernelspec import ( "encoding/json" "fmt" - "log" "os" "path/filepath" "strings" "github.com/zasper-io/zasper/core" + + "github.com/rs/zerolog/log" ) type KernelspecResponse struct { @@ -99,14 +100,14 @@ func fromResourceDir(resourceDir string) KernelSpecJsonData { */ kernelFile := filepath.Join(resourceDir, "kernel.json") - log.Println("loading file", kernelFile) + log.Info().Msgf("loading file %s", kernelFile) byteValue, _ := os.ReadFile(kernelFile) var kernelSpecJsonData KernelSpecJsonData err := json.Unmarshal(byteValue, &kernelSpecJsonData) if err != nil { - log.Println("error encountered") + log.Info().Msg("error encountered") } log.Print(kernelSpecJsonData) kernelSpecJsonData.ResourceDir = resourceDir @@ -192,10 +193,10 @@ func getKernelDirs() []string { func listKernelsIn(kernelDir string) map[string]string { dir, err := os.Open(kernelDir) if err != nil { - log.Println("No kernels found in", kernelDir) + log.Debug().Msgf("No kernels found in %s", kernelDir) return nil } - log.Println("kernels found in", kernelDir) + log.Info().Msgf("kernels found in %s", kernelDir) files, err := dir.Readdir(0) if err != nil { fmt.Println(err) diff --git a/websocket/channels.go b/websocket/channels.go index 332ff40..f15d594 100644 --- a/websocket/channels.go +++ b/websocket/channels.go @@ -194,20 +194,19 @@ func (kwsConn *KernelWebSocketConnection) nudge() { if socket.Socket == nil { continue } - fmt.Print("Polling .....") + log.Debug().Msgf("Polling .....") switch s := socket.Socket; s { case transient_shell_channel: msg, _ := s.Recv(0) - fmt.Printf("Received from Shell socket: %s\n", msg) + log.Debug().Msgf("Received from Shell socket: %s\n", msg) shellFuture = true case transient_control_channel: msg, _ := s.Recv(0) - fmt.Printf("Received from Control socket: %s\n", msg) + log.Debug().Msgf("Received from Control socket: %s\n", msg) controlFuture = true case iopub_channel: msg, _ := s.Recv(0) - fmt.Printf("Received from IoPub socket: %s\n", msg) - // kwsConn.send <- []byte(msg) + log.Debug().Msgf("Received from IoPub socket: %s\n", msg) infoFuture = true } } @@ -215,7 +214,7 @@ func (kwsConn *KernelWebSocketConnection) nudge() { } transient_control_channel.Close() transient_shell_channel.Close() - fmt.Print("Nudge successful") + log.Debug().Msgf("Nudge successful") } func deserializeMsgFromWsV1([]byte) (string, []interface{}) { @@ -251,7 +250,7 @@ func (kwsConn *KernelWebSocketConnection) handleIncomingMessage(messageType int, log.Info().Msgf("Error unmarshalling message: %s", err) return } - fmt.Println("msg is =>", msg) + log.Debug().Msgf("msg is => %v", msg) kwsConn.Session.SendStreamMsg(kwsConn.Channels["shell"], msg) @@ -261,12 +260,12 @@ func (kwsConn *KernelWebSocketConnection) handleIncomingMessage(messageType int, } if channel == "" { - log.Printf("No channel specified, assuming shell: %v", msg) + log.Debug().Msgf("No channel specified, assuming shell: %v", msg) channel = "shell" } if _, ok := kwsConn.Channels[channel]; !ok { - log.Printf("No such channel: %v", channel) + log.Debug().Msgf("No such channel: %v", channel) return } @@ -287,7 +286,7 @@ func (kwsConn *KernelWebSocketConnection) handleIncomingMessage(messageType int, if !ignoreMsg { stream := kwsConn.Channels[channel] - fmt.Print("stream", stream) + log.Debug().Msgf("stream %s", stream) if kwsConn.Subprotocol == "v1.kernel.websocket.jupyter.org" { // kwsConn.Session.SendRaw(stream, msgList) } else { @@ -488,37 +487,6 @@ func removeOpenSocket(kwsConn *KernelWebSocketConnection) { panic("unimplemented") } -func (kwsConn *KernelWebSocketConnection) onZMQReply(stream map[string]*zmq4.Socket, msgList []kernel.Message) { - // Check if the stream is closed - // if stream.IsClosed() { - // fmt.Println("ZMQ message arrived on closed channel") - // kwsConn.Disconnect() - // return - // } - - // channel := kwsConn.Channels // Assuming it's assigned earlier - - // var binMsg []byte - // var err error - - // if kwsConn.Subprotocol == "v1.kernel.websocket.jupyter.org" { - // binMsg, err = serializeMsgToWSV1(msgList, channel) - // if err == nil { - // kwsConn.WriteMessage(binMsg, true) - // } - // } else { - // msg, err := channel.ReserializeReply(msgList) - // if err != nil { - // log.Printf("Malformed message: %v", msgList) - // } else { - // err = channel.WriteMessage(msg, isBinary(msg)) - // if err != nil { - // fmt.Println(err) - // } - // } - // } -} - func registerWebSocketSession(sessionId string) { log.Info().Msgf("registering a new session: %s", sessionId) } diff --git a/websocket/kernel_websocket_handler.go b/websocket/kernel_websocket_handler.go index 4453706..7a91124 100644 --- a/websocket/kernel_websocket_handler.go +++ b/websocket/kernel_websocket_handler.go @@ -1,7 +1,6 @@ package websocket import ( - "log" "net/http" "sync" @@ -11,6 +10,8 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/pebbe/zmq4" + + "github.com/rs/zerolog/log" ) var upgrader = websocket.Upgrader{ @@ -26,18 +27,18 @@ var ( ) func HandleWebSocket(w http.ResponseWriter, req *http.Request) { - log.Println("receieved kernel connection request") + log.Info().Msg("receieved kernel connection request") vars := mux.Vars(req) kernelId := vars["kernelId"] - log.Println("kernelName :", kernelId) + log.Info().Msgf("kernelName : %s", kernelId) sessionId := req.URL.Query().Get("session_id") - log.Println("sessionId :", sessionId) + log.Info().Msgf("sessionId : %s", sessionId) session, ok := core.ZasperSession[sessionId] - log.Println("session", session) + log.Info().Msgf("session %v", session) if !ok { - log.Println("session not found") + log.Info().Msg("session not found") http.NotFound(w, req) return } @@ -45,7 +46,7 @@ func HandleWebSocket(w http.ResponseWriter, req *http.Request) { kernelManager, ok := kernel.ZasperActiveKernels[kernelId] if !ok { - log.Println("kernel not found") + log.Info().Msg("kernel not found") http.NotFound(w, req) return } @@ -53,7 +54,7 @@ func HandleWebSocket(w http.ResponseWriter, req *http.Request) { conn, err := upgrader.Upgrade(w, req, nil) if err != nil { - log.Println(err) + log.Info().Msgf("%s", err) return } @@ -69,10 +70,10 @@ func HandleWebSocket(w http.ResponseWriter, req *http.Request) { Send: make(chan []byte), } - log.Println("preparing kernel connection") + log.Info().Msg("preparing kernel connection") kernelConnection.prepare(sessionId) - log.Println("connecting kernel") + log.Info().Msg("connecting kernel") kernelConnection.connect() clientsMu.Lock() @@ -92,10 +93,10 @@ func (kwsConn *KernelWebSocketConnection) readMessages() { for { messageType, data, err := kwsConn.Conn.ReadMessage() if err != nil { - log.Println(err) + log.Debug().Msgf("%s", err) return } - log.Println("message type =>", messageType) + log.Debug().Msgf("message type => %s", messageType) kwsConn.handleIncomingMessage(messageType, data) // broadcast <- message // Send message to broadcast channel } @@ -110,7 +111,7 @@ func (kwsConn *KernelWebSocketConnection) writeMessages() { return } if err := kwsConn.Conn.WriteMessage(websocket.TextMessage, message); err != nil { - log.Println("Error writing message:", err) + log.Info().Msgf("Error writing message: %s", err) return } }