Skip to content

Commit

Permalink
[FEAT] add golang like channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Snikimonkd committed Jan 16, 2025
1 parent 3e4d1de commit 2e779c1
Show file tree
Hide file tree
Showing 4 changed files with 559 additions and 0 deletions.
136 changes: 136 additions & 0 deletions lib/std/threads/buffered_chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
module std::thread::buffered_chan(<Type, SIZE>);

import std::thread::chan;

struct BufferedChan (Chan(<Type>)) {
Mutex mu;
Type[SIZE] buf;
usz size;
bool closed;
usz elems;

usz sendx;
usz send_waiting;
thread::ConditionVariable send_cond;

usz readx;
usz read_waiting;
thread::ConditionVariable read_cond;
}

fn Chan(<Type>)! BufferedChan.init(&self) @dynamic {
self.mu.init()!;
defer catch (void)self.mu.destroy();
self.send_cond.init()!;
defer catch (void)self.send_cond.destroy();
self.read_cond.init()!;
defer catch (void)self.read_cond.destroy();

self.size = SIZE;
self.elems = 0;
self.sendx = 0;
self.send_waiting = 0;
self.readx = 0;
self.read_waiting = 0;
return self;
}

fn void! BufferedChan.destroy(&self) @dynamic {
anyfault err = @catch(self.mu.destroy());
err = @catch(self.send_cond.destroy()) ?: err;
err = @catch(self.read_cond.destroy()) ?: err;

if (err) return err?;
}

fn void! BufferedChan.push(&self, Type val) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();

// if chan is full -> wait
while (self.elems == self.size && !self.closed) {
self.send_waiting++;
self.send_cond.wait(&self.mu)!;
self.send_waiting--;
}

// check if chan is closed
if (self.closed) {
self.mu.unlock()!;
return chan::ChanError(<Type>).CLOSED?;
}

// save value to buf
self.buf[self.sendx] = val;

// move pointer
self.sendx++;
if (self.sendx == self.size) {
self.sendx = 0;
}

// change elems counter
self.elems++;

// if someone is waiting -> awake him
if (self.read_waiting > 0) {
self.read_cond.signal()!;
}

self.mu.unlock()!;

return;
}

fn Type! BufferedChan.pop(&self) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();

// if chan is empty -> wait for sender
while (self.elems == 0 && !self.closed) {
self.read_waiting++;
self.read_cond.wait(&self.mu)!;
self.read_waiting--;
}

// check if chan is closed and empty
if (self.closed && self.elems == 0) {
self.mu.unlock()!;
return chan::ChanError(<Type>).CLOSED?;
}

// read from buf
Type ret = self.buf[self.readx];

// move pointer
self.readx++;
if (self.readx == self.size) {
self.readx = 0;
}

// change elems counter
self.elems--;

// if someone is waiting -> awake him
if (self.send_waiting > 0) {
self.send_cond.signal()!;
}

self.mu.unlock()!;

return ret;
}

fn void! BufferedChan.close(&self) @dynamic {
anyfault err = @catch(self.mu.lock());

self.closed = true;

err = @catch(self.read_cond.broadcast()) ?: err;
err = @catch(self.send_cond.broadcast()) ?: err;

err = @catch(self.mu.unlock()) ?: err;

if (err) return err?;
}

14 changes: 14 additions & 0 deletions lib/std/threads/chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module std::thread::chan(<Type>);

fault ChanError {
CLOSED,
}

interface Chan {
fn Chan! init();
fn void! destroy();
fn void! push(Type val);
fn Type! pop();
fn void! close();
}

127 changes: 127 additions & 0 deletions lib/std/threads/unbuffered_chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
module std::thread::unbuffered_chan(<Type>);

import std::thread::chan;

struct UnbufferedChan (Chan(<Type>)) {
Mutex mu;
Type buf;
bool closed;

Mutex send_mu;
usz send_waiting;
thread::ConditionVariable send_cond;

Mutex read_mu;
usz read_waiting;
thread::ConditionVariable read_cond;
}

fn Chan(<Type>)! UnbufferedChan.init(&self) @dynamic {
self.mu.init()!;
defer catch (void)self.mu.destroy();
self.send_mu.init()!;
defer catch (void)self.send_mu.destroy();
self.send_cond.init()!;
defer catch (void)self.send_cond.destroy();
self.read_mu.init()!;
defer catch (void)self.read_mu.destroy();
self.read_cond.init()!;
defer catch (void)self.read_cond.destroy();

self.send_waiting = 0;
self.read_waiting = 0;

return self;
}

fn void! UnbufferedChan.destroy(&self) @dynamic {
anyfault err = @catch(self.mu.destroy());
err = @catch(self.send_mu.destroy()) ?: err;
err = @catch(self.send_cond.destroy()) ?: err;
err = @catch(self.read_mu.destroy()) ?: err;
err = @catch(self.read_cond.destroy()) ?: err;

if (err) return err?;
}

fn void! UnbufferedChan.push(&self, Type val) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();
self.send_mu.lock()!;
defer catch (void)self.send_mu.unlock();

if (self.closed) {
self.mu.unlock()!;
self.send_mu.unlock()!;
return chan::ChanError(<Type>).CLOSED?;
}

// store value in the buffer
self.buf = val;
// show that we are waiting for reader
self.send_waiting++;

// if reader is already waiting for us -> awake him
if (self.read_waiting > 0) {
self.read_cond.signal()!;
}

// wait until reader takes value from buffer
self.send_cond.wait(&self.mu)!;

if (self.closed) {
self.mu.unlock()!;
self.send_mu.unlock()!;
return chan::ChanError(<Type>).CLOSED?;
}

self.mu.unlock()!;
self.send_mu.unlock()!;

return;
}

fn Type! UnbufferedChan.pop(&self) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();
self.read_mu.lock()!;
defer catch (void)self.read_mu.unlock();

// if no one is waiting, then there is nothing in the buffer
while (!self.closed && self.send_waiting == 0) {
self.read_waiting++;
self.read_cond.wait(&self.mu)!;
self.read_waiting--;
}

if (self.closed) {
self.mu.unlock()!;
self.read_mu.unlock()!;
return chan::ChanError(<Type>).CLOSED?;
}

// take value from buffer
Type ret = self.buf;

// awake sender
self.send_waiting--;
self.send_cond.signal()!;

self.mu.unlock()!;
self.read_mu.unlock()!;

return ret;
}

fn void! UnbufferedChan.close(&self) @dynamic {
anyfault err = @catch(self.mu.lock());

self.closed = true;

err = @catch(self.read_cond.broadcast()) ?: err;
err = @catch(self.send_cond.broadcast()) ?: err;

err = @catch(self.mu.unlock()) ?: err;

if (err) return err?;
}
Loading

0 comments on commit 2e779c1

Please sign in to comment.