View Javadoc

1   /*
2    * Copyright (c) 2010.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.util;
22  
23  import org.apache.hadoop.hbase.hbql.client.HBqlException;
24  
25  import java.util.concurrent.BlockingQueue;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  public class CompletionQueue<T> {
29  
30      public static class Element<R> {
31  
32          private R value = null;
33          private boolean completionToken = false;
34  
35          private Element(final R value, boolean completionToken) {
36              this.value = value;
37              this.completionToken = completionToken;
38          }
39  
40          public static <S> Element<S> getElement(final Element<S> element, final S value) {
41              element.value = value;
42              element.completionToken = false;
43              return element;
44          }
45  
46          public static <S> Element<S> newEmptyToken() {
47              return new Element<S>(null, false);
48          }
49  
50          public static <S> Element<S> newCompletionToken() {
51              return new Element<S>(null, true);
52          }
53  
54          public R getValue() {
55              return this.value;
56          }
57  
58          public boolean isCompletionToken() {
59              return this.completionToken;
60          }
61      }
62  
63      private final Element<T> completionToken = Element.newCompletionToken();
64      private final AtomicInteger completionCounter = new AtomicInteger(0);
65  
66      private final BlockingQueue<Element<T>> elementQueue;
67      private final BlockingQueue<Element<T>> reusableElementQueue;
68  
69      public CompletionQueue(final int size) throws HBqlException {
70  
71          this.elementQueue = ArrayBlockingQueues.newArrayBlockingQueue(size, true);
72  
73          // Reusable element queue avoids creating objects for every item put in queue.
74          this.reusableElementQueue = ArrayBlockingQueues.newArrayBlockingQueue(size);
75  
76          try {
77              for (int i = 0; i < size; i++) {
78                  final Element<T> emptyItem = Element.newEmptyToken();
79                  this.getReusableElementQueue().put(emptyItem);
80              }
81          }
82          catch (InterruptedException e) {
83              throw new HBqlException(e);
84          }
85      }
86  
87      private BlockingQueue<Element<T>> getElementQueue() {
88          return this.elementQueue;
89      }
90  
91      private BlockingQueue<Element<T>> getReusableElementQueue() {
92          return this.reusableElementQueue;
93      }
94  
95      private AtomicInteger getCompletionCounter() {
96          return this.completionCounter;
97      }
98  
99      private Element<T> getCompletionToken() {
100         return this.completionToken;
101     }
102 
103     public int getCompletionCount() {
104         return this.getCompletionCounter().get();
105     }
106 
107     public void putCompletionToken() {
108         try {
109             this.getElementQueue().put(this.getCompletionToken());
110         }
111         catch (InterruptedException e) {
112             e.printStackTrace();
113         }
114     }
115 
116     public void putElement(final T val) throws HBqlException {
117         try {
118             final Element<T> element = Element.getElement(this.getReusableElementQueue().take(), val);
119             this.getElementQueue().put(element);
120         }
121         catch (InterruptedException e) {
122             throw new HBqlException(e);
123         }
124     }
125 
126     public Element<T> takeElement() throws HBqlException {
127         try {
128             final Element<T> element = this.getElementQueue().take();
129 
130             // Completion tokens do not go back to reusable queue
131             if (element.isCompletionToken())
132                 this.getCompletionCounter().incrementAndGet();
133             else
134                 this.getReusableElementQueue().put(element);
135 
136             return element;
137         }
138         catch (InterruptedException e) {
139             throw new HBqlException(e);
140         }
141     }
142 
143     public void reset() {
144         this.getCompletionCounter().set(0);
145         this.getElementQueue().clear();
146     }
147 }