Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10374. Make container scanner generate merkle trees during the scan #7490

Open
wants to merge 17 commits into
base: HDDS-10239-container-reconciliation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,16 @@ public void stop() {
* The data merkle tree within the file is replaced with the {@code tree} parameter, but all other content of the
* file remains unchanged.
* Concurrent writes to the same file are coordinated internally.
* This method also updates the container's data checksum in the {@code data} parameter, which will be seen by SCM
* on container reports.
*/
public void writeContainerDataTree(ContainerData data, ContainerMerkleTree tree) throws IOException {
long containerID = data.getContainerID();
Lock writeLock = getLock(containerID);
writeLock.lock();
// If there is an error generating the tree and we cannot obtain a final checksum, use 0 to indicate a metadata
// failure.
long dataChecksum = 0;
try {
ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
try {
Expand All @@ -99,12 +104,18 @@ public void writeContainerDataTree(ContainerData data, ContainerMerkleTree tree)
checksumInfoBuilder = ContainerProtos.ContainerChecksumInfo.newBuilder();
}

ContainerProtos.ContainerMerkleTree treeProto = captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
tree::toProto);
checksumInfoBuilder
.setContainerID(containerID)
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(), tree::toProto));
.setContainerMerkleTree(treeProto);
write(data, checksumInfoBuilder.build());
LOG.debug("Data merkle tree for container {} updated", containerID);
// If write succeeds, update the checksum in memory. Otherwise 0 will be used to indicate the metadata failure.
dataChecksum = treeProto.getDataChecksum();
LOG.debug("Data merkle tree for container {} updated with container checksum {}", containerID, dataChecksum);
} finally {
// Even if persisting the tree fails, we should still update the data checksum in memory to report back to SCM.
data.setDataChecksum(dataChecksum);
writeLock.unlock();
}
}
Expand Down Expand Up @@ -384,7 +395,7 @@ public ContainerMerkleTreeMetrics getMetrics() {
return this.metrics;
}

