Skip to content

Commit

Permalink
rawx: Check repository with probe
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Nov 18, 2020
1 parent e03805d commit 0a60950
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 8 deletions.
73 changes: 65 additions & 8 deletions rawx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ http handler.
import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -81,6 +82,54 @@ func installSigHandlers(srv *http.Server) {
}()
}

func taskProbeRepository(rawx *rawxService, finished chan bool) {
for {
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
select {
case <-finished:
LogInfo("Stop the probe to check the repository")
return
default:
}
}

/* Try a directory creation */
path := fmt.Sprintf("%s/probe-%s", rawx.path,
randomString(16, hexaCharacters))
LogDebug("Probing directory %s", path)
err := os.Mkdir(path, 0755)
os.Remove(path)
if err != nil {
msg := fmt.Sprintf("IO error on %s %s: %v", rawx.getURL(), path,
err)
LogWarning(msg)
rawx.lastIOError = time.Now()
rawx.lastIOMsg = msg
continue
}

/* Try a file creation */
path = fmt.Sprintf("%s/probe-%s", rawx.path,
randomString(16, hexaCharacters))
LogDebug("Probing file %s", path)
file, err := os.Create(path)
file.Close()
os.Remove(path)
if err != nil {
msg := fmt.Sprintf("IO error on %s %s: %v", rawx.getURL(), path,
err)
LogWarning(msg)
rawx.lastIOError = time.Now()
rawx.lastIOMsg = msg
continue
}

rawx.lastIOSuccess = time.Now()
rawx.lastIOMsg = "n/a"
}
}

func main() {
var err error

Expand Down Expand Up @@ -144,14 +193,18 @@ func main() {
chunkrepo.sub.openNonBlock = opts.getBool("nonblock", configDefaultOpenNonblock)

rawx := rawxService{
ns: namespace,
url: rawxURL,
path: chunkrepo.sub.root,
id: rawxID,
repo: chunkrepo,
bufferSize: 1024 * opts.getInt("buffer_size", uploadBufferSizeDefault/1024),
checksumMode: checksumAlways,
compression: opts["compression"],
ns: namespace,
url: rawxURL,
path: chunkrepo.sub.root,
id: rawxID,
repo: chunkrepo,
bufferSize: 1024 * opts.getInt("buffer_size", uploadBufferSizeDefault/1024),
checksumMode: checksumAlways,
compression: opts["compression"],
lastIOError: time.Time{},
lastIOSuccess: time.Time{},
lastIOMsg: "n/a",
lastIOReport: time.Time{},
}

// Clamp the buffer size to admitted values
Expand Down Expand Up @@ -279,10 +332,14 @@ func main() {
}
}

finished := make(chan bool)
go taskProbeRepository(&rawx, finished)

if err := srv.ListenAndServe(); err != nil {
LogWarning("HTTP Server exiting: %v", err)
}

finished <- true
rawx.notifier.stop()
logger.close()
}
43 changes: 43 additions & 0 deletions rawx/rawx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type rawxService struct {
compression string

uploadBufferPool bufferPool

// for IO errors
lastIOError time.Time
lastIOSuccess time.Time
lastIOMsg string
lastIOReport time.Time
}

type rawxRequest struct {
Expand Down Expand Up @@ -103,6 +109,41 @@ func (rr *rawxRequest) replyError(err error) {
}
}

func (rawx *rawxService) getURL() string {
if rawx.id == "" {
return rawx.url
}
return rawx.id
}

func (rawx *rawxService) isIOok() bool {
// Never touched -> OK
if rawx.lastIOError.Equal(time.Time{}) &&
rawx.lastIOSuccess.Equal(time.Time{}) {
return true
}

// The most recent activity is an error -> KO
if rawx.lastIOError.After(rawx.lastIOSuccess) {
return false
}

// Check the probe thread was not stalled
now := time.Now()
oneMinuteBefore := now.Add(-time.Minute)
ok := rawx.lastIOSuccess.After(oneMinuteBefore)
if !ok {
// If this function is called often, only report once per minute
if now.After(rawx.lastIOReport.Add(time.Minute)) {
rawx.lastIOReport = now
LogWarning("IO error checker stalled for %d minutes",
now.Sub(rawx.lastIOSuccess)/time.Minute)
}
}

return ok
}

func (rawx *rawxService) ServeHTTP(rep http.ResponseWriter, req *http.Request) {
rawxreq := rawxRequest{
rawx: rawx,
Expand All @@ -129,6 +170,8 @@ func (rawx *rawxService) ServeHTTP(rep http.ResponseWriter, req *http.Request) {

if len(req.Host) > 0 && (req.Host != rawx.id && req.Host != rawx.url) {
rawxreq.replyCode(http.StatusTeapot)
} else if !rawx.isIOok() {
rawxreq.replyCode(http.StatusServiceUnavailable)
} else {
for _dslash(req.URL.Path) {
req.URL.Path = req.URL.Path[1:]
Expand Down
14 changes: 14 additions & 0 deletions rawx/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
package main

import (
"math/rand"
"strconv"
"strings"
"sync/atomic"
"time"
)

// Hexadecimal characters
const hexaCharacters = "0123456789ABCDEF"

// An array of character considered as invalid hexadecimal.
// YOU SHOULD NOT alter this this unless you know what you are doing
var notHexa [256]bool

func init() {
rand.Seed(time.Now().UnixNano())

hexa := []byte{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'a', 'b', 'c', 'd', 'e', 'f',
Expand Down Expand Up @@ -76,3 +82,11 @@ func (pt *PeriodicThrottle) Ok() bool {
}
return false
}

func randomString(length int, charset string) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Intn(len(charset))]
}
return string(b)
}

0 comments on commit 0a60950

Please sign in to comment.