1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.hbql.impl;
22
23 import org.apache.hadoop.hbase.hbql.client.AsyncExecutor;
24 import org.apache.hadoop.hbase.hbql.client.QueryFuture;
25 import org.apache.hadoop.hbase.hbql.util.NamedThreadFactory;
26
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 public class AsyncExecutorImpl implements AsyncExecutor {
36
37 private final AtomicBoolean atomicShutdown = new AtomicBoolean(false);
38 private final AtomicInteger workSubmittedCounter = new AtomicInteger(0);
39 private final LocalThreadPoolExecutor threadPoolExecutor;
40
41 private final String poolName;
42 private final int minThreadCount;
43 private final int maxThreadCount;
44 private final long keepAliveSecs;
45
46 private static class LocalThreadPoolExecutor extends ThreadPoolExecutor {
47
48 private final AtomicInteger queryCounter = new AtomicInteger(0);
49
50 private LocalThreadPoolExecutor(final int minPoolSize,
51 final int maxPoolSize,
52 final long keepAliveTime,
53 final TimeUnit timeUnit,
54 final BlockingQueue<Runnable> backingQueue,
55 final ThreadFactory threadFactory) {
56 super(minPoolSize, maxPoolSize, keepAliveTime, timeUnit, backingQueue, threadFactory);
57 }
58
59 private AtomicInteger getQueryCounter() {
60 return this.queryCounter;
61 }
62
63 private void incrementQueryCount() {
64 this.getQueryCounter().incrementAndGet();
65 }
66
67 private void reset() {
68 this.getQueryCounter().set(0);
69 }
70
71 public int getQueryCount() {
72 return this.getQueryCounter().get();
73 }
74
75 protected void beforeExecute(final Thread thread, final Runnable runnable) {
76 super.beforeExecute(thread, runnable);
77
78 final AsyncRunnable asyncRunnable = (AsyncRunnable)runnable;
79 asyncRunnable.getQueryFuture().markQueryStart();
80 }
81
82 protected void afterExecute(final Runnable runnable, final Throwable throwable) {
83 super.afterExecute(runnable, throwable);
84
85 final AsyncRunnable asyncRunnable = (AsyncRunnable)runnable;
86 asyncRunnable.getQueryFuture().markQueryComplete();
87 }
88 }
89
90 public AsyncExecutorImpl(final String poolName,
91 final int minThreadCount,
92 final int maxThreadCount,
93 final long keepAliveSecs) {
94 this.poolName = poolName;
95 this.minThreadCount = minThreadCount;
96 this.maxThreadCount = maxThreadCount;
97 this.keepAliveSecs = keepAliveSecs;
98
99 final BlockingQueue<Runnable> backingQueue = new LinkedBlockingQueue<Runnable>();
100 final String name = "Async exec pool " + this.getName();
101 this.threadPoolExecutor = new LocalThreadPoolExecutor(minThreadCount,
102 maxThreadCount,
103 keepAliveSecs,
104 TimeUnit.SECONDS,
105 backingQueue,
106 new NamedThreadFactory(name));
107 }
108
109 private LocalThreadPoolExecutor getThreadPoolExecutor() {
110 return this.threadPoolExecutor;
111 }
112
113 private AtomicInteger getWorkSubmittedCounter() {
114 return this.workSubmittedCounter;
115 }
116
117 public void resetElement() {
118 this.getWorkSubmittedCounter().set(0);
119 this.getThreadPoolExecutor().reset();
120 }
121
122 public QueryFuture submit(final AsyncRunnable job) {
123 this.getWorkSubmittedCounter().incrementAndGet();
124 this.getThreadPoolExecutor().execute(job);
125 return job.getQueryFuture();
126 }
127
128 private AtomicBoolean getAtomicShutdown() {
129 return this.atomicShutdown;
130 }
131
132 public boolean isShutdown() {
133 return this.getAtomicShutdown().get();
134 }
135
136 public void shutdown() {
137 if (!this.isShutdown()) {
138 synchronized (this) {
139 if (!this.isShutdown()) {
140 this.getThreadPoolExecutor().shutdown();
141 this.getAtomicShutdown().set(true);
142 }
143 }
144 }
145 }
146
147 public String getName() {
148 return this.poolName;
149 }
150
151 public int getMinThreadCount() {
152 return this.minThreadCount;
153 }
154
155 public int getMaxThreadCount() {
156 return this.maxThreadCount;
157 }
158
159 public long getKeepAliveSecs() {
160 return this.keepAliveSecs;
161 }
162 }