Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use effects for actions #34

Merged
merged 5 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .eslintrc.json

This file was deleted.

19 changes: 8 additions & 11 deletions lib/agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { expect, console } from '~/test-utils';
import { Agent } from './agent';
import { Task, NoAction } from './task';
import { Sensor, Subscriber } from './sensor';
import { Observable } from './observable';

import { setTimeout } from 'timers/promises';

Expand All @@ -24,7 +23,7 @@ describe('Agent', () => {
});
agent.seek({ never: true });
await expect(agent.wait(1000)).to.be.rejected;
await agent.stop();
agent.stop();
});

it('it continues looking for plan unless max retries is set', async () => {
Expand Down Expand Up @@ -64,18 +63,16 @@ describe('Agent', () => {
expect(count).to.deep.equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('it allows to use observables as actions', async () => {
it('allows to use observables as actions', async () => {
const counter = Task.of({
condition: (state: number, { target }) => state < target,
effect: (_: number, { target }) => target,
action: (state: number, { target }) =>
Observable.of(async (s) => {
while (state < target) {
state = state + 1;
s.next(state);
await setTimeout(10);
}
}),
action: async function* (state: number, { target }) {
while (state < target) {
yield ++state;
await setTimeout(10);
}
},
});
const agent = Agent.of({
initial: 0,
Expand Down
4 changes: 2 additions & 2 deletions lib/agent/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from '../assert';
import { NullLogger } from '../logger';
import { Observable, Subject } from '../observable';
import { Subscribable, Subject } from '../observable';
import { Planner } from '../planner';
import { Sensor } from '../sensor';
import { Target } from '../target';
Expand All @@ -10,7 +10,7 @@ import { AgentOpts, NotStarted, Result } from './types';

export * from './types';

export interface Agent<TState = any> extends Observable<TState> {
export interface Agent<TState = any> extends Subscribable<TState> {
/**
* Tells the agent to seek a new target.
*
Expand Down
51 changes: 23 additions & 28 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { setTimeout as delay } from 'timers/promises';
import { diff, patch, Operation as PatchOperation } from 'mahler-wasm';

import { Observer, Observable } from '../observable';
import { Observer } from '../observable';
import { Planner, Node, EmptyNode } from '../planner';
import { Sensor, Subscription } from '../sensor';
import { Target } from '../target';
Expand Down Expand Up @@ -100,34 +100,29 @@ export class Runtime<TState> {
// what we need to compare the updated state to
const before = this.state;
const res = action(before);
if (Observable.is<TState>(res)) {
const runtime = this;
// If the action result is an observable, then
// we need to subscribe to it and update the internal
// state as the observable emits new values
return new Promise((resolve, reject) => {
res.subscribe({
next(s) {
const changes = diff(before, s);
if (changes.length > 0) {
runtime.state = patch(runtime.state, changes);
runtime.observer.next(runtime.state);
}
},
complete() {
// There should be no more changes to perform
// here
resolve([]);
},
error(e) {
reject(e);
},
});
const runtime = this;
// If the action result is an observable, then
// we need to subscribe to it and update the internal
// state as the observable emits new values
return new Promise((resolve, reject) => {
res.subscribe({
next(s) {
const changes = diff(before, s);
if (changes.length > 0) {
runtime.state = patch(runtime.state, changes);
runtime.observer.next(runtime.state);
}
},
complete() {
// There should be no more changes to perform
// here
resolve([]);
},
error(e) {
reject(e);
},
});
} else {
const after = await res;
return diff(before, after);
}
});
} catch (e) {
throw new ActionRunFailed(action, e);
}
Expand Down
1 change: 0 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from './agent';
export * from './observable';
export * from './sensor';
export * from './task';
export * from './logger';
Expand Down
256 changes: 256 additions & 0 deletions lib/observable.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
import { expect } from '~/test-utils';
import { Observable } from './observable';

import { setTimeout } from 'timers/promises';
import { stub } from 'sinon';

const interval = (period: number): Observable<number> => {
return Observable.from(
(async function* () {
let i = 0;
while (true) {
await setTimeout(period);
yield i++;
}
})(),
);
};

describe('Observable', () => {
it('creates observables from promises', async () => {
const o = Observable.from(Promise.resolve(42));

// Add a subscriber
const next = stub();

const promise = new Promise<void>((resolve, reject) => {
const subscriber = o.subscribe({
next,
error: reject,
complete: () => {
resolve();
subscriber.unsubscribe();
},
});
});
await promise;
expect(next).to.have.been.calledOnce;
expect(next).to.have.been.calledWith(42);
});

it('creates observables from values', async () => {
const o = Observable.of(42);

// Add a subscriber
const next = stub();

const promise = new Promise<void>((resolve, reject) => {
const subscriber = o.subscribe({
next,
error: reject,
complete: () => {
resolve();
subscriber.unsubscribe();
},
});
});
await promise;
expect(next).to.have.been.calledOnce;
expect(next).to.have.been.calledWith(42);
});

it('only starts reading values when a subscriber is added', async () => {
const read = stub();
const o = interval(10).map(read);

// The sensor function should not be called before a subscriber is added
expect(read).to.not.have.been.called;

// Add a subscriber
const next = stub();
const subscriber = o.subscribe(next);

await setTimeout(39);

// Only now the sensor function should be called
expect(read).to.have.been.calledThrice;
expect(next).to.have.been.calledThrice;

subscriber.unsubscribe();
});

it('it returns a stream of values', async () => {
const o = interval(10);

// Add a subscriber
const next = stub();
const subscriber = o.subscribe(next);

await setTimeout(39);

// Only now the sensor function should be called
expect(next).to.have.been.calledThrice;
expect(next).to.have.been.calledWith(0);
expect(next).to.have.been.calledWith(1);
expect(next).to.have.been.calledWith(2);

subscriber.unsubscribe();

const next2 = stub();

// Testing unsubcribe
next.reset();
const subscriber2 = o.subscribe(next2);
await setTimeout(39);

expect(next2).to.have.been.calledThrice;
expect(next2).to.have.been.calledWith(4);
expect(next2).to.have.been.calledWith(5);
expect(next2).to.have.been.calledWith(6);
expect(next).to.not.have.been.called;

subscriber2.unsubscribe();
});

it('it shares values between subscribers', async () => {
// The current implementation makes it so values
// produced by async generators will be shared by
// multiple observers, which is probably not what we
// want, but it allows for a simpler implementation
const o = interval(10);

// Add a subscriber
const next = stub();
const next2 = stub();
const subscriber = o.subscribe(next);
const subscriber2 = o.subscribe(next2);

await setTimeout(45);

// Only now the sensor function should be called
expect(next).to.have.been.calledTwice;
expect(next2).to.have.been.calledTwice;
expect(next).to.have.been.calledWith(0);
expect(next2).to.have.been.calledWith(1);
expect(next).to.have.been.calledWith(2);
expect(next2).to.have.been.calledWith(3);

subscriber.unsubscribe();
subscriber2.unsubscribe();
});

it('it allows mapping over values', async () => {
const o = interval(10).map((x) => x * 2);

// Add a subscriber
const next = stub();
const subscriber = o.subscribe(next);

await setTimeout(39);

// Only now the sensor function should be called
expect(next).to.have.been.calledThrice;
expect(next).to.have.been.calledWith(0);
expect(next).to.have.been.calledWith(2);
expect(next).to.have.been.calledWith(4);

subscriber.unsubscribe();
});

it('it allows to merge observables', async () => {
const letters = Observable.of('a', 'b', 'c');
const o = letters.flatMap((x) => interval(10).map((y) => x + y));

// Add a subscriber
const next = stub();
const subscriber = o.subscribe(next);

await setTimeout(55);

// Only now the sensor function should be called
expect(next).to.have.been.calledWith('a0');
expect(next).to.have.been.calledWith('a1');
expect(next).to.have.been.calledWith('a2');
expect(next).to.have.been.calledWith('b0');
expect(next).to.have.been.calledWith('b1');

subscriber.unsubscribe();
});

it('it accepts generators returning values', async () => {
const letters = Observable.from(
(function* () {
yield 'a';
yield 'b';
return 'c';
})(),
);

// Add a subscriber
const next = stub();

const promise = new Promise<void>((resolve, reject) => {
const subscriber = letters.subscribe({
next,
error: reject,
complete: () => {
resolve();
subscriber.unsubscribe();
},
});
});
await promise;
expect(next).to.have.been.calledThrice;
expect(next).to.have.been.calledWith('a');
expect(next).to.have.been.calledWith('b');
expect(next).to.have.been.calledWith('c');
});

it('it propagates errors', async () => {
const letters = Observable.from(
(function* () {
yield 'a';
yield 'b';
throw new Error('test');
})(),
);

// Add a subscriber
const next = stub();

const promise = new Promise<void>((resolve, reject) => {
const subscriber = letters.subscribe({
next,
error: reject,
complete: () => {
resolve();
subscriber.unsubscribe();
},
});
});
await expect(promise).to.be.rejected;
expect(next).to.have.been.calledTwice;
});

it('it ignores the error if no error handler is provided', async () => {
const letters = Observable.from(
(function* () {
yield 'a';
yield 'b';
throw new Error('test');
})(),
);

const next = stub();
const rejection = stub();

// This will produce an unhandled rejection
process.once('unhandledRejection', rejection);

const subscriber = letters.subscribe(next);
await setTimeout(10);
expect(next).to.have.been.calledTwice;
expect(rejection).to.have.been.called;
subscriber.unsubscribe();
});
});
Loading