diff --git a/config/image-cache.js b/config/image-cache.js index 6497e885..d9d6f100 100755 --- a/config/image-cache.js +++ b/config/image-cache.js @@ -34,7 +34,7 @@ const IMAGE_RESIZE_WIDTH = parseInt(process.env['IMAGE_CACHE_RESIZE_WIDTH'] ?? ' const GCS_BUCKET_NAME = process.env['IMAGE_CACHE_GCS_BUCKET_NAME']; const GCS_OBJECT_NAME_PREFIX = process.env['IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX']; const CDN_BASE_PATH = process.env['IMAGE_CACHE_CDN_BASE_PATH']; -const TIMEOUT = parseInt(process.env['METADATA_FETCH_TIMEOUT_MS'] ?? '30'); +const TIMEOUT = parseInt(process.env['METADATA_FETCH_TIMEOUT_MS'] ?? '30000'); const MAX_REDIRECTIONS = parseInt(process.env['METADATA_FETCH_MAX_REDIRECTIONS'] ?? '0'); const MAX_RESPONSE_SIZE = parseInt(process.env['IMAGE_CACHE_MAX_BYTE_SIZE'] ?? '-1'); @@ -47,35 +47,29 @@ async function getGcsAuthToken() { { method: 'GET', headers: { 'Metadata-Flavor': 'Google' }, + throwOnError: true, } ); const json = await response.body.json(); - if (response.statusCode === 200 && json.access_token) { - // Cache the token so we can reuse it for other images. - process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = json.access_token; - return json.access_token; - } - throw new Error(`GCS access token not found ${response.statusCode}: ${json}`); + // Cache the token so we can reuse it for other images. + process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = json.access_token; + return json.access_token; } catch (error) { - throw new Error(`Error fetching GCS access token: ${error.message}`); + throw new Error(`GCS access token error: ${error}`); } } async function upload(stream, name, authToken) { - try { - const response = await request( - `https://storage.googleapis.com/upload/storage/v1/b/${GCS_BUCKET_NAME}/o?uploadType=media&name=${GCS_OBJECT_NAME_PREFIX}${name}`, - { - method: 'POST', - body: stream, - headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` }, - } - ); - if (response.statusCode !== 200) throw new Error(`GCS error: ${response.statusCode}`); - return `${CDN_BASE_PATH}${name}`; - } catch (error) { - throw new Error(`Error uploading ${name}: ${error.message}`); - } + await request( + `https://storage.googleapis.com/upload/storage/v1/b/${GCS_BUCKET_NAME}/o?uploadType=media&name=${GCS_OBJECT_NAME_PREFIX}${name}`, + { + method: 'POST', + body: stream, + headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` }, + throwOnError: true, + } + ); + return `${CDN_BASE_PATH}${name}`; } fetch( @@ -86,15 +80,13 @@ fetch( bodyTimeout: TIMEOUT, maxRedirections: MAX_REDIRECTIONS, maxResponseSize: MAX_RESPONSE_SIZE, + throwOnError: true, connect: { rejectUnauthorized: false, // Ignore SSL cert errors. }, }), }, - ({ statusCode, body }) => { - if (statusCode !== 200) throw new Error(`Failed to fetch image: ${statusCode}`); - return body; - } + ({ body }) => body ) .then(async response => { const imageReadStream = Readable.fromWeb(response.body); @@ -115,15 +107,16 @@ fetch( upload(fullSizeTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}.png`, authToken), upload(thumbnailTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}-thumb.png`, authToken), ]); - // The API will read these strings as CDN URLs. - for (const result of results) console.log(result); + for (const r of results) console.log(r); break; } catch (error) { if ( - (error.message.endsWith('403') || error.message.endsWith('401')) && - !didRetryUnauthorized + !didRetryUnauthorized && + error.cause && + error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' && + (error.cause.statusCode === 401 || error.cause.statusCode === 403) ) { - // Force a dynamic token refresh and try again. + // GCS token is probably expired. Force a token refresh before trying again. process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = undefined; didRetryUnauthorized = true; } else throw error; @@ -131,5 +124,24 @@ fetch( } }) .catch(error => { - throw new Error(`Error fetching image: ${error}`); + console.error(error); + // TODO: Handle `Input buffer contains unsupported image format` error from sharp when the image + // is actually a video or another media file. + let exitCode = 1; + if ( + error.cause && + (error.cause.code == 'UND_ERR_HEADERS_TIMEOUT' || + error.cause.code == 'UND_ERR_BODY_TIMEOUT' || + error.cause.code == 'UND_ERR_CONNECT_TIMEOUT' || + error.cause.code == 'ECONNRESET') + ) { + exitCode = 2; + } else if ( + error.cause && + error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' && + error.cause.statusCode === 429 + ) { + exitCode = 3; + } + process.exit(exitCode); }); diff --git a/src/token-processor/queue/job/process-token-job.ts b/src/token-processor/queue/job/process-token-job.ts index 4e190e13..cdaba8dd 100644 --- a/src/token-processor/queue/job/process-token-job.ts +++ b/src/token-processor/queue/job/process-token-job.ts @@ -16,7 +16,7 @@ import { StacksNodeRpcClient } from '../../stacks-node/stacks-node-rpc-client'; import { StacksNodeClarityError, TooManyRequestsHttpError } from '../../util/errors'; import { fetchAllMetadataLocalesFromBaseUri, - getFetchableUrl, + getFetchableDecentralizedStorageUrl, getTokenSpecificUri, } from '../../util/metadata-helpers'; import { RetryableJobError } from '../errors'; @@ -214,7 +214,7 @@ export class ProcessTokenJob extends Job { return; } // Before we return the uri, check if its fetchable hostname is not already rate limited. - const fetchable = getFetchableUrl(uri); + const fetchable = getFetchableDecentralizedStorageUrl(uri); const rateLimitedHost = await this.db.getRateLimitedHost({ hostname: fetchable.hostname }); if (rateLimitedHost) { const retryAfter = Date.parse(rateLimitedHost.retry_after); diff --git a/src/token-processor/util/image-cache.ts b/src/token-processor/util/image-cache.ts index 9ad70e42..1c11aac9 100644 --- a/src/token-processor/util/image-cache.ts +++ b/src/token-processor/util/image-cache.ts @@ -1,33 +1,81 @@ import * as child_process from 'child_process'; import { ENV } from '../../env'; -import { MetadataParseError } from './errors'; -import { parseDataUrl, getFetchableUrl } from './metadata-helpers'; +import { MetadataParseError, MetadataTimeoutError, TooManyRequestsHttpError } from './errors'; +import { parseDataUrl, getFetchableDecentralizedStorageUrl } from './metadata-helpers'; import { logger } from '@hirosystems/api-toolkit'; import { PgStore } from '../../pg/pg-store'; +import { errors } from 'undici'; +import { RetryableJobError } from '../queue/errors'; /** - * If an external image processor script is configured, then it will process the given image URL for - * the purpose of caching on a CDN (or whatever else it may be created to do). The script is - * expected to return a new URL for the image. If the script is not configured, then the original - * URL is returned immediately. If a data-uri is passed, it is also immediately returned without - * being passed to the script. + * If an external image processor script is configured in the `METADATA_IMAGE_CACHE_PROCESSOR` ENV + * var, this function will process the given image URL for the purpose of caching on a CDN (or + * whatever else it may be created to do). The script is expected to return a new URL for the image + * via `stdout`, with an optional 2nd line with another URL for a thumbnail version of the same + * cached image. If the script is not configured, then the original URL is returned immediately. If + * a data-uri is passed, it is also immediately returned without being passed to the script. + * + * The Image Cache script must return a status code of `0` to mark a successful cache. Other code + * returns available are: + * * `1`: A generic error occurred. Cache should not be retried. + * * `2`: Image fetch timed out before caching was possible. Should be retried. + * * `3`: Image fetch failed due to rate limits from the remote server. Should be retried. */ -export async function processImageUrl( +export async function processImageCache( imgUrl: string, contractPrincipal: string, tokenNumber: bigint ): Promise { const imageCacheProcessor = ENV.METADATA_IMAGE_CACHE_PROCESSOR; - if (!imageCacheProcessor) { - return [imgUrl]; - } - if (imgUrl.startsWith('data:')) { - return [imgUrl]; + if (!imageCacheProcessor || imgUrl.startsWith('data:')) return [imgUrl]; + logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`); + const { code, stdout, stderr } = await callImageCacheScript( + imageCacheProcessor, + imgUrl, + contractPrincipal, + tokenNumber + ); + switch (code) { + case 0: + try { + const urls = stdout + .trim() + .split('\n') + .map(r => new URL(r).toString()); + logger.info(urls, `ImageCache processed token ${contractPrincipal} (${tokenNumber})`); + return urls; + } catch (error) { + // The script returned a code `0` but the results are invalid. This could happen because of + // an unknown script error so we should mark it as retryable. + throw new RetryableJobError( + `ImageCache unknown error`, + new Error(`Invalid cached url for ${imgUrl}: ${stdout}, stderr: ${stderr}`) + ); + } + case 2: + throw new RetryableJobError(`ImageCache fetch timed out`, new MetadataTimeoutError(imgUrl)); + case 3: + throw new RetryableJobError( + `ImageCache fetch rate limited`, + new TooManyRequestsHttpError(new URL(imgUrl), new errors.ResponseStatusCodeError()) + ); + default: + throw new Error(`ImageCache script error (code ${code}): ${stderr}`); } +} - logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`); +async function callImageCacheScript( + imageCacheProcessor: string, + imgUrl: string, + contractPrincipal: string, + tokenNumber: bigint +): Promise<{ + code: number; + stdout: string; + stderr: string; +}> { const repoDir = process.cwd(); - const { code, stdout, stderr } = await new Promise<{ + return await new Promise<{ code: number; stdout: string; stderr: string; @@ -37,26 +85,24 @@ export async function processImageUrl( [imgUrl, contractPrincipal, tokenNumber.toString()], { cwd: repoDir } ); + let code = 0; let stdout = ''; let stderr = ''; cp.stdout.on('data', data => (stdout += data)); cp.stderr.on('data', data => (stderr += data)); - cp.on('close', code => resolve({ code: code ?? 0, stdout, stderr })); + cp.on('close', _ => resolve({ code, stdout, stderr })); + cp.on('exit', processCode => { + code = processCode ?? 0; + }); }); - if (code !== 0) throw new Error(`ImageCache error: ${stderr}`); - const result = stdout.trim().split('\n'); - logger.info(result, `ImageCache processed token ${contractPrincipal} (${tokenNumber})`); - - try { - return result.map(r => new URL(r).toString()); - } catch (error) { - throw new Error( - `Image processing script returned an invalid url for ${imgUrl}: ${result}, stderr: ${stderr}` - ); - } } -export function getImageUrl(uri: string): string { +/** + * Converts a raw image URI from metadata into a fetchable URL. + * @param uri - Original image URI + * @returns Normalized URL string + */ +export function normalizeImageUri(uri: string): string { // Support images embedded in a Data URL if (uri.startsWith('data:')) { const dataUrl = parseDataUrl(uri); @@ -68,7 +114,7 @@ export function getImageUrl(uri: string): string { } return uri; } - const fetchableUrl = getFetchableUrl(uri); + const fetchableUrl = getFetchableDecentralizedStorageUrl(uri); return fetchableUrl.toString(); } @@ -81,8 +127,8 @@ export async function reprocessTokenImageCache( const imageUris = await db.getTokenImageUris(contractPrincipal, tokenIds); for (const token of imageUris) { try { - const [cached, thumbnail] = await processImageUrl( - getFetchableUrl(token.image).toString(), + const [cached, thumbnail] = await processImageCache( + getFetchableDecentralizedStorageUrl(token.image).toString(), contractPrincipal, BigInt(token.token_number) ); diff --git a/src/token-processor/util/metadata-helpers.ts b/src/token-processor/util/metadata-helpers.ts index c53548e8..8bfbd37f 100644 --- a/src/token-processor/util/metadata-helpers.ts +++ b/src/token-processor/util/metadata-helpers.ts @@ -18,7 +18,7 @@ import { TooManyRequestsHttpError, } from './errors'; import { RetryableJobError } from '../queue/errors'; -import { getImageUrl, processImageUrl } from './image-cache'; +import { normalizeImageUri, processImageCache } from './image-cache'; import { RawMetadataLocale, RawMetadataLocalizationCType, @@ -134,8 +134,8 @@ async function parseMetadataForInsertion( let cachedImage: string | undefined; let cachedThumbnailImage: string | undefined; if (image && typeof image === 'string') { - const normalizedUrl = getImageUrl(image); - [cachedImage, cachedThumbnailImage] = await processImageUrl( + const normalizedUrl = normalizeImageUri(image); + [cachedImage, cachedThumbnailImage] = await processImageCache( normalizedUrl, contract.principal, token.token_number @@ -255,7 +255,7 @@ export async function getMetadataFromUri(token_uri: string): Promise { const contract = 'SP3QSAJQ4EA8WXEDSRRKMZZ29NH91VZ6C5X88FGZQ.crashpunks-v2'; @@ -11,7 +11,7 @@ describe('Image cache', () => { }); test('transforms image URL correctly', async () => { - const transformed = await processImageUrl(url, contract, tokenNumber); + const transformed = await processImageCache(url, contract, tokenNumber); expect(transformed).toStrictEqual([ 'http://cloudflare-ipfs.com/test/image.png?processed=true', 'http://cloudflare-ipfs.com/test/image.png?processed=true&thumb=true', @@ -20,18 +20,20 @@ describe('Image cache', () => { test('ignores data: URL', async () => { const url = 'data:123456'; - const transformed = await processImageUrl(url, contract, tokenNumber); + const transformed = await processImageCache(url, contract, tokenNumber); expect(transformed).toStrictEqual(['data:123456']); }); test('ignores empty script paths', async () => { ENV.METADATA_IMAGE_CACHE_PROCESSOR = ''; - const transformed = await processImageUrl(url, contract, tokenNumber); + const transformed = await processImageCache(url, contract, tokenNumber); expect(transformed).toStrictEqual([url]); }); test('handles script errors', async () => { ENV.METADATA_IMAGE_CACHE_PROCESSOR = './tests/test-image-cache-error.js'; - await expect(processImageUrl(url, contract, tokenNumber)).rejects.toThrow(/ImageCache error/); + await expect(processImageCache(url, contract, tokenNumber)).rejects.toThrow( + /ImageCache script error/ + ); }); }); diff --git a/tests/metadata-helpers.test.ts b/tests/metadata-helpers.test.ts index 5edb9912..dc6ec47d 100644 --- a/tests/metadata-helpers.test.ts +++ b/tests/metadata-helpers.test.ts @@ -7,7 +7,7 @@ import { MetadataTimeoutError, } from '../src/token-processor/util/errors'; import { - getFetchableUrl, + getFetchableDecentralizedStorageUrl, getMetadataFromUri, getTokenSpecificUri, fetchMetadata, @@ -187,16 +187,16 @@ describe('Metadata Helpers', () => { ENV.PUBLIC_GATEWAY_IPFS = 'https://cloudflare-ipfs.com'; ENV.PUBLIC_GATEWAY_ARWEAVE = 'https://arweave.net'; const arweave = 'ar://II4z2ziYyqG7-kWDa98lWGfjxRdYOx9Zdld9P_I_kzE/9731.json'; - expect(getFetchableUrl(arweave).toString()).toBe( + expect(getFetchableDecentralizedStorageUrl(arweave).toString()).toBe( 'https://arweave.net/II4z2ziYyqG7-kWDa98lWGfjxRdYOx9Zdld9P_I_kzE/9731.json' ); const ipfs = 'ipfs://ipfs/bafybeifwoqwdhs5djtx6vopvuwfcdrqeuecayp5wzpzjylxycejnhtrhgu/vague_art_paintings/vague_art_paintings_6_metadata.json'; - expect(getFetchableUrl(ipfs).toString()).toBe( + expect(getFetchableDecentralizedStorageUrl(ipfs).toString()).toBe( 'https://cloudflare-ipfs.com/ipfs/bafybeifwoqwdhs5djtx6vopvuwfcdrqeuecayp5wzpzjylxycejnhtrhgu/vague_art_paintings/vague_art_paintings_6_metadata.json' ); const ipfs2 = 'ipfs://QmYCnfeseno5cLpC75rmy6LQhsNYQCJabiuwqNUXMaA3Fo/1145.png'; - expect(getFetchableUrl(ipfs2).toString()).toBe( + expect(getFetchableDecentralizedStorageUrl(ipfs2).toString()).toBe( 'https://cloudflare-ipfs.com/ipfs/QmYCnfeseno5cLpC75rmy6LQhsNYQCJabiuwqNUXMaA3Fo/1145.png' ); }); diff --git a/tests/test-image-cache-error.js b/tests/test-image-cache-error.js index 4ab0feb6..878b3073 100755 --- a/tests/test-image-cache-error.js +++ b/tests/test-image-cache-error.js @@ -1,3 +1,3 @@ #!/usr/bin/env node console.error('Test error'); -throw new Error('Test'); +process.exit(1);