Skip to content

Commit

Permalink
send webmentions concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
nekr0z committed Oct 6, 2021
1 parent a9c1763 commit 6f4fd0f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Pre-release]
### Added
- an option to process files concurrently
- an option to send webmentions concurrently

### Fixed
- not processing empty line as a valid endpoint address
Expand Down
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ oldDir = "testdata/prod/" # the directory where old (or current) site
webmentionsFile = "mentions.json" # the file to store pending webmentions in

concurrentFiles = 2 # number of files to process simultaneously, defaults to 1
concurrentRequests = 5 # number of requests that can be simultaneously sent to the same host when sending webmentions

## by default, only the `h-entry` is checked for changes; you can add other parts of a page here
## using CSS selectors
Expand Down
59 changes: 43 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type config struct {
websubHub []string
feedFiles []string
concurFiles int
concurReqs int
}

type mention struct {
Expand Down Expand Up @@ -113,7 +114,7 @@ func main() {
fmt.Printf("%v\n", err)
os.Exit(1)
}
sendMentions(mentions)
sendMentions(mentions, cfg.concurReqs)
fmt.Println("all sent")
default:
mentions, err := findWork(cfg)
Expand All @@ -127,21 +128,19 @@ func main() {
ping(hub, feeds)
}
}
sendMentions(mentions)
sendMentions(mentions, cfg.concurReqs)
fmt.Println("all sent")
}
}

func sendMentions(mentions []mention) {
func sendMentions(mentions []mention, smax int) {
sc := make(map[string]chan struct{})
var wg sync.WaitGroup
for _, m := range mentions {
fmt.Printf(" %v ... ", m.Dest)
err := send(m.Source, m.Dest)
if err != nil {
fmt.Printf("%v\n", err)
} else {
fmt.Printf("sent\n")
}
wg.Add(1)
go send(m.Source, m.Dest, &wg, sc, smax)
}
wg.Wait()
}

func dump(mentions []mention, file string) error {
Expand Down Expand Up @@ -250,6 +249,7 @@ func readConfig(path string) (config, error) {
ExcludeSelectors []string
WebmentionsFile string
ConcurrentFiles int
ConcurrentRequests int
}
type params struct {
WebsubHub []string
Expand Down Expand Up @@ -277,6 +277,10 @@ func readConfig(path string) (config, error) {
if conf.concurFiles < 0 {
conf.concurFiles = 0
}
conf.concurReqs = cfg.Webmentions.ConcurrentRequests - 1
if conf.concurReqs < 0 {
conf.concurReqs = 0
}
if len(cfg.Params.FeedFiles) == 0 {
conf.feedFiles = []string{"index.xml"}
} else {
Expand All @@ -285,21 +289,44 @@ func readConfig(path string) (config, error) {
return conf, err
}

func send(source, target string) error {
func send(source, target string, wg *sync.WaitGroup, sc map[string]chan struct{}, smax int) {
defer wg.Done()
u, err := url.Parse(target)
if err != nil {
fmt.Printf(" %v doesn't look like a parsable URL\n", target)
return
}
if _, ok := sc[u.Host]; !ok {
sc[u.Host] = make(chan struct{}, smax)
}
sc[u.Host] <- struct{}{}

fmt.Printf(" %v ... ", target)
client := webmention.New(nil)

endpoint, err := client.DiscoverEndpoint(target)
<-sc[u.Host]
if err != nil {
return err
} else if endpoint == "" {
return fmt.Errorf("no webmention support")
fmt.Println(err)
return
}
u, err = url.Parse(endpoint)
if err != nil {
fmt.Printf(" enpoint %v doesn't look like a parsable URL\n", endpoint)
return
}
if _, ok := sc[u.Host]; !ok {
sc[u.Host] = make(chan struct{}, smax)
}
sc[u.Host] <- struct{}{}
defer func() { <-sc[u.Host] }()

_, err = client.SendWebmention(endpoint, source, target)
if err != nil {
return err
fmt.Println(err)
return
}
return nil
fmt.Println("sent")
}

func compareDirs(conf config) ([]string, error) {
Expand Down
33 changes: 22 additions & 11 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ package main

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"testing"
)

Expand Down Expand Up @@ -186,22 +190,29 @@ func TestSend(t *testing.T) {

tests := map[string]struct {
url string
fail bool
want string
}{
"good": {good.URL, false},
"failed send": {bad.URL, true},
"bad page": {"destination", true},
"no endpoint": {empty.URL, true},
"good": {good.URL, "sent\n"},
"failed send": {bad.URL, "response error: 400\n"},
"bad page": {"destination", "Get \"destination\": unsupported protocol scheme \"\"\n"},
"no endpoint": {empty.URL, "no webmention rel found\n"},
}

rescueStdout := os.Stdout
defer func() { os.Stdout = rescueStdout }()
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
got := send(src, tc.url)
if !tc.fail && got != nil {
t.Fatalf("want nil, got: %v", got)
}
if tc.fail && got == nil {
t.Fatalf("want error, got nil")
r, w, _ := os.Pipe()
os.Stdout = w
var wg sync.WaitGroup
wg.Add(1)
sc := make(map[string]chan struct{})
send(src, tc.url, &wg, sc, 15)
w.Close()
out, _ := ioutil.ReadAll(r)
got := strings.SplitAfterN(string(out), " ... ", 2)[1]
if tc.want != got {
t.Fatalf("\nwant %q,\ngot: %q", tc.want, got)
}
})
}
Expand Down

0 comments on commit 6f4fd0f

Please sign in to comment.