View Javadoc

1   /*
2    * Copyright (c) 2011.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.impl;
22  
23  import org.apache.hadoop.hbase.hbql.client.HBqlException;
24  import org.apache.hadoop.hbase.hbql.util.ArrayBlockingQueues;
25  import org.apache.hadoop.hbase.hbql.util.CompletionQueue;
26  import org.apache.hadoop.hbase.hbql.util.Lists;
27  import org.apache.hadoop.hbase.hbql.util.NamedThreadFactory;
28  import org.apache.hadoop.hbase.hbql.util.PoolableElement;
29  
30  import java.util.List;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.RejectedExecutionHandler;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  
41  public abstract class CompletionQueueExecutor<T> extends PoolableElement<CompletionQueueExecutor> {
42  
43      private final AtomicBoolean       atomicShutdown       = new AtomicBoolean(false);
44      private final AtomicInteger       workSubmittedCounter = new AtomicInteger(0);
45      private final ExecutorService     submitterThread      = Executors.newSingleThreadExecutor();
46      private final List<HBqlException> exceptionList        = Lists.newArrayList();
47      private final LocalThreadPoolExecutor threadPoolExecutor;
48      private final CompletionQueue<T>      completionQueue;
49  
50      private static class LocalCallerRunsPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
51  
52          public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor threadPoolExecutor) {
53              super.rejectedExecution(runnable, threadPoolExecutor);
54              ((LocalThreadPoolExecutor)threadPoolExecutor).incrementRejectionCount();
55          }
56      }
57  
58      private static class LocalThreadPoolExecutor extends ThreadPoolExecutor {
59  
60          private final AtomicInteger rejectionCounter = new AtomicInteger(0);
61  
62          private LocalThreadPoolExecutor(final int minPoolSize,
63                                          final int maxPoolSize,
64                                          final long keepAliveTime,
65                                          final TimeUnit timeUnit,
66                                          final BlockingQueue<Runnable> backingQueue,
67                                          final ThreadFactory threadFactory,
68                                          final RejectedExecutionHandler handler) {
69              super(minPoolSize, maxPoolSize, keepAliveTime, timeUnit, backingQueue, threadFactory, handler);
70          }
71  
72          private AtomicInteger getRejectionCounter() {
73              return this.rejectionCounter;
74          }
75  
76          private void incrementRejectionCount() {
77              this.getRejectionCounter().incrementAndGet();
78          }
79  
80          private void reset() {
81              this.getRejectionCounter().set(0);
82          }
83  
84          public int getRejectionCount() {
85              return this.getRejectionCounter().get();
86          }
87      }
88  
89      protected CompletionQueueExecutor(final QueryExecutorPoolImpl executorPool,
90                                        final int minThreadCount,
91                                        final int maxThreadCount,
92                                        final long keepAliveSecs,
93                                        final int completionQueueSize) throws HBqlException {
94          super(executorPool);
95          final BlockingQueue<Runnable> backingQueue = ArrayBlockingQueues.newArrayBlockingQueue(maxThreadCount * 5);
96          final String name = executorPool == null ? "Non query exec pool" : "Query exec pool " + executorPool.getName();
97          this.threadPoolExecutor = new LocalThreadPoolExecutor(minThreadCount,
98                                                                maxThreadCount,
99                                                                keepAliveSecs,
100                                                               TimeUnit.SECONDS,
101                                                               backingQueue,
102                                                               new NamedThreadFactory(name),
103                                                               new LocalCallerRunsPolicy());
104         this.completionQueue = new CompletionQueue<T>(completionQueueSize);
105     }
106 
107     public abstract boolean threadsReadResults();
108 
109     private ExecutorService getSubmitterThread() {
110         return this.submitterThread;
111     }
112 
113     private LocalThreadPoolExecutor getThreadPoolExecutor() {
114         return this.threadPoolExecutor;
115     }
116 
117     private CompletionQueue<T> getCompletionQueue() {
118         return this.completionQueue;
119     }
120 
121     private AtomicInteger getWorkSubmittedCounter() {
122         return this.workSubmittedCounter;
123     }
124 
125     public void putElement(final T val) throws HBqlException {
126         this.getCompletionQueue().putElement(val);
127     }
128 
129     public void putCompletion() {
130         this.getCompletionQueue().putCompletionToken();
131     }
132 
133     public CompletionQueue.Element<T> takeElement() throws HBqlException {
134         if (this.getExceptionList().size() > 0)
135             throw this.getExceptionList().remove(0);
136 
137         return this.getCompletionQueue().takeElement();
138     }
139 
140     public int getRejectionCount() {
141         return this.getThreadPoolExecutor().getRejectionCount();
142     }
143 
144     private List<HBqlException> getExceptionList() {
145         return this.exceptionList;
146     }
147 
148     public void addException(final HBqlException exception) {
149         this.getExceptionList().add(exception);
150     }
151 
152     public boolean moreResultsPending() {
153         final int completionCount = this.getCompletionQueue().getCompletionCount();
154         final int submittedCount = this.getWorkSubmittedCounter().get();
155         return completionCount < submittedCount;
156     }
157 
158     public void submitWorkToSubmitterThread(final Runnable job) {
159         this.getWorkSubmittedCounter().incrementAndGet();
160         this.getSubmitterThread().submit(job);
161     }
162 
163     public void submitWorkToThreadPool(final Runnable job) {
164         this.getWorkSubmittedCounter().incrementAndGet();
165         this.getThreadPoolExecutor().execute(job);
166     }
167 
168     public void resetElement() {
169         this.getWorkSubmittedCounter().set(0);
170         this.getExceptionList().clear();
171         this.getCompletionQueue().reset();
172         this.getThreadPoolExecutor().reset();
173     }
174 
175     public void close() {
176         this.resetElement();
177         this.releaseElement();
178     }
179 
180     public void releaseElement() {
181         this.getElementPool().release(this);
182     }
183 
184     private AtomicBoolean getAtomicShutdown() {
185         return this.atomicShutdown;
186     }
187 
188     public boolean isShutdown() {
189         return this.getAtomicShutdown().get();
190     }
191 
192     public void shutdown() {
193         if (!this.isShutdown()) {
194             synchronized (this) {
195                 if (!this.isShutdown()) {
196                     this.getThreadPoolExecutor().shutdown();
197                     this.getSubmitterThread().shutdown();
198                     this.getAtomicShutdown().set(true);
199                 }
200             }
201         }
202     }
203 }