Akka Http
소개
- akka actor, akka stream을 이용한 full server- and client-side Http Stack
- Web Application 제작하는 곳의 사용은 적절하지 않다
- REST/HTTP Interface를 제공하는 것에 특화가 되어있다.
- akka actor와 akka stream을 활용한 프로그래밍이 가능하다.
- Http Server API도 제공, Http Client API로 제공을 한다.
Using Akka Http
gradle
compile group: 'com.typesafe.akka', name: 'akka-http_2.12', version: '10.1.3'
compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.12'
Routing DSL for HTTP Servers
- Route는 하나 이상의 `Directive`로 구성이 되어있다.
- Route
- path(..)
- get(..)
- complete(..)
- CompletionStage<Optional<Item>> 형태로 complete을 호출을 하면 JSON repsone 형태로 변환(marshalls) 이 된다.
public class JacksonExampleTest extends AllDirectives {
public static void main(String[] args) throws Exception {
// boot up server using the route as defined below
ActorSystem system = ActorSystem.create("routes");
final Http http = Http.get(system);
final ActorMaterializer materializer = ActorMaterializer.create(system);
//In order to access all directives we need an instance where the routes are define.
JacksonExampleTest app = new JacksonExampleTest();
final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
ConnectHttp.toHost("localhost", 8080), materializer);
System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
System.in.read(); // let it run until user presses return
binding
.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done
}
// (fake) async database query api
private CompletionStage<Optional<Item>> fetchItem(long itemId) {
return CompletableFuture.completedFuture(Optional.of(new Item("foo", itemId)));
}
// (fake) async database query api
private CompletionStage<Done> saveOrder(final Order order) {
return CompletableFuture.completedFuture(Done.getInstance());
}
private Route createRoute() {
return route(
get(() ->
pathPrefix("item", () ->
path(longSegment(), (Long id) -> {
final CompletionStage<Optional<Item>> futureMaybeItem = fetchItem(id);
return onSuccess(futureMaybeItem, maybeItem ->
maybeItem.map(item -> completeOK(item, Jackson.marshaller()))
.orElseGet(() -> complete(StatusCodes.NOT_FOUND, "Not Found"))
);
}))),
post(() ->
path("create-order", () ->
entity(Jackson.unmarshaller(Order.class), order -> {
CompletionStage<Done> futureSaved = saveOrder(order);
return onSuccess(futureSaved, done ->
complete("order created")
);
})))
);
}
private static class Item {
final String name;
final long id;
@JsonCreator
Item(@JsonProperty("name") String name,
@JsonProperty("id") long id) {
this.name = name;
this.id = id;
}
public String getName() {
return name;
}
public long getId() {
return id;
}
}
private static class Order {
final List<Item> items;
@JsonCreator
Order(@JsonProperty("items") List<Item> items) {
this.items = items;
}
public List<Item> getItems() {
return items;
}
}
}
Akka http with stream
public class HttpServerStreamRandomNumbersTest extends AllDirectives {
public static void main(String[] args) throws Exception {
// boot up server using the route as defined below
ActorSystem system = ActorSystem.create("routes");
final Http http = Http.get(system);
final ActorMaterializer materializer = ActorMaterializer.create(system);
//In order to access all directives we need an instance where the routes are define.
HttpServerStreamRandomNumbersTest app = new HttpServerStreamRandomNumbersTest();
final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
ConnectHttp.toHost("localhost", 8080), materializer);
System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
System.in.read(); // let it run until user presses return
binding
.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done
}
private Route createRoute() {
final Random rnd = new Random();
// streams are re-usable so we can define it here
// and use it for every request
Source<Integer, NotUsed> numbers = Source.fromIterator(() -> Stream.generate(rnd::nextInt).iterator());
return route(
path("random", () ->
get(() ->
complete(HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8,
// transform each number to a chunk of bytes
numbers.map(x -> ByteString.fromString(x + "\n")))))));
}
}
Akka http with actor
public class HttpServerActorInteractionExample extends AllDirectives {
private final ActorRef auction;
public static void main(String[] args) throws Exception {
// boot up server using the route as defined below
ActorSystem system = ActorSystem.create("routes");
final Http http = Http.get(system);
final ActorMaterializer materializer = ActorMaterializer.create(system);
//In order to access all directives we need an instance where the routes are define.
HttpServerActorInteractionExample app = new HttpServerActorInteractionExample(system);
final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
ConnectHttp.toHost("localhost", 8080), materializer);
System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
System.in.read(); // let it run until user presses return
binding
.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done
}
private HttpServerActorInteractionExample(final ActorSystem system) {
auction = system.actorOf(Auction.props(), "auction");
}
private Route createRoute() {
return route(
path("auction", () -> route(
put(() ->
parameter(StringUnmarshallers.INTEGER, "bid", bid ->
parameter("user", user -> {
// place a bid, fire-and-forget
auction.tell(new Bid(user, bid), ActorRef.noSender());
return complete(StatusCodes.ACCEPTED, "bid placed");
})
)),
get(() -> {
final Timeout timeout = Timeout.durationToTimeout(FiniteDuration.apply(5, TimeUnit.SECONDS));
// query the actor for the current auction state
CompletionStage<Bids> bids = ask(auction, new GetBids(), timeout).thenApply((Bids.class::cast));
return completeOKWithFuture(bids, Jackson.marshaller());
}))));
}
static class Bid {
final String userId;
final int offer;
Bid(String userId, int offer) {
this.userId = userId;
this.offer = offer;
}
}
static class GetBids {
}
static class Bids {
public final List<Bid> bids;
Bids(List<Bid> bids) {
this.bids = bids;
}
}
static class Auction extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
List<HttpServerActorInteractionExample.Bid> bids = new ArrayList<>();
static Props props() {
return Props.create(Auction.class);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(HttpServerActorInteractionExample.Bid.class, bid -> {
bids.add(bid);
log.info("Bid complete: {}, {}", bid.userId, bid.offer);
})
.match(HttpServerActorInteractionExample.GetBids.class, m -> {
sender().tell(new HttpServerActorInteractionExample.Bids(bids), self());
})
.matchAny(o -> log.info("Invalid message"))
.build();
}
}
}
Akka Http Client
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import java.util.concurrent.CompletionStage;
public class ClientSingleRequestExample {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(HttpRequest.create("http://akka.io"), materializer);
}
}
'Programing > Akka' 카테고리의 다른 글
Akka Dispatcher를 이용한 성능 튜닝 (0) | 2018.08.05 |
---|---|
Akka Stream Error Handling (0) | 2018.08.01 |
Akka Stream Operator Fusion (0) | 2018.06.29 |
Akka Stream 기본 개념 Source, Sink, Flow (0) | 2018.06.29 |
Akka Stream Graph DSL를 이용한 Broadcast/Merge 기능 구현 (0) | 2018.06.02 |