async programming in scala, are futures the standard now?

package com.xxx.dataplatform.grpc

import akka.Done
import akka.actor.typed.ActorSystem
import akka.grpc.scaladsl.{ServerReflection, ServiceHandler}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import com.xxx.dataplatform.internal.datalake.DataLakeService
import com.xxx.dataplatform.server.Config
import com.typesafe.scalalogging.LazyLogging
import data_lake_reader.{DataLakeReaderService, DataLakeReaderServiceHandler}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

final class GrpcServer(config: Config, dataLakeService: DataLakeService)
(implicit system: ActorSystem[_], ec: ExecutionContext) extends LazyLogging {

private val dataLakeReaderImpl = new GrpcDataLakeReader(dataLakeService)
private val serviceHandler = DataLakeReaderServiceHandler.partial(dataLakeReaderImpl)
private val reflectionHandler = ServerReflection.partial(List(DataLakeReaderService))
private val combinedHandler = ServiceHandler.concatOrNotFound(serviceHandler, reflectionHandler)
private val handler = withAuth(combinedHandler)

private var binding: Http.ServerBinding = _

def start(): Future[Http.ServerBinding] = {
Http().newServerAt(config.grpcHost, config.grpcPort)
.bind(handler)
.map { b =>
binding = b
logger.info(s"gRPC server bound to ${b.localAddress}")
b
}
}

def shutdown(): Future[Done] = {
if (binding != null) {
binding.terminate(5.seconds).map(_ => Done)
} else {
logger.info("gRPC server was never started")
Future.successful(Done)
}
}

private def withAuth(handlerFn: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { req =>
req.headers.collectFirst { case Authorization(OAuth2BearerToken(token)) => token } match {
case Some(t) if t == config.grpcSecret =>
handlerFn(req)
case Some(_) =>
Future.successful(HttpResponse(Unauthorized, entity = "Invalid token"))
case None =>
Future.successful(HttpResponse(Unauthorized, entity = "Authorization header required"))
}
}
}
package com.xxx.dataplatform.grpc

import akka.Done
import akka.actor.typed.ActorSystem
import akka.grpc.scaladsl.{ServerReflection, ServiceHandler}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import com.xxx.dataplatform.internal.datalake.DataLakeService
import com.xxx.dataplatform.server.Config
import com.typesafe.scalalogging.LazyLogging
import data_lake_reader.{DataLakeReaderService, DataLakeReaderServiceHandler}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

final class GrpcServer(config: Config, dataLakeService: DataLakeService)
(implicit system: ActorSystem[_], ec: ExecutionContext) extends LazyLogging {

private val dataLakeReaderImpl = new GrpcDataLakeReader(dataLakeService)
private val serviceHandler = DataLakeReaderServiceHandler.partial(dataLakeReaderImpl)
private val reflectionHandler = ServerReflection.partial(List(DataLakeReaderService))
private val combinedHandler = ServiceHandler.concatOrNotFound(serviceHandler, reflectionHandler)
private val handler = withAuth(combinedHandler)

private var binding: Http.ServerBinding = _

def start(): Future[Http.ServerBinding] = {
Http().newServerAt(config.grpcHost, config.grpcPort)
.bind(handler)
.map { b =>
binding = b
logger.info(s"gRPC server bound to ${b.localAddress}")
b
}
}

def shutdown(): Future[Done] = {
if (binding != null) {
binding.terminate(5.seconds).map(_ => Done)
} else {
logger.info("gRPC server was never started")
Future.successful(Done)
}
}

private def withAuth(handlerFn: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { req =>
req.headers.collectFirst { case Authorization(OAuth2BearerToken(token)) => token } match {
case Some(t) if t == config.grpcSecret =>
handlerFn(req)
case Some(_) =>
Future.successful(HttpResponse(Unauthorized, entity = "Invalid token"))
case None =>
Future.successful(HttpResponse(Unauthorized, entity = "Authorization header required"))
}
}
}
does this broadly look correct, im implementing a grpc server and i dont see why i shouldn't use futures as my primary concurrency model for async programming, they are fairly simple, i come from a go background and i dont see the value in trying to do as complex as create a whole framework of actors. im assuming the async execution contexts are managed by Http().newServerAt and i dont really need to do more than write my logic
1 Reply
JavaBot
JavaBot6mo ago
This post has been reserved for your question.
Hey @mmacheerpuppy! Please use /close or the Close Post button above when your problem is solved. Please remember to follow the help guidelines. This post will be automatically marked as dormant after 300 minutes of inactivity.
TIP: Narrow down your issue to simple and precise questions to maximize the chance that others will reply in here. 💤 Post marked as dormant
This post has been inactive for over 300 minutes, thus, it has been archived. If your question was not answered yet, feel free to re-open this post or create a new one. In case your post is not getting any attention, you can try to use /help ping. Warning: abusing this will result in moderative actions taken against you.

Did you find this page helpful?