数码工坊
白蓝主题五 · 清爽阅读
首页  > 数据备份

线程安全的Exchanger使用示例:在数据备份中的实际应用

Exchanger 是什么?

在多线程编程中,有时候需要两个线程之间交换数据。比如,一个线程负责收集待备份的文件列表,另一个线程负责实际写入磁盘或上传到云端。这时候,Java 提供了一个轻量又高效的工具——java.util.concurrent.Exchanger

它就像两个人约定在一个地点“换包”:线程 A 把自己的数据包交给线程 B,同时从线程 B 那里拿到对方的数据包。整个过程是线程安全的,不需要额外加锁。

实际场景:双缓冲备份机制

设想一个持续运行的数据备份程序,它要定期将日志文件打包发送到远程服务器。为了不中断数据采集,我们采用双缓冲策略:一个线程往缓冲区 A 写数据,另一个线程把缓冲区 B 的内容发送出去。每隔一段时间,两个线程交换各自的缓冲区。

这时候,Exchanger 就派上了用场。它让两个线程安全地交换缓冲区引用,避免了读写冲突。

代码示例

下面是一个简化的实现:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class BackupExample {
    private static final Exchanger<List<String>> EXCHANGER = new Exchanger<>();
    private static final int BUFFER_SIZE = 1000;

    public static void main(String[] args) {
        // 缓冲区 A 和 B
        List<String> buffer1 = new ArrayList<>(BUFFER_SIZE);
        List<String> buffer2 = new ArrayList<>(BUFFER_SIZE);

        Thread producer = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                buffer1.add("log-data-" + i);
                if (buffer1.size() == BUFFER_SIZE) {
                    try {
                        buffer1 = EXCHANGER.exchange(buffer1); // 交换满的缓冲区
                        System.out.println("采集线程:已交换缓冲区");
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
            // 最后一次交换剩余数据
            if (!buffer1.isEmpty()) {
                try {
                    buffer1 = EXCHANGER.exchange(buffer1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("采集线程结束");
        });

        Thread sender = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    buffer2 = EXCHANGER.exchange(buffer2); // 拿到要发送的数据
                    System.out.println("发送线程:正在发送 " + buffer2.size() + " 条数据");
                    // 模拟网络传输
                    Thread.sleep(300);
                    buffer2.clear(); // 清空后继续接收下一批
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            System.out.println("发送线程结束");
        });

        producer.start();
        sender.start();

        try {
            producer.join();
            sender.interrupt();
            sender.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在这个例子中,两个线程通过 EXCHANGER.exchange() 方法完成数据交接。一旦某一方调用 exchange,它就会等待另一方也调用 exchange,然后两者原子性地交换数据。这个过程天然线程安全,无需 synchronized 或显式锁。

注意事项

Exchanger 只适用于两个线程之间的配对交换。如果涉及多个线程,就得考虑其他并发工具,比如 BlockingQueue。另外,exchange 调用会阻塞,直到配对线程也到达交换点,所以要合理设计超时处理或中断机制,避免死等。

在备份系统中,这种模式特别适合“生产-消费”节奏稳定、数据批量处理的场景,能有效降低锁竞争,提升吞吐。