Skip to content
This repository has been archived by the owner on Nov 6, 2024. It is now read-only.

Commit

Permalink
Peer data sending
Browse files Browse the repository at this point in the history
  • Loading branch information
philcockfield committed Oct 23, 2023
1 parent e81cd69 commit da90dbe
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ export const events: t.PatchStateEventFactory<t.Peer, t.PeerModelEvents> = ($, d
*/
return {
$,
cmd: { $: cmd$ },
cmd: {
$: cmd$,
conn$: rx.payload<t.PeerModelConnCmd>(cmd$, 'Peer:Conn'),
data$: rx.payload<t.PeerModelDataCmd>(cmd$, 'Peer:Data'),
},

/**
* Lifecycle
Expand Down
17 changes: 17 additions & 0 deletions code/ext/ext.lib.peerjs/src/Webrtc.PeerModel/PeerModel.get.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { type t } from './common';

export function getFactory(peer: t.PeerJs) {
const api = {
conn: {
exists: (s: t.Peer, id: string) => Boolean(api.conn.item(s, id)),
item: (s: t.Peer, id: string) => s.connections.find((item) => item.id === id),
object(s: t.Peer, id: string, kind?: t.PeerConnection['kind']) {
const item = api.conn.item(s, id);
if (kind && item?.kind !== kind) return undefined;
return item ? peer.getConnection(item.peer.remote, id) || undefined : undefined;
},
},
} as const;

return api;
}
103 changes: 78 additions & 25 deletions code/ext/ext.lib.peerjs/src/Webrtc.PeerModel/PeerModel.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PatchState, type t } from './common';
import { slug, PatchState, rx, type t } from './common';
import { events } from './PeerModel.events';
import { getFactory } from './PeerModel.get';

/**
* Peer model.
Expand All @@ -8,36 +9,47 @@ export const PeerModel = {
/**
* Wrap a PeerJS object with a stateful management model.
*/
wrap(peer: t.PeerJs) {
wrap(peer: t.PeerJs, dispose$?: t.UntilObservable) {
const lifecycle = rx.lifecycle(dispose$);
const local = peer.id;
const initial: t.Peer = { open: false, connections: [] };
const state = PatchState.init<t.Peer, t.PeerModelEvents>({ initial, events });

const Get = {
conn: {
exists: (s: t.Peer, id: string) => Boolean(Get.conn.item(s, id)),
item: (s: t.Peer, id: string) => s.connections.find((item) => item.id === id),
object(s: t.Peer, id: string) {
const item = Get.conn.item(s, id);
return item ? peer.getConnection(item.peer.remote, id) : undefined;
},
const state = PatchState.init<t.Peer, t.PeerModelEvents>({
initial,
events($, dispose$) {
return events($, [rx.disposable(dispose$).dispose$, lifecycle.dispose$]);
},
} as const;
});
const dispatch = PatchState.Command.dispatcher<t.PeerModelCmd>(state);
const toConnection = (conn: t.DataConnection | t.MediaConnection) => {
const id = conn.connectionId;
const remote = conn.peer;
return { id, peer: { local, remote } };
};

const Get = getFactory(peer);

const DataConnection = {
monitor(conn: t.DataConnection) {
const id = conn.connectionId;
conn.on('data', () => {});
const connection = toConnection(conn);
conn.on('data', (data) => {
dispatch({ type: 'Peer:Data', payload: { tx: slug(), connection, data } });
});
conn.on('close', () => {
state.change((d) => (d.connections = d.connections.filter((item) => item.id !== id)));
api.disconnect(id);
});
conn.on('error', (err) => {
console.log('error data', conn, err);
conn.on('error', (error) => {
// TODO 🐷
// console.log('error data', conn, error);
dispatch({
type: 'Peer:Conn',
payload: { tx: slug(), connection, action: 'error', error },
});
});
},

outgoing(remote: string) {
startOutgoing(remote: string) {
const conn = peer.connect(remote, { reliable: true });
const id = conn.connectionId;
state.change((d) => {
Expand All @@ -50,9 +62,13 @@ export const PeerModel = {
});
});
DataConnection.monitor(conn);
dispatch({
type: 'Peer:Conn',
payload: { tx: slug(), connection: toConnection(conn), action: 'start:out' },
});
},

incoming(conn: t.DataConnection) {
startIncoming(conn: t.DataConnection) {
const exists = Boolean(Get.conn.item(state.current, conn.connectionId));
if (!exists) {
const id = conn.connectionId;
Expand All @@ -61,6 +77,10 @@ export const PeerModel = {
state.change((d) => d.connections.push({ kind: 'data', id, open: true, peer }));
DataConnection.monitor(conn);
}
dispatch({
type: 'Peer:Conn',
payload: { tx: slug(), connection: toConnection(conn), action: 'start:in' },
});
},
} as const;

Expand All @@ -69,11 +89,7 @@ export const PeerModel = {
*/
peer.on('open', () => state.change((d) => (d.open = true)));
peer.on('close', () => state.change((d) => (d.open = false)));

/**
* Incoming data connection.
*/
peer.on('connection', (conn) => DataConnection.incoming(conn));
peer.on('connection', (conn) => DataConnection.startIncoming(conn));

/**
* Incoming media connection.
Expand All @@ -91,21 +107,58 @@ export const PeerModel = {
get current() {
return state.current;
},

connect: {
data: (remotePeer: string) => DataConnection.outgoing(remotePeer),
data: (remotePeer: string) => DataConnection.startOutgoing(remotePeer),
},

disconnect(id) {
Get.conn.object(state.current, id)?.close();
const conn = api.get.connection(id);
if (conn) {
conn.close();
dispatch({
type: 'Peer:Conn',
payload: { tx: slug(), connection: toConnection(conn), action: 'close' },
});
}
api.purge();
},

purge() {
let changed = false;
state.change((d) => {
const total = d.connections.length;
d.connections = d.connections.filter((item) => {
if (item.open === false) return false;
if (Get.conn.object(d, item.id)?.open === false) return false;
return true;
});
if (total !== d.connections.length) changed = true;
});
if (changed) {
dispatch({ type: 'Peer:Conn', payload: { tx: slug(), action: 'total' } });
}
},

get: {
connection(id) {
return Get.conn.object(state.current, id);
},
dataConnection(id) {
return Get.conn.object(state.current, id, 'data') as t.DataConnection;
},
mediaConnection(id) {
return Get.conn.object(state.current, id, 'media') as t.MediaConnection;
},
},

/**
* Lifecycle
*/
dispose: lifecycle.dispose,
dispose$: lifecycle.dispose$,
get disposed() {
return lifecycle.disposed;
},
};
return api;
Expand Down
27 changes: 23 additions & 4 deletions code/ext/ext.lib.peerjs/src/Webrtc.PeerModel/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ export type PeerConnection = {
/**
* Logical API over the peer state.
*/
export type PeerModel = {
export type PeerModel = t.Lifecycle & {
id: string;
current: t.Peer;
connect: { data(remotePeer: string): void };
disconnect(id: string): void;
events(dispose$?: t.UntilObservable): t.PeerModelEvents;
purge(): void;
get: {
connection(id: string): t.DataConnection | t.MediaConnection | undefined;
dataConnection(id: string): t.DataConnection | undefined;
mediaConnection(id: string): t.MediaConnection | undefined;
};
};

/**
Expand All @@ -32,13 +37,27 @@ export type PeerModelEvents = t.Lifecycle & {
readonly $: t.Observable<t.PatchChange<t.Peer>>;
readonly cmd: {
readonly $: t.Observable<t.PeerModelCmd>;
readonly conn$: t.Observable<t.PeerModelConn>;
readonly data$: t.Observable<t.PeerModelData>;
};
};

/**
* Event Commands
*/
export type PeerModelCmd = PeerModelConnCmd;
export type PeerModelCmd = PeerModelConnCmd | PeerModelDataCmd;

export type PeerModelConnCmd = { type: 'Peer:Conn'; payload: PeerModelConn };
export type PeerModelConn = {
tx: string;
action: 'start:out' | 'start:in' | 'close' | 'error' | 'total';
connection?: { id: string; peer: { local: string; remote: string } };
error?: Error;
};

export type PeerModelConnCmd = { type: 'Peer:Connection'; payload: PeerModelConn };
export type PeerModelConn = { tx: string };
export type PeerModelDataCmd = { type: 'Peer:Data'; payload: PeerModelData };
export type PeerModelData = {
tx: string;
connection: { id: string; peer: { local: string; remote: string } };
data: unknown;
};
26 changes: 21 additions & 5 deletions code/ext/ext.lib.peerjs/src/ui/ui.Sample.Model/ui.Peer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ export const Peer: React.FC<PeerProps> = (props) => {
useEffect(() => {
const model = PeerModel.wrap(props.peer.self);
setModel(model);

const events = model.events();
events.$.subscribe(redraw);

events.cmd.data$.subscribe((e) => {
const peer = e.connection.peer;
console.log('data', e.data, `from "${peer.remote}" to "${peer.local}"`);
});

events.cmd.conn$.pipe(rx.filter((e) => Boolean(e.error))).subscribe((e) => {
console.log('error', e.error);
});

return events.dispose;
}, []);

Expand All @@ -33,8 +41,13 @@ export const Peer: React.FC<PeerProps> = (props) => {
props.peer.self.destroy();
};

const handleCloseConnection = (id: string) => {
model?.disconnect(id);
const handleCloseConnection = (connid: string) => {
model?.disconnect(connid);
};

const handleSendData = (connid: string) => {
const conn = model?.get.dataConnection(connid);
conn?.send('hello');
};

const handlePurge = () => {
Expand Down Expand Up @@ -81,7 +94,10 @@ export const Peer: React.FC<PeerProps> = (props) => {
<li key={conn.id}>
<div {...styles.connection}>
<div>{conn.id}</div>
<Button onClick={() => handleCloseConnection(conn.id)}>close</Button>
<div>
<Button onClick={() => handleSendData(conn.id)}>send</Button>
<Button onClick={() => handleCloseConnection(conn.id)}>close</Button>
</div>
</div>
</li>
);
Expand All @@ -103,7 +119,7 @@ export const Peer: React.FC<PeerProps> = (props) => {
</div>
{elConnections}
<div {...styles.section}>
<ObjectView data={model?.current} fontSize={11} expand={3} />
<ObjectView data={model?.current} fontSize={11} expand={1} />
</div>
</div>
);
Expand Down

0 comments on commit da90dbe

Please sign in to comment.