使用Java AtomicLong实现单线程保证(参考kafka)

当查看 KafkaConsumer 类的 poll() 方法附近的源代码时,注意到它使用 AtomicLong 来保证单线程执行的。由于这个思路非常精妙,因此我也试了一个自己的版本并附上了。

(参考:Kafka消费者的poll()方法)


import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;


public class Main {


    private static final Long NO_CURRENT_THREAD = -1L;
    private static final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);

    private static final AtomicInteger refcount = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {

        refcount.incrementAndGet();

        currentThread.incrementAndGet();

        Thread.sleep(199);

        release();

        Thread.sleep(199);

        acquire();    
    }

     public static void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            System.out.println("this is not safe for multi-threaded access");

        refcount.incrementAndGet();

        System.out.println(threadId);
        System.out.println(currentThread);
    }

    private static void release() {

        if(refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);    
    }

}

KafkaConsumer的源代码



    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        acquireAndEnsureOpen();
        try {
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }

            // poll for new data until the timeout expires
            do {
                client.maybeTriggerWakeup();

                if (includeMetadataInTimeout) {
                    if (!updateAssignmentMetadataIfNeeded(timer)) {
                        return ConsumerRecords.empty();
                    }
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                        log.warn("Still waiting for metadata");
                    }
                }

                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.pollNoWakeup();
                    }

                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
            } while (timer.notExpired());

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }