Skip to content

Commit

Permalink
wait about a sec before logging message, check presence (#144)
Browse files Browse the repository at this point in the history
* wait about a sec before logging message, check presence

* fix: force flush on **each** 5s of inactivity, instead of first 5sec

* bot is useless with arbitrary token and group

* don't use common http client, use a custom one with disabled redirects

* fixed test, set zero delay for flush tests
use pointer receiver for reporter methods to replace http client with mock

* fix: in github actions, `os.TempDir` returns path without slash
concat path properly

* perf: manually stop ticker

* doc: updated readme

* added repeater, comments, some cosmetic changes

* doc: moved comments to the corresponding places

* revert imports sort

by @Semior001
  • Loading branch information
Semior001 authored Feb 15, 2025
1 parent cfa58e4 commit c66d235
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 48 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
Дополнительные переменные окружения со значениями по-умолчанию:

* `DEBUG` (false) – включает режим отладки (логируется больше событий)
* `TELEGRAM_LOGS` (logs) - путь к папке куда пишется лог чата
* `TELEGRAM_LOGS` (logs) - путь к папке куда пишется лог чата, для того чтобы работал, необходимо чтобы в `TELEGRAM_GROUP` было публичное _имя_ группы, в противном случае лог не будет писаться
* `SYS_DATA` (data) - путь к папке с *.data файлами и шаблоном для построения HTML отчета
* `TELEGRAM_TIMEOUT` (30s) – HTTP таймаут для скачивания файлов из Telegram при построении HTML отчета
* `RTJC_PORT` (18001) – порт на который приходят уведомления о новостях
Expand Down
7 changes: 4 additions & 3 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ import (

var opts struct {
Telegram struct {
Token string `long:"token" env:"TOKEN" description:"telegram bot token" default:"test"`
Group string `long:"group" env:"GROUP" description:"group name/id" default:"test"`
Token string `long:"token" env:"TOKEN" description:"telegram bot token" required:"true"`
Group string `long:"group" env:"GROUP" description:"group name/id" required:"true"`
Timeout time.Duration `long:"timeout" env:"TIMEOUT" description:"http client timeout for getting files from Telegram" default:"30s"`
} `group:"telegram" namespace:"telegram" env-namespace:"TELEGRAM"`

RtjcPort int `short:"p" long:"port" env:"RTJC_PORT" default:"18001" description:"rtjc port room"`
LogsPath string `short:"l" long:"logs" env:"TELEGRAM_LOGS" default:"logs" description:"path to logs"`
MessageLogDelay time.Duration `long:"msg-log-delay" env:"MSG_LOG_DELAY" default:"1s" description:"delay for message log"`
SuperUsers events.SuperUser `long:"super" description:"super-users"`
MashapeToken string `long:"mashape" env:"MASHAPE_TOKEN" description:"mashape token"`
SysData string `long:"sys-data" env:"SYS_DATA" default:"data" description:"location of sys data"`
Expand Down Expand Up @@ -211,7 +212,7 @@ func main() {
AllActivityTerm: allActivityTerm,
BotsActivityTerm: botsActivityTerm,
OverallBotActivityTerm: botsAllUsersActivityTerm,
MsgLogger: reporter.NewLogger(opts.LogsPath),
MsgLogger: reporter.NewLogger(opts.LogsPath, opts.MessageLogDelay, opts.Telegram.Group),
Bots: multiBot,
Group: opts.Telegram.Group,
Debug: opts.Dbg,
Expand Down
75 changes: 75 additions & 0 deletions app/reporter/mock_reporter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 91 additions & 11 deletions app/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,60 @@ import (
"time"

"github.com/radio-t/super-bot/app/bot"
"net/http"
"github.com/go-pkgz/repeater"
"context"
)

type msgEntry struct {
MessageID int
Data string
}

// Reporter collects all messages and saves to plain file
type Reporter struct {
logsPath string
messages chan string
logsPath string
messages chan msgEntry
saveDelay time.Duration
chatID string
httpCl httpClient
repeater *repeater.Repeater
}

//go:generate moq -out mock_reporter.go . httpClient

type httpClient interface {
Get(url string) (*http.Response, error)
}

// NewLogger makes new reporter bot
func NewLogger(logs string) (result Reporter) {
func NewLogger(logs string, delay time.Duration, chatID string) (result *Reporter) {
log.Printf("[INFO] new reporter, path=%s", logs)
_ = os.MkdirAll(logs, 0o750)
result = Reporter{logsPath: logs, messages: make(chan string, 1000)}
if err := os.MkdirAll(logs, 0o750); err != nil {
log.Printf("[WARN] can't make logs dir %s, %v", logs, err)
}
result = &Reporter{
logsPath: logs,
messages: make(chan msgEntry, 1000),
saveDelay: delay,
httpCl: &http.Client{
Timeout: time.Second * 5,
CheckRedirect: func(*http.Request, []*http.Request) error {
// don't follow redirects, we need to check status code
return http.ErrUseLastResponse
},
},
repeater: repeater.NewDefault(3, 2*time.Second),
chatID: chatID,
}
go result.activate()
return result
}

// Save to log channel, non-blocking and skip if needed
func (l Reporter) Save(msg *bot.Message) {
func (l *Reporter) Save(msg *bot.Message) {
if msg.Text == "" && msg.Image == nil {
log.Print("[DEBUG] message not saved to log: no text or image = irrelevant")
log.Printf("[DEBUG] message not saved to log: no text or image = irrelevant, msg id: %d", msg.ID)
return
}

Expand All @@ -39,15 +72,17 @@ func (l Reporter) Save(msg *bot.Message) {
}

select {
case l.messages <- string(bdata) + "\n":
case l.messages <- msgEntry{MessageID: msg.ID, Data: string(bdata) + "\n"}:
default:
log.Printf("[WARN] can't buffer log entry %v", msg)
}
}

func (l Reporter) activate() {
func (l *Reporter) activate() {
log.Print("[INFO] activate reporter")
buffer := make([]string, 0, 100)
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

writeBuff := func() error {
if len(buffer) == 0 {
Expand Down Expand Up @@ -76,16 +111,61 @@ func (l Reporter) activate() {
for {
select {
case entry := <-l.messages:
buffer = append(buffer, entry)
// don't save right away, wait for antispam checks
time.Sleep(l.saveDelay)

if !l.messageExists(entry.MessageID) {
log.Printf("[DEBUG] message %d has been deleted, skipping", entry.MessageID)
continue
}

buffer = append(buffer, entry.Data)
if len(buffer) >= 100 { // forced flush every 100 records
if err := writeBuff(); err != nil {
log.Printf("[WARN] failed to write reporter buffer, %v", err)
}
}
case <-time.After(time.Second * 5): // flush on 5 seconds inactivity
case <-ticker.C: // flush on 5 seconds inactivity
if err := writeBuff(); err != nil {
log.Printf("[WARN] failed to write reporter buffer, %v", err)
}
}
}
}

// messageExists checks if message wasn't deleted by a user, to prevent
// spam being saved to logs.
// this doesn't use bot api, since bot api can't access messages, message ID
// is there only for reply purposes.
func (l *Reporter) messageExists(msgID int) bool {
var resp *http.Response
var err error

fn := func() error {
// a hacky way to check if message exists, by
// requesting a link to the message with "single" query parameter.
// ref: https://core.telegram.org/api/links#message-links

//nolint:bodyclose // for some reason it gives false positives
resp, err = l.httpCl.Get(fmt.Sprintf("https://t.me/%s/%d?single", l.chatID, msgID))
if err != nil {
return fmt.Errorf("get: %w", err)
}
defer resp.Body.Close() //nolint

// telegram returns 302 redirect to the same page without query, if it exists
// and 200 with the "download telegram" webpage, if it isn't
if resp.StatusCode == http.StatusFound || resp.StatusCode == http.StatusOK {
return nil
}

return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

if err = l.repeater.Do(context.TODO(), fn); err != nil {
log.Printf("[WARN] failed to check message existence, %v", err)
return false
}

return resp.StatusCode == http.StatusFound
}
114 changes: 81 additions & 33 deletions app/reporter/reporter_test.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,92 @@
package reporter

import (
"fmt"
"os"
"strconv"
"testing"
"github.com/radio-t/super-bot/app/bot"
"os"
"net/http"
"time"

"github.com/stretchr/testify/assert"

"github.com/radio-t/super-bot/app/bot"
"strconv"
"fmt"
"github.com/stretchr/testify/require"
"io"
"bytes"
"path"
)

var logs = "logs"
var msg = bot.Message{Text: "1st"}
var msg = bot.Message{ID: 101, Text: "1st"}

func TestNewLogger(t *testing.T) {
defer os.RemoveAll(logs)
reporter := NewLogger(logs)
assert.NotNil(t, reporter)
assert.DirExists(t, logs)

tbl := []struct {
count int
timeout time.Duration
}{
{101, 100 * time.Millisecond},
{1, 6 * time.Second},
}

for i, tt := range tbl {
t.Run(strconv.Itoa(i), func(t *testing.T) {
for i = 0; i < tt.count; i++ {
reporter.Save(&msg)
}
time.Sleep(tt.timeout)
logfile := fmt.Sprintf("%s/%s.log", logs, time.Now().Format("20060102"))
assert.FileExists(t, logfile)
err := os.Remove(logfile)
assert.NoError(t, err)
})
}
t.Run("logger saves messages", func(t *testing.T) {
tbl := []struct {
count int
timeout time.Duration
}{
{101, 100 * time.Millisecond},
{1, 6 * time.Second},
}

for i, tt := range tbl {
t.Run(strconv.Itoa(i), func(t *testing.T) {
p := path.Join(os.TempDir(), "superbot_logs", strconv.Itoa(i))
defer os.RemoveAll(p)

clientMock := &httpClientMock{
GetFunc: func(url string) (*http.Response, error) {
assert.Equal(t, "https://t.me/radio_t_chat/101?single", url)
return &http.Response{
StatusCode: 302,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
}, nil
},
}

reporter := NewLogger(p, 0, "radio_t_chat")
reporter.httpCl = clientMock
assert.NotNil(t, reporter)
assert.DirExists(t, p)

for i = 0; i < tt.count; i++ {
reporter.Save(&msg)
}
time.Sleep(tt.timeout)
logfile := fmt.Sprintf("%s/%s.log", p, time.Now().Format("20060102"))
assert.FileExists(t, logfile)
require.NoError(t, os.Remove(logfile))
})
}
})

t.Run("logger skips deleted messages", func(t *testing.T) {
path, err := os.MkdirTemp("", "superbot_logs")
require.NoError(t, err)
defer os.RemoveAll(path)

var startedAt time.Time
clientMock := &httpClientMock{
GetFunc: func(url string) (*http.Response, error) {
actualDelay := time.Since(startedAt)
assert.Equal(t, "https://t.me/radio_t_chat/101?single", url)
assert.True(t, actualDelay > 500*time.Millisecond && actualDelay < 600*time.Millisecond,
"delay expected 500ms, got %v", actualDelay)
return &http.Response{
StatusCode: 302,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
}, nil
},
}
reporter := NewLogger(path, 500*time.Millisecond, "radio_t_chat")
reporter.httpCl = clientMock
assert.NotNil(t, reporter)
assert.DirExists(t, path)

startedAt = time.Now()
reporter.Save(&bot.Message{ID: 101, Text: "something"})

time.Sleep(6 * time.Second) // wait for forced flush
logfile := fmt.Sprintf("%s/%s.log", path, time.Now().Format("20060102"))
assert.FileExists(t, logfile)
require.NoError(t, os.Remove(logfile))
})
}

0 comments on commit c66d235

Please sign in to comment.