Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
benson1029 committed Apr 19, 2024
2 parents 25b4b05 + bba4134 commit 66dd0e3
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 57 deletions.
83 changes: 39 additions & 44 deletions docs/docs/developer-notes/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,60 @@ sidebar_position: 1
A channel with buffer size $k$ is essentially
```go
type Channel struct {
buffer Queue // at most size k
mutex Mutex
buffer Queue // at most size k
waitingSend Queue
waitingRecv Queue
}
```

We have an auxiliary atomic function `ReleaseAndBlock`
When a thread $t$ wants to send to a channel,
```go
func ReleaseAndBlock(thread Thread, waitingQueue []Queue, mutex []Mutex) {
waker := Waker(thread)

for i := 0; i < len(waitingQueue); i++ {
waitingQueue[i].Push(waker)
}
for i := 0; i < len(mutex); i++ {
mutex[i].Unlock(thread)
func send(t thread, value value_t) {
if buffer.size() < k || waitingRecv.size() > 0 {
buffer.enqueue(value)
if waitingRecv.size() > 0 {
newThread := waitingRecv.dequeue()
newValue := buffer.dequeue()
newThread.Stash().Push(newValue)
scheduler.Wake(newThread)
}
} else {
waitingSend.enqueue(t)
}

thread.Block()
}
```

When a thread $t$ wants to send to a channel,
When a thread $t$ wants to receive from a channel,
```go
value := ...
c.mutex.Lock()
if c.buffer.Size() < k {
c.buffer.Push(value)
if c.waitingRecv.Size() > 0 {
newThread := c.waitingRecv.Pop()
new_value = c.buffer.Pop()
newThread.Stash().Push(new_value)
newThread.Unblock()
func recv(t thread) {
if waitingSend.size() > 0 {
newThread := waitingSend.dequeue()
newValue := newThread.sendValue
buffer.enqueue(newValue)
scheduler.Wake(newThread)
}
if buffer.size() > 0 {
newValue := buffer.dequeue()
t.Stash().Push(newValue)
} else {
waitingRecv.enqueue(t)
}
c.mutex.Unlock()
} else {
// buffer is full, wait
t.Stash().Push(value)
ReleaseAndBlock(t, []Queue{c.waitingSend}, []Mutex{c.mutex})
}
```

When a thread $t$ wants to receive from a channel,
```go
c.mutex.Lock()
if (c.buffer.Size() > 0) {
value := c.buffer.Pop()
if c.waitingSend.Size() > 0 {
newThread := c.waitingSend.Pop()
new_value := newThread.Stash().Pop()
c.buffer.Push(new_value)
newThread.Unblock()
}
c.mutex.Unlock()
} else {
// buffer is empty, wait
ReleaseAndBlock(t, []Queue{c.waitingRecv}, []Mutex{c.mutex})
// after unblocked, there is a value in stash
value := t.Stash().Pop()
var mutex sync.Mutex
c1 := make(chan int32)
c2 := make(chan int32)

// some code...

select {
case c1 <- 10:
mutex.Lock()
// critical section
mutex.Unlock()
case c2 <- 10:
// some code...
}
```
14 changes: 8 additions & 6 deletions src/go/ece/ece.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ class ECE {
};

