Skip to content

Commit

Permalink
[FEAT] add channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Snikimonkd committed Jan 14, 2025
1 parent 3033295 commit 704e010
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 0 deletions.
233 changes: 233 additions & 0 deletions lib/std/threads/chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
module chan(<Type>);

fault ChanError {
CLOSED,
}

interface Chan {
fn void! push(Type val);
fn Type! pop();
fn void close();
fn void destroy();
}

fn Chan new(usz size, Allocator allocator = allocator::heap()) {
if (size != 0) {
BufferedChan* c = allocator::alloc(allocator, BufferedChan);
c.allocator = allocator;
c.buf = allocator::alloc_array(allocator, Type, size);
c.size = size;
c.mu.init()!!;
c.send_cond.init()!!;
c.read_cond.init()!!;
c.elems = 0;
c.sendx = 0;
c.send_waiting = 0;
c.readx = 0;
c.read_waiting = 0;
return c;
}

UnbufferedChan* c = allocator::alloc(allocator, UnbufferedChan);
c.allocator = allocator;
c.mu.init()!!;
c.send_mu.init()!!;
c.send_waiting = 0;
c.send_cond.init()!!;
c.read_mu.init()!!;
c.read_waiting = 0;
c.read_cond.init()!!;

return c;
}

struct BufferedChan (Chan) @private {
Allocator allocator;

Mutex mu;
Type[] buf;
usz size;
bool closed;
usz elems;

usz sendx;
usz send_waiting;
thread::ConditionVariable send_cond;

usz readx;
usz read_waiting;
thread::ConditionVariable read_cond;
}

fn void BufferedChan.destroy(&self) @dynamic {
self.mu.destroy()!!;
self.send_cond.destroy()!!;
self.read_cond.destroy()!!;
allocator::free(self.allocator, self.buf);
allocator::free(self.allocator, self);
}

fn void! BufferedChan.push(&self, Type val) @dynamic {
self.mu.lock()!!;
defer self.mu.unlock()!!;

// check if chan is closed
if (self.closed) {
return ChanError.CLOSED?;
}

// if chan is full -> wait
while (self.elems == self.size) {
self.send_waiting++;
self.send_cond.wait(&self.mu)!!;
self.send_waiting--;
}

// save value to buf
self.buf[self.sendx] = val;

// move pointer
self.sendx++;
if (self.sendx == self.size) {
self.sendx = 0;
}

// change elems counter
self.elems++;

// if someone is waiting -> awake him
if (self.read_waiting > 0) {
self.read_cond.signal()!!;
}

return;
}

fn Type! BufferedChan.pop(&self) @dynamic {
self.mu.lock()!!;
defer self.mu.unlock()!!;

// check if chan is closed and empty
if (self.closed && self.elems == 0) {
return ChanError.CLOSED?;
}

// if chan is empty -> wait for sender
while (self.elems == 0) {
self.read_waiting++;
self.read_cond.wait(&self.mu)!!;
self.read_waiting--;
}

// read from buf
Type ret = self.buf[self.readx];

// move pointer
self.readx++;
if (self.readx == self.size) {
self.readx = 0;
}

// change elems counter
self.elems--;

// if someone is waiting -> awake him
if (self.send_waiting > 0) {
self.send_cond.signal()!!;
}

return ret;
}

fn void BufferedChan.close(&self) @dynamic {
self.mu.lock()!!;
defer self.mu.unlock()!!;

self.closed = true;
}

struct UnbufferedChan (Chan) @private {
Allocator allocator;

Mutex mu;
Type buf;
bool closed;

Mutex send_mu;
usz send_waiting;
thread::ConditionVariable send_cond;

Mutex read_mu;
usz read_waiting;
thread::ConditionVariable read_cond;
}

fn void UnbufferedChan.destroy(&self) @dynamic {
self.mu.destroy()!!;
self.send_mu.destroy()!!;
self.send_cond.destroy()!!;
self.read_mu.destroy()!!;
self.read_cond.destroy()!!;
allocator::free(self.allocator, self);
}

fn void! UnbufferedChan.push(&self, Type val) @dynamic {
self.mu.lock()!!;
defer self.mu.unlock()!!;
self.send_mu.lock()!!;
defer self.send_mu.unlock()!!;

if (self.closed) {
return ChanError.CLOSED?;
}

// store value in the buffer
self.buf = val;

// show that we are waiting for reader
self.send_waiting++;
defer self.send_waiting--;

// if reader is already waiting for us -> awake him
if (self.read_waiting > 0) {
self.read_cond.signal()!!;
}

// wait until reader takes value from buffer
self.send_cond.wait(&self.mu)!!;

return;
}

fn Type! UnbufferedChan.pop(&self) @dynamic {
self.mu.lock()!!;
defer self.mu.unlock()!!;
self.read_mu.lock()!!;
defer self.read_mu.unlock()!!;

if (self.closed) {
return ChanError.CLOSED?;
}

// if no one is waiting, then there is nothing in the buffer
while (self.send_waiting == 0) {
self.read_waiting++;
self.read_cond.wait(&self.mu)!!;
self.read_waiting--;
}

// take value from buffer
Type ret = self.buf;

// awake sender
self.send_cond.signal()!!;

return ret;
}

fn void UnbufferedChan.close(&self) @dynamic {
self.mu.lock()!!;
defer self.mu.unlock()!!;

self.closed = true;
}
55 changes: 55 additions & 0 deletions test/unit/stdlib/threads/chan.c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module chan_test;

import std::chan;
import std::thread;

fn void init_destroy_buffered() @test {
for (usz i = 0; i < 20; i++) {
chan::Chan(<int>) c = chan::new(<int>)(1);
defer c.destroy();
}
}

fn void init_destroy_unbuffered() @test {
for (usz i = 0; i < 20; i++) {
chan::Chan(<int>) c = chan::new(<int>)(0);
defer c.destroy();
}
}

fn void push_buffered_no_lock() @test {
chan::Chan(<int>) c = chan::new(<int>)(1);
defer c.destroy();

c.push(1)!!;
}

fn void push_pop_buffered_no_locks() @test {
chan::Chan(<int>) c = chan::new(<int>)(1);
defer c.destroy();

c.push(123)!!;
int got = c.pop()!!;
assert(got == 123);
}

fn void push_pop_unbuffered_with_locks() @test {
chan::Chan(<int>) c = chan::new(<int>)(1);
defer c.destroy();

Thread thread;
defer thread.join()!!;

thread.create(fn int(void* arg){
chan::Chan(<int>)* c_ptr = arg;
c_ptr.push(123)!!;
c_ptr.push(321)!!;
return 0;
}, &c)!!;

int got = c.pop()!!;
assert(got == 123);
got = c.pop()!!;
assert(got == 321);
}

0 comments on commit 704e010

Please sign in to comment.