Skip to content

Commit 091bcf3

Browse files
refactoring pu tcp
1 parent 36c9dd3 commit 091bcf3

File tree

2 files changed

+333
-109
lines changed

2 files changed

+333
-109
lines changed

drivers/pu-linux-tcp/pu-linux-tcp-client.c

+42-53
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ struct ProcessingUnitStructureTCP{
2929
sem_t sendrdy, recvrdy;
3030
uint8_t *message;
3131
size_t messageLength;
32+
bool done;
3233
};
3334

3435
typedef struct ProcessingUnitStructureTCP ProcessingUnitStructureTCP;
3536

36-
void *TCPLoop(void *pu);
37+
void *ClientThread(void *pu);
3738

3839
bool CreateLinuxTCPClient(ProcessingUnitStructure **pu, char *ip, int port){
3940
ProcessingUnitStructureTCP *puS;
@@ -64,7 +65,7 @@ bool CreateLinuxTCPClient(ProcessingUnitStructure **pu, char *ip, int port){
6465
puS->sendSuccess = false;
6566

6667
// Start client thread
67-
ret = pthread_create(&puS->clientThread, NULL, TCPLoop, puS);
68+
ret = pthread_create(&puS->clientThread, NULL, ClientThread, puS);
6869
if(ret == 0){
6970
success = true;
7071
}else{
@@ -79,17 +80,16 @@ bool CreateLinuxTCPClient(ProcessingUnitStructure **pu, char *ip, int port){
7980
return success;
8081
}
8182

82-
void *TCPLoop(void *pu){
83+
void *ClientThread(void *pu){
8384
ProcessingUnitStructureTCP *puS;
84-
bool done;
8585
int ret;
8686
char lengthString[20];
8787
bool success;
8888

8989
puS = (ProcessingUnitStructureTCP*)pu;
9090

91-
done = false;
92-
while(!done){
91+
puS->done = false;
92+
while(!puS->done){
9393
// Not sending
9494
puS->sending = false;
9595
puS->sendSuccess = false;
@@ -101,55 +101,65 @@ void *TCPLoop(void *pu){
101101
if(ret == 0){
102102
puS->connected = true;
103103
}else{
104+
printf("Connect failed\n");
104105
puS->connected = false;
106+
sleep(1);
105107
}
106108
}
107109

108-
if(puS->connected){
110+
if(puS->connected && !puS->done){
109111
// Send
110112
sem_wait(&puS->sendrdy);
111113

112-
sprintf(lengthString, "%15Ld", (long long)puS->messageLength);
113-
114-
sendAll(puS->serverSockets, (uint8_t*)lengthString, 15);
115-
sendAll(puS->serverSockets, puS->message, puS->messageLength);
114+
if(!puS->done){
115+
sprintf(lengthString, "%15Ld", (long long)puS->messageLength);
116116

117-
puS->sendSuccess = true;
118-
puS->sending = false;
117+
success = sendAll(puS->serverSockets, (uint8_t*)lengthString, 15);
118+
if(success){
119+
success = sendAll(puS->serverSockets, puS->message, puS->messageLength);
120+
}
119121

120-
// Receive
121-
success = recvAll(puS->serverSockets, (uint8_t*)lengthString, 15);
122+
puS->sendSuccess = true;
123+
puS->sending = false;
122124

123-
if(success){
124-
puS->messageLength = atof(lengthString);
125-
}
125+
// Receive
126+
success = recvAll(puS->serverSockets, (uint8_t*)lengthString, 15);
126127

127-
puS->message = malloc(puS->messageLength);
128+
if(success){
129+
puS->messageLength = atof(lengthString);
128130

129-
recvAll(puS->serverSockets, puS->message, puS->messageLength);
131+
puS->message = malloc(puS->messageLength);
130132

131-
sem_post(&puS->recvrdy);
133+
success = recvAll(puS->serverSockets, puS->message, puS->messageLength);
134+
if(success){
135+
sem_post(&puS->recvrdy);
136+
}
137+
}
138+
}
132139
}
133140
}
141+
}
142+
143+
void CloseLinuxTCPClient(ProcessingUnitStructure *pu){
144+
ProcessingUnitStructureTCP *puS;
145+
146+
puS = (ProcessingUnitStructureTCP*)pu->p;
147+
148+
puS->done = true;
149+
150+
sem_post(&puS->recvrdy);
151+
sem_post(&puS->sendrdy);
134152

135153
// If connected, disconnect.
136154
if(puS->connected){
137155
close(puS->serverSockets);
138156
puS->connected = false;
139-
140-
sem_destroy(&puS->sendrdy);
141-
sem_destroy(&puS->recvrdy);
142157
}
143158

144-
pthread_exit(NULL);
145-
}
146-
147-
void CloseLinuxTCPClient(ProcessingUnitStructure *pu){
148-
ProcessingUnitStructureTCP *puS;
149-
150-
puS = (ProcessingUnitStructureTCP*)pu->p;
159+
pthread_join(puS->clientThread, NULL);
151160

152-
close(puS->serverSockets);
161+
sem_destroy(&puS->sendrdy);
162+
sem_destroy(&puS->recvrdy);
153163

154164
free(pu); // Free #1
155165
free(puS); // Free #2
@@ -203,24 +213,3 @@ void Call(ProcessingUnitStructure *pu, uint8_t *s, size_t sLength, ByteArrayRefe
203213
Receive(pu, d);
204214
}
205215

206-
207-
208-
209-
210-
211-
212-
213-
214-
215-
216-
217-
218-
219-
220-
221-
222-
223-
224-
225-
226-

0 commit comments

Comments
 (0)