Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] add golang like channel #1843

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions lib/std/threads/buffered_chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
module std::thread::buffered_chan(<Type, SIZE>);

import std::thread::chan;

struct BufferedChan (Chan(<Type>)) {
Mutex mu;
Type[SIZE] buf;
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved
usz size;
bool closed;
usz elems;

usz sendx;
usz send_waiting;
thread::ConditionVariable send_cond;
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved

usz readx;
usz read_waiting;
thread::ConditionVariable read_cond;
}

fn Chan(<Type>)! BufferedChan.init(&self) @dynamic {
Copy link
Contributor

@Book-reader Book-reader Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it seems like this is meant to be used as the interface and Chan(<int>) new = BufferedChan(<int, 1>){}.init()!!; is quite hacky as this essentially does this:

BufferedChan(<int, 1>) __temp;
Chan(<int>) new = __temp.init();

consider making this a method instead and allocating the BufferedChan on the heap:

fn Chan(<Type>)! new(Allocator allocator = allocator::heap())
{
  BufferedChan* new = allocator::alloc(allocator, BufferedChan);
  // setup the channel
  ...
  return new;
}

which would be used as Chan(<int>) foo = buffered_chan::new(<int, 123>)()

ofc this would require freeing the channel once you are done with it so in BufferedChan.destroy you can add free(self).
passing the allocator to new would allow for fn Chan(<Type>)! temp() => new(allocator::temp()) to create a channel on the temporary allocator.

the same should also be done to UnbufferedChan

you could also remove the Chan interface and keep things how they are but instead used like this:

BufferedChan(<int, 123>) foo;
foo.new_init();

or

BufferedChan(<int, 123>) foo = BufferedChan(<int, 123>).new();

but this would come with the drawback of other libraries/functions being able to take any type of channel like how allocators are used.

this is just a dump of all my thoughts, you can choose what you want as long as @lerno is fine with it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasons of the constructor being implemented like this are the following:

  1. channel must be passed to the thread by reference - that's why i hide the channel implementation behind the Interface
  2. (imho) - if we can create something without allocations - its better to do it like this
    but i don't mind to redo it)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferedChan signature (<Type, SIZE>) is fully stolen from ring buffer (in reality i just reimplemented ring buffer with some quirks specific to channel)

self.mu.init()!;
defer catch (void)self.mu.destroy();
self.send_cond.init()!;
defer catch (void)self.send_cond.destroy();
self.read_cond.init()!;
defer catch (void)self.read_cond.destroy();

self.size = SIZE;
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved
self.elems = 0;
self.sendx = 0;
self.send_waiting = 0;
self.readx = 0;
self.read_waiting = 0;
return self;
}

fn void! BufferedChan.destroy(&self) @dynamic {
anyfault err = @catch(self.mu.destroy());
err = @catch(self.send_cond.destroy()) ?: err;
err = @catch(self.read_cond.destroy()) ?: err;

if (err) return err?;
}

fn void! BufferedChan.push(&self, Type val) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved

// if chan is full -> wait
while (self.elems == self.size && !self.closed) {
self.send_waiting++;
self.send_cond.wait(&self.mu)!;
self.send_waiting--;
}

// check if chan is closed
if (self.closed) {
self.mu.unlock()!;
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved
return ThreadFault.CHANNEL_CLOSED?;
}

// save value to buf
self.buf[self.sendx] = val;

// move pointer
self.sendx++;
if (self.sendx == self.size) {
self.sendx = 0;
}

// change elems counter
self.elems++;

// if someone is waiting -> awake him
if (self.read_waiting > 0) {
self.read_cond.signal()!;
}

self.mu.unlock()!;
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved

return;
}

fn Type! BufferedChan.pop(&self) @dynamic {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a fairness mechanism?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean if the thread came and locked first - then it is getting the value first? - no

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it work we need some sort of mechanism to awake/send to sleep specific thread

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have this mechanism, then instead of send_waiting/read_waiting we can use linkedlist wich stores threads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like that. Or that each thread will eventually pop a value from the channel, provided there are sufficient values available. It is guaranteed that no thread will be starved (i.e., some threads consuming all the values while others never get a chance to pop any value).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i have an idea how to implement it - we can create a cond for every thread and keep this conds in LiknedList - this will guarantee us the order of awaking of threads is the same as the order of them going to wait

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but i think it's gonna be very inefficient

self.mu.lock()!;
defer catch (void)self.mu.unlock();

// if chan is empty -> wait for sender
while (self.elems == 0 && !self.closed) {
self.read_waiting++;
self.read_cond.wait(&self.mu)!;
self.read_waiting--;
}

// check if chan is closed and empty
if (self.closed && self.elems == 0) {
self.mu.unlock()!;
return ThreadFault.CHANNEL_CLOSED?;
}

// read from buf
Type ret = self.buf[self.readx];

// move pointer
self.readx++;
if (self.readx == self.size) {
self.readx = 0;
}

// change elems counter
self.elems--;

// if someone is waiting -> awake him
if (self.send_waiting > 0) {
self.send_cond.signal()!;
}

self.mu.unlock()!;

return ret;
}

