阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 与Web Workers

与Web Workers

作者:陈川 阅读数:13826人阅读 分类: TypeScript

Web Workers 简介

Web Workers 允许在后台线程中运行脚本,避免阻塞主线程。TypeScript 提供了类型支持,使得使用 Web Workers 更加安全和高效。通过将计算密集型任务转移到 Worker,可以显著提升页面响应速度。

创建 Web Worker

在 TypeScript 中创建 Web Worker 需要两个文件:主线程脚本和 Worker 脚本。首先创建 Worker 脚本:

// worker.ts
self.onmessage = (e: MessageEvent) => {
  const data = e.data;
  // 处理数据
  const result = heavyCalculation(data);
  self.postMessage(result);
};

function heavyCalculation(input: number[]): number {
  return input.reduce((a, b) => a + b, 0);
}

然后在主线程中创建 Worker 实例:

// main.ts
const worker = new Worker(new URL('./worker.ts', import.meta.url), {
  type: 'module'
});

worker.onmessage = (e: MessageEvent) => {
  console.log('Worker 返回的结果:', e.data);
};

worker.postMessage([1, 2, 3, 4, 5]);

类型安全的 Worker 通信

为了确保类型安全,可以定义通信接口:

// worker-types.ts
interface WorkerRequest {
  type: 'calculate';
  data: number[];
}

interface WorkerResponse {
  type: 'result';
  data: number;
}

修改 Worker 和主线程代码使用这些类型:

// worker.ts
self.onmessage = (e: MessageEvent<WorkerRequest>) => {
  if (e.data.type === 'calculate') {
    const result = heavyCalculation(e.data.data);
    self.postMessage({
      type: 'result',
      data: result
    } as WorkerResponse);
  }
};

高级 Worker 模式

Worker 池

对于需要处理大量任务的场景,可以创建 Worker 池:

class WorkerPool {
  private workers: Worker[] = [];
  private taskQueue: Array<{
    task: WorkerRequest;
    resolve: (value: WorkerResponse) => void;
  }> = [];
  
  constructor(size: number) {
    for (let i = 0; i < size; i++) {
      const worker = new Worker(new URL('./worker.ts', import.meta.url), {
        type: 'module'
      });
      
      worker.onmessage = (e: MessageEvent<WorkerResponse>) => {
        const nextTask = this.taskQueue.shift();
        if (nextTask) {
          worker.postMessage(nextTask.task);
          nextTask.resolve(e.data);
        }
      };
      
      this.workers.push(worker);
    }
  }
  
  execute(task: WorkerRequest): Promise<WorkerResponse> {
    return new Promise((resolve) => {
      this.taskQueue.push({ task, resolve });
    });
  }
}

SharedArrayBuffer 和 Atomics

对于需要共享内存的场景:

// 主线程
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);

const worker = new Worker(new URL('./shared-worker.ts', import.meta.url));
worker.postMessage({ buffer: sharedBuffer });

// shared-worker.ts
self.onmessage = (e: MessageEvent<{ buffer: SharedArrayBuffer }>) => {
  const sharedArray = new Int32Array(e.data.buffer);
  Atomics.add(sharedArray, 0, 1);
  self.postMessage({ done: true });
};

Worker 中的错误处理

正确处理 Worker 中的错误:

// worker.ts
try {
  // 可能出错的代码
  self.postMessage({ result: heavyCalculation(data) });
} catch (error) {
  self.postMessage({ 
    error: true,
    message: error instanceof Error ? error.message : 'Unknown error'
  });
}

// 主线程
worker.onmessage = (e: MessageEvent<{ result?: number; error?: boolean; message?: string }>) => {
  if (e.data.error) {
    console.error('Worker 错误:', e.data.message);
  } else {
    console.log('结果:', e.data.result);
  }
};

worker.onerror = (e: ErrorEvent) => {
  console.error('Worker 运行时错误:', e.message);
};

Worker 模块化

使用 ES 模块组织 Worker 代码:

