-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleaned up code and added better error handling and responses for invalid tables #32
Conversation
Would this be reasonable time to reevaluate processMessage (L505-688)? Theres a bit of copy paste logic that could be cleaned up. Perhaps move to a new file src/process.tsclass defs not shown! see old example for that import { event_StatusToJSON } from "./client_api";
export async function processMSG(wsMessage:any, isBin:boolean): Promise<[string|undefined, Message|undefined]> {
let data = wsMessage.data;
// GET MESSAGES
let messages:any;
if (isBin) {
if (typeof data.arrayBuffer === "undefined") {
data = new Blob([data]);
}
await data.arrayBuffer().then((data: any) => {
messages = ProtobufMessage.decode( new Uint8Array(data))
})
}
else {
messages = JSON.parse(wsMessage.data);
}
// PROCESS
let [eventType, eventData] = Object.entries(messages).find(([eventName, val])=>val!==undefined) || []
if (!(eventType in processorFuncs)) return [undefined, undefined]
let processed: Message = processorFuncs[eventType](eventData, isBin)
return [eventType, processed]
}
/*** PROCESSORS ***/
let processorFuncs = {
subscriptionUpdate(subUpdate:any, isBin=true) {
const tableUpdates = getTableUpdates(subUpdate, isBin);
return new SubscriptionUpdateMessage(tableUpdates);
},
identityToken(identityToken:any, isBin=undefined) {
const identity = new Identity(identityToken["identity"]);
const token = identityToken["token"];
return new IdentityTokenMessage(identity, token);
},
transactionUpdate(update:any, isBin=true) {
let txUpdate = update["subscriptionUpdate"]
const tableUpdates = getTableUpdates(txUpdate, isBin);
const event = update["event"] as any;
const functionCall = event["functionCall"] as any;
const originalReducerName: string = functionCall["reducer"];
const reducerName: string = toPascalCase(originalReducerName);
const messageStr = event["message"];
let identity, args, status;
if(isBin) {
identity = new Identity(event["callerIdentity"]);
args = functionCall["argBytes"];
status = event_StatusToJSON(event["status"]);
} else {
identity = Identity.fromString( event["caller_identity"]);
args = JSON.parse(functionCall["args"]);
status = event["status"];
}
const transactionUpdateEvent = new TransactionUpdateEvent(
identity, originalReducerName, reducerName, args, status, messageStr
);
return new TransactionUpdateMessage(tableUpdates, transactionUpdateEvent);
}
}
/*** HELPERS ***/
// would be nice if binary msg's just returned pascal case or something
let keyMap = [["table_updates", "table_name", "table_row_operations"], ["tableUpdates", "tableName", "tableRowOperations"]]
let getTableUpdates = (update:any, isBin=true) => {
const tableUpdates:TableUpdate[] = [];
let [tUpdates, tName, tOp] = keyMap[+isBin]
for (const rawUpdate of update[ tUpdates ]) {
const tableName = rawUpdate[ tName ]
const rowOps = rawUpdate[ tOp ]
const operations:TableOperation[] = [];
for (const rawOperation of rowOps) {
let type, rowPk;
if(isBin){
type = (rawOperation["op"] === 1 /* INSERT # */) ? "insert" : "delete";
rowPk = new TextDecoder().decode( rawOperation["rowPk"])
} else {
type = rawOperation["op"];
rowPk = rawOperation["rowPk"];
}
operations.push( new TableOperation(type, rowPk, rawOperation.row) )
}
const tableUpdate = new TableUpdate(tableName, operations);
tableUpdates.push(tableUpdate);
}
return tableUpdates
}
let toPascalCase = function (s: string): string {
const str = s.replace(/([-_][a-z])/gi, ($1) => {
return $1.toUpperCase().replace("-", "").replace("_", "");
});
return str.charAt(0).toUpperCase() + str.slice(1);
}; Then in src/spacetimedb.ts: import { processMSG } from './process.js'
export class SpacetimeDBClient {
// ...
private processMessage(wsMessage:Event) {
let isBinary = this.protocol === "binary"
return processMSG(wsMessage, isBinary)
}
// ...
} Please note this is not a drop in replacement. It is not tested and there have been a handful of identity and address changes (2 & 4 moths ago) since i did this (~5 months ago). If you want to got the extra mile, you could move Heres a loose draft of what that might look likesrc/spacetimedb.ts import { useMsgHandlers } from './process.js'
export class SpacetimeDBClient {
// ...
private async handleOnMessage(wsMessage: any) {
this.emitter.emit("receiveWSMessage", wsMessage); // no default listeners
let msgHandlers = useMsgHandlers(this) // pass instance, curry, or bind
return await msgHandlers(wsMessage) || []
}
// ...
} src/process.ts or src/msgHandler.ts or wutever // useMsgHandlers only emits, no returns.
export const useMsgHandlers = (client) => async (wsMessage:any, isBin:boolean) {
let data = wsMessage.data;
// GET MESSAGES
let messages:any;
if (isBin) {
if (typeof data.arrayBuffer === "undefined") {
data = new Blob([data]);
}
await data.arrayBuffer().then((data: any) => {
messages = ProtobufMessage.decode( new Uint8Array(data))
})
}
else {
messages = JSON.parse(wsMessage.data);
}
// PROCESS
let [eventType, eventData] = Object.entries(messages).find(([eventName, val])=>val!==undefined) || []
if (!(eventType in processorFuncs)) return // no emit
let processed = processorFuncs[eventType](eventData, isBin)
// processorFuncs are defined in prior suggestion, only need to return data tho
// EMIT
emitterFuncs[eventType](client, processed)
}
/*
Might need a switch case if TS gives you issues. Prob smthn like:
switch (eventType) {
case "subscriptionUpdate":
let processed = processorFuncs[eventType](eventData, isBin)
emitterFuncs[eventType](processed as SubscriptionUpdateMessage)
case "subscriptionUpdate": // or
let processed = processorFuncs.subscriptionUpdate(eventData, isBin)
emitterFuncs.subscriptionUpdate(processed)
}
*/
/*** EMITTERS ***/
const emitterFuncs = {
identityToken(client, message: IdentityTokenMessage) {
client.identity = message.identity;
if (client.runtime.auth_token) {
client.token = client.runtime.auth_token;
} else {
client.token = message.token;
}
client.emitter.emit("connected", client.token, client.identity);
},
transactionUpdate(client, message: TransactionUpdateMessage) {
const reducerName = message.event.reducerName;
const reducer: any | undefined = reducerName
? client.reducers.get(reducerName)
: undefined;
let reducerEvent: ReducerEvent | undefined;
let reducerArgs: any;
if (reducer && message.event.status === "committed") {
let adapter: ReducerArgsAdapter;
if (client.protocol === "binary") {
adapter = new BinaryReducerArgsAdapter(
new BinaryAdapter(
new BinaryReader(message.event.args as Uint8Array)
)
);
} else {
adapter = new JSONReducerArgsAdapter(message.event.args as any[]);
}
reducerArgs = reducer.deserializeArgs(adapter);
}
reducerEvent = new ReducerEvent(
message.event.identity,
message.event.originalReducerName,
message.event.status,
message.event.message,
reducerArgs
);
for (let tableUpdate of message.tableUpdates) {
const tableName = tableUpdate.tableName;
const entityClass = client.runtime.global.components.get(tableName);
const table = client.db.getOrCreateTable(
tableUpdate.tableName,
undefined,
entityClass
);
table.applyOperations(
entityClass,
client.protocol,
tableUpdate.operations,
reducerEvent
);
}
if (reducer) {
client.emitter.emit(
"reducer:" + reducerName,
reducerEvent,
reducerArgs
);
}
},
subscriptionUpdate(client, message: SubscriptionUpdateMessage) {
for (let tableUpdate of message.tableUpdates) {
const tableName = tableUpdate.tableName;
const entityClass = client.runtime.global.components.get(tableName);
const table = client.db.getOrCreateTable( tableName, undefined, entityClass );
table.applyOperations(
entityClass,
client.protocol,
tableUpdate.operations,
undefined
);
}
if (client.emitter) client.emitter.emit("initialStateSync")
}
} Sorry for the long message, hopefully some of that made sense 😅 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry it took me so long to review, I like these changes!
@tcardlab thanks for the comment and sorry for a delayed reply. What you propose makes sense. I remember the @gefjon has already done a bit of a refactor in this place, but I'll take another look. It's a part of code I know could be better, but also which I don't really like so it's always hard for me to get to it 😅 |
Description of Changes
API
[] This is NOT an API breaking change to the SDK
Initially it was thought it may break the API, however the changes required were not breaking.
Requires SpacetimeDB PRs