public static boolean checksumFileExist(Container container) {
public static boolean checksumFileExist(Container<?> container) {
File checksumFile = getContainerChecksumFile(container.getContainerData());
return checksumFile.exists();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;

Expand Down Expand Up @@ -56,10 +55,22 @@ public ContainerMerkleTree() {
* If the block entry already exists, the chunks will be added to the existing chunks for that block.
*
* @param blockID The ID of the block that these chunks belong to.
* @param healthy True if there were no errors detected with these chunks. False indicates that all the chunks
* being added had errors.
* @param chunks A list of chunks to add to this block. The chunks will be sorted internally by their offset.
*/
public void addChunks(long blockID, Collection<ContainerProtos.ChunkInfo> chunks) {
id2Block.computeIfAbsent(blockID, BlockMerkleTree::new).addChunks(chunks);
public void addChunks(long blockID, boolean healthy, ContainerProtos.ChunkInfo... chunks) {
id2Block.computeIfAbsent(blockID, BlockMerkleTree::new).addChunks(healthy, chunks);
}

/**
* Adds an empty block to the tree. This method is not a pre-requisite to {@code addChunks}.
* If the block entry already exists, it will not be modified.
*
* @param blockID The ID of the empty block to add to the tree
*/
public void addBlock(long blockID) {
addChunks(blockID, true);
}

/**
Expand Down Expand Up @@ -106,11 +117,13 @@ private static class BlockMerkleTree {
* Adds the specified chunks to this block. The offset value of the chunk must be unique within the block,
* otherwise it will overwrite the previous value at that offset.
*
* @param healthy True if there were no errors detected with these chunks. False indicates that all the chunks
* being added had errors.
* @param chunks A list of chunks to add to this block.
*/
public void addChunks(Collection<ContainerProtos.ChunkInfo> chunks) {
public void addChunks(boolean healthy, ContainerProtos.ChunkInfo... chunks) {
for (ContainerProtos.ChunkInfo chunk: chunks) {
offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTree(chunk));
offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTree(chunk, healthy));
}
}

Expand Down Expand Up @@ -151,11 +164,12 @@ public ContainerProtos.BlockMerkleTree toProto() {
* This class computes one checksum for the whole chunk by aggregating these.
*/
private static class ChunkMerkleTree {
private ContainerProtos.ChunkInfo chunk;
private boolean isHealthy = true;
private final ContainerProtos.ChunkInfo chunk;
private final boolean isHealthy;

ChunkMerkleTree(ContainerProtos.ChunkInfo chunk) {
ChunkMerkleTree(ContainerProtos.ChunkInfo chunk, boolean healthy) {
this.chunk = chunk;
this.isHealthy = healthy;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ public DataScanResult fullCheck(DataTransferThrottler throttler, Canceler cancel

LOG.debug("Running data checks for container {}", containerID);
try {
// TODO HDDS-10374 this tree will get updated with the container's contents as it is scanned.
ContainerMerkleTree dataTree = new ContainerMerkleTree();
List<ContainerScanError> dataErrors = scanData(dataTree, throttler, canceler);
if (containerIsDeleted()) {
Expand Down Expand Up @@ -379,12 +378,13 @@ private List<ContainerScanError> scanBlock(DBHandle db, File dbFile, BlockData b
// So, we need to make sure, chunk length > 0, before declaring
// the missing chunk file.
if (!block.getChunks().isEmpty() && block.getChunks().get(0).getLen() > 0) {
ContainerScanError error = new ContainerScanError(FailureType.MISSING_CHUNK_FILE,
ContainerScanError error = new ContainerScanError(FailureType.MISSING_DATA_FILE,
new File(containerDataFromDisk.getChunksPath()), new IOException("Missing chunk file " +
chunkFile.getAbsolutePath()));
blockErrors.add(error);
}
} else if (chunk.getChecksumData().getType() != ContainerProtos.ChecksumType.NONE) {
currentTree.addBlock(block.getBlockID().getLocalID());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not add the block when we add the first chunk?

int bytesPerChecksum = chunk.getChecksumData().getBytesPerChecksum();
ByteBuffer buffer = BUFFER_POOL.getBuffer(bytesPerChecksum);
// Keep scanning the block even if there are errors with individual chunks.
Expand Down Expand Up @@ -422,6 +422,14 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,

List<ContainerScanError> scanErrors = new ArrayList<>();

// Information used to populate the merkle tree. Chunk metadata will be the same, but we must fill in the
// checksums with what we actually observe.
ContainerProtos.ChunkInfo.Builder observedChunkBuilder = chunk.toBuilder();
ContainerProtos.ChecksumData.Builder observedChecksumData = chunk.getChecksumData().toBuilder();
observedChecksumData.clearChecksums();
boolean chunkHealthy = true;
boolean chunkMissing = false;

ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunk.getChecksumData());
int checksumCount = checksumData.getChecksums().size();
Expand All @@ -434,10 +442,7 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,
if (layout == ContainerLayoutVersion.FILE_PER_BLOCK) {
channel.position(chunk.getOffset());
}
// Only report one error per chunk. Reporting corruption at every "bytes per checksum" interval will lead to a
// large amount of errors when a full chunk is corrupted.
boolean chunkHealthy = true;
for (int i = 0; i < checksumCount && chunkHealthy; i++) {
for (int i = 0; i < checksumCount; i++) {
// limit last read for FILE_PER_BLOCK, to avoid reading next chunk
if (layout == ContainerLayoutVersion.FILE_PER_BLOCK &&
i == checksumCount - 1 &&
Expand All @@ -457,7 +462,11 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,
ByteString expected = checksumData.getChecksums().get(i);
ByteString actual = cal.computeChecksum(buffer)
.getChecksums().get(0);
if (!expected.equals(actual)) {
observedChecksumData.addChecksums(actual);
// Only report one error per chunk. Reporting corruption at every "bytes per checksum" interval will lead to a
// large amount of errors when a full chunk is corrupted.
// Continue scanning the chunk even after the first error so the full merkle tree can be built.
if (chunkHealthy && !expected.equals(actual)) {
String message = String
.format("Inconsistent read for chunk=%s" +
" checksum item %d" +
Expand All @@ -469,26 +478,46 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,
StringUtils.bytes2Hex(expected.asReadOnlyByteBuffer()),
StringUtils.bytes2Hex(actual.asReadOnlyByteBuffer()),
block.getBlockID());
chunkHealthy = false;
scanErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK, chunkFile,
new OzoneChecksumException(message)));
chunkHealthy = false;
}
}
// If all the checksums match, also check that the length stored in the metadata matches the number of bytes
// seen on the disk.

observedChunkBuilder.setLen(bytesRead);
// If we haven't seen any errors after scanning the whole chunk, verify that the length stored in the metadata
// matches the number of bytes seen on the disk.
if (chunkHealthy && bytesRead != chunk.getLen()) {
String message = String
.format("Inconsistent read for chunk=%s expected length=%d"
+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID());
scanErrors.add(new ContainerScanError(FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile,
new IOException(message)));
if (bytesRead == 0) {
// If we could not find any data for the chunk, report it as missing.
chunkMissing = true;
chunkHealthy = false;
String message = String.format("Missing chunk=%s with expected length=%d for block %s",
chunk.getChunkName(), chunk.getLen(), block.getBlockID());
scanErrors.add(new ContainerScanError(FailureType.MISSING_CHUNK, chunkFile, new IOException(message)));
} else {
// We found data for the chunk, but it was shorter than expected.
String message = String
.format("Inconsistent read for chunk=%s expected length=%d"
+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID());
chunkHealthy = false;
scanErrors.add(new ContainerScanError(FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile,
new IOException(message)));
}
}
} catch (IOException ex) {
scanErrors.add(new ContainerScanError(FailureType.MISSING_CHUNK_FILE, chunkFile, ex));
// An unknown error occurred trying to access the chunk. Report it as corrupted.
chunkHealthy = false;
scanErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK, chunkFile, ex));
}

// Missing chunks should not be added to the merkle tree.
if (!chunkMissing) {
observedChunkBuilder.setChecksumData(observedChecksumData);
currentTree.addChunks(block.getBlockID().getLocalID(), chunkHealthy, observedChunkBuilder.build());
}
return scanErrors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,15 @@ ContainerCommandResponseProto handleCloseContainer(
return getSuccessResponse(request);
}

/**
* Write the merkle tree for this container using the existing checksum metadata only. The data is not read or
* validated by this method, so it is expected to run quickly.
*
* If a checksum file already exists on the disk, this method will do nothing. The existing file would have either
* been made from the metadata or data itself so there is no need to recreate it from the metadata.
*
* @param container The container which will have a tree generated.
*/
private void createContainerMerkleTree(Container container) {
if (ContainerChecksumTreeManager.checksumFileExist(container)) {
return;
Expand All @@ -560,8 +569,9 @@ private void createContainerMerkleTree(Container container) {
getBlockIterator(containerData.getContainerID())) {
while (blockIterator.hasNext()) {
BlockData blockData = blockIterator.nextBlock();
List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
// All chunks are assumed to be healthy until the scanner inspects them to determine otherwise.
merkleTree.addChunks(blockData.getLocalID(), true,
blockData.getChunks().toArray(new ContainerProtos.ChunkInfo[0]));
}
}
checksumManager.writeContainerDataTree(containerData, merkleTree);
Expand Down Expand Up @@ -1348,7 +1358,6 @@ public void markContainerUnhealthy(Container container, ScanResult reason)
} finally {
container.writeUnlock();
}
createContainerMerkleTree(container);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a container moves from OPEN to UNHEALTHY state we should try to build a merkle tree with whatever data we have at the moment before the scanner builds the actual merkle tree. If for some reason (either metadata/data error) we are unable to build it, then we can log the exception and move.

// Even if the container file is corrupted/missing and the unhealthy
// update fails, the unhealthy state is kept in memory and sent to
// SCM. Write a corresponding entry to the container log as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ public void scanContainer(Container<?> c)
if (result.isDeleted()) {
LOG.debug("Container [{}] has been deleted during the data scan.", containerId);
} else {
// Merkle tree write failure should not abort the scanning process. Continue marking the scan as completed.
try {
checksumManager.writeContainerDataTree(containerData, result.getDataTree());
} catch (IOException ex) {
LOG.error("Failed to write container merkle tree for container {}", containerId, ex);
}

if (!result.isHealthy()) {
logUnhealthyScanResult(containerId, result, LOG);

Expand All @@ -103,17 +110,15 @@ public void scanContainer(Container<?> c)
metrics.incNumUnHealthyContainers();
}
}
checksumManager.writeContainerDataTree(containerData, result.getDataTree());
metrics.incNumContainersScanned();
}

// Even if the container was deleted, mark the scan as completed since we already logged it as starting.
Instant now = Instant.now();
logScanCompleted(containerData, now);

if (!result.isDeleted()) {
controller.updateDataScanTimestamp(containerId, now);
}
// Even if the container was deleted, mark the scan as completed since we already logged it as starting.
logScanCompleted(containerData, now);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ public enum FailureType {
MISSING_METADATA_DIR,
MISSING_CONTAINER_FILE,
MISSING_CHUNKS_DIR,
MISSING_CHUNK_FILE,
MISSING_DATA_FILE,
CORRUPT_CONTAINER_FILE,
CORRUPT_CHUNK,
MISSING_CHUNK,
INCONSISTENT_CHUNK_LENGTH,
INACCESSIBLE_DB,
WRITE_FAILURE,
Expand Down
Loading
Loading