Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi-socket support #124

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
53 changes: 4 additions & 49 deletions curl/src/main/scala/org/http4s/curl/CurlApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,14 @@

package org.http4s.curl

import cats.effect.IO
import cats.effect.IOApp
import cats.effect.unsafe.IORuntime
import org.http4s.client.Client
import org.http4s.client.websocket.WSClient
import org.http4s.curl.unsafe.CurlExecutorScheduler
import org.http4s.curl.unsafe.CurlRuntime
import org.http4s.curl.websocket.CurlWSClient
import cats.effect.unsafe.PollingSystem
import org.http4s.curl.unsafe.CurlMultiPerformPoller

trait CurlApp extends IOApp {
private val multiPerform = CurlMultiPerformPoller()

final override lazy val runtime: IORuntime = {
val installed = CurlRuntime.installGlobal {
CurlRuntime(runtimeConfig)
}

if (!installed) {
System.err
.println(
"WARNING: CurlRuntime global runtime already initialized; custom configurations will be ignored"
)
}

CurlRuntime.global
}

private def scheduler = runtime.compute.asInstanceOf[CurlExecutorScheduler]
final lazy val curlClient: Client[IO] = http.CurlClient(scheduler)

/** gets websocket client if current libcurl environment supports it */
final def websocket(
recvBufferSize: Int = 100,
verbose: Boolean = false,
): Option[WSClient[IO]] =
CurlWSClient(
scheduler,
recvBufferSize,
pauseOn = recvBufferSize / 10,
resumeOn = (recvBufferSize * 0.3).floor.toInt,
verbose = verbose,
)

/** gets websocket client if current libcurl environment supports it throws an error otherwise */
final def websocketOrError(recvBufferSize: Int = 100, verbose: Boolean = false): WSClient[IO] =
websocket(recvBufferSize, verbose).getOrElse(
throw new RuntimeException(
"""Websocket is not supported in this environment!
You need to have curl with version 7.87.0 or higher with websockets enabled.
Note that websocket support in curl is experimental and is not available by default,
so you need to either build it with websocket support or use an already built libcurl with websocket support."""
)
)

override protected def pollingSystem: PollingSystem = multiPerform
}