// math-utils.ts
export function calculateSum(numbers: number[]): number {
  return numbers.reduce((a, b) => a + b, 0);
}

// worker.ts
import { calculateSum } from './math-utils';

self.onmessage = (e: MessageEvent<number[]>) => {
  const result = calculateSum(e.data);
  self.postMessage(result);
};

Worker 生命周期管理

合理管理 Worker 生命周期:

class TaskProcessor {
  private worker: Worker | null = null;
  
  start() {
    if (!this.worker) {
      this.worker = new Worker(new URL('./worker.ts', import.meta.url));
      this.worker.onmessage = this.handleMessage;
    }
  }
  
  stop() {
    if (this.worker) {
      this.worker.terminate();
      this.worker = null;
    }
  }
  
  private handleMessage = (e: MessageEvent) => {
    // 处理消息
  };
  
  postTask(data: unknown) {
    if (this.worker) {
      this.worker.postMessage(data);
    }
  }
}

Web Worker 的限制

了解 Worker 的限制很重要:

  1. 不能直接访问 DOM
  2. 不能使用某些 API(如 localStorage)
  3. 通信数据会被结构化克隆算法复制

性能优化技巧

优化 Worker 性能的几个方法:

  1. 批量处理消息
  2. 使用 Transferable 对象
  3. 合理设置 Worker 数量
// 使用 Transferable 对象
const largeBuffer = new ArrayBuffer(10000000);
worker.postMessage(largeBuffer, [largeBuffer]);

// 批量处理
worker.postMessage({
  tasks: [task1, task2, task3],
  batchId: 'batch-123'
});

Worker 与主线程的协同

实现复杂的协同逻辑:

// 主线程
const worker = new Worker(new URL('./coordinated-worker.ts', import.meta.url));

worker.onmessage = (e: MessageEvent<{ type: string; data: unknown }>) => {
  switch (e.data.type) {
    case 'progress':
      updateProgressBar(e.data.data as number);
      break;
    case 'result':
      displayResult(e.data.data);
      break;
    case 'error':
      showError(e.data.data as string);
      break;
  }
};

// coordinated-worker.ts
function processInChunks(data: LargeDataSet, chunkSize: number) {
  for (let i = 0; i < data.length; i += chunkSize) {
    const chunk = data.slice(i, i + chunkSize);
    const result = processChunk(chunk);
    
    self.postMessage({
      type: 'progress',
      data: (i + chunkSize) / data.length
    });
    
    self.postMessage({
      type: 'partial-result',
      data: result
    });
  }
}

TypeScript 高级类型在 Worker 中的应用

利用 TypeScript 的高级类型特性:

// 定义消息类型映射
type WorkerMessages = {
  calculate: { numbers: number[] };
  fetch: { url: string };
  cancel: null;
};

type WorkerResponses = {
  result: { value: number };
  data: { content: string };
  error: { message: string };
};

// 类型安全的 Worker 包装器
class TypedWorker {
  private worker: Worker;
  
  constructor() {
    this.worker = new Worker(new URL('./typed-worker.ts', import.meta.url));
  }
  
  postMessage<T extends keyof WorkerMessages>(
    type: T,
    data: WorkerMessages[T]
  ): Promise<WorkerResponses[keyof WorkerResponses]> {
    return new Promise((resolve, reject) => {
      const messageId = Math.random().toString(36).slice(2);
      
      const handler = (e: MessageEvent<{
        messageId: string;
        type: keyof WorkerResponses;
        data: WorkerResponses[keyof WorkerResponses];
      }>) => {
        if (e.data.messageId === messageId) {
          this.worker.removeEventListener('message', handler);
          if (e.data.type === 'error') {
            reject(e.data.data);
          } else {
            resolve(e.data.data);
          }
        }
      };
      
      this.worker.addEventListener('message', handler);
      this.worker.postMessage({ type, data, messageId });
    });
  }
}

Worker 中的状态管理

在 Worker 中管理复杂状态:

// state-worker.ts
interface State {
  counter: number;
  data: Record<string, unknown>;
  processing: boolean;
}

