阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 异步流程控制

异步流程控制

作者:陈川 阅读数:62890人阅读 分类: JavaScript

异步流程控制的概念

JavaScript作为单线程语言,异步编程是其核心特性之一。异步流程控制指的是管理多个异步操作的执行顺序和依赖关系,确保代码按照预期顺序执行。由于JavaScript的事件循环机制,异步操作不会阻塞主线程,但这也带来了回调地狱、难以追踪等问题。

// 简单的异步操作示例
setTimeout(() => {
  console.log('第一个操作');
  setTimeout(() => {
    console.log('第二个操作');
  }, 1000);
}, 1000);

回调函数模式

最早的异步控制方式是回调函数,将函数作为参数传递给异步操作,当操作完成时调用该函数。这种方式简单直接,但容易形成"回调地狱",代码难以阅读和维护。

fs.readFile('file1.txt', 'utf8', (err, data1) => {
  if (err) throw err;
  fs.readFile('file2.txt', 'utf8', (err, data2) => {
    if (err) throw err;
    fs.writeFile('output.txt', data1 + data2, (err) => {
      if (err) throw err;
      console.log('文件合并完成');
    });
  });
});

Promise解决方案

Promise是ES6引入的异步编程解决方案,它代表一个可能现在或将来可用,或永远不可用的值。Promise有三种状态:pending、fulfilled和rejected。

function readFilePromise(path) {
  return new Promise((resolve, reject) => {
    fs.readFile(path, 'utf8', (err, data) => {
      if (err) reject(err);
      else resolve(data);
    });
  });
}

readFilePromise('file1.txt')
  .then(data1 => readFilePromise('file2.txt'))
  .then(data2 => console.log(data2))
  .catch(err => console.error(err));

async/await语法糖

ES2017引入的async/await让异步代码看起来像同步代码,基于Promise但语法更简洁。async函数返回一个Promise,await表达式会暂停async函数的执行,等待Promise解决。

async function processFiles() {
  try {
    const data1 = await readFilePromise('file1.txt');
    const data2 = await readFilePromise('file2.txt');
    await fs.promises.writeFile('output.txt', data1 + data2);
    console.log('文件合并完成');
  } catch (err) {
    console.error('处理文件时出错:', err);
  }
}

processFiles();

并行执行控制

有时需要并行执行多个异步操作,然后等待所有操作完成。Promise.all和Promise.race提供了这种能力。

// 并行读取多个文件
async function readAllFiles() {
  try {
    const [data1, data2] = await Promise.all([
      readFilePromise('file1.txt'),
      readFilePromise('file2.txt')
    ]);
    console.log(data1, data2);
  } catch (err) {
    console.error('读取文件失败:', err);
  }
}

// 竞速模式,第一个完成或失败的Promise决定结果
Promise.race([
  fetch('https://api.example.com/data'),
  new Promise((_, reject) => 
    setTimeout(() => reject(new Error('请求超时')), 5000)
  )
]).then(response => console.log(response))
  .catch(err => console.error(err));

高级流程控制模式

对于更复杂的异步流程控制,可以使用第三方库如async.js或自己实现控制模式。

// 使用async.js实现限流
const async = require('async');

const urls = ['url1', 'url2', 'url3', 'url4', 'url5'];

// 同时最多2个请求
async.mapLimit(urls, 2, async (url) => {
  const response = await fetch(url);
  return response.json();
}, (err, results) => {
  if (err) throw err;
  console.log(results);
});

// 自定义重试逻辑
function withRetry(fn, retries = 3, delay = 1000) {
  return async function(...args) {
    let lastError;
    for (let i = 0; i < retries; i++) {
      try {
        return await fn(...args);
      } catch (err) {
        lastError = err;
        if (i < retries - 1) await new Promise(r => setTimeout(r, delay));
      }
    }
    throw lastError;
  };
}

const reliableFetch = withRetry(fetch, 3);

