/** * Netty-specific extension of {@link DiscoveryServer}. * Made to reuse server channel for client. */ public interface NettyDiscoveryServer extends DiscoveryServer { /** * Reuse Netty server channel with client, * so you are able to send packets from the same port */ CompletableFuture<Void> useDatagramChannel( Consumer<NioDatagramChannel> consumer); }
Pipeline setup:
incomingPipeline .addHandler(new IncomingDataPacker()) .addHandler(new WhoAreYouAttempt(homeNode.getNodeId())) .addHandler(new WhoAreYouSessionResolver(authTagRepo)) .addHandler(new UnknownPacketTagToSender(homeNode)) .addHandler(nodeIdToSession) .addHandler(new UnknownPacketTypeByStatus()) .addHandler(new NotExpectedIncomingPacketHandler()) .addHandler(new WhoAreYouPacketHandler(outgoingPipeline, taskScheduler))
start:
incomingPipeline.build(); Flux.from(discoveryServer.getIncomingPackets()) .subscribe(incomingPipeline::push);
setup:
outgoingPipeline .addHandler(new OutgoingParcelHandler(outgoingSink)) .addHandler(new NodeSessionRequestHandler()) .addHandler(nodeIdToSession) .addHandler(new NewTaskHandler()) .addHandler(new NextTaskHandler(outgoingPipeline, taskScheduler));
usage:
public void sendOutgoing(Packet packet) { logger.trace(() -> String.format("Sending outgoing packet %s in session %s", packet, this)); outgoing.accept(packet); }