package com.xxx.dataplatform.grpc
import akka.Done
import akka.actor.typed.ActorSystem
import akka.grpc.scaladsl.{ServerReflection, ServiceHandler}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import com.xxx.dataplatform.internal.datalake.DataLakeService
import com.xxx.dataplatform.server.Config
import com.typesafe.scalalogging.LazyLogging
import data_lake_reader.{DataLakeReaderService, DataLakeReaderServiceHandler}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
final class GrpcServer(config: Config, dataLakeService: DataLakeService)
(implicit system: ActorSystem[_], ec: ExecutionContext) extends LazyLogging {
private val dataLakeReaderImpl = new GrpcDataLakeReader(dataLakeService)
private val serviceHandler = DataLakeReaderServiceHandler.partial(dataLakeReaderImpl)
private val reflectionHandler = ServerReflection.partial(List(DataLakeReaderService))
private val combinedHandler = ServiceHandler.concatOrNotFound(serviceHandler, reflectionHandler)
private val handler = withAuth(combinedHandler)
private var binding: Http.ServerBinding = _
def start(): Future[Http.ServerBinding] = {
Http().newServerAt(config.grpcHost, config.grpcPort)
.bind(handler)
.map { b =>
binding = b
logger.info(s"gRPC server bound to ${b.localAddress}")
b
}
}
def shutdown(): Future[Done] = {
if (binding != null) {
binding.terminate(5.seconds).map(_ => Done)
} else {
logger.info("gRPC server was never started")
Future.successful(Done)
}
}
private def withAuth(handlerFn: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { req =>
req.headers.collectFirst { case Authorization(OAuth2BearerToken(token)) => token } match {
case Some(t) if t == config.grpcSecret =>
handlerFn(req)
case Some(_) =>
Future.successful(HttpResponse(Unauthorized, entity = "Invalid token"))
case None =>
Future.successful(HttpResponse(Unauthorized, entity = "Authorization header required"))
}
}
}
package com.xxx.dataplatform.grpc
import akka.Done
import akka.actor.typed.ActorSystem
import akka.grpc.scaladsl.{ServerReflection, ServiceHandler}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import com.xxx.dataplatform.internal.datalake.DataLakeService
import com.xxx.dataplatform.server.Config
import com.typesafe.scalalogging.LazyLogging
import data_lake_reader.{DataLakeReaderService, DataLakeReaderServiceHandler}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
final class GrpcServer(config: Config, dataLakeService: DataLakeService)
(implicit system: ActorSystem[_], ec: ExecutionContext) extends LazyLogging {
private val dataLakeReaderImpl = new GrpcDataLakeReader(dataLakeService)
private val serviceHandler = DataLakeReaderServiceHandler.partial(dataLakeReaderImpl)
private val reflectionHandler = ServerReflection.partial(List(DataLakeReaderService))
private val combinedHandler = ServiceHandler.concatOrNotFound(serviceHandler, reflectionHandler)
private val handler = withAuth(combinedHandler)
private var binding: Http.ServerBinding = _
def start(): Future[Http.ServerBinding] = {
Http().newServerAt(config.grpcHost, config.grpcPort)
.bind(handler)
.map { b =>
binding = b
logger.info(s"gRPC server bound to ${b.localAddress}")
b
}
}
def shutdown(): Future[Done] = {
if (binding != null) {
binding.terminate(5.seconds).map(_ => Done)
} else {
logger.info("gRPC server was never started")
Future.successful(Done)
}
}
private def withAuth(handlerFn: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { req =>
req.headers.collectFirst { case Authorization(OAuth2BearerToken(token)) => token } match {
case Some(t) if t == config.grpcSecret =>
handlerFn(req)
case Some(_) =>
Future.successful(HttpResponse(Unauthorized, entity = "Invalid token"))
case None =>
Future.successful(HttpResponse(Unauthorized, entity = "Authorization header required"))
}
}
}