事件驱动模式

Node.js的事件发射器(EventEmitter)提供了另一种异步流程控制方式,适合处理多个异步事件。

const EventEmitter = require('events');

class FileProcessor extends EventEmitter {
  constructor() {
    super();
    this.on('process', this.handleProcess);
  }
  
  async handleProcess(file) {
    try {
      const content = await fs.promises.readFile(file, 'utf8');
      this.emit('processed', file, content);
    } catch (err) {
      this.emit('error', err);
    }
  }
}

const processor = new FileProcessor();
processor.on('processed', (file, content) => {
  console.log(`文件${file}处理完成,内容长度: ${content.length}`);
});
processor.on('error', err => console.error('处理出错:', err));

processor.emit('process', 'example.txt');

生成器与协程

在async/await之前,生成器函数配合Promise可以实现类似的协程效果,这种方式现在较少使用但仍值得了解。

function* fetchUserAndPosts(userId) {
  try {
    const user = yield fetch(`/users/${userId}`);
    const posts = yield fetch(`/users/${userId}/posts`);
    return { user, posts };
  } catch (err) {
    console.error('获取数据失败:', err);
    throw err;
  }
}

function runGenerator(generator) {
  const iterator = generator();
  
  function iterate(iteration) {
    if (iteration.done) return Promise.resolve(iteration.value);
    return Promise.resolve(iteration.value)
      .then(x => iterate(iterator.next(x)))
      .catch(err => iterate(iterator.throw(err)));
  }
  
  return iterate(iterator.next());
}

runGenerator(fetchUserAndPosts)
  .then(data => console.log(data))
  .catch(err => console.error(err));

错误处理策略

异步流程中的错误处理至关重要,需要统一考虑同步和异步错误。

// 全局未捕获Promise错误
process.on('unhandledRejection', (reason, promise) => {
  console.error('未处理的Promise拒绝:', reason);
});

// 中间件错误处理
app.use(async (ctx, next) => {
  try {
    await next();
  } catch (err) {
    ctx.status = err.status || 500;
    ctx.body = { error: err.message };
    ctx.app.emit('error', err, ctx);
  }
});

// 实用错误处理函数
async function handleErrors(fn, ...args) {
  try {
    return { result: await fn(...args) };
  } catch (err) {
    return { error: err };
  }
}

const { result, error } = await handleErrors(fetch, 'invalid-url');
if (error) console.error('请求失败:', error);

性能考量

异步流程控制对性能有显著影响,特别是在高并发场景下。

// 测量异步操作时间
async function measure(fn) {
  const start = process.hrtime.bigint();
  await fn();
  const end = process.hrtime.bigint();
  console.log(`耗时: ${Number(end - start) / 1e6}ms`);
}

// 连接池管理数据库连接
const { Pool } = require('pg');
const pool = new Pool({ max: 20 }); // 限制并发连接数

async function query(text, params) {
  const client = await pool.connect();
  try {
    return await client.query(text, params);
  } finally {
    client.release();
  }
}

// 批量处理优化
async function processInBatches(items, batchSize, processItem) {
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    await Promise.all(batch.map(processItem));
  }
}

实际应用场景

异步流程控制在Web开发中有广泛应用,从API调用到文件处理。

// Express路由中的异步处理
app.get('/user/:id', async (req, res, next) => {
  try {
    const user = await User.findById(req.params.id);
    const posts = await Post.find({ author: user.id }).limit(10);
    res.json({ user, posts });
  } catch (err) {
    next(err);
  }
});

// 前端数据获取
async function loadPageData() {
  const [user, products, notifications] = await Promise.all([
    fetch('/api/user'),
    fetch('/api/products'),
    fetch('/api/notifications')
  ]);
  
  return {
    user: await user.json(),
    products: await products.json(),
    notifications: await notifications.json()
  };
}

// 文件上传处理
const multer = require('multer');
const upload = multer({ dest: 'uploads/' });

