Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port ee9 BlockingTest to ee10/11 #12635

Open
wants to merge 9 commits into
base: jetty-12.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public Throwable fillInStackTrace()
{
return this;
}

@Override
public String toString()
{
return "ACQUIRED";
}
};
private static final Throwable SUCCEEDED = new Throwable()
{
Expand All @@ -88,6 +94,12 @@ public Throwable fillInStackTrace()
{
return this;
}

@Override
public String toString()
{
return "SUCCEEDED";
}
};

public interface Runnable extends java.lang.Runnable, AutoCloseable, Invocable
Expand Down Expand Up @@ -434,5 +446,11 @@ public Runnable runnable() throws IOException
_lock.unlock();
}
}

@Override
public String toString()
{
return "%s@%x[c=%s]".formatted(getClass().getSimpleName(), hashCode(), _completed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.eclipse.jetty.util;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -466,6 +468,15 @@ public static <T> T get(CompletableFuture<T> completableFuture)
}
}

public static String toString(Throwable x)
{
if (x == null)
return "null";
StringWriter sw = new StringWriter();
x.printStackTrace(new PrintWriter(sw));
return sw.toString();
}

private ExceptionUtil()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private void onWriteComplete(boolean last, Throwable failure)
try (AutoLock ignored = _channelState.lock())
{
if (LOG.isDebugEnabled())
state = stateString();
state = lockedStateString();

// Transition to CLOSED state if we were the last write or we have failed
if (last || failure != null)
Expand All @@ -273,7 +273,7 @@ private void onWriteComplete(boolean last, Throwable failure)
_closedCallback = null;
if (failure == null)
lockedReleaseBuffer();
wake = updateApiState(failure);
wake = lockedUpdateApiState(failure);
}
else if (_state == State.CLOSE)
{
Expand All @@ -285,13 +285,13 @@ else if (_state == State.CLOSE)
}
else
{
wake = updateApiState(null);
wake = lockedUpdateApiState(null);
}
}

if (LOG.isDebugEnabled())
LOG.debug("onWriteComplete({},{}) {}->{} c={} cb={} w={}",
last, failure, state, stateString(), BufferUtil.toDetailString(closeContent), closedCallback, wake, failure);
if (LOG.isDebugEnabled())
LOG.debug("onWriteComplete({},{}) {}->{} c={} cb={} w={}",
last, failure, state, lockedStateString(), BufferUtil.toDetailString(closeContent), closedCallback, wake, failure);
}

try
{
Expand All @@ -314,8 +314,10 @@ else if (closeContent != null)
}
}

private boolean updateApiState(Throwable failure)
private boolean lockedUpdateApiState(Throwable failure)
{
assert _channelState.isLockHeldByCurrentThread();

boolean wake = false;
switch (_apiState)
{
Expand All @@ -342,7 +344,7 @@ private boolean updateApiState(Throwable failure)
default:
if (_state == State.CLOSED)
break;
throw new IllegalStateException(stateString());
throw new IllegalStateException(lockedStateString());
}
return wake;
}
Expand Down Expand Up @@ -465,10 +467,10 @@ public void complete(Callback callback)
break;
}
}
}

if (LOG.isDebugEnabled())
LOG.debug("complete({}) {} s={} e={}, c={}", callback, stateString(), succeeded, error, BufferUtil.toDetailString(content));
if (LOG.isDebugEnabled())
LOG.debug("complete({}) {} s={} e={}, c={}", callback, lockedStateString(), succeeded, error, BufferUtil.toDetailString(content));
}

