T3 interblag real-estate

Asynchronous queues in Typescript

March 6, 2022

These days we mostly use async/await for asynchronous workflows and we don't touch the lower-level Promise unless we need to, so it's easy to forget that promises are just glorified callbacks. I mean, promises essentially transforms code that (way back when) used to looked like

const step1 = doThings();
const asyncStep2 = doAsyncThingsWithCallback(() => {
  const step3 = doMoreThings();
  const step4 = doEvenMoreThings()
});

into

const step1 = doThings();
const asyncStep2 = await doAsyncThings();
const step3 = doMoreThings();
const step4 = doEvenMoreThings();

So in a backwards kinda way, it's almost like the await keyboard captures the rest of our code (step 3 + 4), turns it into a callback, and passes the callback to the asynchronous function that was awaited.

This actually sounds an awful lot like call-with-current-continuation that's found in various LISP dialects, buuuuut I'm not smart enough to figure out the connection.

Anyway

Anyway, have y'all ever tried to not call the resolve function, but instead "leak" it to the outside world?

type TResolver<T> = {
  promise: Promise<T>;
  resolve: (value: T) => void;
};

// Create a promise and expose the resolve function,
// so the promise can be resolved from afar.
const resolver = <T>(): TResolver<T> => {
  let resolve, promise = new Promise<T>((r) => {
    resolve = r;
  });
  return { promise, resolve: resolve as any };
};

This allows us to trigger the resolve function from a totally different context, at a totally different time. We get to decide about the program flow of some unrelated part of the program, as if we were god or something.

const resolvers: TResolver<void> = [];

const russianRoulette = () => {
  const r = resolver<void>();
  resolvers.push(r);
  return r.promise;
};

const roulettePlayer = () => {
  await russianRoulette();
  console.log("Yay, I survived!");
};

const player1 = roulettePlayer();
const player2 = roulettePlayer();
const player3 = roulettePlayer();

// Pull the trigger
for (const { resolve } of resolvers) {
  if (Math.random() < 0.66)
    resolve();
}

I think that's pretty neat. I'm sure you can do many cool things with it, I haven't really explored all the possibilities. There's a very particular and practical usecase for resolver I have found however: asynchronous queues.

Asynchronous queues

By that I mean an array-like datastructure that has an asynchronous .pop() method -- if the queue is empty, .pop() will wait until new items are added. Using these queues we can write data pipelines where two pieces of code can produce and consume values, concurrently.

resolver makes this somewhat straightforward: If there is an item available, return it immediately. If not, return a promise instead, which will be resolved whenever an item is added to the list. (I called the queue operations submit and poll.)

type TQueue<T> = {
  submit: (value: T) => void;
  poll: () => Promise<T>;
  size: number;
};

const asynq = <T>(): TQueue<T> => {
  const buffer: T[] = [];
  const polling: TResolver<T>[] = [];

  const submit = (x: T) => {
    // functions are queueing for items
    if (polling.length > 0) {
      polling.pop()!.resolve(x);
	}
    // add the item to the main queue
    else {
      buffer.unshift(x);
    }
  };

  const poll = () => {
    // items available in the main queue
    if (buffer.length > 0) {
      
      const b = buffer.pop()!;
      return Promise.resolve(b);
      
	}
    // no items available, add to waiting queue
    else {
      
      var r = resolver<T>();
      polling.unshift(r);
      return r.promise;

    }
  };

  return {
    get size() { return buffer.length - polling.length; },
    poll,
    submit,
  };
};

What I like about this code is just how easy it was to implement, and how short it is. Goes to show how much you can do with how little if you pick the right abstractions! Also goes to show how cool Promises and async/await are.

Using the queue is pretty straightforward too:

const consumer = async <T>(queue: TQueue<T>) => {
  while (true) {
    const item = await queue.poll();
    console.log("I got an", item);
  };
};

const queue = asynq<number>();
consumer(queue);

queue.submit(1);
queue.submit(2);
queue.submit(3);

Works like a charm :)

Asynchronous submitting, bounded queues, and the producer/consumer pattern

The above code snippet consists of two arrays internally - the "main" queue where items are buffered, and a "waiting" or "polling" queue where functions that are trying to dequeue from an empty queue are parked while no items are in the "main" queue.

To implement proper producer/consumer workflows, we need queues with bounded capacity: A producer will insert items into the queue until it's filled up, and the queue will block once it is at max capacity.

For this, the .submit function needs to be asynchronous aswell, and we can implement this behaviour with a third "submitting" queue which also stores resolve functions, where producers will be queueing up while the queue is full.

Though, instead of posting more code I'll leave you with this codesandbox that contains a working implementation, which you can look at to explore the concepts I've talked about. I also added small abstractions called consumer and producer which simply poll or submit in a loop, like we did manually above.

-Flo