Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLSession;

import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.Header;
Expand Down Expand Up @@ -89,6 +90,7 @@
import org.apache.hc.core5.http2.nio.AsyncPingHandler;
import org.apache.hc.core5.http2.nio.command.PingCommand;
import org.apache.hc.core5.http2.nio.command.PushResponseCommand;
import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
import org.apache.hc.core5.http2.priority.PriorityParamsParser;
import org.apache.hc.core5.http2.priority.PriorityValue;
import org.apache.hc.core5.io.CloseMode;
Expand Down Expand Up @@ -148,6 +150,9 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private static final long STREAM_TIMEOUT_GRANULARITY_MILLIS = 1000;
private long lastStreamTimeoutCheckMillis;

private final Timeout validateAfterInactivity;
private long lastActivityTime;
private volatile long lastActivityMillis;

AbstractH2StreamMultiplexer(
final ProtocolIOSession ioSession,
Expand All @@ -157,6 +162,18 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener) {
this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, null);
}

AbstractH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final StreamIdGenerator idGenerator,
final HttpProcessor httpProcessor,
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener,
final Timeout validateAfterInactivity) {
this.ioSession = Args.notNull(ioSession, "IO session");
this.frameFactory = Args.notNull(frameFactory, "Frame factory");
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
Expand All @@ -183,6 +200,8 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }

this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
this.streamListener = streamListener;
this.lastActivityTime = System.currentTimeMillis();
this.validateAfterInactivity = validateAfterInactivity;
}

@Override
Expand Down Expand Up @@ -287,6 +306,7 @@ private void commitFrame(final RawFrame frame) throws IOException {
} finally {
ioSession.getLock().unlock();
}
updateLastActivity();
}

