使用Java AtomicLong实现单线程保证(参考kafka)
当查看 KafkaConsumer 类的 poll() 方法附近的源代码时,注意到它使用 AtomicLong 来保证单线程执行的。由于这个思路非常精妙,因此我也试了一个自己的版本并附上了。
(参考:Kafka消费者的poll()方法)
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
private static final Long NO_CURRENT_THREAD = -1L;
private static final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private static final AtomicInteger refcount = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
refcount.incrementAndGet();
currentThread.incrementAndGet();
Thread.sleep(199);
release();
Thread.sleep(199);
acquire();
}
public static void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
System.out.println("this is not safe for multi-threaded access");
refcount.incrementAndGet();
System.out.println(threadId);
System.out.println(currentThread);
}
private static void release() {
if(refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
}
KafkaConsumer的源代码
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}