Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 27, 2022
1 parent 0143a64 commit 980898b
Showing 1 changed file with 51 additions and 13 deletions.
64 changes: 51 additions & 13 deletions content/en/docs/16.0/reference/vreplication/vstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ have the binary logs going back to the creation of the table — and then begin
new changes to the table from that point on. It also supports resuming this initial copy
phase if it's interrupted for any reason.

Events in the stream are [MySQL row based binary log events](https://dev.mysql.com/doc/refman/en/mysqlbinlog-row-events.html)
and can be processed by event bridges which support Vitess such as
[Debezium](https://debezium.io/documentation/reference/stable/connectors/vitess.html)
and to some extent bridges that support MySQL such as
[GoldenGate](https://docs.oracle.com/en/middleware/goldengate/core/21.3/gghdb/using-oracle-goldengate-mysql.html).
Events in the stream are [MySQL row based binary log events](https://dev.mysql.com/doc/refman/en/mysqlbinlog-row-events.html) — with [extended metadata](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#VEvent)
— and can be processed by event bridges which support Vitess such as
[Debezium](https://debezium.io/documentation/reference/stable/connectors/vitess.html).
Other products such as [AirByte](https://airbyte.com) can also be used with [custom
Vitess connectors](https://docs.airbyte.com/connector-development/).

Expand All @@ -35,21 +33,61 @@ environments by many Vitess users.

## API Details

`VStream` is a gRPC service that is part of the `vtgate` service.
`VStream` is a gRPC that is part of the `vtgate` service. You would send a
[VStreamRequest](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/vtgate#VStreamRequest)
to a `vtgate` process's `--grpc_port` and receive a
[VStreamResponse](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/vtgate#VStreamResponse).
As part of the gRPC request, you can specify the following
[flags](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/vtgate#VStreamFlags):

#### RPC Calls
* VTGate `Vstream` gRPC
#### MinimizeSkew

#### Types
**Type** bool\
**Default** false

When enabled the `vtgate` will keep the events in the stream roughly time aligned — it is aggregating streams coming
from each of the shards involved — using the event timestamps to ensure the maximum time skew between the streams is
under 10 minutes. When it detects skew between the base shard streams it will pause sending the client more events and allow the lagging shard(s)
to catch up.

{{< info >}}
Note that the order is not guaranteed across shards and the client will need to examine the event timestamps.
{{</ info >}}

#### HeartbeatInterval

**Type** unsigned integer\
**Default** 0 (none)

How frequently, in seconds, to send heartbeat events to the client when there are no other events in the stream to
send.

#### StopOnReshard

**Type** bool\
**Default** false

When enabled the `vtgate` will send reshard events to the client and stop sending any further events for the current
[VStreamRequest](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/vtgate#VStreamRequest).

#### Cells

**Type** string\
**Default** ""

If specified these cells (comma-separated) are used [to pick source tablets from](../tablet_selection/). When no
value is specified the `vtgate` will default to looking for source tablets within its own local cell.

### Types
* [VStreamRequest](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/vtgate#VStreamRequest)
* [VStreamResponse](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/vtgate#VStreamResponse)
* [VStreamFlags](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/vtgate#VStreamFlags)
* [VEvent](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#VEvent)
* [VGtid](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#VGtid)
* [LastPKEvent](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#LastPKEvent)
* [TableLastPK](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#TableLastPK)
* [VEvent](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#VEvent)

#### Example Usage
### Example Usage
```
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
if err != nil {
Expand All @@ -72,13 +110,13 @@ var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
Gtid: "MySQL56/89f66ef2-863a-11ed-9bdf-3d270fd3f552:1-30219"
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
Gtid: "MySQL56/2174b383-5441-11e8-b90a-c80aa9429562:1-29516,24da167-0c0c-11e8-8442-00059a3c7b00:1-19"
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
Expand Down

0 comments on commit 980898b

Please sign in to comment.