Skip to content

Commit

Permalink
chore: various cleanups (#965)
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas authored Oct 25, 2024
1 parent 7857d78 commit eee809a
Show file tree
Hide file tree
Showing 28 changed files with 132 additions and 89 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ To debug potential deadlocks:
## Spec

The spec is written in Markdown, compatible with [Mmark](https://mmark.miek.nl/).
It is then converted in the [the "xml2rfc" Version 3 Vocabulary](https://tools.ietf.org/html/rfc7991).
It is then converted in [the "xml2rfc" Version 3 Vocabulary](https://tools.ietf.org/html/rfc7991).

To contribute to the protocol itself:

Expand Down
31 changes: 23 additions & 8 deletions bolt_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func NewBoltTransport(
return nil, &TransportError{err: err}
}

lastEventID, err := getDBLastEventID(db, bucketName)
if err != nil {
return nil, &TransportError{err: err}
}

return &BoltTransport{
logger: logger,
db: db,
Expand All @@ -107,13 +112,13 @@ func NewBoltTransport(

subscribers: NewSubscriberList(1e5),
closed: make(chan struct{}),
lastEventID: getDBLastEventID(db, bucketName),
lastEventID: lastEventID,
}, nil
}

func getDBLastEventID(db *bolt.DB, bucketName string) string {
func getDBLastEventID(db *bolt.DB, bucketName string) (string, error) {
lastEventID := EarliestLastEventID
db.View(func(tx *bolt.Tx) error {
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return nil // No data
Expand All @@ -125,8 +130,11 @@ func getDBLastEventID(db *bolt.DB, bucketName string) string {

return nil
})
if err != nil {
return "", fmt.Errorf("unable to get lastEventID from BoltDB: %w", err)
}

return lastEventID
return lastEventID, nil
}

// Dispatch dispatches an update to all subscribers and persists it in Bolt DB.
Expand Down Expand Up @@ -176,7 +184,7 @@ func (t *BoltTransport) persist(updateID string, updateJSON []byte) error {
// The sequence value is prepended to the update id to create an ordered list
key := bytes.Join([][]byte{prefix, []byte(updateID)}, []byte{})

// The DB is append only
// The DB is append-only
bucket.FillPercent = 1

t.lastSeq = seq
Expand Down Expand Up @@ -207,7 +215,9 @@ func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
t.Unlock()

if s.RequestLastEventID != "" {
t.dispatchHistory(s, toSeq)
if err := t.dispatchHistory(s, toSeq); err != nil {
return err
}
}

s.Ready()
Expand Down Expand Up @@ -239,8 +249,8 @@ func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error) {
}

//nolint:gocognit
func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) {
t.db.View(func(tx *bolt.Tx) error {
func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) error {
err := t.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(t.bucketName))
if b == nil {
s.HistoryDispatched(EarliestLastEventID)
Expand Down Expand Up @@ -286,6 +296,11 @@ func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) {

return nil
})
if err != nil {
return fmt.Errorf("unable to retrieve history from BoltDB: %w", err)
}

return nil
}

// Close closes the Transport.
Expand Down
4 changes: 2 additions & 2 deletions bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestNewBoltTransport(t *testing.T) {
u, _ = url.Parse("bolt:///test.db")
_, err = DeprecatedNewBoltTransport(u, zap.NewNop())

// The exact error message depends of the OS
// The exact error message depends on the OS
assert.Contains(t, err.Error(), "open /test.db:")

u, _ = url.Parse("bolt://test.db?cleanup_frequency=invalid")
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestBoltLastEventID(t *testing.T) {
// The sequence value is prepended to the update id to create an ordered list
key := bytes.Join([][]byte{prefix, []byte("foo")}, []byte{})

// The DB is append only
// The DB is append-only
bucket.FillPercent = 1

return bucket.Put(key, []byte("invalid"))
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func NewHubFromViper(v *viper.Viper) (*Hub, error) { //nolint:funlen,gocognit
return h, err
}

// Start is an helper method to start the Mercure Hub.
// Start is a helper method to start the Mercure Hub.
//
// Deprecated: use the Caddy server module or the standalone library instead.
func Start() {
Expand Down
2 changes: 1 addition & 1 deletion demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var uiContent embed.FS
// The Content-Type header will automatically be set according to the URL's extension.
func (h *Hub) Demo(w http.ResponseWriter, r *http.Request) {
// JSON-LD is the preferred format
mime.AddExtensionType(".jsonld", "application/ld+json")
_ = mime.AddExtensionType(".jsonld", "application/ld+json")

url := r.URL.String()
mimeType := mime.TypeByExtension(filepath.Ext(r.URL.Path))
Expand Down
12 changes: 6 additions & 6 deletions docs/UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
The `MERCURE_TRANSPORT_URL` environment variable and the `transport_url` directive have been deprecated.
Use the new `transport` directive instead.

The `MERCURE_TRANSPORT_URL` environement variable has been removed from the default `Caddyfile`s,
The `MERCURE_TRANSPORT_URL` environment variable has been removed from the default `Caddyfile`s,
but a backward compatibility layer is provided.

If both the `transport` and the deprecated `transport_url` are not explicitly set
and the `MERCURE_TRANSPORT_URL` environement variable is set, the `transport_url` will be automatically populated.
and the `MERCURE_TRANSPORT_URL` environment variable is set, the `transport_url` will be automatically populated.
To disable this behavior, unset `MERCURE_TRANSPORT_URL` or set it to an empty string.

Before:
Expand All @@ -30,12 +30,12 @@ To configure the transport using an environment variable, append the `transport

To prevent security issues, be sure to not pass credentials such as API tokens or password in `MERCURE_EXTRA_DIRECTIVES` (ex: when using transports [provided by the paid version](hub/cluster.md) such as Redis).

To pass credentials security, create a custom `Caddyfile` an use the `{env.MY_ENV_VAR}` syntax, which is interpreted at runtime.
To pass credentials security, create a custom `Caddyfile` and use the `{env.MY_ENV_VAR}` syntax, which is interpreted at runtime.

## 0.16.2

The `Caddyfile.dev` file has been renamed `dev.Caddyfile` to match new Caddy best practices
and prevent "ambigous adapter" issues.
and prevent "ambiguous adapter" issues.

## 0.14.4

Expand All @@ -52,9 +52,9 @@ The default dev key changed from `!ChangeMe!` to `!ChangeThisMercureHubJWTSecret

## 0.14

The query parameter allowing to fetch past events has been renamed `lastEventID`: in your clients, replace all occurences of the `Last-Event-ID` query parameter by `lastEventID`.
The query parameter allowing to fetch past events has been renamed `lastEventID`: in your clients, replace all occurrences of the `Last-Event-ID` query parameter by `lastEventID`.

Publishing public updates in topics not explictly listed in the `mercure.publish` JWT claim isn't supported anymore.
Publishing public updates in topics not explicitly listed in the `mercure.publish` JWT claim isn't supported anymore.
To let your publishers publish (public and private updates) in all topics, use the special `*` topic selector:

```patch
Expand Down
5 changes: 3 additions & 2 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
## Starting the Hub

The easiest way to get started is to [install the official Mercure.rocks
Hub](hub/install.md). When it's done, go directly to the next step. There are also other unofficial [libraries implementing Mercure](ecosystem/awesome.md#hubs-and-server-libraries). In the rest of this tutorial, we'll assume that the hub is running on `https://localhost` and that the `JWT_KEY` is `!ChangeThisMercureHubJWTSecretKey!`.
Hub](hub/install.md). When it's done, go directly to the next step.
There are other unofficial [libraries implementing Mercure](ecosystem/awesome.md#hubs-and-server-libraries). In the rest of this tutorial, we'll assume that the hub is running on `https://localhost` and that the `JWT_KEY` is `!ChangeThisMercureHubJWTSecretKey!`.

Please note that the hub is entirely optional when using the Mercure protocol. Your app can also implement the Mercure protocol directly.

Expand Down Expand Up @@ -47,7 +48,7 @@ Optionally, [the authorization mechanism](../spec/mercure.md#authorization) can

## Discovering the Mercure Hub

Also optionally, the hub URL can be automatically discovered:
Also, optionally, the hub URL can be automatically discovered:

![Discovery Schema](../spec/discovery.png)

Expand Down
2 changes: 1 addition & 1 deletion docs/hub/cloud.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The Cloud service is built on top of the free and open-source hub and helps fund

Purchase your managed Mercure.rocks hub [directly online](https://mercure.rocks/pricing)!

After purchase, your hub will be instantly provisionned and available under a `mercure.rocks` subdomain. A TLS certificate is also automatically created.
After purchase, your hub will be instantly provisioned and available under a `mercure.rocks` subdomain. A TLS certificate is also automatically created.

You'll have access to an administration interface allowing you to:

Expand Down
40 changes: 20 additions & 20 deletions docs/hub/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ To use it, just configure your custom domain name (if any) and your secret JWT k

## High Availability On Premise Version

The High Availability On Premise Mercure.rocks Hub is a drop-in replacement for the free Hub which allows to spread the load across as many servers as you want. It is designed to run on your own servers and is fault tolerant by default.
The High Availability On Premise Mercure.rocks Hub is a drop-in replacement for the free Hub which allows to spread the load across as many servers as you want. It is designed to run on your own servers and is fault-tolerant by default.

The HA version is shipped with transports having node synchronization capabilities.
These transports can rely on:
Expand Down Expand Up @@ -59,7 +59,7 @@ If you use the Helm chart, set the `license` value and change the Docker image t

### Transports

The clustered mode of the Mercure.rocks Hub requires a transport to work.
The clustered mode of the Mercure.rocks Hub requires transport to work.
Supported transports are Apache Pulsar, Apache Kafka and PostgreSQL.

#### Redis Transport
Expand All @@ -71,10 +71,10 @@ To install Redis, [read the documentation](https://redis.io/topics/quickstart).
Most Cloud Computing platforms also provide managed versions of Redis.

| Feature | Supported |
| --------------- | --------- |
| History ||
| Presence API ||
| Custom event ID ||
|-----------------|-----------|
| History | |
| Presence API | |
| Custom event ID | |

##### Configuration

Expand All @@ -90,10 +90,10 @@ To use Redis, the `MERCURE_TRANSPORT_URL` environment variable must be set like

The following options can be passed as query parameters of the URL set in `transport_url`:

| Parameter | Description | Default |
| ---------------- | ------------------------------------------------------------------------------------------------------ | ------- |
| `tls` | set to `1` to enable TLS support | `0` |
| `max_len_approx` | the approximative maximum number of messages to store in the history, set to `0` to store all messages | `0` |
| Parameter | Description | Default |
|------------------|------------------------------------------------------------------------------------------------------|---------|
| `tls` | set to `1` to enable TLS support | `0` |
| `max_len_approx` | the approximate maximum number of messages to store in the history, set to `0` to store all messages | `0` |

#### PostgreSQL Transport

Expand All @@ -103,8 +103,8 @@ It is mostly useful when using the Mercure.rocks Hub as an event store, or as a
To install PostgreSQL, [read the documentation](https://www.postgresql.org/docs/12/tutorial-install.html).
Most Cloud Computing platforms also provide managed versions of PostgreSQL.

| Feature | Supported |
| --------------- | ------------ |
| Feature | Supported |
|-----------------|-------------|
| History ||
| Presence API | ❌ (planned) |
| Custom event ID ||
Expand Down Expand Up @@ -138,10 +138,10 @@ The Mercure.rocks hub has been tested with:
- Heroku Kafka

| Feature | Supported |
| --------------- | --------- |
| History ||
| Presence API ||
| Custom event ID ||
|-----------------|-----------|
| History | |
| Presence API | |
| Custom event ID | |

##### Kafka Configuration

Expand All @@ -158,7 +158,7 @@ To use Kafka, the `MERCURE_TRANSPORT_URL` environment variable must be set like
The following options can be passed as query parameters of the URL set in `transport_url`:

| Parameter | Description |
| ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
|------------------|---------------------------------------------------------------------------------------------------------------------------------------------|
| `addr` | addresses of the Kafka servers, you can pass several `addr` parameters to use several Kafka servers (ex: `addr=host1:9092&addr=host2:9092`) |
| `topic` | the name of the Kafka topic to use (ex: `topic=mercure-ha`), **all Mercure.rocks hub instances must use the same topic** |
| `consumer_group` | consumer group, **must be different for every instance of the Mercure.rocks hub** (ex: `consumer_group=<random-string>`) |
Expand All @@ -172,8 +172,8 @@ The Pulsar transport should only be used when Pulsar is already part of your sta

To install Apache Pulsar, [read the documentation](https://pulsar.apache.org/docs/en/standalone/).

| Feature | Supported |
| --------------- | ------------ |
| Feature | Supported |
|-----------------|-------------|
| History ||
| Presence API ||
| Custom event ID | ❌ (planned) |
Expand All @@ -193,7 +193,7 @@ To use Pulsar, the `MERCURE_TRANSPORT_URL` environment variable must be set like
The following options can be passed as query parameters of the URL set in `transport_url`:

| Parameters | Description | |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | --- |
|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|-----|
| `topic` | the name of the Pulsar topic to use (ex: `topic=mercure`), **all Mercure.rocks hub instances must use the same topic** | |
| `subscription_name` | the subscription name for this node, **must be different for every instance of the Mercure.rocks hub** (ex: `subscription_name=<random-string>`) | |

Expand Down
4 changes: 2 additions & 2 deletions docs/hub/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ services:
## JWT Verification
JWT can validated using HMAC and RSA algorithms.
JWT can be validated using HMAC and RSA algorithms.
In addition, it's possible to use JSON Web Key Sets (JWK Sets) (usually provided by OAuth and OIDC providers such as Keycloak or Amazon Cognito) to validate the keys.
When using RSA public keys for verification make sure the key is properly formatted and make sure to set the correct algorithm as second parameter of the `publisher_jwt` or `subscriber_jwt` directives (for example `RS256`).

Here is an example of how to use environments variables with a RSA public key.
Here is an example of how to use environments variables with an RSA public key.

Generate keys (if you don't already have them):

Expand Down
2 changes: 1 addition & 1 deletion docs/hub/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ docker run \
dunglas/mercure
```

HTTPS support is automatically enabled. If you run the Mercure hub behind a reverse proxy [such as NGINX](cookbooks.md#using-nginx-as-an-http-2-reverse-proxy-in-front-of-the-hub), you usually want to use unencrypted HTTP.
HTTPS support is automatically enabled. If you run the Mercure hub behind a reverse proxy [such as NGINX](nginx.md), you usually want to use unencrypted HTTP.
This can be done like that:

```console
Expand Down
2 changes: 1 addition & 1 deletion docs/hub/traefik.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Use the Mercure.rocks Hub with Traefik Proxy

[Traefik](https://doc.traefik.io/traefik/) is a free and open source _edge router_ poular in the Docker and Kubernetes ecosystems.
[Traefik](https://doc.traefik.io/traefik/) is a free and open source _edge router_ popular in the Docker and Kubernetes ecosystems.

The following Docker Compose file exposes a Mercure.rocks hub through Traefik:

Expand Down
5 changes: 3 additions & 2 deletions docs/spec/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
In a nutshell [the WebSocket API](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) is low level, Mercure is a high level.
Mercure provides convenient built-in features such as authorization, re-connection, state reconciliation and a presence API ; while with WebSockets, you need to implement them yourself.

Also WebSockets [are not designed to leverage HTTP/2+](https://www.infoq.com/articles/websocket-and-http2-coexist) and are known to be [hard to secure](https://gravitational.com/blog/kubernetes-websocket-upgrade-security-vulnerability/). On the other hand Mercure relies on plain HTTP connections and benefits from the performance an security improvement built-in the latest versions of this protocol.
Also, WebSockets [are not designed to leverage HTTP/2+](https://www.infoq.com/articles/websocket-and-http2-coexist) and are known to be [hard to secure](https://gravitational.com/blog/kubernetes-websocket-upgrade-security-vulnerability/).
On the other hand Mercure relies on plain HTTP connections and benefits from the performance and security improvement built-in the latest versions of this protocol.

HTTP/2 connections are multiplexed and bidirectional by default (it was not the case of HTTP/1).
When using Mercure over a h2 connection (recommended), your app can receive data through Server-Sent Events, and send data to the server with regular `POST` (or `PUT`/`PATCH`/`DELETE`) requests, with no overhead.
Expand Down Expand Up @@ -34,7 +35,7 @@ In summary, use the Push API to send notifications to offline users (that will b
When using HTTP/2+ ([the default for almost all users](https://caniuse.com/#feat=http2)), the maximum number of simultaneous HTTP **streams** is negotiated between the server and the client (it defaults to 100).
When using HTTP 1.1, this limit is of 6.

By using template selectors and by passing several `topic` parameters, it's possible to subscribe to an unlimited of topics using a single HTTP connection.
By using template selectors and by passing several `topic` parameters, it's possible to subscribe to an unlimited number of topics using a single HTTP connection.

## How to Use Mercure with GraphQL?

Expand Down
6 changes: 3 additions & 3 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func (e *Event) String() string {
var b strings.Builder

if e.Type != "" {
fmt.Fprintf(&b, "event: %s\n", e.Type)
_, _ = fmt.Fprintf(&b, "event: %s\n", e.Type)
}
if e.Retry != 0 {
fmt.Fprintf(&b, "retry: %d\n", e.Retry)
_, _ = fmt.Fprintf(&b, "retry: %d\n", e.Retry)
}

r := strings.NewReplacer("\r\n", "\ndata: ", "\r", "\ndata: ", "\n", "\ndata: ")
fmt.Fprintf(&b, "id: %s\ndata: %s\n\n", e.ID, r.Replace(e.Data))
_, _ = fmt.Fprintf(&b, "id: %s\ndata: %s\n\n", e.ID, r.Replace(e.Data))

return b.String()
}
4 changes: 2 additions & 2 deletions hub.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package mercure helps implementing the Mercure protocol (https://mercure.rocks) in Go projects.
// It provides an implementation of a Mercure hub as a HTTP handler.
// Package mercure helps implement the Mercure protocol (https://mercure.rocks) in Go projects.
// It provides an implementation of a Mercure hub as an HTTP handler.
package mercure

import (
Expand Down
2 changes: 1 addition & 1 deletion jwt_keyfunc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestCreateJWTKeyfunc(t *testing.T) {
f, err := createJWTKeyfunc(([]byte{}), "invalid")
f, err := createJWTKeyfunc([]byte{}, "invalid")
require.Error(t, err)
require.Nil(t, f)
}
Expand Down
Loading

0 comments on commit eee809a

Please sign in to comment.