Skip to content

Commit

Permalink
big ass refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsupermanhd committed Mar 19, 2024
1 parent 052cbd0 commit 8203687
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 210 deletions.
8 changes: 3 additions & 5 deletions chunkConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@ package main
import (
"bytes"
"compress/gzip"
"context"
"errors"
"log"
"strings"
"time"

"github.com/maxsupermanhd/WebChunk/chunkStorage"
"github.com/maxsupermanhd/WebChunk/proxy"
"github.com/maxsupermanhd/go-vmc/v764/level"
"github.com/maxsupermanhd/go-vmc/v764/nbt"
"github.com/maxsupermanhd/go-vmc/v764/save"
)

func chunkConsumer(ctx context.Context, c chan *proxy.ProxiedChunk) {
func chunkConsumer(exitchan <-chan struct{}) {
for {
select {
case <-ctx.Done():
case <-exitchan:
return
case r := <-c:
case r := <-chunkChannel:
if r.Dimension == "" || r.Server == "" {
log.Printf("Got chunk [%v](%v) from [%v] by [%v] with empty params, DROPPING", r.Pos, r.Dimension, r.Server, r.Username)
continue
Expand Down
1 change: 1 addition & 0 deletions debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package main
5 changes: 3 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Path to load config from is taken from environment variable `WEBCHUNK_CONFIG` an
| --- | --- | --- | --- | --- |
| `logs_path` | string | No | `./logs/WebChunk.log` | Path to log file (will create files and directories if needed) |
| `colors_path` | string | Yes 🔧 |`./colors.gob` | Path to GOB-encoded block color palette |
| `ignore_failed_storages` | bool | No | `false` | Continue to start webchunk if errors occur on storages init |
| `storages` | object | No | `{}` | Contains defined storages, see [Storage object](#storage-object) |
| `render_received` | bool | Yes | `true` | Do render chunks immediately when received |
| `imaging_workers` | int | No | `4` | Essentially number of IO threads that read/write from cache |
Expand Down Expand Up @@ -39,8 +40,8 @@ Storage object contains 2 fields: `type` and `address`.

Storage types:

- `postgres` PostgreSQL database, address field is a URI or DSN connection string to the database
- `filesystem` Mojang-compatible anvil region format storage, address filed is a path to the directory (will not be created automatically)
- `postgres` PostgreSQL database, address is a URI or DSN connection string to the database
- `filesystem` Mojang-compatible anvil region format storage, address is a path to the directory (will not be created automatically)

Example of storage objects:

Expand Down
6 changes: 2 additions & 4 deletions eventRouter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"log"
)

Expand All @@ -28,12 +27,11 @@ func newMapEventRouter() *mapEventRouter {
}
}

func (router *mapEventRouter) Run(ctx context.Context) {
func (router *mapEventRouter) Run(exitchan <-chan struct{}) {
clients := map[chan mapEvent]bool{}
log.Println("Event router started")
for {
select {
case <-ctx.Done():
case <-exitchan:
for c := range clients {
close(c)
}
Expand Down
212 changes: 40 additions & 172 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,15 @@ import (
"context"
"io"
"log"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"runtime"
"runtime/debug"
rpprof "runtime/pprof"
"sync"
"syscall"
"time"

"github.com/maxsupermanhd/WebChunk/chunkStorage"
imagecache "github.com/maxsupermanhd/WebChunk/imageCache"
"github.com/maxsupermanhd/WebChunk/proxy"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
)

var (
Expand All @@ -51,7 +43,9 @@ var (
)

var (
ic *imagecache.ImageCache
ic *imagecache.ImageCache
chunkChannel = make(chan *proxy.ProxiedChunk, 12*12)
mainCtxCancel context.CancelFunc
)

func main() {
Expand All @@ -78,187 +72,61 @@ func main() {
rpprof.StartCPUProfile(f)
}

var wg sync.WaitGroup
ctx, ctxCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)

if err := storagesInit(); err != nil && cfg.GetDSBool(false, "ignore_failed_storages") {
log.Fatal("Failed to initialize storages: ", err)
}
if err := loadColors(cfg.GetDSString("./colors.gob", "colors_path")); err != nil {
log.Fatal(err)
}

// log.Println("Starting image scaler")
// wg.Add(1)
// go func() {
// imagingProcessor(ctx)
// log.Println("Image scaler stopped")
// wg.Done()
// }()

log.Println("Starting metrix dispatcher")
wg.Add(2)
go func() {
metricsDispatcher()
log.Println("Metrix dispatcher stopped")
wg.Done()
}()
go func() {
<-ctx.Done()
closeMetrics()
wg.Done()
}()

log.Println("Starting event router")
wg.Add(1)
go func() {
globalEventRouter.Run(ctx)
log.Println("Event router stopped")
wg.Done()
}()

if err := storagesInit(); err != nil {
log.Fatal("Failed to initialize storages: ", err)
}

log.Println("Starting template manager")
wg.Add(1)
go func() {
templateManager(ctx, cfg.SubTree("web"))
wg.Done()
}()

log.Println("Adding routes")
// wg.Add(1)
// go func() {
// tasksProgressBroadcaster.Start()
// wg.Done()
// }()
// defer tasksProgressBroadcaster.Stop()
router := mux.NewRouter()
router.PathPrefix("/static").Handler(http.StripPrefix("/static/", http.FileServer(hiddenFileSystem{http.Dir("./static")}))).Methods("GET")
router.HandleFunc("/favicon.ico", faviconHandler).Methods("GET")
router.HandleFunc("/robots.txt", robotsHandler).Methods("GET")

router.HandleFunc("/", indexHandler).Methods("GET")
router.HandleFunc("/stop", func(w http.ResponseWriter, _ *http.Request) {
ctxCancel()
w.WriteHeader(200)
w.Write([]byte("Success"))
}).Methods("GET")
router.HandleFunc("/worlds/{world}/{dim}", dimensionHandler).Methods("GET")
router.HandleFunc("/worlds/{world}/{dim}/tiles/{ttype}/{cs:[0-9]+}/{cx:-?[0-9]+}/{cz:-?[0-9]+}/{format}", tileRouterHandler).Methods("GET")
router.HandleFunc("/view", basicTemplateResponseHandler("view")).Methods("GET")
router.HandleFunc("/colors", colorsHandlerGET).Methods("GET")
router.HandleFunc("/colors", colorsHandlerPOST).Methods("POST")
router.HandleFunc("/colors/save", colorsSaveHandler).Methods("GET")
router.HandleFunc("/cfg", cfgHandler).Methods("GET")

router.HandleFunc("/api/v1/config/save", apiHandle(apiSaveConfig)).Methods("GET")

router.HandleFunc("/api/v1/submit/chunk/{world}/{dim}", apiHandle(apiAddChunkHandler))
router.HandleFunc("/api/v1/submit/region/{world}/{dim}", apiAddRegionHandler)

router.HandleFunc("/api/v1/renderers", apiHandle(apiListRenderers)).Methods("GET")

router.HandleFunc("/api/v1/storages", apiHandle(apiStoragesGET)).Methods("GET")
router.HandleFunc("/api/v1/storages", apiHandle(apiStorageAdd)).Methods("PUT")
router.HandleFunc("/api/v1/storages/{storage}/reinit", apiHandle(apiStorageReinit)).Methods("GET")

router.HandleFunc("/api/v1/worlds", apiHandle(apiAddWorld)).Methods("POST")
router.HandleFunc("/api/v1/worlds", apiHandle(apiListWorlds)).Methods("GET")
var ctx context.Context
ctx, mainCtxCancel = signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)

router.HandleFunc("/api/v1/dims", apiHandle(apiAddDimension)).Methods("POST")
router.HandleFunc("/api/v1/dims", apiHandle(apiListDimensions)).Methods("GET")

router.HandleFunc("/api/v1/ws", wsClientHandlerWrapper(ctx))

router.HandleFunc("/debug/chunk/{world}/{dim}/{cx:-?[0-9]+}/{cz:-?[0-9]+}", terrainInfoHandler).Methods("GET")
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
router.Handle("/debug/pprof/heap", pprof.Handler("heap"))
router.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
router.Handle("/debug/pprof/block", pprof.Handler("block"))
router.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
router.HandleFunc("/debug/gc", func(w http.ResponseWriter, r *http.Request) {
runtime.GC()
w.WriteHeader(200)
w.Write([]byte("ok"))
bgsMetrics := startBackgroundRoutine("metrics dispatcher", metricsDispatcher)
bgsEventRouter := startBackgroundRoutine("event router", globalEventRouter.Run)
bgsTemplateManager := startBackgroundRoutine("template manager", func(ec <-chan struct{}) { templateManager(ec, cfg.SubTree("web")) })
bgsChunkConsumer := startBackgroundRoutine("chunk consumer", chunkConsumer)
bgsImageCache := startBackgroundRoutine("image cache", func(c <-chan struct{}) {
imageCacheCtx, imageCacheCtxCancel := context.WithCancel(context.Background())
go func() {
<-c
imageCacheCtxCancel()
}()
ic = imagecache.NewImageCache(log.Default(), cfg.SubTree("imageCache"), imageCacheCtx)
ic.WaitExit()
})

router1 := handlers.ProxyHeaders(router)
router2 := handlers.CompressHandler(router1)
router3 := handlers.CustomLoggingHandler(os.Stdout, router2, customLogger)
router4 := handlers.RecoveryHandler(handlers.PrintRecoveryStack(true))(router3)

chunkChannel := make(chan *proxy.ProxiedChunk, 12*12)
wg.Add(1)
go func() {
addr := cfg.GetDSString("0.0.0.0:3002", "web", "listen_addr")
if addr == "" {
log.Println("Not starting web server because listen address is empty")
return
}
websrv := http.Server{
Addr: addr,
Handler: router4,
}
log.Println("Starting web server (http://" + addr + "/)")
wg.Add(1)
bgsProxy := startBackgroundRoutine("proxy", func(c <-chan struct{}) {
proxyCtx, proxyCtxCancel := context.WithCancel(context.Background())
go func() {
if err := websrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Web server returned an error: %s\n", err)
}
wg.Done()
<-c
proxyCtxCancel()
}()
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := websrv.Shutdown(shutdownCtx); err != nil {
log.Fatalf("Server Shutdown Failed:%+v", err)
} else {
log.Println("Web server stopped")
}
wg.Done()
}()
wg.Add(1)
go func() {
addr := cfg.GetDSString("0.0.0.0:25566", "proxy", "listen_addr")
if addr == "" {
log.Println("Not starting proxy because listen address is empty")
return
}
log.Println("Starting proxy")
proxy.RunProxy(ctx, cfg.SubTree("proxy"), chunkChannel)
log.Println("Proxy stopped")
wg.Done()
}()
wg.Add(1)
go func() {
chunkConsumer(ctx, chunkChannel)
log.Println("Chunk consumer stopped")
wg.Done()
}()
wg.Add(1)
go func() {
ic = imagecache.NewImageCache(log.Default(), cfg.SubTree("imageCache"), ctx)
ic.WaitExit()
log.Println("Image cache stopped")
wg.Done()
}()
proxy.RunProxy(proxyCtx, cfg.SubTree("proxy"), chunkChannel)
})
bgsWeb := startBackgroundRoutine("web server", runWeb)

<-ctx.Done()
log.Println("Interrupt recieved, shutting down...")
ctxCancel()
wg.Wait()

bgsWeb()
log.Println("Waiting for websocket clients to drop...")
wsClients.Wait()

bgsProxy()
bgsImageCache()
bgsChunkConsumer()
bgsTemplateManager()
bgsEventRouter()
bgsMetrics()

log.Println("Shutting down storages...")
chunkStorage.CloseStorages(storages)
log.Println("Storages closed.")

if profileCPU {
log.Println("Stopping profiler...")
rpprof.StopCPUProfile()
}
lg.Close()
log.Println("Shutdown complete, bye!")
}
9 changes: 7 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ var collectPackets = []packetid.ClientboundPacketID{
}

func RunProxy(ctx context.Context, cfg *lac.ConfSubtree, dump chan *ProxiedChunk) {
listenAddr := cfg.GetDSString("localhost:25566", "listen_addr")
if listenAddr == "" {
log.Println("Proxy disabled")
return
}
var icon image.Image
if iconpath := cfg.GetDSString("", "icon_path"); iconpath != "" {
f, err := os.Open(iconpath)
Expand Down Expand Up @@ -128,12 +133,12 @@ func RunProxy(ctx context.Context, cfg *lac.ConfSubtree, dump chan *ProxiedChunk
Ctx: ctx,
},
}
listener, err := net.ListenMC(cfg.GetDSString("localhost:25566", "listen_addr"))
listener, err := net.ListenMC(listenAddr)
if err != nil {
log.Println("Proxy startup error: ", err)
return
}
log.Println("Proxy started on " + cfg.GetDSString("localhost:25566", "listen_addr"))
log.Println("Proxy started on " + listenAddr)
var wg sync.WaitGroup
lstCloseChan := make(chan struct{})
wg.Add(1)
Expand Down
5 changes: 2 additions & 3 deletions templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package main

import (
"context"
"fmt"
"html/template"
"log"
Expand Down Expand Up @@ -89,7 +88,7 @@ func plainmsg(w http.ResponseWriter, r *http.Request, color int, msg string) {
"msg": msg})
}

func templateManager(ctx context.Context, cfg *lac.ConfSubtree) {
func templateManager(exitchan <-chan struct{}, cfg *lac.ConfSubtree) {
log.Println("Loading web templates")
templatesGlob := cfg.GetDSString("templates/*.gohtml", "templates_glob")
var err error
Expand Down Expand Up @@ -133,7 +132,7 @@ func templateManager(ctx context.Context, cfg *lac.ConfSubtree) {
return
}
log.Println("Layouts watcher error:", err)
case <-ctx.Done():
case <-exitchan:
watcher.Close()
log.Println("Layouts watcher stopped")
return
Expand Down
Loading

0 comments on commit 8203687

Please sign in to comment.