private void commitHeaders(
Expand Down Expand Up @@ -519,6 +539,29 @@ public final void onOutput() throws HttpException, IOException {
}

if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
final long validateAfterInactivityMillis = validateAfterInactivity != null ? validateAfterInactivity.toMilliseconds() : 0;
final boolean hasBeenIdleTooLong = validateAfterInactivityMillis > 0
&& System.currentTimeMillis() - lastActivityTime > validateAfterInactivityMillis;

if (hasBeenIdleTooLong && ioSession.hasCommands() && pingHandlers.isEmpty()) {
final Timeout socketTimeout = ioSession.getSocketTimeout();
ioSession.setSocketTimeout(Timeout.ofSeconds(5));
executePing(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {

@Override
public void execute(final Boolean result) {
// restore timeout
ioSession.setSocketTimeout(socketTimeout);
if (result) {
requestSessionOutput();
} else {
ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
}
}

})));
return;
}
while (streams.getLocalCount() < remoteConfig.getMaxConcurrentStreams()) {
final Command command = ioSession.poll();
if (command == null) {
Expand Down Expand Up @@ -732,6 +775,7 @@ public final void onException(final Exception cause) {
}

private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
updateLastActivity();
final FrameType frameType = FrameType.valueOf(frame.getType());
final int streamId = frame.getStreamId();
if (continuation != null && frameType != FrameType.CONTINUATION) {
Expand Down Expand Up @@ -1627,4 +1671,16 @@ private void validateStreamTimeouts() throws IOException {
}
}

private void updateLastActivity() {
this.lastActivityMillis = System.currentTimeMillis();
}

private boolean hasBeenIdleTooLong() {
if (validateAfterInactivity == null || validateAfterInactivity.isDisabled()) {
return false;
}
final long idleMillis = System.currentTimeMillis() - lastActivityMillis;
return idleMillis >= validateAfterInactivity.toMilliseconds();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hc.core5.http2.frame.FrameFactory;
import org.apache.hc.core5.http2.frame.StreamIdGenerator;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.util.Timeout;

/**
* I/O event handler for events fired by {@link ProtocolIOSession} that implements
Expand All @@ -66,18 +67,31 @@ public ClientH2StreamMultiplexer(
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener) {
super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener);
final H2StreamListener streamListener,
final Timeout validateAfterInactivity) {
super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener,
validateAfterInactivity);
this.pushHandlerFactory = pushHandlerFactory;
}

public ClientH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener) {
this(ioSession, frameFactory, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null);
}

public ClientH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig) {
this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null);
this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null, null);
}

public ClientH2StreamMultiplexer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
Expand All @@ -39,6 +40,8 @@
import org.apache.hc.core5.http2.frame.FrameFactory;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;

/**
* {@link ClientH2StreamMultiplexer} factory.
Expand All @@ -55,20 +58,33 @@ public final class ClientH2StreamMultiplexerFactory {
private final CharCodingConfig charCodingConfig;
private final H2StreamListener streamListener;
private final FrameFactory frameFactory;
private final Supplier<TimeValue> validateAfterInactivitySupplier;

public ClientH2StreamMultiplexerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener,
final FrameFactory frameFactory) {
final FrameFactory frameFactory,
final Supplier<TimeValue> validateAfterInactivitySupplier) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.pushHandlerFactory = pushHandlerFactory;
this.h2Config = h2Config != null ? h2Config : H2Config.DEFAULT;
this.charCodingConfig = charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT;
this.streamListener = streamListener;
this.frameFactory = frameFactory != null ? frameFactory : DefaultFrameFactory.INSTANCE;
this.validateAfterInactivitySupplier = validateAfterInactivitySupplier;
}

public ClientH2StreamMultiplexerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener,
final FrameFactory frameFactory) {
this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, null);
}

public ClientH2StreamMultiplexerFactory(
Expand All @@ -78,14 +94,14 @@ public ClientH2StreamMultiplexerFactory(
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener
) {
this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null);
this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null, null);
}

public ClientH2StreamMultiplexerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2StreamListener streamListener) {
this(httpProcessor, pushHandlerFactory, null, null, streamListener, null);
this(httpProcessor, pushHandlerFactory, null, null, streamListener, null, null);
}

public ClientH2StreamMultiplexerFactory(
Expand All @@ -96,7 +112,18 @@ public ClientH2StreamMultiplexerFactory(

public ClientH2StreamMultiplexer create(final ProtocolIOSession ioSession) {
return new ClientH2StreamMultiplexer(ioSession, frameFactory, httpProcessor,
pushHandlerFactory, h2Config, charCodingConfig, streamListener);
pushHandlerFactory, h2Config, charCodingConfig, streamListener, resolveValidateAfterInactivity());
}

private Timeout resolveValidateAfterInactivity() {
if (validateAfterInactivitySupplier == null) {
return null;
}
final TimeValue timeValue = validateAfterInactivitySupplier.get();
if (!TimeValue.isNonNegative(timeValue)) {
return null;
}
return timeValue.toTimeout();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

Expand Down Expand Up @@ -87,6 +88,7 @@
public class H2MultiplexingRequester extends AsyncRequester {

private final H2ConnPool connPool;
private final AtomicReference<TimeValue> validateAfterInactivityRef;

/**
* Hard cap on per-connection queued / in-flight commands.
Expand All @@ -108,11 +110,16 @@ public H2MultiplexingRequester(
final TlsStrategy tlsStrategy,
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector,
final AtomicReference<TimeValue> validateAfterInactivityRef,
final int maxCommandsPerConnection) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE,
threadPoolListener, workerSelector);
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
this.validateAfterInactivityRef = validateAfterInactivityRef;
if (this.validateAfterInactivityRef != null) {
this.validateAfterInactivityRef.set(this.connPool.getValidateAfterInactivity());
}
this.maxCommandsPerConnection = maxCommandsPerConnection;
}

Expand All @@ -130,6 +137,9 @@ public TimeValue getValidateAfterInactivity() {

public void setValidateAfterInactivity(final TimeValue timeValue) {
connPool.setValidateAfterInactivity(timeValue);
if (validateAfterInactivityRef != null) {
validateAfterInactivityRef.set(timeValue);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.function.Callback;
Expand All @@ -53,6 +54,7 @@
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;

/**
* {@link H2MultiplexingRequester} bootstrap.
Expand Down Expand Up @@ -277,13 +279,15 @@ public final H2MultiplexingRequesterBootstrap registerVirtual(final String hostn
public H2MultiplexingRequester create() {
final RequestRouter<Supplier<AsyncPushConsumer>> requestRouter = RequestRouter.create(
null, uriPatternType, routeEntries, RequestRouter.LOCAL_AUTHORITY_RESOLVER, null);
final AtomicReference<TimeValue> validateAfterInactivityRef = new AtomicReference<>(TimeValue.NEG_ONE_MILLISECOND);
final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory = new ClientH2StreamMultiplexerFactory(
httpProcessor != null ? httpProcessor : H2Processors.client(),
new DefaultAsyncPushConsumerFactory(requestRouter),
h2Config != null ? h2Config : H2Config.DEFAULT,
charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
streamListener,
frameFactory);
frameFactory,
validateAfterInactivityRef::get);
return new H2MultiplexingRequester(
ioReactorConfig,
(ioSession, attachment) ->
Expand All @@ -295,6 +299,7 @@ public H2MultiplexingRequester create() {
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
threadPoolListener,
null,
validateAfterInactivityRef,
maxCommandsPerConnection);
}

Expand Down
Loading
Loading