Skip to content

Commit

Permalink
fix: improve image cache error handling (#214)
Browse files Browse the repository at this point in the history
* fix: handle cache script errors

* fix: tests

* fix: try fetch in a function

* fix: force timeouts to test error handling

* fix: use error codes in script

* fix: unauthorized error handling

* fix: return to old fetch type

* fix: handle exit code explicitly

* chore: try again with code exit

* fix: take code from error cause

* fix: generate retryable errors

* fix: remove explicit code zero

* chore: debug log error

* chore: debug without all

* chore: debug without all 2

* chore: debug more

* fix: prmise await

* chore: detailed logs

* fix: log response

* fix: logger

* fix: try with only one handler

* fix: add error handlers to streams

* fix: mark invalid out as retryable

* fix: validate if a cause exists

* fix: retry ECONNRESET

* chore: comment
  • Loading branch information
rafaelcr authored May 13, 2024
1 parent a3aad1d commit 115a745
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 82 deletions.
76 changes: 44 additions & 32 deletions config/image-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const IMAGE_RESIZE_WIDTH = parseInt(process.env['IMAGE_CACHE_RESIZE_WIDTH'] ?? '
const GCS_BUCKET_NAME = process.env['IMAGE_CACHE_GCS_BUCKET_NAME'];
const GCS_OBJECT_NAME_PREFIX = process.env['IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX'];
const CDN_BASE_PATH = process.env['IMAGE_CACHE_CDN_BASE_PATH'];
const TIMEOUT = parseInt(process.env['METADATA_FETCH_TIMEOUT_MS'] ?? '30');
const TIMEOUT = parseInt(process.env['METADATA_FETCH_TIMEOUT_MS'] ?? '30000');
const MAX_REDIRECTIONS = parseInt(process.env['METADATA_FETCH_MAX_REDIRECTIONS'] ?? '0');
const MAX_RESPONSE_SIZE = parseInt(process.env['IMAGE_CACHE_MAX_BYTE_SIZE'] ?? '-1');

Expand All @@ -47,35 +47,29 @@ async function getGcsAuthToken() {
{
method: 'GET',
headers: { 'Metadata-Flavor': 'Google' },
throwOnError: true,
}
);
const json = await response.body.json();
if (response.statusCode === 200 && json.access_token) {
// Cache the token so we can reuse it for other images.
process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = json.access_token;
return json.access_token;
}
throw new Error(`GCS access token not found ${response.statusCode}: ${json}`);
// Cache the token so we can reuse it for other images.
process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = json.access_token;
return json.access_token;
} catch (error) {
throw new Error(`Error fetching GCS access token: ${error.message}`);
throw new Error(`GCS access token error: ${error}`);
}
}