fn void! BufferedChan.close(&self) @dynamic {
anyfault err = @catch(self.mu.lock());

self.closed = true;

err = @catch(self.read_cond.broadcast()) ?: err;
err = @catch(self.send_cond.broadcast()) ?: err;

err = @catch(self.mu.unlock()) ?: err;

if (err) return err?;
}

10 changes: 10 additions & 0 deletions lib/std/threads/chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module std::thread::chan(<Type>);

interface Chan {
fn Chan! init();
fn void! destroy();
fn void! push(Type val);
fn Type! pop();
fn void! close();
}

1 change: 1 addition & 0 deletions lib/std/threads/thread.c3
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fault ThreadFault
DETACH_FAILED,
JOIN_FAILED,
INTERRUPTED,
CHANNEL_CLOSED,
}

macro void! Mutex.init(&mutex) => NativeMutex.init((NativeMutex*)mutex, MUTEX_PLAIN);
Expand Down
127 changes: 127 additions & 0 deletions lib/std/threads/unbuffered_chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
module std::thread::unbuffered_chan(<Type>);

import std::thread::chan;

struct UnbufferedChan (Chan(<Type>)) {
Mutex mu;
Type buf;
bool closed;

Mutex send_mu;
usz send_waiting;
thread::ConditionVariable send_cond;

Mutex read_mu;
usz read_waiting;
thread::ConditionVariable read_cond;
}

fn Chan(<Type>)! UnbufferedChan.init(&self) @dynamic {
self.mu.init()!;
defer catch (void)self.mu.destroy();
self.send_mu.init()!;
defer catch (void)self.send_mu.destroy();
self.send_cond.init()!;
defer catch (void)self.send_cond.destroy();
self.read_mu.init()!;
defer catch (void)self.read_mu.destroy();
self.read_cond.init()!;
defer catch (void)self.read_cond.destroy();

self.send_waiting = 0;
self.read_waiting = 0;

return self;
}

fn void! UnbufferedChan.destroy(&self) @dynamic {
anyfault err = @catch(self.mu.destroy());
err = @catch(self.send_mu.destroy()) ?: err;
err = @catch(self.send_cond.destroy()) ?: err;
err = @catch(self.read_mu.destroy()) ?: err;
err = @catch(self.read_cond.destroy()) ?: err;

if (err) return err?;
}

fn void! UnbufferedChan.push(&self, Type val) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved
self.send_mu.lock()!;
defer catch (void)self.send_mu.unlock();

if (self.closed) {
self.mu.unlock()!;
self.send_mu.unlock()!;
return ThreadFault.CHANNEL_CLOSED?;
}

// store value in the buffer
self.buf = val;
// show that we are waiting for reader
self.send_waiting++;

// if reader is already waiting for us -> awake him
if (self.read_waiting > 0) {
self.read_cond.signal()!;
}

// wait until reader takes value from buffer
self.send_cond.wait(&self.mu)!;

if (self.closed) {
self.mu.unlock()!;
self.send_mu.unlock()!;
return ThreadFault.CHANNEL_CLOSED?;
}

self.mu.unlock()!;
self.send_mu.unlock()!;

return;
}

fn Type! UnbufferedChan.pop(&self) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();
self.read_mu.lock()!;
defer catch (void)self.read_mu.unlock();

// if no one is waiting, then there is nothing in the buffer
while (!self.closed && self.send_waiting == 0) {
self.read_waiting++;
self.read_cond.wait(&self.mu)!;
Snikimonkd marked this conversation as resolved.
Show resolved Hide resolved
self.read_waiting--;
}

if (self.closed) {
self.mu.unlock()!;
self.read_mu.unlock()!;
return ThreadFault.CHANNEL_CLOSED?;
}

// take value from buffer
Type ret = self.buf;

// awake sender
self.send_waiting--;
self.send_cond.signal()!;

self.mu.unlock()!;
self.read_mu.unlock()!;

return ret;
}

fn void! UnbufferedChan.close(&self) @dynamic {
anyfault err = @catch(self.mu.lock());

self.closed = true;

err = @catch(self.read_cond.broadcast()) ?: err;
err = @catch(self.send_cond.broadcast()) ?: err;

err = @catch(self.mu.unlock()) ?: err;

if (err) return err?;
}
Loading