Exchanger 是什么?
在多线程编程中,有时候需要两个线程之间交换数据。比如,一个线程负责收集待备份的文件列表,另一个线程负责实际写入磁盘或上传到云端。这时候,Java 提供了一个轻量又高效的工具——java.util.concurrent.Exchanger。
它就像两个人约定在一个地点“换包”:线程 A 把自己的数据包交给线程 B,同时从线程 B 那里拿到对方的数据包。整个过程是线程安全的,不需要额外加锁。
实际场景:双缓冲备份机制
设想一个持续运行的数据备份程序,它要定期将日志文件打包发送到远程服务器。为了不中断数据采集,我们采用双缓冲策略:一个线程往缓冲区 A 写数据,另一个线程把缓冲区 B 的内容发送出去。每隔一段时间,两个线程交换各自的缓冲区。
这时候,Exchanger 就派上了用场。它让两个线程安全地交换缓冲区引用,避免了读写冲突。
代码示例
下面是一个简化的实现:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class BackupExample {
private static final Exchanger<List<String>> EXCHANGER = new Exchanger<>();
private static final int BUFFER_SIZE = 1000;
public static void main(String[] args) {
// 缓冲区 A 和 B
List<String> buffer1 = new ArrayList<>(BUFFER_SIZE);
List<String> buffer2 = new ArrayList<>(BUFFER_SIZE);
Thread producer = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
buffer1.add("log-data-" + i);
if (buffer1.size() == BUFFER_SIZE) {
try {
buffer1 = EXCHANGER.exchange(buffer1); // 交换满的缓冲区
System.out.println("采集线程:已交换缓冲区");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
// 最后一次交换剩余数据
if (!buffer1.isEmpty()) {
try {
buffer1 = EXCHANGER.exchange(buffer1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("采集线程结束");
});
Thread sender = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
buffer2 = EXCHANGER.exchange(buffer2); // 拿到要发送的数据
System.out.println("发送线程:正在发送 " + buffer2.size() + " 条数据");
// 模拟网络传输
Thread.sleep(300);
buffer2.clear(); // 清空后继续接收下一批
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println("发送线程结束");
});
producer.start();
sender.start();
try {
producer.join();
sender.interrupt();
sender.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}在这个例子中,两个线程通过 EXCHANGER.exchange() 方法完成数据交接。一旦某一方调用 exchange,它就会等待另一方也调用 exchange,然后两者原子性地交换数据。这个过程天然线程安全,无需 synchronized 或显式锁。
注意事项
Exchanger 只适用于两个线程之间的配对交换。如果涉及多个线程,就得考虑其他并发工具,比如 BlockingQueue。另外,exchange 调用会阻塞,直到配对线程也到达交换点,所以要合理设计超时处理或中断机制,避免死等。
在备份系统中,这种模式特别适合“生产-消费”节奏稳定、数据批量处理的场景,能有效降低锁竞争,提升吞吐。