Skip to content

Commit

Permalink
fix: make pbStream function polymorphic (#40)
Browse files Browse the repository at this point in the history
What we want to say is that the type returned from `unwrap` will be
the original duplex passed in, but it's source will have changed to
return `Uint8ArrayList`s.  We can still pass duplexes that source
`Uint8Array`s though, so make the exported function polymorphic.
  • Loading branch information
achingbrain authored Mar 1, 2023
1 parent 76bf41c commit ccb2065
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
20 changes: 12 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface Encoder<T> {
/**
* Convenience methods for working with protobuf streams
*/
export interface ProtobufStream <TSink> {
export interface ProtobufStream <Stream extends Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array> = Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>> {
/**
* Read a set number of bytes from the stream
*/
Expand Down Expand Up @@ -82,7 +82,7 @@ export interface ProtobufStream <TSink> {
/**
* Returns the underlying stream
*/
unwrap: () => Duplex<Uint8ArrayList, TSink>
unwrap: () => Stream
}

export interface Opts {
Expand All @@ -97,14 +97,16 @@ export interface Opts {
maxDataLength: number
}

export function pbStream <TSink extends Uint8Array | Uint8ArrayList> (duplex: Duplex<Uint8ArrayList | Uint8Array, TSink>, opts = {}): ProtobufStream<TSink> {
const shake = handshake(duplex)
export function pbStream <Stream extends Duplex<Uint8ArrayList, Uint8Array | Uint8ArrayList>> (duplex: Stream, opts?: Partial<Opts>): ProtobufStream<Stream>
export function pbStream <Stream extends Duplex<Uint8ArrayList, Uint8Array | Uint8ArrayList>> (duplex: Duplex<Uint8Array>, opts?: Partial<Opts>): ProtobufStream<Stream>
export function pbStream (duplex: any, opts = {}): ProtobufStream<any> {
const shake = handshake<Uint8Array>(duplex)
const lpReader = lp.decode.fromReader(
shake.reader,
opts
)

const W: ProtobufStream<TSink> = {
const W: ProtobufStream<any> = {
read: async (bytes) => {
// just read
const { value } = await shake.reader.next(bytes)
Expand Down Expand Up @@ -142,10 +144,8 @@ export function pbStream <TSink extends Uint8Array | Uint8ArrayList> (duplex: Du
write: (data) => {
// just write
if (data instanceof Uint8Array) {
// @ts-expect-error writer should always accept pushing Uint8Arrays
shake.writer.push(data)
} else {
// @ts-expect-error writer should always accept pushing Uint8Arrays
shake.writer.push(data.subarray())
}
},
Expand All @@ -166,7 +166,11 @@ export function pbStream <TSink extends Uint8Array | Uint8ArrayList> (duplex: Du
unwrap: () => {
// returns vanilla duplex again, terminates all reads/writes from this object
shake.rest()
return shake.stream

duplex.source = shake.stream.source
duplex.sink = shake.stream.sink

return duplex
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Object.keys(tests).forEach(key => {
const test = tests[key]

describe(`it-pb-rpc ${key}`, () => {
let w: ProtobufStream<Uint8Array>
let w: ProtobufStream

before(async () => {
w = pbStream(pair<Uint8Array>())
Expand Down

0 comments on commit ccb2065

Please sign in to comment.