object CurlApp {
Expand Down
36 changes: 36 additions & 0 deletions curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.curl

import cats.effect.IO
import org.http4s.client.Client
import org.http4s.curl.http.CurlClient
import org.http4s.curl.internal.CurlMultiDriver

final class CurlClientBuilder private[curl] (
driver: CurlMultiDriver,
val isVerbose: Boolean = false,
) {
private def copy(
isVerbose: Boolean
) = new CurlClientBuilder(driver, isVerbose = isVerbose)

def setVerbose: CurlClientBuilder = copy(isVerbose = true)
def notVerbose: CurlClientBuilder = copy(isVerbose = false)

def build: Client[IO] = CurlClient(driver, isVerbose = isVerbose)
}
35 changes: 35 additions & 0 deletions curl/src/main/scala/org/http4s/curl/CurlDriver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2022 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.curl

import cats.effect.IO
import cats.effect.kernel.Resource
import org.http4s.curl.internal.CurlMultiDriver
import org.http4s.curl.unsafe.CurlMultiPerformPoller
import org.http4s.curl.unsafe.CurlMultiSocket

final class CurlDriver private (driver: CurlMultiDriver) {
def http: CurlClientBuilder = new CurlClientBuilder(driver)
def websocket: CurlWSClientBuilder = new CurlWSClientBuilder(driver)
}

object CurlDriver {
val default: Resource[IO, CurlDriver] = IO.pollers.toResource.flatMap {
_.collectFirst { case mp: CurlMultiPerformPoller => Resource.eval(IO(new CurlDriver(mp))) }
.getOrElse(CurlMultiSocket().map(new CurlDriver(_)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,13 @@
*/

package org.http4s.curl
package unsafe

import cats.effect.unsafe.IORuntime
import cats.effect.unsafe.IORuntimeConfig
import cats.effect.unsafe.Scheduler

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.scalanative.unsafe._

object CurlRuntime {

def apply(): IORuntime = apply(IORuntimeConfig())

def apply(config: IORuntimeConfig): IORuntime = {
val (ecScheduler, shutdown) = defaultExecutionContextScheduler()
IORuntime(ecScheduler, ecScheduler, ecScheduler, shutdown, config)
}

def defaultExecutionContextScheduler(): (ExecutionContext with Scheduler, () => Unit) = {
val (ecScheduler, shutdown) = CurlExecutorScheduler(64)
(ecScheduler, shutdown)
}

private[this] var _global: IORuntime = null

private[curl] def installGlobal(global: => IORuntime): Boolean =
if (_global == null) {
_global = global
true
} else {
false
}

lazy val global: IORuntime = {
if (_global == null) {
installGlobal {
CurlRuntime()
}
}

_global
}
import unsafe.libcurl

object CurlRuntime {
def curlVersion: String = fromCString(libcurl.curl_version())

private lazy val versionData = libcurl.curl_version_info(libcurl.CURLVERSION_NOW())
Expand Down
72 changes: 72 additions & 0 deletions curl/src/main/scala/org/http4s/curl/CurlWSClientBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2022 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.curl

import cats.effect.IO
import org.http4s.client.websocket.WSClient
import org.http4s.curl.internal.CurlMultiDriver
import org.http4s.curl.websocket.CurlWSClient

final class CurlWSClientBuilder private[curl] (
driver: CurlMultiDriver,
val isVerbose: Boolean = false,
val recvBufferSize: Int = 100,
val pause: Int = 10,
val resume: Int = 30,
) {
private def copy(
isVerbose: Boolean = isVerbose,
recvBufferSize: Int = recvBufferSize,
pause: Int = pause,
resume: Int = resume,
) = new CurlWSClientBuilder(
driver,
isVerbose = isVerbose,
recvBufferSize = recvBufferSize,
pause = pause,
resume = resume,
)

def setVerbose: CurlWSClientBuilder = copy(isVerbose = true)
def notVerbose: CurlWSClientBuilder = copy(isVerbose = false)

def withRecvBufferSize(value: Int): CurlWSClientBuilder = {
assert(value > 0, "buffer size must be greater than zero!")
copy(recvBufferSize = value)
}
def withBackpressure(pause: Int, resume: Int): CurlWSClientBuilder = {
assert(pause >= 0 && pause < 100, "pause must be in [0, 100)")
assert(resume > 0 && resume <= 100, "resume must be in (0, 100]")
copy(pause = pause, resume = resume)
}

def build: Either[RuntimeException, WSClient[IO]] = CurlWSClient(
driver,
recvBufferSize = recvBufferSize,
pauseOn = pause * recvBufferSize / 100,
resumeOn = resume * recvBufferSize / 100,
verbose = isVerbose,
).toRight(
new RuntimeException(
"""Websocket is not supported in this environment!
You need to have curl with version 7.87.0 or higher with websockets enabled.
Note that websocket support in curl is experimental and is not available by default,
so you need to either build it with websocket support or use an already built libcurl with websocket support."""
)
)
def buildIO: IO[WSClient[IO]] = IO.fromEither(build)
}
12 changes: 6 additions & 6 deletions curl/src/main/scala/org/http4s/curl/http/CurlClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package org.http4s.curl.http

import cats.effect._
import org.http4s.client.Client
import org.http4s.curl.unsafe.CurlExecutorScheduler
import org.http4s.curl.CurlDriver
import org.http4s.curl.internal.CurlMultiDriver

private[curl] object CurlClient {
def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _))
def apply(ms: CurlMultiDriver, isVerbose: Boolean = false): Client[IO] = Client(
CurlRequest(ms, _, isVerbose)
)

def get: IO[Client[IO]] = IO.executionContext.flatMap {
case ec: CurlExecutorScheduler => IO.pure(apply(ec))
case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler"))
}
val default: Resource[IO, Client[IO]] = CurlDriver.default.map(_.http.build)
}
17 changes: 9 additions & 8 deletions curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,19 @@ import org.http4s.Request
import org.http4s.Response
import org.http4s.curl.internal.Utils
import org.http4s.curl.internal._
import org.http4s.curl.unsafe.CurlExecutorScheduler

private[curl] object CurlRequest {
private def setup(
handle: CurlEasy,
ec: CurlExecutorScheduler,
send: RequestSend,
recv: RequestRecv,
req: Request[IO],
verbose: Boolean,
): Resource[IO, Unit] =
Utils.newZone.flatMap(implicit zone =>
CurlSList().evalMap(headers =>
IO {
// TODO add in options
// handle.setVerbose(true)
if (verbose) handle.setVerbose(true)

import org.http4s.curl.unsafe.libcurl_const
import scala.scalanative.unsafe._
Expand Down Expand Up @@ -72,20 +70,23 @@ private[curl] object CurlRequest {

handle.setWriteData(Utils.toPtr(recv))
handle.setWriteFunction(RequestRecv.writeCallback(_, _, _, _))

ec.addHandle(handle.curl, recv.onTerminated)
}
)
)

def apply(ec: CurlExecutorScheduler, req: Request[IO]): Resource[IO, Response[IO]] = for {
def apply(
ms: CurlMultiDriver,
req: Request[IO],
isVerbose: Boolean = false,
): Resource[IO, Response[IO]] = for {
gc <- GCRoot()
handle <- CurlEasy()
flow <- FlowControl(handle)
send <- RequestSend(flow)
recv <- RequestRecv(flow)
_ <- gc.add(send, recv)
_ <- setup(handle, ec, send, recv, req)
_ <- setup(handle, send, recv, req, isVerbose)
_ <- ms.addHandlerTerminating(handle, recv.onTerminated).toResource
_ <- req.body.through(send.pipe).compile.drain.background
resp <- recv.response()
} yield resp
Expand Down
Loading