app.post('/upload', upload.array('files', 5), async (req, res) => {
  try {
    const results = await Promise.all(
      req.files.map(file => 
        processUpload(file).catch(err => ({ error: err.message }))
    );
    res.json({ results });
  } catch (err) {
    res.status(500).json({ error: err.message });
  }
});

测试异步代码

测试异步代码需要特殊考虑,现代测试框架提供了相应支持。

// 使用Jest测试异步代码
describe('异步函数测试', () => {
  test('resolve示例', async () => {
    await expect(Promise.resolve('value')).resolves.toBe('value');
  });
  
  test('reject示例', async () => {
    await expect(Promise.reject(new Error('error'))).rejects.toThrow('error');
  });
  
  test('超时处理', async () => {
    const fastPromise = new Promise(resolve => 
      setTimeout(() => resolve('fast'), 100));
    const slowPromise = new Promise(resolve => 
      setTimeout(() => resolve('slow'), 1000));
    
    await expect(Promise.race([fastPromise, slowPromise]))
      .resolves.toBe('fast');
  }, 2000); // 设置测试超时时间
});

// 模拟异步API
jest.mock('axios');
test('模拟API调用', async () => {
  axios.get.mockResolvedValue({ data: { id: 1, name: 'John' } });
  
  const user = await fetchUser(1);
  expect(user).toEqual({ id: 1, name: 'John' });
  expect(axios.get).toHaveBeenCalledWith('/users/1');
});

浏览器与Node.js差异

不同环境下异步API和行为有所差异,需要注意兼容性。

// 浏览器中的requestAnimationFrame
function animate() {
  await new Promise(resolve => requestAnimationFrame(resolve));
  // 动画逻辑
  animate();
}

// Node.js中的setImmediate vs nextTick
process.nextTick(() => {
  console.log('nextTick - 在当前事件循环结束时执行');
});

setImmediate(() => {
  console.log('setImmediate - 在下一个事件循环开始时执行');
});

// 跨环境兼容的异步睡眠函数
function sleep(ms) {
  if (typeof window !== 'undefined') {
    return new Promise(resolve => setTimeout(resolve, ms));
  } else {
    const { promisify } = require('util');
    return promisify(setTimeout)(ms);
  }
}

现代JavaScript运行时特性

新版本JavaScript和运行时环境引入了更多异步控制机制。

// 顶层await (ES2022)
const data = await fetchData();
console.log(data);

// Worker线程中的异步
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  worker.on('message', msg => console.log('Worker说:', msg));
  worker.postMessage('开始工作');
} else {
  parentPort.on('message', async msg => {
    const result = await heavyComputation(msg);
    parentPort.postMessage(result);
  });
}

// 动态import()
async function loadModule(condition) {
  const module = condition 
    ? await import('./moduleA.js')
    : await import('./moduleB.js');
  module.doSomething();
}

可视化与调试工具

调试异步代码需要特殊工具和技术。

// 使用async_hooks跟踪异步资源
const async_hooks = require('async_hooks');
const fs = require('fs');

const hook = async_hooks.createHook({
  init(asyncId, type, triggerAsyncId) {
    fs.writeSync(1, `初始化: ${type}(${asyncId})\n`);
  },
  destroy(asyncId) {
    fs.writeSync(1, `销毁: ${asyncId}\n`);
  }
});

hook.enable();

// Chrome DevTools中的异步堆栈跟踪
async function parent() {
  await child();
}

async function child() {
  await new Promise(resolve => setTimeout(resolve, 100));
  throw new Error('调试这个错误');
}

parent().catch(err => console.error(err));

// 性能分析标记
async function measuredOperation() {
  performance.mark('start');
  await doAsyncWork();
  performance.mark('end');
  performance.measure('操作耗时', 'start', 'end');
}

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:异步错误处理

下一篇:事件监听模式

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