与Web Workers
Web Workers 简介
Web Workers 允许在后台线程中运行脚本,避免阻塞主线程。TypeScript 提供了类型支持,使得使用 Web Workers 更加安全和高效。通过将计算密集型任务转移到 Worker,可以显著提升页面响应速度。
创建 Web Worker
在 TypeScript 中创建 Web Worker 需要两个文件:主线程脚本和 Worker 脚本。首先创建 Worker 脚本:
// worker.ts
self.onmessage = (e: MessageEvent) => {
const data = e.data;
// 处理数据
const result = heavyCalculation(data);
self.postMessage(result);
};
function heavyCalculation(input: number[]): number {
return input.reduce((a, b) => a + b, 0);
}
然后在主线程中创建 Worker 实例:
// main.ts
const worker = new Worker(new URL('./worker.ts', import.meta.url), {
type: 'module'
});
worker.onmessage = (e: MessageEvent) => {
console.log('Worker 返回的结果:', e.data);
};
worker.postMessage([1, 2, 3, 4, 5]);
类型安全的 Worker 通信
为了确保类型安全,可以定义通信接口:
// worker-types.ts
interface WorkerRequest {
type: 'calculate';
data: number[];
}
interface WorkerResponse {
type: 'result';
data: number;
}
修改 Worker 和主线程代码使用这些类型:
// worker.ts
self.onmessage = (e: MessageEvent<WorkerRequest>) => {
if (e.data.type === 'calculate') {
const result = heavyCalculation(e.data.data);
self.postMessage({
type: 'result',
data: result
} as WorkerResponse);
}
};
高级 Worker 模式
Worker 池
对于需要处理大量任务的场景,可以创建 Worker 池:
class WorkerPool {
private workers: Worker[] = [];
private taskQueue: Array<{
task: WorkerRequest;
resolve: (value: WorkerResponse) => void;
}> = [];
constructor(size: number) {
for (let i = 0; i < size; i++) {
const worker = new Worker(new URL('./worker.ts', import.meta.url), {
type: 'module'
});
worker.onmessage = (e: MessageEvent<WorkerResponse>) => {
const nextTask = this.taskQueue.shift();
if (nextTask) {
worker.postMessage(nextTask.task);
nextTask.resolve(e.data);
}
};
this.workers.push(worker);
}
}
execute(task: WorkerRequest): Promise<WorkerResponse> {
return new Promise((resolve) => {
this.taskQueue.push({ task, resolve });
});
}
}
SharedArrayBuffer 和 Atomics
对于需要共享内存的场景:
// 主线程
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);
const worker = new Worker(new URL('./shared-worker.ts', import.meta.url));
worker.postMessage({ buffer: sharedBuffer });
// shared-worker.ts
self.onmessage = (e: MessageEvent<{ buffer: SharedArrayBuffer }>) => {
const sharedArray = new Int32Array(e.data.buffer);
Atomics.add(sharedArray, 0, 1);
self.postMessage({ done: true });
};
Worker 中的错误处理
正确处理 Worker 中的错误:
// worker.ts
try {
// 可能出错的代码
self.postMessage({ result: heavyCalculation(data) });
} catch (error) {
self.postMessage({
error: true,
message: error instanceof Error ? error.message : 'Unknown error'
});
}
// 主线程
worker.onmessage = (e: MessageEvent<{ result?: number; error?: boolean; message?: string }>) => {
if (e.data.error) {
console.error('Worker 错误:', e.data.message);
} else {
console.log('结果:', e.data.result);
}
};
worker.onerror = (e: ErrorEvent) => {
console.error('Worker 运行时错误:', e.message);
};
Worker 模块化
使用 ES 模块组织 Worker 代码:
// math-utils.ts
export function calculateSum(numbers: number[]): number {
return numbers.reduce((a, b) => a + b, 0);
}
// worker.ts
import { calculateSum } from './math-utils';
self.onmessage = (e: MessageEvent<number[]>) => {
const result = calculateSum(e.data);
self.postMessage(result);
};
Worker 生命周期管理
合理管理 Worker 生命周期:
class TaskProcessor {
private worker: Worker | null = null;
start() {
if (!this.worker) {
this.worker = new Worker(new URL('./worker.ts', import.meta.url));
this.worker.onmessage = this.handleMessage;
}
}
stop() {
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
}
private handleMessage = (e: MessageEvent) => {
// 处理消息
};
postTask(data: unknown) {
if (this.worker) {
this.worker.postMessage(data);
}
}
}
Web Worker 的限制
了解 Worker 的限制很重要:
- 不能直接访问 DOM
- 不能使用某些 API(如 localStorage)
- 通信数据会被结构化克隆算法复制
性能优化技巧
优化 Worker 性能的几个方法:
- 批量处理消息
- 使用 Transferable 对象
- 合理设置 Worker 数量
// 使用 Transferable 对象
const largeBuffer = new ArrayBuffer(10000000);
worker.postMessage(largeBuffer, [largeBuffer]);
// 批量处理
worker.postMessage({
tasks: [task1, task2, task3],
batchId: 'batch-123'
});
Worker 与主线程的协同
实现复杂的协同逻辑:
// 主线程
const worker = new Worker(new URL('./coordinated-worker.ts', import.meta.url));
worker.onmessage = (e: MessageEvent<{ type: string; data: unknown }>) => {
switch (e.data.type) {
case 'progress':
updateProgressBar(e.data.data as number);
break;
case 'result':
displayResult(e.data.data);
break;
case 'error':
showError(e.data.data as string);
break;
}
};
// coordinated-worker.ts
function processInChunks(data: LargeDataSet, chunkSize: number) {
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
const result = processChunk(chunk);
self.postMessage({
type: 'progress',
data: (i + chunkSize) / data.length
});
self.postMessage({
type: 'partial-result',
data: result
});
}
}
TypeScript 高级类型在 Worker 中的应用
利用 TypeScript 的高级类型特性:
// 定义消息类型映射
type WorkerMessages = {
calculate: { numbers: number[] };
fetch: { url: string };
cancel: null;
};
type WorkerResponses = {
result: { value: number };
data: { content: string };
error: { message: string };
};
// 类型安全的 Worker 包装器
class TypedWorker {
private worker: Worker;
constructor() {
this.worker = new Worker(new URL('./typed-worker.ts', import.meta.url));
}
postMessage<T extends keyof WorkerMessages>(
type: T,
data: WorkerMessages[T]
): Promise<WorkerResponses[keyof WorkerResponses]> {
return new Promise((resolve, reject) => {
const messageId = Math.random().toString(36).slice(2);
const handler = (e: MessageEvent<{
messageId: string;
type: keyof WorkerResponses;
data: WorkerResponses[keyof WorkerResponses];
}>) => {
if (e.data.messageId === messageId) {
this.worker.removeEventListener('message', handler);
if (e.data.type === 'error') {
reject(e.data.data);
} else {
resolve(e.data.data);
}
}
};
this.worker.addEventListener('message', handler);
this.worker.postMessage({ type, data, messageId });
});
}
}
Worker 中的状态管理
在 Worker 中管理复杂状态:
// state-worker.ts
interface State {
counter: number;
data: Record<string, unknown>;
processing: boolean;
}
let state: State = {
counter: 0,
data: {},
processing: false
};
self.onmessage = (e: MessageEvent<{ type: string; payload?: unknown }>) => {
switch (e.data.type) {
case 'increment':
state.counter++;
self.postMessage({ type: 'state-update', state });
break;
case 'load-data':
state.processing = true;
loadDataAsync(e.data.payload as string).then(data => {
state.data = data;
state.processing = false;
self.postMessage({ type: 'data-loaded', data });
});
break;
case 'get-state':
self.postMessage({ type: 'current-state', state });
break;
}
};
async function loadDataAsync(url: string): Promise<Record<string, unknown>> {
// 模拟异步数据加载
return new Promise(resolve => {
setTimeout(() => resolve({ url, timestamp: Date.now() }), 1000);
});
}
Web Worker 在现代框架中的集成
在 React 中使用 Worker 的示例:
// useWorker.ts
import { useEffect, useRef, useState } from 'react';
export function useWorker<T, R>(workerUrl: string) {
const [result, setResult] = useState<R | null>(null);
const [error, setError] = useState<Error | null>(null);
const workerRef = useRef<Worker | null>(null);
useEffect(() => {
const worker = new Worker(new URL(workerUrl, import.meta.url));
workerRef.current = worker;
worker.onmessage = (e: MessageEvent<R>) => {
setResult(e.data);
};
worker.onerror = (e) => {
setError(new Error(e.message));
};
return () => {
worker.terminate();
};
}, [workerUrl]);
const postMessage = (data: T) => {
if (workerRef.current) {
workerRef.current.postMessage(data);
}
};
return { result, error, postMessage };
}
// 在组件中使用
function CalculationComponent() {
const { result, error, postMessage } = useWorker<number[], number>('./worker.ts');
const handleCalculate = () => {
postMessage([1, 2, 3, 4, 5]);
};
return (
<div>
<button onClick={handleCalculate}>计算</button>
{error && <div>错误: {error.message}</div>}
{result !== null && <div>结果: {result}</div>}
</div>
);
}
Worker 中的复杂计算示例
实现一个图像处理 Worker:
// image-worker.ts
interface ImageTask {
id: string;
imageData: ImageData;
operations: ('grayscale' | 'invert' | 'blur')[];
}
self.onmessage = (e: MessageEvent<ImageTask>) => {
const { id, imageData, operations } = e.data;
const result = processImage(imageData, operations);
self.postMessage({
id,
result
}, [result.data.buffer]); // 使用 Transferable 传输图像数据
};
function processImage(imageData: ImageData, operations: string[]): ImageData {
const { width, height, data } = imageData;
const result = new ImageData(width, height);
const resultData = result.data;
// 复制原始数据
for (let i = 0; i < data.length; i++) {
resultData[i] = data[i];
}
// 应用每个操作
operations.forEach(op => {
switch (op) {
case 'grayscale':
applyGrayscale(resultData);
break;
case 'invert':
applyInvert(resultData);
break;
case 'blur':
applyBlur(resultData, width, height);
break;
}
});
return result;
}
function applyGrayscale(data: Uint8ClampedArray) {
for (let i = 0; i < data.length; i += 4) {
const avg = (data[i] + data[i + 1] + data[i + 2]) / 3;
data[i] = data[i + 1] = data[i + 2] = avg;
}
}
// 其他图像处理函数...
Worker 通信的性能考量
测量 Worker 通信开销:
// 性能测试 Worker
self.onmessage = async (e: MessageEvent<{ type: string; size: number }>) => {
if (e.data.type === 'benchmark') {
const { size } = e.data;
// 测试小消息
const smallStart = performance.now();
for (let i = 0; i < 1000; i++) {
self.postMessage({ type: 'small', data: i });
}
const smallEnd = performance.now();
// 测试大消息
const largeData = new ArrayBuffer(size);
const largeStart = performance.now();
for (let i = 0; i < 100; i++) {
self.postMessage({ type: 'large', data: largeData }, [largeData]);
}
const largeEnd = performance.now();
self.postMessage({
type: 'benchmark-result',
smallMessageTime: (smallEnd - smallStart) / 1000,
largeMessageTime: (largeEnd - largeStart) / 100,
transferrableRatio: size / (largeEnd - largeStart)
});
}
};
Worker 中的并行计算模式
实现并行计算任务分解:
// parallel-worker.ts
interface Task {
id: string;
start: number;
end: number;
data: Float64Array;
}
interface Result {
id: string;
value: number;
}
self.onmessage = (e: MessageEvent<Task>) => {
const { id, start, end, data } = e.data;
let sum = 0;
// 计算分配的部分
for (let i = start; i < end; i++) {
sum += data[i];
}
const result: Result = {
id,
value: sum
};
self.postMessage(result);
};
// 主线程中的使用
async function parallelSum(data: Float64Array, chunkCount: number): Promise<number> {
const chunkSize = Math.ceil(data.length / chunkCount);
const workers: Worker[] = [];
const promises: Promise<number>[] = [];
for (let i = 0; i < chunkCount; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, data.length);
const worker = new Worker(new URL('./parallel-worker.ts', import.meta.url));
const promise = new Promise<number>((resolve) => {
worker.onmessage = (e: MessageEvent<Result>) => {
resolve(e.data.value);
worker.terminate();
};
});
workers.push(worker);
promises.push(promise);
// 发送数据副本给 Worker
const chunk = new Float64Array(data.buffer.slice(
start * Float64Array.BYTES_PER_ELEMENT,
end * Float64Array.BYTES_PER_ELEMENT
));
worker.postMessage({
id: `chunk-${i}`,
start: 0,
end: chunk.length,
data: chunk
}, [chunk.buffer]);
}
const results = await Promise.all(promises);
return results.reduce((a, b) => a + b, 0);
}
Worker 中的缓存策略
在 Worker 中实现数据缓存:
// caching-worker.ts
interface CacheItem {
timestamp: number;
data: unknown;
ttl: number;
}
const cache = new Map<string, CacheItem>();
const MAX_CACHE_SIZE = 100;
function getFromCache(key: string): unknown | null {
const item = cache.get(key);
if (!item) return null;
// 检查是否过期
if (Date.now() - item.timestamp > item.ttl) {
cache.delete(key);
return null;
}
return item.data;
}
function setToCache(key: string, data: unknown, ttl: number = 60000) {
if (cache.size >= MAX_CACHE_SIZE) {
// 简单的 LRU 策略
const oldestKey = cache.keys().next().value;
cache.delete(oldestKey);
}
cache.set(key, {
timestamp: Date.now(),
data,
ttl
});
}
self.onmessage = async (e: MessageEvent<{
type: 'get' | 'set';
key: string;
data?: unknown;
ttl?: number
}>) => {
const { type, key, data, ttl } = e.data;
if (type === 'get') {
const cached = getFromCache(key);
self.postMessage({
type: 'cache-response',
key,
data: cached,
hit: cached !== null
});
} else if (type === 'set' && data !== undefined) {
setToCache(key, data, ttl);
self.postMessage({
type: 'cache-set',
key,
success: true
});
}
};
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:与GraphQL配合
下一篇:与WebAssembly