if (succeeded)
{
Expand Down Expand Up @@ -501,6 +503,7 @@ public void completed(Throwable ignored)
@Override
public void close() throws IOException
{
RetainableByteBuffer aggregate = null;
ByteBuffer content = null;
Blocker.Callback blocker = null;
try (AutoLock ignored = _channelState.lock())
Expand Down Expand Up @@ -549,7 +552,16 @@ public void close() throws IOException
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
blocker = _writeBlocker.callback();
content = _aggregate != null && _aggregate.hasRemaining() ? _aggregate.getByteBuffer() : BufferUtil.EMPTY_BUFFER;
aggregate = _aggregate;
if (aggregate != null && _aggregate.hasRemaining())
{
aggregate.retain();
content = aggregate.getByteBuffer();
}
else
{
content = BufferUtil.EMPTY_BUFFER;
}
break;

case BLOCKED:
Expand All @@ -567,7 +579,16 @@ public void close() throws IOException
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = _aggregate != null && _aggregate.hasRemaining() ? _aggregate.getByteBuffer() : BufferUtil.EMPTY_BUFFER;
aggregate = _aggregate;
if (aggregate != null && _aggregate.hasRemaining())
{
aggregate.retain();
content = aggregate.getByteBuffer();
}
else
{
content = BufferUtil.EMPTY_BUFFER;
}
break;

case UNREADY:
Expand All @@ -580,10 +601,10 @@ public void close() throws IOException
}
break;
}
}

if (LOG.isDebugEnabled())
LOG.debug("close() {} c={} b={}", stateString(), BufferUtil.toDetailString(content), blocker);
if (LOG.isDebugEnabled())
LOG.debug("close() {} c={} b={}", lockedStateString(), BufferUtil.toDetailString(content), blocker);
}

if (content == null)
{
Expand All @@ -602,7 +623,10 @@ public void close() throws IOException
if (blocker == null)
{
// Do an async close
channelWrite(content, true, new WriteCompleteCB());
Callback callback = new WriteCompleteCB();
if (aggregate != null)
callback = Callback.from(callback, aggregate::release);
channelWrite(content, true, callback);
}
else
{
Expand All @@ -611,6 +635,8 @@ public void close() throws IOException
{
channelWrite(content, true, blocker);
b.block();
if (aggregate != null)
aggregate.release();
onWriteComplete(true, null);
}
catch (Throwable t)
Expand Down Expand Up @@ -701,7 +727,7 @@ public void flush() throws IOException

case ASYNC:
case PENDING:
throw new IllegalStateException("isReady() not called: " + stateString());
throw new IllegalStateException("isReady() not called: " + lockedStateString());

case READY:
_apiState = ApiState.PENDING;
Expand All @@ -711,7 +737,7 @@ public void flush() throws IOException
throw new WritePendingException();

default:
throw new IllegalStateException(stateString());
throw new IllegalStateException(lockedStateString());
}
}
}
Expand Down Expand Up @@ -795,7 +821,7 @@ public void write(byte[] b, int off, int len) throws IOException
break;

case ASYNC:
throw new IllegalStateException("isReady() not called: " + stateString());
throw new IllegalStateException("isReady() not called: " + lockedStateString());

case READY:
async = true;
Expand All @@ -807,7 +833,7 @@ public void write(byte[] b, int off, int len) throws IOException
throw new WritePendingException();

default:
throw new IllegalStateException(stateString());
throw new IllegalStateException(lockedStateString());
}

_written = written;
Expand All @@ -823,19 +849,19 @@ public void write(byte[] b, int off, int len) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("write(array) {} aggregated !flush {}",
stateString(), _aggregate);
lockedStateString(), _aggregate);
return;
}

// adjust offset/length
off += filled;
len -= filled;
}
}

if (LOG.isDebugEnabled())
LOG.debug("write(array) {} last={} agg={} flush=true async={}, len={} {}",
stateString(), last, aggregate, async, len, _aggregate);
if (LOG.isDebugEnabled())
LOG.debug("write(array) {} last={} agg={} flush=true async={}, len={} {}",
lockedStateString(), last, aggregate, async, len, _aggregate);
}

