From 2e779c17efa2d2ce822914c06340c1dfbef1cd55 Mon Sep 17 00:00:00 2001 From: Snikimonkd Date: Fri, 17 Jan 2025 00:22:18 +0300 Subject: [PATCH] [FEAT] add golang like channels --- lib/std/threads/buffered_chan.c3 | 136 ++++++++++++++ lib/std/threads/chan.c3 | 14 ++ lib/std/threads/unbuffered_chan.c3 | 127 +++++++++++++ test/unit/stdlib/threads/chan.c3 | 282 +++++++++++++++++++++++++++++ 4 files changed, 559 insertions(+) create mode 100644 lib/std/threads/buffered_chan.c3 create mode 100644 lib/std/threads/chan.c3 create mode 100644 lib/std/threads/unbuffered_chan.c3 create mode 100644 test/unit/stdlib/threads/chan.c3 diff --git a/lib/std/threads/buffered_chan.c3 b/lib/std/threads/buffered_chan.c3 new file mode 100644 index 000000000..21b949922 --- /dev/null +++ b/lib/std/threads/buffered_chan.c3 @@ -0,0 +1,136 @@ +module std::thread::buffered_chan(); + +import std::thread::chan; + +struct BufferedChan (Chan()) { + Mutex mu; + Type[SIZE] buf; + usz size; + bool closed; + usz elems; + + usz sendx; + usz send_waiting; + thread::ConditionVariable send_cond; + + usz readx; + usz read_waiting; + thread::ConditionVariable read_cond; +} + +fn Chan()! BufferedChan.init(&self) @dynamic { + self.mu.init()!; + defer catch (void)self.mu.destroy(); + self.send_cond.init()!; + defer catch (void)self.send_cond.destroy(); + self.read_cond.init()!; + defer catch (void)self.read_cond.destroy(); + + self.size = SIZE; + self.elems = 0; + self.sendx = 0; + self.send_waiting = 0; + self.readx = 0; + self.read_waiting = 0; + return self; +} + +fn void! BufferedChan.destroy(&self) @dynamic { + anyfault err = @catch(self.mu.destroy()); + err = @catch(self.send_cond.destroy()) ?: err; + err = @catch(self.read_cond.destroy()) ?: err; + + if (err) return err?; +} + +fn void! BufferedChan.push(&self, Type val) @dynamic { + self.mu.lock()!; + defer catch (void)self.mu.unlock(); + + // if chan is full -> wait + while (self.elems == self.size && !self.closed) { + self.send_waiting++; + self.send_cond.wait(&self.mu)!; + self.send_waiting--; + } + + // check if chan is closed + if (self.closed) { + self.mu.unlock()!; + return chan::ChanError().CLOSED?; + } + + // save value to buf + self.buf[self.sendx] = val; + + // move pointer + self.sendx++; + if (self.sendx == self.size) { + self.sendx = 0; + } + + // change elems counter + self.elems++; + + // if someone is waiting -> awake him + if (self.read_waiting > 0) { + self.read_cond.signal()!; + } + + self.mu.unlock()!; + + return; +} + +fn Type! BufferedChan.pop(&self) @dynamic { + self.mu.lock()!; + defer catch (void)self.mu.unlock(); + + // if chan is empty -> wait for sender + while (self.elems == 0 && !self.closed) { + self.read_waiting++; + self.read_cond.wait(&self.mu)!; + self.read_waiting--; + } + + // check if chan is closed and empty + if (self.closed && self.elems == 0) { + self.mu.unlock()!; + return chan::ChanError().CLOSED?; + } + + // read from buf + Type ret = self.buf[self.readx]; + + // move pointer + self.readx++; + if (self.readx == self.size) { + self.readx = 0; + } + + // change elems counter + self.elems--; + + // if someone is waiting -> awake him + if (self.send_waiting > 0) { + self.send_cond.signal()!; + } + + self.mu.unlock()!; + + return ret; +} + +fn void! BufferedChan.close(&self) @dynamic { + anyfault err = @catch(self.mu.lock()); + + self.closed = true; + + err = @catch(self.read_cond.broadcast()) ?: err; + err = @catch(self.send_cond.broadcast()) ?: err; + + err = @catch(self.mu.unlock()) ?: err; + + if (err) return err?; +} + diff --git a/lib/std/threads/chan.c3 b/lib/std/threads/chan.c3 new file mode 100644 index 000000000..b62836124 --- /dev/null +++ b/lib/std/threads/chan.c3 @@ -0,0 +1,14 @@ +module std::thread::chan(); + +fault ChanError { + CLOSED, +} + +interface Chan { + fn Chan! init(); + fn void! destroy(); + fn void! push(Type val); + fn Type! pop(); + fn void! close(); +} + diff --git a/lib/std/threads/unbuffered_chan.c3 b/lib/std/threads/unbuffered_chan.c3 new file mode 100644 index 000000000..edc2644f0 --- /dev/null +++ b/lib/std/threads/unbuffered_chan.c3 @@ -0,0 +1,127 @@ +module std::thread::unbuffered_chan(); + +import std::thread::chan; + +struct UnbufferedChan (Chan()) { + Mutex mu; + Type buf; + bool closed; + + Mutex send_mu; + usz send_waiting; + thread::ConditionVariable send_cond; + + Mutex read_mu; + usz read_waiting; + thread::ConditionVariable read_cond; +} + +fn Chan()! UnbufferedChan.init(&self) @dynamic { + self.mu.init()!; + defer catch (void)self.mu.destroy(); + self.send_mu.init()!; + defer catch (void)self.send_mu.destroy(); + self.send_cond.init()!; + defer catch (void)self.send_cond.destroy(); + self.read_mu.init()!; + defer catch (void)self.read_mu.destroy(); + self.read_cond.init()!; + defer catch (void)self.read_cond.destroy(); + + self.send_waiting = 0; + self.read_waiting = 0; + + return self; +} + +fn void! UnbufferedChan.destroy(&self) @dynamic { + anyfault err = @catch(self.mu.destroy()); + err = @catch(self.send_mu.destroy()) ?: err; + err = @catch(self.send_cond.destroy()) ?: err; + err = @catch(self.read_mu.destroy()) ?: err; + err = @catch(self.read_cond.destroy()) ?: err; + + if (err) return err?; +} + +fn void! UnbufferedChan.push(&self, Type val) @dynamic { + self.mu.lock()!; + defer catch (void)self.mu.unlock(); + self.send_mu.lock()!; + defer catch (void)self.send_mu.unlock(); + + if (self.closed) { + self.mu.unlock()!; + self.send_mu.unlock()!; + return chan::ChanError().CLOSED?; + } + + // store value in the buffer + self.buf = val; + // show that we are waiting for reader + self.send_waiting++; + + // if reader is already waiting for us -> awake him + if (self.read_waiting > 0) { + self.read_cond.signal()!; + } + + // wait until reader takes value from buffer + self.send_cond.wait(&self.mu)!; + + if (self.closed) { + self.mu.unlock()!; + self.send_mu.unlock()!; + return chan::ChanError().CLOSED?; + } + + self.mu.unlock()!; + self.send_mu.unlock()!; + + return; +} + +fn Type! UnbufferedChan.pop(&self) @dynamic { + self.mu.lock()!; + defer catch (void)self.mu.unlock(); + self.read_mu.lock()!; + defer catch (void)self.read_mu.unlock(); + + // if no one is waiting, then there is nothing in the buffer + while (!self.closed && self.send_waiting == 0) { + self.read_waiting++; + self.read_cond.wait(&self.mu)!; + self.read_waiting--; + } + + if (self.closed) { + self.mu.unlock()!; + self.read_mu.unlock()!; + return chan::ChanError().CLOSED?; + } + + // take value from buffer + Type ret = self.buf; + + // awake sender + self.send_waiting--; + self.send_cond.signal()!; + + self.mu.unlock()!; + self.read_mu.unlock()!; + + return ret; +} + +fn void! UnbufferedChan.close(&self) @dynamic { + anyfault err = @catch(self.mu.lock()); + + self.closed = true; + + err = @catch(self.read_cond.broadcast()) ?: err; + err = @catch(self.send_cond.broadcast()) ?: err; + + err = @catch(self.mu.unlock()) ?: err; + + if (err) return err?; +} diff --git a/test/unit/stdlib/threads/chan.c3 b/test/unit/stdlib/threads/chan.c3 new file mode 100644 index 000000000..e1c375fa9 --- /dev/null +++ b/test/unit/stdlib/threads/chan.c3 @@ -0,0 +1,282 @@ +module thread_test; + +import std::thread::buffered_chan; +import std::thread::unbuffered_chan; +import std::thread::chan; +import std::thread; +import std::time; +import std::io; + +fn void init_destroy_buffered() @test { + for (usz i = 0; i < 20; i++) { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + } +} + +fn void init_destroy_unbuffered() @test { + for (usz i = 0; i < 20; i++) { + Chan() c = UnbufferedChan(){}.init()!!; + defer c.destroy()!!; + } +} + +fn void push_buffered_no_lock() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.push(1)!!; +} + +fn void push_pop_buffered_no_locks() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.push(123)!!; + int got = c.pop()!!; + assert(got == 123); +} + +fn void push_pop_unbuffered_with_locks() @test { + Chan() c = UnbufferedChan(){}.init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) { + Chan()* c_ptr = arg; + c_ptr.push(123)!!; + c_ptr.push(321)!!; + return 0; + }, &c)!!; + + int got = c.pop()!!; + assert(got == 123); + got = c.pop()!!; + assert(got == 321); +} + +fn void sending_to_closed_unbuffered_chan_is_forbidden() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.push(123)) { + assert(err == chan::ChanError().CLOSED); + return; + } + assert(false); +} + +fn void sending_to_closed_buffered_chan_is_forbidden() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.push(123)) { + assert(err == chan::ChanError().CLOSED); + return; + } + assert(false); +} + +fn void reading_from_empty_closed_unbuffered_chan_is_forbidden() @test { + Chan() c = UnbufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.pop()) { + assert(err == chan::ChanError().CLOSED); + return; + } + assert(false); +} + +fn void reading_from_empty_closed_buffered_chan_is_forbidden() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.close()!!; + + if (catch err = c.pop()) { + assert(err == chan::ChanError().CLOSED); + return; + } + assert(false); +} + +fn void reading_from_non_empty_closed_buffered_chan_is_ok() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.push(1)!!; + c.push(2)!!; + c.push(3)!!; + + c.close()!!; + + int got = c.pop()!!; + assert(got == 1); + got = c.pop()!!; + assert(got == 2); + got = c.pop()!!; + assert(got == 3); + + int! got_err = c.pop(); + if (catch err = got_err) { + assert(err == chan::ChanError().CLOSED); + return; + } + + assert(false); +} + +fn void reading_from_empty_buffered_chan_aborted_by_close() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) { + Chan()* c_ptr = arg; + c_ptr.close()!!; + return 0; + }, &c)!!; + + int! res = c.pop(); + if (catch err = res) { + assert(err == chan::ChanError().CLOSED); + return; + } + + assert(false); +} + +fn void reading_from_unbuffered_chan_aborted_by_close() @test { + Chan() c = UnbufferedChan(){}.init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) { + Chan()* c_ptr = arg; + c_ptr.close()!!; + return 0; + }, &c)!!; + + int! res = c.pop(); + if (catch err = res) { + assert(err == chan::ChanError().CLOSED); + return; + } + + assert(false); +} + +fn void sending_to_full_buffered_chan_aborted_by_close() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + c.push(1)!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) { + Chan()* c_ptr = arg; + c_ptr.close()!!; + return 0; + }, &c)!!; + + anyfault err = @catch(c.push(1)); + if (err) { + assert(err == chan::ChanError().CLOSED); + return; + } + + assert(false); +} + +fn void sending_to_unbuffered_chan_aborted_by_close() @test { + Chan() c = UnbufferedChan(){}.init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) { + Chan()* c_ptr = arg; + c_ptr.close()!!; + return 0; + }, &c)!!; + + anyfault err = @catch(c.push(1)); + if (err) { + assert(err == chan::ChanError().CLOSED); + return; + } + + assert(false); +} + +fn void multiple_actions_unbuffered() @test { + Chan() c = UnbufferedChan(){}.init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) { + Chan()* c_ptr = arg; + for (int i = 0; i <= 100; i++) { + c_ptr.push(i)!!; + } + return 0; + }, &c)!!; + + int sum; + + for (int i = 0; i <= 100; i++) { + int! res = c.pop(); + if (catch err = res) { + assert(false); + } + sum += res; + } + + assert(sum == 5050); +} + +fn void multiple_actions_buffered() @test { + Chan() c = BufferedChan(){}.init()!!; + defer c.destroy()!!; + + Thread thread; + defer thread.join()!!; + + thread.create(fn int(void* arg) { + Chan()* c_ptr = arg; + for (int i = 0; i <= 100; i++) { + c_ptr.push(i)!!; + } + return 0; + }, &c)!!; + + int sum; + + for (int i = 0; i <= 100; i++) { + int! res = c.pop(); + if (catch err = res) { + assert(false); + } + sum += res; + } + + assert(sum == 5050); +}