// Evaluate the program.
while (!scheduler.empty()) {
while (!main_thread.control().empty()) {
if (scheduler.empty()) {
throw new Error(
"All goroutines are asleep - deadlock!"
);
}
if (this.visualize) {
this.take_snapshot(scheduler);
}
Expand All @@ -98,11 +103,8 @@ class ECE {
thread.free();
this.heap.clear_intermediate();
}

if (!main_thread.control().empty()) {
throw new Error(
"All goroutines are asleep - deadlock! (main thread control stack not empty)"
);
if (this.visualize) {
this.take_snapshot(scheduler);
}
scheduler.free();

Expand Down
74 changes: 68 additions & 6 deletions src/go/ece/microcode/concurrent/go.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,89 @@
import { Heap } from "../../../heap";
import { auto_cast } from "../../../heap/types/auto_cast";
import { ComplexFunction } from "../../../heap/types/complex/function";
import { ContextScheduler } from "../../../heap/types/context/scheduler";
import { ContextThread } from "../../../heap/types/context/thread";
import { ControlGoCallI } from "../../../heap/types/control/go_call_i";
import { ControlGoCallStmt } from "../../../heap/types/control/go_call_stmt";
import { TAG_COMPLEX_method } from "../../../heap/types/tags";
import { UserStruct } from "../../../heap/types/user/struct";

function evaluate_go_call_stmt(
cmd: number,
heap: Heap,
thread: ContextThread,
scheduler: ContextScheduler
) {
const cmd_object = auto_cast(heap, cmd) as ControlGoCallStmt;
const forked_cmd = cmd_object.get_body_address();
const C = thread.control();
const cmd_object_go = auto_cast(heap, cmd) as ControlGoCallStmt;
const cmd_object = cmd_object_go.get_body_address();

// The original thread does nothing.
const function_object = cmd_object.get_func_address();

const call_i_cmd = heap.allocate_any({
tag: "go_call_i",
num_args: cmd_object.get_number_of_args(),
});

C.push(call_i_cmd);
heap.free_object(call_i_cmd);

for (let i = 0; i < cmd_object.get_number_of_args(); i++) {
const arg = cmd_object.get_arg_address(i);
C.push(arg.address);
}

C.push(function_object.address);
scheduler.enqueue(thread);
}

function evaluate_go_call_i(
cmd: number,
heap: Heap,
thread: ContextThread,
scheduler: ContextScheduler
) {
const cmd_object = auto_cast(heap, cmd) as ControlGoCallI;

// The new thread runs the goroutine.
// The spawned thread runs the goroutine.
const call_i_cmd = heap.allocate_any({
tag: "call_i",
num_args: cmd_object.get_number_of_args(),
});
const pop_i_cmd = heap.allocate_any({ tag: "pop_i" });
const forked_thread = thread.fork();
forked_thread.control().clear();
forked_thread.control().push(forked_cmd.address);
forked_thread.control().push(pop_i_cmd);
forked_thread.control().push(call_i_cmd);
scheduler.enqueue(forked_thread);
forked_thread.free();
heap.free_object(call_i_cmd);
heap.free_object(pop_i_cmd);

// The original thread pops the arguments and the function object.
{
// Pop arguments from the stash
let args = [];
for (let i = 0; i < cmd_object.get_number_of_args(); i++) {
args.push(thread.stash().pop());
}
const function_object = auto_cast(
heap,
thread.stash().pop()
) as ComplexFunction;
const deferred_free = () => {
function_object.free();
for (let arg of args) {
heap.free_object(arg);
}
};
deferred_free();
if (function_object.get_tag() === TAG_COMPLEX_method) {
const self = auto_cast(heap, thread.stash().pop()) as UserStruct;
self.free();
}
scheduler.enqueue(thread);
}
}

export { evaluate_go_call_stmt };
export { evaluate_go_call_stmt, evaluate_go_call_i };
2 changes: 2 additions & 0 deletions src/go/ece/microcode/lookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ function lookup_microcode(tag: number): Function {
return control_function.evaluate_call_i;
case tags.TAG_CONTROL_go_call_stmt:
return concurrent_go.evaluate_go_call_stmt;
case tags.TAG_CONTROL_go_call_i:
return concurrent_go.evaluate_go_call_i;
case tags.TAG_CONTROL_chan_send:
return concurrent_channel.evaluate_chan_send;
case tags.TAG_CONTROL_chan_send_i:
Expand Down
59 changes: 59 additions & 0 deletions src/go/ece/tests/goroutine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ describe("Goroutines", () => {
const functions = `
func main() {
go f(1)
for i := 0; i < 10; i++ {}
}
func f(a int32) {
Expand All @@ -50,6 +51,7 @@ describe("Goroutines", () => {
func main() {
x := 1
go f(x)
for i := 0; i < 10; i++ {}
}
func f(a int32) {
Expand All @@ -66,6 +68,7 @@ describe("Goroutines", () => {
func main() {
go f(1)
go f(2)
for i := 0; i < 30; i++ {}
}
func f(a int32) {
Expand All @@ -77,4 +80,60 @@ describe("Goroutines", () => {
const result = evaluateFunctions(functions);
expect(result).toBe("1\n2\n1\n2\n1\n2\n1\n2\n1\n2\n");
})

it('main thread can terminate early', () => {
const functions = `
func f() func() int32 {
fmt.Println("hi");
return func () int32 {
for i := 0; i < 10; i++ {}
fmt.Println("inner");
return 5;
}
}
func main() {
go f()()
fmt.Println("bye")
}
`
const result = evaluateFunctions(functions);
expect(result).toBe("hi\nbye\n");
})

it('should evaluate call argument before spawning goroutine', () => {
const functions = `
func f(x int32) int32 {
return x
}
func g() int32 {
for i := 0; i < 100; i++ {}
fmt.Println("evaluating g")
return 5
}
func main() {
go f(g())
fmt.Println("bye")
}
`
const result = evaluateFunctions(functions);
expect(result).toBe("evaluating g\nbye\n");
})

it('should evaluate goroutine for struct method', () => {
const functions = `
type S struct {}
func (s S) foo() {}
func main() {
go S{}.foo();
for i := 0; i < 30; i++ {}
}
`
const result = evaluateFunctions(functions);
expect(result).toBe("");
})
})
2 changes: 2 additions & 0 deletions src/go/ece/tests/mutex.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ describe("Mutex", () => {
}
m.Unlock()
}()
for i := 0; i < 100; i++ {}
}
`
const result = evaluateFunctions(functions);
Expand All @@ -141,6 +142,7 @@ describe("Mutex", () => {
fmt.Println(i)
}
}()
for i := 0; i < 30; i++ {}
}
`
const result2 = evaluateFunctions(functions2);
Expand Down
2 changes: 1 addition & 1 deletion src/go/ece/tests/select.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe("Select", () => {
evaluateFunctions(functions);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toBe("All goroutines are asleep - deadlock! (main thread control stack not empty)");
expect(e.message).toBe("All goroutines are asleep - deadlock!");
}
})

Expand Down
14 changes: 14 additions & 0 deletions src/go/heap/heap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import { ControlExitScopeI } from "./types/control/exit_scope";
import { ControlFor } from "./types/control/for";
import { ControlForI } from "./types/control/for_i";
import { ControlFunction } from "./types/control/function";
import { ControlGoCallI } from "./types/control/go_call_i";
import { ControlGoCallStmt } from "./types/control/go_call_stmt";
import { ControlIf } from "./types/control/if";
import { ControlIfI } from "./types/control/if_i";
Expand Down Expand Up @@ -191,6 +192,7 @@ import {
TAGSTRING_CONTROL_case_send,
TAGSTRING_CONTROL_case_receive,
TAGSTRING_CONTROL_case_default,
TAGSTRING_CONTROL_go_call_i,
} from "./types/tags";
import { UserType } from "./types/user/type";
import { UserTypeArray } from "./types/user/type/array";
Expand Down Expand Up @@ -850,6 +852,16 @@ class Heap {
return ControlCallI.allocate(this, obj.num_args);
}

/**
* CONTROL_go_call_i
* Fields : number of children
* Children :
* - 4 bytes address of the number of arguments (PRIMITIVE_number)
*/
public allocate_CONTROL_go_call_i(obj: { tag: string, num_args: number }): number {
return ControlGoCallI.allocate(this, obj.num_args);
}

/**
* CONTROL_restore_env_i
* Fields : number of children
Expand Down Expand Up @@ -1468,6 +1480,8 @@ class Heap {
return this.allocate_CONTROL_return(obj);
case TAGSTRING_CONTROL_call_i:
return this.allocate_CONTROL_call_i(obj);
case TAGSTRING_CONTROL_go_call_i:
return this.allocate_CONTROL_go_call_i(obj);
case TAGSTRING_CONTROL_restore_env_i:
return this.allocate_CONTROL_restore_env_i(obj);
case TAGSTRING_CONTROL_return_i:
Expand Down
Loading

0 comments on commit 66dd0e3

Please sign in to comment.