if (async)
{
Expand Down Expand Up @@ -928,7 +954,7 @@ public void write(ByteBuffer buffer) throws IOException
break;

case ASYNC:
throw new IllegalStateException("isReady() not called: " + stateString());
throw new IllegalStateException("isReady() not called: " + lockedStateString());

case READY:
async = true;
Expand All @@ -940,7 +966,7 @@ public void write(ByteBuffer buffer) throws IOException
throw new WritePendingException();

default:
throw new IllegalStateException(stateString());
throw new IllegalStateException(lockedStateString());
}
_written = written;
}
Expand Down Expand Up @@ -1010,7 +1036,7 @@ public void write(int b) throws IOException
break;

case ASYNC:
throw new IllegalStateException("isReady() not called: " + stateString());
throw new IllegalStateException("isReady() not called: " + lockedStateString());

case READY:
async = true;
Expand All @@ -1022,7 +1048,7 @@ public void write(int b) throws IOException
throw new WritePendingException();

default:
throw new IllegalStateException(stateString());
throw new IllegalStateException(lockedStateString());
}
_written = written;

Expand Down Expand Up @@ -1282,7 +1308,7 @@ private boolean prepareSendContent(int len, Callback callback)
}

if (_apiState != ApiState.BLOCKING)
throw new IllegalStateException(stateString());
throw new IllegalStateException(lockedStateString());
_apiState = ApiState.PENDING;
if (len > 0)
_written += len;
Expand Down Expand Up @@ -1337,13 +1363,13 @@ public void resetBuffer()
@Override
public void setWriteListener(WriteListener writeListener)
{
if (!_servletChannel.getServletRequestState().isAsync())
throw new IllegalStateException("!ASYNC: " + stateString());
boolean wake;
try (AutoLock ignored = _channelState.lock())
{
if (!_servletChannel.getServletRequestState().isAsync())
throw new IllegalStateException("!ASYNC: " + lockedStateString());
if (_apiState != ApiState.BLOCKING)
throw new IllegalStateException("!OPEN" + stateString());
throw new IllegalStateException("!OPEN" + lockedStateString());
_apiState = ApiState.READY;
_writeListener = writeListener;
wake = _servletChannel.getServletRequestState().onWritePossible();
Expand Down Expand Up @@ -1422,17 +1448,24 @@ public void writeCallback()
}
}

private String stateString()
private String lockedStateString()
{
assert _channelState.isLockHeldByCurrentThread();
return unsafeStateString();
}

private String unsafeStateString()
{
return String.format("s=%s,api=%s,sc=%b,e=%s", _state, _apiState, _softClose, _onError);
return String.format("s=%s,api=%s,sc=%b,e=%s,wb=%s", _state, _apiState, _softClose, _onError, _writeBlocker);
}

@Override
public String toString()
{
try (AutoLock ignored = _channelState.lock())
try (AutoLock lock = _channelState.tryLock())
{
return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), stateString());
boolean held = lock.isHeldByCurrentThread();
return String.format("%s@%x{%s%s}", this.getClass().getSimpleName(), hashCode(), held ? "" : "?:", unsafeStateString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1200,8 +1200,15 @@ public ServletInputStream getInputStream() throws IOException
if (_inputState != ServletContextRequest.INPUT_NONE && _inputState != ServletContextRequest.INPUT_STREAM)
throw new IllegalStateException("READER");
_inputState = ServletContextRequest.INPUT_STREAM;
// Try to write a 100 continue, ignoring failure result if it was not necessary.
_servletChannel.getResponse().writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY);
try
{
// Try to write a 100 continue, ignoring failure result if it was not necessary.
_servletChannel.getResponse().writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY);
}
catch (IllegalStateException ise)
{
throw new IOException(ise);
}
return getServletRequestInfo().getHttpInput();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ AutoLock lock()
return _lock.lock();
}

AutoLock tryLock()
{
return _lock.tryLock();
}

boolean isLockHeldByCurrentThread()
{
return _lock.isHeldByCurrentThread();
Expand Down
Loading
Loading