Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] add golang like channel #1843

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

Snikimonkd
Copy link

this is a go like channel struct

@Adversing
Copy link
Contributor

Adversing commented Jan 15, 2025

I took a look at your code and it seems like both the buffered and unbuffered channels set self.closed = true; in close() but never call send_cond.signal(), read_cond.signal(), or a broadcast to wake up any threads that might be stuck in wait() calls.

Example in BufferedChan.close():

fn void BufferedChan.close(&self) @dynamic {
    self.mu.lock()!!;
    defer self.mu.unlock()!!;

    self.closed = true;
    // self.send_cond.broadcast()!!;
    // self.read_cond.broadcast()!!;
}

Example in UnbufferedChan.close():

fn void UnbufferedChan.close(&self) @dynamic {
    self.mu.lock()!!;
    defer self.mu.unlock()!!;

    self.closed = true;
    // self.send_cond.broadcast()!!;
    // self.read_cond.broadcast()!!;
}

So, since neither version signals/broadcasts on close, if a thread is blocked in push() (or pop()) waiting on self.send_cond.wait(...)/self.read_cond.wait(...), there is nothing to wake it up once closed = true; is set.

@Snikimonkd Snikimonkd marked this pull request as draft January 15, 2025 09:31
lib/std/threads/chan.c3 Outdated Show resolved Hide resolved
@Snikimonkd Snikimonkd changed the title [FEAT] add channel [FEAT] add golang like channel Jan 15, 2025
@Snikimonkd Snikimonkd force-pushed the feat/add_channel branch 6 times, most recently from 90f1d35 to 2e779c1 Compare January 16, 2025 21:22
@Snikimonkd Snikimonkd marked this pull request as ready for review January 16, 2025 21:29
lib/std/threads/buffered_chan.c3 Outdated Show resolved Hide resolved
thread::ConditionVariable read_cond;
}

fn Chan(<Type>)! BufferedChan.init(&self) @dynamic {
Copy link
Contributor

@Book-reader Book-reader Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it seems like this is meant to be used as the interface and Chan(<int>) new = BufferedChan(<int, 1>){}.init()!!; is quite hacky as this essentially does this:

BufferedChan(<int, 1>) __temp;
Chan(<int>) new = __temp.init();

consider making this a method instead and allocating the BufferedChan on the heap:

fn Chan(<Type>)! new(Allocator allocator = allocator::heap())
{
  BufferedChan* new = allocator::alloc(allocator, BufferedChan);
  // setup the channel
  ...
  return new;
}

which would be used as Chan(<int>) foo = buffered_chan::new(<int, 123>)()

ofc this would require freeing the channel once you are done with it so in BufferedChan.destroy you can add free(self).
passing the allocator to new would allow for fn Chan(<Type>)! temp() => new(allocator::temp()) to create a channel on the temporary allocator.

the same should also be done to UnbufferedChan

you could also remove the Chan interface and keep things how they are but instead used like this:

BufferedChan(<int, 123>) foo;
foo.new_init();

or

BufferedChan(<int, 123>) foo = BufferedChan(<int, 123>).new();

but this would come with the drawback of other libraries/functions being able to take any type of channel like how allocators are used.

this is just a dump of all my thoughts, you can choose what you want as long as @lerno is fine with it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasons of the constructor being implemented like this are the following:

  1. channel must be passed to the thread by reference - that's why i hide the channel implementation behind the Interface
  2. (imho) - if we can create something without allocations - its better to do it like this
    but i don't mind to redo it)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferedChan signature (<Type, SIZE>) is fully stolen from ring buffer (in reality i just reimplemented ring buffer with some quirks specific to channel)

lib/std/threads/buffered_chan.c3 Outdated Show resolved Hide resolved
lib/std/threads/buffered_chan.c3 Show resolved Hide resolved
lib/std/threads/buffered_chan.c3 Show resolved Hide resolved
lib/std/threads/buffered_chan.c3 Show resolved Hide resolved
lib/std/threads/buffered_chan.c3 Outdated Show resolved Hide resolved

fn void! UnbufferedChan.push(&self, Type val) @dynamic {
self.mu.lock()!;
defer catch (void)self.mu.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the catch necessary when the excuse is ignored?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, it's only when existing with error. But doesn't the error mean that the lock wasn't locked? So it's not necessary to unlock it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are error returns in other places, like on line 54 there is return ThreadFault.CHANNEL_CLOSED? where this also applies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. But still why do we unlock only in case when failure happens? Why not unlock always?

Copy link
Author

@Snikimonkd Snikimonkd Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so on all successful exits from function i have self.send_mu.unlock()!;
this defer is meant when self.send_cond.wait(&self.mu)!; or self.read_cond.signal()!; or any other function throws an error and exits immediately

Copy link
Author

@Snikimonkd Snikimonkd Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if everything goes good i will unlock mutexes at the end of function on lines 74 and 75

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why not remove catch from defer and always unlock in defer and also remove lines 74 and 75? Would it work?

Copy link
Contributor

@radekm radekm Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And another question, isn't the code unlocking twice when self.closed is true on line 70?

No. It seems ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there's a failure in self.send_cond.wait(&self.mu)! after unlocking self.mu. Wouldn't the code then unlock twice?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to be honest i don't know how wait fails. Will it unlock the mutex if it fails - i can't find anything about it in docs (i used pthread docs, cause c3 threads for linux/macos are baased on pthread)? In any case we try to fix as mach as we can on exit - if mutex is already unlocked -ok, we will get an error and hide it with (void) cast, if it is not - then we will unlock it

return;
}

fn Type! BufferedChan.pop(&self) @dynamic {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a fairness mechanism?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean if the thread came and locked first - then it is getting the value first? - no

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it work we need some sort of mechanism to awake/send to sleep specific thread

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have this mechanism, then instead of send_waiting/read_waiting we can use linkedlist wich stores threads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like that. Or that each thread will eventually pop a value from the channel, provided there are sufficient values available. It is guaranteed that no thread will be starved (i.e., some threads consuming all the values while others never get a chance to pop any value).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i have an idea how to implement it - we can create a cond for every thread and keep this conds in LiknedList - this will guarantee us the order of awaking of threads is the same as the order of them going to wait

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but i think it's gonna be very inefficient

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants