阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 异步流程控制的实现方式

异步流程控制的实现方式

作者:陈川 阅读数:64186人阅读 分类: Node.js

回调函数

回调函数是最基础的异步流程控制方式。在Koa2中,虽然不推荐直接使用回调函数处理异步逻辑,但理解其原理对掌握更高级的控制方式很有帮助。回调函数通过将后续逻辑封装为函数,作为参数传递给异步操作。

function fetchData(callback) {
  setTimeout(() => {
    callback(null, 'Data fetched');
  }, 1000);
}

fetchData((err, data) => {
  if (err) console.error(err);
  else console.log(data);
});

这种模式容易导致"回调地狱",当多个异步操作需要顺序执行时,代码会变得难以维护:

operation1((err, result1) => {
  if (err) return handleError(err);
  operation2(result1, (err, result2) => {
    if (err) return handleError(err);
    operation3(result2, (err, result3) => {
      // 更多嵌套...
    });
  });
});

Promise

Promise是ES6引入的异步解决方案,Koa2中间件虽然不直接返回Promise,但可以在业务逻辑中使用。Promise通过链式调用解决了回调嵌套问题。

function fetchData() {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('Data fetched');
    }, 1000);
  });
}

fetchData()
  .then(data => {
    console.log(data);
    return processData(data);
  })
  .then(processedData => {
    console.log(processedData);
  })
  .catch(err => {
    console.error(err);
  });

Promise还提供了一些静态方法处理多个异步操作:

// 并行执行
Promise.all([promise1, promise2])
  .then(results => {
    // results是包含所有结果的数组
  });

// 竞速模式
Promise.race([promise1, promise2])
  .then(firstResult => {
    // 最先完成的结果
  });

async/await

Koa2的核心优势在于对async/await的原生支持。async函数返回Promise,await可以暂停函数执行直到Promise解决。

async function getData() {
  try {
    const data = await fetchData();
    const processed = await processData(data);
    return processed;
  } catch (err) {
    console.error(err);
    throw err;
  }
}

// 在Koa2中间件中使用
app.use(async (ctx, next) => {
  try {
    const result = await getData();
    ctx.body = result;
  } catch (err) {
    ctx.status = 500;
    ctx.body = { error: err.message };
  }
});

async/await让异步代码看起来像同步代码,同时保持非阻塞特性。在Koa2中,中间件本身就是async函数,可以方便地组合:

app.use(async (ctx, next) => {
  const start = Date.now();
  await next();
  const duration = Date.now() - start;
  ctx.set('X-Response-Time', `${duration}ms`);
});

app.use(async ctx => {
  ctx.body = 'Hello World';
});

Generator函数与co模块

在async/await成为标准前,Koa1使用Generator函数配合co模块实现类似效果。虽然Koa2已转向async/await,但了解Generator有助于理解异步流程控制的演进。

const co = require('co');

function* fetchDataGenerator() {
  const data = yield fetchData();
  const processed = yield processData(data);
  return processed;
}

co(fetchDataGenerator)
  .then(result => console.log(result))
  .catch(err => console.error(err));

Generator函数通过yield暂停执行,co模块自动处理值的解包和流程控制。Koa2的async/await可以看作Generator的语法糖。

事件发射器

Node.js的事件发射器模式也可用于异步流程控制,适合事件驱动的场景。Koa2的上下文对象继承自EventEmitter。

const EventEmitter = require('events');

class DataFetcher extends EventEmitter {
  fetch() {
    setTimeout(() => {
      this.emit('data', 'Data fetched');
    }, 1000);
  }
}

const fetcher = new DataFetcher();
fetcher.on('data', data => {
  console.log(data);
});
fetcher.fetch();

在Koa2中,可以利用事件机制实现发布/订阅模式:

app.use(async (ctx, next) => {
  ctx.app.emit('request-start', { url: ctx.url });
  await next();
  ctx.app.emit('request-end', { url: ctx.url, status: ctx.status });
});

// 监听事件
app.on('request-start', data => {
  console.log(`Request started: ${data.url}`);
});

流控制库

对于复杂流程,可以使用专门的流控制库如async.js。虽然现代JavaScript已内置更好的解决方案,但在某些遗留系统中仍可能遇到。

const async = require('async');

async.waterfall([
  callback => fetchData(callback),
  (data, callback) => processData(data, callback),
  (processed, callback) => saveData(processed, callback)
], (err, result) => {
  if (err) console.error(err);
  else console.log('Final result:', result);
});

Koa2通常不需要这类库,因为async/await已提供更优雅的解决方案。但在处理并行任务队列等场景时,这类库仍有价值。

中间件组合

Koa2的核心特性是中间件组合机制,通过将多个中间件组合成单一请求处理管道来实现流程控制。

const compose = require('koa-compose');

async function middleware1(ctx, next) {
  console.log('Middleware 1 start');
  await next();
  console.log('Middleware 1 end');
}

async function middleware2(ctx, next) {
  console.log('Middleware 2 start');
  await next();
  console.log('Middleware 2 end');
}

const allMiddleware = compose([middleware1, middleware2]);

app.use(allMiddleware);

执行顺序是中间件1开始 -> 中间件2开始 -> 中间件2结束 -> 中间件1结束,形成"洋葱模型"。这种组合方式让流程控制更加灵活。

错误处理

异步流程控制中的错误处理需要特别注意。Koa2提供了统一的错误处理机制。

// 全局错误处理中间件
app.use(async (ctx, next) => {
  try {
    await next();
  } catch (err) {
    ctx.status = err.status || 500;
    ctx.body = { message: err.message };
    ctx.app.emit('error', err, ctx);
  }
});

// 业务逻辑中的错误
app.use(async ctx => {
  const user = await getUser(ctx.params.id);
  if (!user) {
    const err = new Error('User not found');
    err.status = 404;
    throw err;
  }
  ctx.body = user;
});

// 监听全局错误
app.on('error', (err, ctx) => {
  console.error('Server error', err, ctx);
});

并发控制

处理多个并行异步任务时,需要控制并发数量以避免资源耗尽。Koa2中可以结合第三方库实现。

const { default: PQueue } = require('p-queue');

const queue = new PQueue({ concurrency: 3 });

app.use(async ctx => {
  const tasks = Array(10).fill().map((_, i) => 
    queue.add(() => processTask(i))
  );
  const results = await Promise.all(tasks);
  ctx.body = results;
});

对于更复杂的场景,可以使用RxJS等响应式编程库:

const { from } = require('rxjs');
const { mergeMap, toArray } = require('rxjs/operators');

app.use(async ctx => {
  const tasks = Array(10).fill().map((_, i) => i);
  const results = await from(tasks)
    .pipe(
      mergeMap(i => processTask(i), 3), // 并发数3
      toArray()
    )
    .toPromise();
  ctx.body = results;
});

超时处理

异步操作可能需要设置超时,防止长时间挂起。Koa2中可以包装Promise实现超时控制。

function withTimeout(promise, ms) {
  return Promise.race([
    promise,
    new Promise((_, reject) => 
      setTimeout(() => reject(new Error('Timeout')), ms)
    )
  ]);
}

app.use(async ctx => {
  try {
    const data = await withTimeout(fetchData(), 5000);
    ctx.body = data;
  } catch (err) {
    if (err.message === 'Timeout') {
      ctx.status = 504;
      ctx.body = 'Request timeout';
    } else {
      throw err;
    }
  }
});

Koa2还提供了ctx.req.setTimeout方法设置请求超时:

app.use(async (ctx, next) => {
  ctx.req.setTimeout(5000);
  await next();
});

上下文传递

在异步流程中保持上下文是一个挑战。Koa2的ctx对象贯穿整个请求生命周期,解决了这个问题。

// 设置请求ID并记录日志
app.use(async (ctx, next) => {
  ctx.requestId = generateId();
  logger.info(`Start request ${ctx.requestId}`);
  await next();
  logger.info(`End request ${ctx.requestId}`);
});

app.use(async ctx => {
  const user = await getUser(ctx.query.id);
  logger.info(`User fetched in request ${ctx.requestId}`);
  ctx.body = user;
});

对于需要跨请求跟踪的场景,可以使用Async Hooks API:

const asyncHooks = require('async_hooks');
const store = new Map();

const hook = asyncHooks.createHook({
  init: (asyncId, _, triggerAsyncId) => {
    if (store.has(triggerAsyncId)) {
      store.set(asyncId, store.get(triggerAsyncId));
    }
  },
  destroy: asyncId => {
    store.delete(asyncId);
  }
});

hook.enable();

app.use(async (ctx, next) => {
  const asyncId = asyncHooks.executionAsyncId();
  store.set(asyncId, { requestId: generateId() });
  await next();
});

function getContext() {
  const asyncId = asyncHooks.executionAsyncId();
  return store.get(asyncId);
}

取消异步操作

取消正在进行的异步操作是复杂场景下的常见需求。Koa2可以结合AbortController实现。

app.use(async ctx => {
  const controller = new AbortController();
  const timeout = setTimeout(() => controller.abort(), 5000);
  
  try {
    const response = await fetch(url, {
      signal: controller.signal
    });
    clearTimeout(timeout);
    ctx.body = await response.json();
  } catch (err) {
    if (err.name === 'AbortError') {
      ctx.status = 504;
      ctx.body = 'Request aborted';
    } else {
      throw err;
    }
  }
});

对于自定义异步操作,可以类似实现取消逻辑:

function cancellableFetch(url, { signal } = {}) {
  return new Promise((resolve, reject) => {
    if (signal && signal.aborted) {
      return reject(new DOMException('Aborted', 'AbortError'));
    }
    
    const timer = setTimeout(() => {
      fetch(url)
        .then(resolve)
        .catch(reject);
    }, 0);
    
    if (signal) {
      signal.addEventListener('abort', () => {
        clearTimeout(timer);
        reject(new DOMException('Aborted', 'AbortError'));
      });
    }
  });
}

性能优化

异步流程控制的实现方式直接影响性能。Koa2中需要注意以下几点:

  1. 避免不必要的await:
// 不推荐 - 顺序执行
const user = await getUser();
const posts = await getPosts();

// 推荐 - 并行执行
const [user, posts] = await Promise.all([
  getUser(),
  getPosts()
]);
  1. 合理设置并发数:
// 使用p-map控制并发
const pMap = require('p-map');

app.use(async ctx => {
  const items = Array(100).fill().map((_, i) => i);
  const results = await pMap(items, processItem, { concurrency: 5 });
  ctx.body = results;
});
  1. 缓存异步结果:
const cache = new Map();

app.use(async ctx => {
  const key = ctx.url;
  if (cache.has(key)) {
    ctx.body = cache.get(key);
    return;
  }
  
  const data = await fetchData();
  cache.set(key, data);
  ctx.body = data;
});
  1. 使用流处理大文件:
const fs = require('fs');
const { pipeline } = require('stream/promises');

app.use(async ctx => {
  ctx.set('Content-Type', 'application/octet-stream');
  await pipeline(
    fs.createReadStream('large-file.bin'),
    ctx.res
  );
});

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