Programing/Akka

Akka Http

BUST 2018. 7. 28. 17:43

Akka Http

소개

  • akka actor, akka stream을 이용한 full server- and client-side Http Stack
  • Web Application 제작하는 곳의 사용은 적절하지 않다
  • REST/HTTP Interface를 제공하는 것에 특화가 되어있다.
  • akka actor와 akka stream을 활용한 프로그래밍이 가능하다.
  • Http Server API도 제공, Http Client API로 제공을 한다.

Using Akka Http

gradle

compile group: 'com.typesafe.akka', name: 'akka-http_2.12',   version: '10.1.3'
compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.12'


Routing DSL for HTTP Servers

  • Route는 하나 이상의 `Directive`로 구성이 되어있다.
  • Route
    • path(..)
      • get(..)
        • complete(..)
  • CompletionStage<Optional<Item>> 형태로 complete을 호출을 하면 JSON repsone 형태로 변환(marshalls) 이 된다.

public class JacksonExampleTest extends AllDirectives {

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem system = ActorSystem.create("routes");

    final Http http = Http.get(system);
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    //In order to access all directives we need an instance where the routes are define.
    JacksonExampleTest app = new JacksonExampleTest();

    final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
    final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
      ConnectHttp.toHost("localhost", 8080), materializer);

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
      .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
      .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }

  // (fake) async database query api
  private CompletionStage<Optional<Item>> fetchItem(long itemId) {
    return CompletableFuture.completedFuture(Optional.of(new Item("foo", itemId)));
  }

  // (fake) async database query api
  private CompletionStage<Done> saveOrder(final Order order) {
    return CompletableFuture.completedFuture(Done.getInstance());
  }

  private Route createRoute() {

    return route(
      get(() ->
        pathPrefix("item", () ->
          path(longSegment(), (Long id) -> {
            final CompletionStage<Optional<Item>> futureMaybeItem = fetchItem(id);
            return onSuccess(futureMaybeItem, maybeItem ->
              maybeItem.map(item -> completeOK(item, Jackson.marshaller()))
                .orElseGet(() -> complete(StatusCodes.NOT_FOUND, "Not Found"))
            );
          }))),
      post(() ->
        path("create-order", () ->
          entity(Jackson.unmarshaller(Order.class), order -> {
            CompletionStage<Done> futureSaved = saveOrder(order);
            return onSuccess(futureSaved, done ->
              complete("order created")
            );
          })))
    );
  }

  private static class Item {

    final String name;
    final long id;

    @JsonCreator
    Item(@JsonProperty("name") String name,
         @JsonProperty("id") long id) {
      this.name = name;
      this.id = id;
    }

    public String getName() {
      return name;
    }

    public long getId() {
      return id;
    }
  }

  private static class Order {

    final List<Item> items;

    @JsonCreator
    Order(@JsonProperty("items") List<Item> items) {
      this.items = items;
    }

    public List<Item> getItems() {
      return items;
    }
  }
}


Akka http with stream

public class HttpServerStreamRandomNumbersTest extends AllDirectives {

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem system = ActorSystem.create("routes");

    final Http http = Http.get(system);
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    //In order to access all directives we need an instance where the routes are define.
    HttpServerStreamRandomNumbersTest app = new HttpServerStreamRandomNumbersTest();

    final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
    final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
        ConnectHttp.toHost("localhost", 8080), materializer);

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
        .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
        .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }


  private Route createRoute() {
    final Random rnd = new Random();
    // streams are re-usable so we can define it here
    // and use it for every request
    Source<Integer, NotUsed> numbers = Source.fromIterator(() -> Stream.generate(rnd::nextInt).iterator());

    return route(
        path("random", () ->
            get(() ->
                complete(HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8,
                    // transform each number to a chunk of bytes
                    numbers.map(x -> ByteString.fromString(x + "\n")))))));
  }
}


Akka http with actor

public class HttpServerActorInteractionExample extends AllDirectives {

  private final ActorRef auction;

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem system = ActorSystem.create("routes");

    final Http http = Http.get(system);
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    //In order to access all directives we need an instance where the routes are define.
    HttpServerActorInteractionExample app = new HttpServerActorInteractionExample(system);

    final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
    final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
      ConnectHttp.toHost("localhost", 8080), materializer);

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
      .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
      .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }

  private HttpServerActorInteractionExample(final ActorSystem system) {
    auction = system.actorOf(Auction.props(), "auction");
  }

  private Route createRoute() {
    return route(
      path("auction", () -> route(
        put(() ->
          parameter(StringUnmarshallers.INTEGER, "bid", bid ->
            parameter("user", user -> {
              // place a bid, fire-and-forget
              auction.tell(new Bid(user, bid), ActorRef.noSender());
              return complete(StatusCodes.ACCEPTED, "bid placed");
            })
          )),
        get(() -> {
          final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.apply(5, TimeUnit.SECONDS));
          // query the actor for the current auction state
          CompletionStage<Bids> bids = ask(auction, new GetBids(), timeout).thenApply((Bids.class::cast));
          return completeOKWithFuture(bids, Jackson.marshaller());
        }))));
  }

  static class Bid {
    final String userId;
    final int offer;

    Bid(String userId, int offer) {
      this.userId = userId;
      this.offer = offer;
    }
  }

  static class GetBids {

  }

  static class Bids {
    public final List<Bid> bids;

    Bids(List<Bid> bids) {
      this.bids = bids;
    }
  }

  static class Auction extends AbstractActor {

    private final LoggingAdapter log = Logging.getLogger(context().system(), this);

    List<HttpServerActorInteractionExample.Bid> bids = new ArrayList<>();

    static Props props() {
      return Props.create(Auction.class);
    }

    @Override
    public Receive createReceive() {
      return receiveBuilder()
        .match(HttpServerActorInteractionExample.Bid.class, bid -> {
          bids.add(bid);
          log.info("Bid complete: {}, {}", bid.userId, bid.offer);
        })
        .match(HttpServerActorInteractionExample.GetBids.class, m -> {
          sender().tell(new HttpServerActorInteractionExample.Bids(bids), self());
        })
        .matchAny(o -> log.info("Invalid message"))
        .build();
    }
  }
}


Akka Http Client

import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import java.util.concurrent.CompletionStage;

public class ClientSingleRequestExample {

  public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create();
    final Materializer materializer = ActorMaterializer.create(system);

    final CompletionStage<HttpResponse> responseFuture =
      Http.get(system)
        .singleRequest(HttpRequest.create("http://akka.io"), materializer);
  }
}