async function upload(stream, name, authToken) {
try {
const response = await request(
`https://storage.googleapis.com/upload/storage/v1/b/${GCS_BUCKET_NAME}/o?uploadType=media&name=${GCS_OBJECT_NAME_PREFIX}${name}`,
{
method: 'POST',
body: stream,
headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` },
}
);
if (response.statusCode !== 200) throw new Error(`GCS error: ${response.statusCode}`);
return `${CDN_BASE_PATH}${name}`;
} catch (error) {
throw new Error(`Error uploading ${name}: ${error.message}`);
}
await request(
`https://storage.googleapis.com/upload/storage/v1/b/${GCS_BUCKET_NAME}/o?uploadType=media&name=${GCS_OBJECT_NAME_PREFIX}${name}`,
{
method: 'POST',
body: stream,
headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` },
throwOnError: true,
}
);
return `${CDN_BASE_PATH}${name}`;
}

fetch(
Expand All @@ -86,15 +80,13 @@ fetch(
bodyTimeout: TIMEOUT,
maxRedirections: MAX_REDIRECTIONS,
maxResponseSize: MAX_RESPONSE_SIZE,
throwOnError: true,
connect: {
rejectUnauthorized: false, // Ignore SSL cert errors.
},
}),
},
({ statusCode, body }) => {
if (statusCode !== 200) throw new Error(`Failed to fetch image: ${statusCode}`);
return body;
}
({ body }) => body
)
.then(async response => {
const imageReadStream = Readable.fromWeb(response.body);
Expand All @@ -115,21 +107,41 @@ fetch(
upload(fullSizeTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}.png`, authToken),
upload(thumbnailTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}-thumb.png`, authToken),
]);
// The API will read these strings as CDN URLs.
for (const result of results) console.log(result);
for (const r of results) console.log(r);
break;
} catch (error) {
if (
(error.message.endsWith('403') || error.message.endsWith('401')) &&
!didRetryUnauthorized
!didRetryUnauthorized &&
error.cause &&
error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' &&
(error.cause.statusCode === 401 || error.cause.statusCode === 403)
) {
// Force a dynamic token refresh and try again.
// GCS token is probably expired. Force a token refresh before trying again.
process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = undefined;
didRetryUnauthorized = true;
} else throw error;
}
}
})
.catch(error => {
throw new Error(`Error fetching image: ${error}`);
console.error(error);
// TODO: Handle `Input buffer contains unsupported image format` error from sharp when the image
// is actually a video or another media file.
let exitCode = 1;
if (
error.cause &&
(error.cause.code == 'UND_ERR_HEADERS_TIMEOUT' ||
error.cause.code == 'UND_ERR_BODY_TIMEOUT' ||
error.cause.code == 'UND_ERR_CONNECT_TIMEOUT' ||
error.cause.code == 'ECONNRESET')
) {
exitCode = 2;
} else if (
error.cause &&
error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' &&
error.cause.statusCode === 429
) {
exitCode = 3;
}
process.exit(exitCode);
});
4 changes: 2 additions & 2 deletions src/token-processor/queue/job/process-token-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { StacksNodeRpcClient } from '../../stacks-node/stacks-node-rpc-client';
import { StacksNodeClarityError, TooManyRequestsHttpError } from '../../util/errors';
import {
fetchAllMetadataLocalesFromBaseUri,
getFetchableUrl,
getFetchableDecentralizedStorageUrl,
getTokenSpecificUri,
} from '../../util/metadata-helpers';
import { RetryableJobError } from '../errors';
Expand Down Expand Up @@ -214,7 +214,7 @@ export class ProcessTokenJob extends Job {
return;
}
// Before we return the uri, check if its fetchable hostname is not already rate limited.
const fetchable = getFetchableUrl(uri);
const fetchable = getFetchableDecentralizedStorageUrl(uri);
const rateLimitedHost = await this.db.getRateLimitedHost({ hostname: fetchable.hostname });
if (rateLimitedHost) {
const retryAfter = Date.parse(rateLimitedHost.retry_after);
Expand Down
108 changes: 77 additions & 31 deletions src/token-processor/util/image-cache.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,81 @@
import * as child_process from 'child_process';
import { ENV } from '../../env';
import { MetadataParseError } from './errors';
import { parseDataUrl, getFetchableUrl } from './metadata-helpers';
import { MetadataParseError, MetadataTimeoutError, TooManyRequestsHttpError } from './errors';
import { parseDataUrl, getFetchableDecentralizedStorageUrl } from './metadata-helpers';
import { logger } from '@hirosystems/api-toolkit';
import { PgStore } from '../../pg/pg-store';
import { errors } from 'undici';
import { RetryableJobError } from '../queue/errors';

/**
* If an external image processor script is configured, then it will process the given image URL for
* the purpose of caching on a CDN (or whatever else it may be created to do). The script is
* expected to return a new URL for the image. If the script is not configured, then the original
* URL is returned immediately. If a data-uri is passed, it is also immediately returned without
* being passed to the script.
* If an external image processor script is configured in the `METADATA_IMAGE_CACHE_PROCESSOR` ENV
* var, this function will process the given image URL for the purpose of caching on a CDN (or
* whatever else it may be created to do). The script is expected to return a new URL for the image
* via `stdout`, with an optional 2nd line with another URL for a thumbnail version of the same
* cached image. If the script is not configured, then the original URL is returned immediately. If
* a data-uri is passed, it is also immediately returned without being passed to the script.
*
* The Image Cache script must return a status code of `0` to mark a successful cache. Other code
* returns available are:
* * `1`: A generic error occurred. Cache should not be retried.
* * `2`: Image fetch timed out before caching was possible. Should be retried.
* * `3`: Image fetch failed due to rate limits from the remote server. Should be retried.
*/
export async function processImageUrl(
export async function processImageCache(
imgUrl: string,
contractPrincipal: string,
tokenNumber: bigint
): Promise<string[]> {
const imageCacheProcessor = ENV.METADATA_IMAGE_CACHE_PROCESSOR;
if (!imageCacheProcessor) {
return [imgUrl];
}
if (imgUrl.startsWith('data:')) {
return [imgUrl];
if (!imageCacheProcessor || imgUrl.startsWith('data:')) return [imgUrl];
logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`);
const { code, stdout, stderr } = await callImageCacheScript(
imageCacheProcessor,
imgUrl,
contractPrincipal,
tokenNumber
);
switch (code) {
case 0:
try {
const urls = stdout
.trim()
.split('\n')
.map(r => new URL(r).toString());
logger.info(urls, `ImageCache processed token ${contractPrincipal} (${tokenNumber})`);
return urls;
} catch (error) {
// The script returned a code `0` but the results are invalid. This could happen because of
// an unknown script error so we should mark it as retryable.
throw new RetryableJobError(
`ImageCache unknown error`,
new Error(`Invalid cached url for ${imgUrl}: ${stdout}, stderr: ${stderr}`)
);
}
case 2:
throw new RetryableJobError(`ImageCache fetch timed out`, new MetadataTimeoutError(imgUrl));
case 3:
throw new RetryableJobError(
`ImageCache fetch rate limited`,
new TooManyRequestsHttpError(new URL(imgUrl), new errors.ResponseStatusCodeError())
);
default:
throw new Error(`ImageCache script error (code ${code}): ${stderr}`);
}
}

logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`);
async function callImageCacheScript(
imageCacheProcessor: string,
imgUrl: string,
contractPrincipal: string,
tokenNumber: bigint
): Promise<{
code: number;
stdout: string;
stderr: string;
}> {
const repoDir = process.cwd();
const { code, stdout, stderr } = await new Promise<{
return await new Promise<{
code: number;
stdout: string;
stderr: string;
Expand All @@ -37,26 +85,24 @@ export async function processImageUrl(
[imgUrl, contractPrincipal, tokenNumber.toString()],
{ cwd: repoDir }
);
let code = 0;
let stdout = '';
let stderr = '';
cp.stdout.on('data', data => (stdout += data));
cp.stderr.on('data', data => (stderr += data));
cp.on('close', code => resolve({ code: code ?? 0, stdout, stderr }));
cp.on('close', _ => resolve({ code, stdout, stderr }));
cp.on('exit', processCode => {
code = processCode ?? 0;
});
});
if (code !== 0) throw new Error(`ImageCache error: ${stderr}`);
const result = stdout.trim().split('\n');
logger.info(result, `ImageCache processed token ${contractPrincipal} (${tokenNumber})`);

try {
return result.map(r => new URL(r).toString());
} catch (error) {
throw new Error(
`Image processing script returned an invalid url for ${imgUrl}: ${result}, stderr: ${stderr}`
);
}
}

export function getImageUrl(uri: string): string {
/**
* Converts a raw image URI from metadata into a fetchable URL.
* @param uri - Original image URI
* @returns Normalized URL string
*/
export function normalizeImageUri(uri: string): string {
// Support images embedded in a Data URL
if (uri.startsWith('data:')) {
const dataUrl = parseDataUrl(uri);
Expand All @@ -68,7 +114,7 @@ export function getImageUrl(uri: string): string {
}
return uri;
}
const fetchableUrl = getFetchableUrl(uri);
const fetchableUrl = getFetchableDecentralizedStorageUrl(uri);
return fetchableUrl.toString();
}

Expand All @@ -81,8 +127,8 @@ export async function reprocessTokenImageCache(
const imageUris = await db.getTokenImageUris(contractPrincipal, tokenIds);
for (const token of imageUris) {
try {
const [cached, thumbnail] = await processImageUrl(
getFetchableUrl(token.image).toString(),
const [cached, thumbnail] = await processImageCache(
getFetchableDecentralizedStorageUrl(token.image).toString(),
contractPrincipal,
BigInt(token.token_number)
);
Expand Down
14 changes: 7 additions & 7 deletions src/token-processor/util/metadata-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
TooManyRequestsHttpError,
} from './errors';
import { RetryableJobError } from '../queue/errors';
import { getImageUrl, processImageUrl } from './image-cache';
import { normalizeImageUri, processImageCache } from './image-cache';
import {
RawMetadataLocale,
RawMetadataLocalizationCType,
Expand Down Expand Up @@ -134,8 +134,8 @@ async function parseMetadataForInsertion(
let cachedImage: string | undefined;
let cachedThumbnailImage: string | undefined;
if (image && typeof image === 'string') {
const normalizedUrl = getImageUrl(image);
[cachedImage, cachedThumbnailImage] = await processImageUrl(
const normalizedUrl = normalizeImageUri(image);
[cachedImage, cachedThumbnailImage] = await processImageCache(
normalizedUrl,
contract.principal,
token.token_number
Expand Down Expand Up @@ -255,7 +255,7 @@ export async function getMetadataFromUri(token_uri: string): Promise<RawMetadata
}

// Support HTTP/S URLs otherwise
const httpUrl = getFetchableUrl(token_uri);
const httpUrl = getFetchableDecentralizedStorageUrl(token_uri);
const urlStr = httpUrl.toString();
let fetchImmediateRetryCount = 0;
let content: string | undefined;
Expand All @@ -270,7 +270,7 @@ export async function getMetadataFromUri(token_uri: string): Promise<RawMetadata
} catch (error) {
fetchImmediateRetryCount++;
fetchError = error;
if (error instanceof MetadataTimeoutError && isUriFromDecentralizedGateway(token_uri)) {
if (error instanceof MetadataTimeoutError && isUriFromDecentralizedStorage(token_uri)) {
// Gateways like IPFS and Arweave commonly time out when a resource can't be found quickly.
// Try again later if this is the case.
throw new RetryableJobError(`Gateway timeout for ${urlStr}`, error);
Expand Down Expand Up @@ -314,7 +314,7 @@ function parseJsonMetadata(url: string, content?: string): RawMetadata {
* @param uri - URL to convert
* @returns Fetchable URL
*/
export function getFetchableUrl(uri: string): URL {
export function getFetchableDecentralizedStorageUrl(uri: string): URL {
try {
const parsedUri = new URL(uri);
if (parsedUri.protocol === 'http:' || parsedUri.protocol === 'https:') return parsedUri;
Expand All @@ -334,7 +334,7 @@ export function getFetchableUrl(uri: string): URL {
throw new MetadataParseError(`Unsupported uri protocol: ${uri}`);
}

function isUriFromDecentralizedGateway(uri: string): boolean {
function isUriFromDecentralizedStorage(uri: string): boolean {
return (
uri.startsWith('ipfs:') ||
uri.startsWith('ipns:') ||
Expand Down
12 changes: 7 additions & 5 deletions tests/image-cache.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ENV } from '../src/env';
import { processImageUrl } from '../src/token-processor/util/image-cache';
import { processImageCache } from '../src/token-processor/util/image-cache';

describe('Image cache', () => {
const contract = 'SP3QSAJQ4EA8WXEDSRRKMZZ29NH91VZ6C5X88FGZQ.crashpunks-v2';
Expand All @@ -11,7 +11,7 @@ describe('Image cache', () => {
});

test('transforms image URL correctly', async () => {
const transformed = await processImageUrl(url, contract, tokenNumber);
const transformed = await processImageCache(url, contract, tokenNumber);
expect(transformed).toStrictEqual([
'http://cloudflare-ipfs.com/test/image.png?processed=true',
'http://cloudflare-ipfs.com/test/image.png?processed=true&thumb=true',
Expand All @@ -20,18 +20,20 @@ describe('Image cache', () => {

test('ignores data: URL', async () => {
const url = 'data:123456';
const transformed = await processImageUrl(url, contract, tokenNumber);
const transformed = await processImageCache(url, contract, tokenNumber);
expect(transformed).toStrictEqual(['data:123456']);
});

test('ignores empty script paths', async () => {
ENV.METADATA_IMAGE_CACHE_PROCESSOR = '';
const transformed = await processImageUrl(url, contract, tokenNumber);
const transformed = await processImageCache(url, contract, tokenNumber);
expect(transformed).toStrictEqual([url]);
});

test('handles script errors', async () => {
ENV.METADATA_IMAGE_CACHE_PROCESSOR = './tests/test-image-cache-error.js';
await expect(processImageUrl(url, contract, tokenNumber)).rejects.toThrow(/ImageCache error/);
await expect(processImageCache(url, contract, tokenNumber)).rejects.toThrow(
/ImageCache script error/
);
});
});
Loading

0 comments on commit 115a745

Please sign in to comment.