-
-
Notifications
You must be signed in to change notification settings - Fork 413
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
# Conflicts: # plugins/websockets/plugin.go
- Loading branch information
Showing
64 changed files
with
2,307 additions
and
1,034 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package pubsub | ||
|
||
import json "github.com/json-iterator/go" | ||
|
||
// Message represents a single message with payload bound to a particular topic | ||
type Message struct { | ||
// Topic (channel in terms of redis) | ||
Topic string `json:"topic"` | ||
// Payload (on some decode stages might be represented as base64 string) | ||
Payload []byte `json:"payload"` | ||
} | ||
|
||
func (m *Message) MarshalBinary() (data []byte, err error) { | ||
return json.Marshal(m) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package broadcast | ||
|
||
/* | ||
# Global redis config (priority - 2) | ||
websockets: # <----- one of possible subscribers | ||
path: /ws | ||
broker: default # <------ broadcast broker to use --------------- | | ||
| match | ||
broadcast: # <-------- broadcast entry point plugin | | ||
default: # <----------------------------------------------------- | | ||
driver: redis | ||
# local redis config (priority - 1) | ||
test: | ||
driver: memory | ||
priority local -> global | ||
*/ | ||
|
||
// Config ... | ||
type Config struct { | ||
Data map[string]interface{} `mapstructure:"broadcast"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package broadcast | ||
|
||
import "github.com/spiral/roadrunner/v2/pkg/pubsub" | ||
|
||
type Broadcaster interface { | ||
GetDriver(key string) (pubsub.SubReader, error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
package broadcast | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/google/uuid" | ||
endure "github.com/spiral/endure/pkg/container" | ||
"github.com/spiral/errors" | ||
"github.com/spiral/roadrunner/v2/pkg/pubsub" | ||
"github.com/spiral/roadrunner/v2/plugins/config" | ||
"github.com/spiral/roadrunner/v2/plugins/logger" | ||
) | ||
|
||
const ( | ||
PluginName string = "broadcast" | ||
// driver is the mandatory field which should present in every storage | ||
driver string = "driver" | ||
|
||
redis string = "redis" | ||
memory string = "memory" | ||
) | ||
|
||
type Plugin struct { | ||
sync.RWMutex | ||
|
||
cfg *Config | ||
cfgPlugin config.Configurer | ||
log logger.Logger | ||
// publishers implement Publisher interface | ||
// and able to receive a payload | ||
publishers map[string]pubsub.PubSub | ||
constructors map[string]pubsub.Constructor | ||
} | ||
|
||
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { | ||
const op = errors.Op("broadcast_plugin_init") | ||
if !cfg.Has(PluginName) { | ||
return errors.E(op, errors.Disabled) | ||
} | ||
p.cfg = &Config{} | ||
// unmarshal config section | ||
err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) | ||
if err != nil { | ||
return errors.E(op, err) | ||
} | ||
|
||
p.publishers = make(map[string]pubsub.PubSub) | ||
p.constructors = make(map[string]pubsub.Constructor) | ||
|
||
p.log = log | ||
p.cfgPlugin = cfg | ||
return nil | ||
} | ||
|
||
func (p *Plugin) Serve() chan error { | ||
return make(chan error) | ||
} | ||
|
||
func (p *Plugin) Stop() error { | ||
return nil | ||
} | ||
|
||
func (p *Plugin) Collects() []interface{} { | ||
return []interface{}{ | ||
p.CollectPublishers, | ||
} | ||
} | ||
|
||
// CollectPublishers collect all plugins who implement pubsub.Publisher interface | ||
func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) { | ||
// key redis, value - interface | ||
p.constructors[name.Name()] = constructor | ||
} | ||
|
||
// Publish is an entry point to the websocket PUBSUB | ||
func (p *Plugin) Publish(m *pubsub.Message) error { | ||
p.Lock() | ||
defer p.Unlock() | ||
|
||
const op = errors.Op("broadcast_plugin_publish") | ||
|
||
// check if any publisher registered | ||
if len(p.publishers) > 0 { | ||
for j := range p.publishers { | ||
err := p.publishers[j].Publish(m) | ||
if err != nil { | ||
return errors.E(op, err) | ||
} | ||
} | ||
return nil | ||
} else { | ||
p.log.Warn("no publishers registered") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (p *Plugin) PublishAsync(m *pubsub.Message) { | ||
go func() { | ||
p.Lock() | ||
defer p.Unlock() | ||
// check if any publisher registered | ||
if len(p.publishers) > 0 { | ||
for j := range p.publishers { | ||
err := p.publishers[j].Publish(m) | ||
if err != nil { | ||
p.log.Error("publishAsync", "error", err) | ||
// continue publish to other registered publishers | ||
continue | ||
} | ||
} | ||
} else { | ||
p.log.Warn("no publishers registered") | ||
} | ||
}() | ||
} | ||
|
||
func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit | ||
const op = errors.Op("broadcast_plugin_get_driver") | ||
|
||
// choose a driver | ||
if val, ok := p.cfg.Data[key]; ok { | ||
// check type of the v | ||
// should be a map[string]interface{} | ||
switch t := val.(type) { | ||
// correct type | ||
case map[string]interface{}: | ||
if _, ok := t[driver]; !ok { | ||
panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val))) | ||
} | ||
default: | ||
return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) | ||
} | ||
|
||
// config key for the particular sub-driver kv.memcached | ||
configKey := fmt.Sprintf("%s.%s", PluginName, key) | ||
|
||
switch val.(map[string]interface{})[driver] { | ||
case memory: | ||
if _, ok := p.constructors[memory]; !ok { | ||
return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) | ||
} | ||
ps, err := p.constructors[memory].PSConstruct(configKey) | ||
if err != nil { | ||
return nil, errors.E(op, err) | ||
} | ||
|
||
// save the initialized publisher channel | ||
// for the in-memory, register new publishers | ||
p.publishers[uuid.NewString()] = ps | ||
|
||
return ps, nil | ||
case redis: | ||
if _, ok := p.constructors[redis]; !ok { | ||
return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) | ||
} | ||
|
||
// first - try local configuration | ||
switch { | ||
case p.cfgPlugin.Has(configKey): | ||
ps, err := p.constructors[redis].PSConstruct(configKey) | ||
if err != nil { | ||
return nil, errors.E(op, err) | ||
} | ||
|
||
// if section already exists, return new connection | ||
if _, ok := p.publishers[configKey]; ok { | ||
return ps, nil | ||
} | ||
|
||
// if not - initialize a connection | ||
p.publishers[configKey] = ps | ||
return ps, nil | ||
|
||
// then try global if local does not exist | ||
case p.cfgPlugin.Has(redis): | ||
ps, err := p.constructors[redis].PSConstruct(configKey) | ||
if err != nil { | ||
return nil, errors.E(op, err) | ||
} | ||
|
||
// if section already exists, return new connection | ||
if _, ok := p.publishers[configKey]; ok { | ||
return ps, nil | ||
} | ||
|
||
// if not - initialize a connection | ||
p.publishers[configKey] = ps | ||
return ps, nil | ||
} | ||
} | ||
} | ||
return nil, errors.E(op, errors.Str("could not find driver by provided key")) | ||
} | ||
|
||
func (p *Plugin) RPC() interface{} { | ||
return &rpc{ | ||
plugin: p, | ||
log: p.log, | ||
} | ||
} | ||
|
||
func (p *Plugin) Name() string { | ||
return PluginName | ||
} | ||
|
||
func (p *Plugin) Available() {} |
Oops, something went wrong.