let state: State = {
  counter: 0,
  data: {},
  processing: false
};

self.onmessage = (e: MessageEvent<{ type: string; payload?: unknown }>) => {
  switch (e.data.type) {
    case 'increment':
      state.counter++;
      self.postMessage({ type: 'state-update', state });
      break;
      
    case 'load-data':
      state.processing = true;
      loadDataAsync(e.data.payload as string).then(data => {
        state.data = data;
        state.processing = false;
        self.postMessage({ type: 'data-loaded', data });
      });
      break;
      
    case 'get-state':
      self.postMessage({ type: 'current-state', state });
      break;
  }
};

async function loadDataAsync(url: string): Promise<Record<string, unknown>> {
  // 模拟异步数据加载
  return new Promise(resolve => {
    setTimeout(() => resolve({ url, timestamp: Date.now() }), 1000);
  });
}

Web Worker 在现代框架中的集成

在 React 中使用 Worker 的示例:

// useWorker.ts
import { useEffect, useRef, useState } from 'react';

export function useWorker<T, R>(workerUrl: string) {
  const [result, setResult] = useState<R | null>(null);
  const [error, setError] = useState<Error | null>(null);
  const workerRef = useRef<Worker | null>(null);
  
  useEffect(() => {
    const worker = new Worker(new URL(workerUrl, import.meta.url));
    workerRef.current = worker;
    
    worker.onmessage = (e: MessageEvent<R>) => {
      setResult(e.data);
    };
    
    worker.onerror = (e) => {
      setError(new Error(e.message));
    };
    
    return () => {
      worker.terminate();
    };
  }, [workerUrl]);
  
  const postMessage = (data: T) => {
    if (workerRef.current) {
      workerRef.current.postMessage(data);
    }
  };
  
  return { result, error, postMessage };
}

// 在组件中使用
function CalculationComponent() {
  const { result, error, postMessage } = useWorker<number[], number>('./worker.ts');
  
  const handleCalculate = () => {
    postMessage([1, 2, 3, 4, 5]);
  };
  
  return (
    <div>
      <button onClick={handleCalculate}>计算</button>
      {error && <div>错误: {error.message}</div>}
      {result !== null && <div>结果: {result}</div>}
    </div>
  );
}

Worker 中的复杂计算示例

实现一个图像处理 Worker:

// image-worker.ts
interface ImageTask {
  id: string;
  imageData: ImageData;
  operations: ('grayscale' | 'invert' | 'blur')[];
}

self.onmessage = (e: MessageEvent<ImageTask>) => {
  const { id, imageData, operations } = e.data;
  const result = processImage(imageData, operations);
  
  self.postMessage({
    id,
    result
  }, [result.data.buffer]); // 使用 Transferable 传输图像数据
};

function processImage(imageData: ImageData, operations: string[]): ImageData {
  const { width, height, data } = imageData;
  const result = new ImageData(width, height);
  const resultData = result.data;
  
  // 复制原始数据
  for (let i = 0; i < data.length; i++) {
    resultData[i] = data[i];
  }
  
  // 应用每个操作
  operations.forEach(op => {
    switch (op) {
      case 'grayscale':
        applyGrayscale(resultData);
        break;
      case 'invert':
        applyInvert(resultData);
        break;
      case 'blur':
        applyBlur(resultData, width, height);
        break;
    }
  });
  
  return result;
}

function applyGrayscale(data: Uint8ClampedArray) {
  for (let i = 0; i < data.length; i += 4) {
    const avg = (data[i] + data[i + 1] + data[i + 2]) / 3;
    data[i] = data[i + 1] = data[i + 2] = avg;
  }
}

// 其他图像处理函数...

Worker 通信的性能考量

测量 Worker 通信开销:

// 性能测试 Worker
self.onmessage = async (e: MessageEvent<{ type: string; size: number }>) => {
  if (e.data.type === 'benchmark') {
    const { size } = e.data;
    
    // 测试小消息
    const smallStart = performance.now();
    for (let i = 0; i < 1000; i++) {
      self.postMessage({ type: 'small', data: i });
    }
    const smallEnd = performance.now();
    
    // 测试大消息
    const largeData = new ArrayBuffer(size);
    const largeStart = performance.now();
    for (let i = 0; i < 100; i++) {
      self.postMessage({ type: 'large', data: largeData }, [largeData]);
    }
    const largeEnd = performance.now();
    
    self.postMessage({
      type: 'benchmark-result',
      smallMessageTime: (smallEnd - smallStart) / 1000,
      largeMessageTime: (largeEnd - largeStart) / 100,
      transferrableRatio: size / (largeEnd - largeStart)
    });
  }
};

Worker 中的并行计算模式

实现并行计算任务分解:

// parallel-worker.ts
interface Task {
  id: string;
  start: number;
  end: number;
  data: Float64Array;
}

interface Result {
  id: string;
  value: number;
}

self.onmessage = (e: MessageEvent<Task>) => {
  const { id, start, end, data } = e.data;
  let sum = 0;
  
  // 计算分配的部分
  for (let i = start; i < end; i++) {
    sum += data[i];
  }
  
  const result: Result = {
    id,
    value: sum
  };
  
  self.postMessage(result);
};

// 主线程中的使用
async function parallelSum(data: Float64Array, chunkCount: number): Promise<number> {
  const chunkSize = Math.ceil(data.length / chunkCount);
  const workers: Worker[] = [];
  const promises: Promise<number>[] = [];
  
  for (let i = 0; i < chunkCount; i++) {
    const start = i * chunkSize;
    const end = Math.min(start + chunkSize, data.length);
    const worker = new Worker(new URL('./parallel-worker.ts', import.meta.url));
    
    const promise = new Promise<number>((resolve) => {
      worker.onmessage = (e: MessageEvent<Result>) => {
        resolve(e.data.value);
        worker.terminate();
      };
    });
    
    workers.push(worker);
    promises.push(promise);
    
    // 发送数据副本给 Worker
    const chunk = new Float64Array(data.buffer.slice(
      start * Float64Array.BYTES_PER_ELEMENT,
      end * Float64Array.BYTES_PER_ELEMENT
    ));
    
    worker.postMessage({
      id: `chunk-${i}`,
      start: 0,
      end: chunk.length,
      data: chunk
    }, [chunk.buffer]);
  }
  
  const results = await Promise.all(promises);
  return results.reduce((a, b) => a + b, 0);
}

Worker 中的缓存策略

在 Worker 中实现数据缓存:

// caching-worker.ts
interface CacheItem {
  timestamp: number;
  data: unknown;
  ttl: number;
}

const cache = new Map<string, CacheItem>();
const MAX_CACHE_SIZE = 100;

function getFromCache(key: string): unknown | null {
  const item = cache.get(key);
  if (!item) return null;
  
  // 检查是否过期
  if (Date.now() - item.timestamp > item.ttl) {
    cache.delete(key);
    return null;
  }
  
  return item.data;
}

function setToCache(key: string, data: unknown, ttl: number = 60000) {
  if (cache.size >= MAX_CACHE_SIZE) {
    // 简单的 LRU 策略
    const oldestKey = cache.keys().next().value;
    cache.delete(oldestKey);
  }
  
  cache.set(key, {
    timestamp: Date.now(),
    data,
    ttl
  });
}

self.onmessage = async (e: MessageEvent<{ 
  type: 'get' | 'set'; 
  key: string; 
  data?: unknown; 
  ttl?: number 
}>) => {
  const { type, key, data, ttl } = e.data;
  
  if (type === 'get') {
    const cached = getFromCache(key);
    self.postMessage({
      type: 'cache-response',
      key,
      data: cached,
      hit: cached !== null
    });
  } else if (type === 'set' && data !== undefined) {
    setToCache(key, data, ttl);
    self.postMessage({
      type: 'cache-set',
      key,
      success: true
    });
  }
};

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:与GraphQL配合

下一篇:与WebAssembly

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