Skip to content

Commit 608bdf0

Browse files
Anton Shaboutazloyuser
Anton Shabouta
authored andcommitted
Events infrastructure and connection refactoring
1 parent 14f179d commit 608bdf0

24 files changed

+304
-260
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Loop::run(function () {
4545
printf("The keyspace %s has a table called %s\n", $row['keyspace_name'], $row['columnfamily_name']);
4646
}
4747

48-
yield $cluster->disconnect();
48+
$session->close();
4949
});
5050

5151
```

benchmark/read.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
$session = yield $cluster->connect();
2222
$setup = require __DIR__ . '/shared.php';
2323

24-
$watcher = Loop::onSignal(SIGTERM, function () use ($cluster) {
25-
yield $cluster->disconnect();
24+
$watcher = Loop::onSignal(SIGTERM, function () use ($session) {
25+
$session->close();
2626
});
2727

2828
try {
@@ -85,7 +85,7 @@
8585
yield $session->query("DROP KEYSPACE IF EXISTS blogs;");
8686
}
8787

88-
yield $cluster->disconnect();
88+
$session->close();
8989

9090
Loop::cancel($watcher);
9191
});

benchmark/write.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
$session = yield $cluster->connect();
2121
$setup = require __DIR__ . '/shared.php';
2222

23-
$watcher = Loop::onSignal(SIGTERM, function () use ($cluster) {
24-
yield $cluster->disconnect();
23+
$watcher = Loop::onSignal(SIGTERM, function () use ($session) {
24+
$session->close();
2525
});
2626

2727
try {
@@ -63,7 +63,7 @@
6363
yield $session->query("DROP KEYSPACE IF EXISTS blogs;");
6464
}
6565

66-
yield $cluster->disconnect();
66+
$session->close();
6767

6868
Loop::cancel($watcher);
6969
});

examples/basic.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@
2222
\printf("The keyspace %s has a table called %s\n", $row['keyspace_name'], $row['table_name']);
2323
}
2424

25-
yield $cluster->disconnect();
25+
$session->close();
2626
});

src/Cluster.php

+45-59
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ final class Cluster
2222
private const
2323
STATE_NOT_CONNECTED = 0,
2424
STATE_CONNECTING = 1,
25-
STATE_CONNECTED = 2,
26-
STATE_DISCONNECTING = 3
25+
STATE_CONNECTED = 2
2726
;
2827

2928
/**
@@ -42,9 +41,9 @@ final class Cluster
4241
private $state = self::STATE_NOT_CONNECTED;
4342

4443
/**
45-
* @var Connection
44+
* @var Events
4645
*/
47-
private $connection;
46+
private $events;
4847

4948
/**
5049
* @param Config $config
@@ -71,17 +70,32 @@ public static function build(string $dsn): self
7170
public function options(): Promise
7271
{
7372
return call(function () {
74-
if ($this->connection === null) {
75-
$this->connection = yield $this->open();
76-
}
73+
/** @var Connection $connection */
74+
$connection = yield $this->open();
7775

7876
/** @var Response\Supported $response */
79-
$response = yield $this->connection->send(new Request\Options);
77+
$response = yield $connection->send(new Request\Options);
78+
79+
$connection->close();
8080

8181
return $response->options;
8282
});
8383
}
8484

85+
/**
86+
* @return Promise<Events>
87+
*/
88+
public function events(): Promise
89+
{
90+
return call(function () {
91+
if ($this->events) {
92+
return $this->events;
93+
}
94+
95+
return $this->events = new Events(yield $this->startup());
96+
});
97+
}
98+
8599
/**
86100
* @param string $keyspace
87101
*
@@ -96,17 +110,7 @@ public function connect(string $keyspace = null): Promise
96110

97111
$this->state = self::STATE_CONNECTING;
98112

99-
if (null === $this->connection) {
100-
$this->connection = yield $this->open();
101-
}
102-
103-
$frame = yield $this->connection->send(new Request\Startup($this->config->options()));
104-
105-
if ($frame instanceof Response\Authenticate) {
106-
yield $this->authenticate();
107-
}
108-
109-
$session = new Session($this->connection);
113+
$session = new Session(yield $this->startup());
110114

111115
if ($keyspace !== null) {
112116
yield $session->keyspace($keyspace);
@@ -119,38 +123,27 @@ public function connect(string $keyspace = null): Promise
119123
}
120124

121125
/**
122-
* @param int $code
123-
* @param string $reason
124-
*
125-
* @return Promise<void>
126+
* @return Promise<Connection>
126127
*/
127-
public function disconnect(int $code = 0, string $reason = ''): Promise
128+
private function startup()
128129
{
129-
return call(function() use ($code, $reason) {
130-
if ($this->state === self::STATE_DISCONNECTING) {
131-
return;
132-
}
130+
return call(function () {
131+
/** @var Connection $connection */
132+
$connection = yield $this->open();
133133

134-
$this->state = self::STATE_DISCONNECTING;
134+
$request = new Request\Startup($this->config->options());
135+
$response = yield $connection->send($request);
135136

136-
if ($this->connection !== null) {
137-
$this->connection->close();
137+
if ($response instanceof Response\Authenticate) {
138+
yield $this->authenticate($connection);
138139
}
139140

140-
$this->state = self::STATE_NOT_CONNECTED;
141+
return $connection;
141142
});
142143
}
143144

144145
/**
145-
* @return bool
146-
*/
147-
public function isConnected(): bool
148-
{
149-
return $this->state === self::STATE_CONNECTED;
150-
}
151-
152-
/**
153-
* @return Promise
146+
* @return Promise<Connection>
154147
*/
155148
private function open(): Promise
156149
{
@@ -178,32 +171,25 @@ private function open(): Promise
178171
}
179172

180173
/**
181-
* @return Promise
174+
* @param Connection $connection
175+
*
176+
* @return Promise<Response\AuthSuccess>
182177
*/
183-
private function authenticate(): Promise
178+
private function authenticate(Connection $connection): Promise
184179
{
185-
return call(function () {
186-
$request = new Request\AuthResponse(
187-
$this->config->user(),
188-
$this->config->password()
189-
);
190-
191-
/** @var Frame $frame */
192-
$frame = yield $this->connection->send($request);
180+
return call(function () use ($connection) {
181+
$request = new Request\AuthResponse($this->config->user(), $this->config->password());
182+
$response = yield $connection->send($request);
193183

194184
switch (true) {
195-
case $frame instanceof Response\AuthChallenge:
196-
// TODO
197-
198-
break;
199-
case $frame instanceof Response\AuthSuccess:
200-
break;
185+
case $response instanceof Response\AuthSuccess:
186+
return $response;
201187
default:
202-
throw Exception\ServerException::unexpectedFrame($frame->opcode);
188+
throw Exception\ServerException::unexpectedFrame($response->opcode);
203189
}
204190
});
205191
}
206-
192+
207193
/**
208194
* @return Compressor
209195
*/

src/Compressor/LzCompressor.php

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class LzCompressor implements Compressor
2121
*/
2222
public function compress(string $data): string
2323
{
24+
/** @noinspection PhpUndefinedFunctionInspection */
2425
return lz4_compress($data);
2526
}
2627

@@ -29,6 +30,7 @@ public function compress(string $data): string
2930
*/
3031
public function decompress(string $binary): string
3132
{
33+
/** @noinspection PhpUndefinedFunctionInspection */
3234
return lz4_uncompress($binary);
3335
}
3436
}

src/Compressor/SnappyCompressor.php

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class SnappyCompressor implements Compressor
2121
*/
2222
public function compress(string $data): string
2323
{
24+
/** @noinspection PhpUndefinedFunctionInspection */
2425
return snappy_compress($data);
2526
}
2627

@@ -29,6 +30,7 @@ public function compress(string $data): string
2930
*/
3031
public function decompress(string $binary): string
3132
{
33+
/** @noinspection PhpUndefinedFunctionInspection */
3234
return snappy_uncompress($binary);
3335
}
3436
}

0 commit comments

Comments
 (0)