View Javadoc

1   /*
2    * Copyright (c) 2011.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.filter;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.expreval.client.NullColumnValueException;
26  import org.apache.expreval.client.ResultMissingColumnException;
27  import org.apache.expreval.expr.ExpressionTree;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.hbql.client.HBqlException;
30  import org.apache.hadoop.hbase.hbql.impl.HRecordImpl;
31  import org.apache.hadoop.hbase.hbql.impl.Utils;
32  import org.apache.hadoop.hbase.hbql.io.IO;
33  import org.apache.hadoop.hbase.hbql.mapping.ColumnAttrib;
34  import org.apache.hadoop.hbase.hbql.mapping.FieldType;
35  import org.apache.hadoop.hbase.hbql.mapping.MappingContext;
36  import org.apache.hadoop.hbase.hbql.mapping.TableMapping;
37  import org.apache.hadoop.hbase.util.Bytes;
38  
39  import java.io.ByteArrayInputStream;
40  import java.io.ByteArrayOutputStream;
41  import java.io.DataInput;
42  import java.io.DataOutput;
43  import java.io.IOException;
44  import java.io.ObjectInputStream;
45  import java.io.ObjectOutputStream;
46  import java.util.Arrays;
47  import java.util.List;
48  
49  public class RecordFilter extends InstrumentedFilter {
50  
51      private static final Log LOG = LogFactory.getLog(RecordFilter.class);
52  
53      private boolean        verbose        = false;
54      private ExpressionTree expressionTree = null;
55  
56      public HRecordImpl record = new HRecordImpl((MappingContext)null);
57  
58      public RecordFilter() {
59      }
60  
61      private RecordFilter(final ExpressionTree expressionTree) {
62          this.expressionTree = expressionTree;
63          this.getHRecord().setMappingContext(this.getExpressionTree().getMappingContext());
64      }
65  
66      public static RecordFilter newRecordFilter(final ExpressionTree expressionTree) {
67          return (expressionTree == null) ? null : new RecordFilter(expressionTree);
68      }
69  
70      public boolean hasFilterRow() {
71          return true;
72      }
73  
74      private HRecordImpl getHRecord() {
75          return this.record;
76      }
77  
78      private TableMapping getMapping() {
79          return this.getExpressionTree().getTableMapping();
80      }
81  
82      private ExpressionTree getExpressionTree() {
83          return this.expressionTree;
84      }
85  
86      private boolean hasValidExpressionTree() {
87          return this.getExpressionTree() != null;
88      }
89  
90      public void setVerbose(final boolean verbose) {
91          this.verbose = verbose;
92      }
93  
94      public boolean getVerbose() {
95          return this.verbose;
96      }
97  
98      public void reset() {
99          if (this.getVerbose())
100             LOG.debug("In reset()");
101         this.getHRecord().clearValues();
102     }
103 
104     public boolean filterRowKey(byte[] buffer, int offset, int length) {
105         // LOG.debug("In filterRowKey()");
106         //final String rowKey = new String(buffer, offset, length);
107         final byte[] rowKey = Arrays.copyOfRange(buffer, offset, length);
108         try {
109             this.getMapping().getKeyAttrib().setCurrentValue(record, 0, rowKey);
110         }
111         catch (HBqlException e) {
112             Utils.logException(LOG, e);
113         }
114         return false;
115     }
116 
117     public boolean filterAllRemaining() {
118         return false;
119     }
120 
121     public void filterRow(List<KeyValue> keyValues) {
122         // TODO This is new
123     }
124 
125     public ReturnCode filterKeyValue(KeyValue v) {
126 
127         if (this.getVerbose())
128             LOG.debug("In filterKeyValue()");
129 
130         if (this.hasValidExpressionTree()) {
131 
132             try {
133                 final String familyName = Bytes.toString(v.getFamily());
134                 final String columnName = Bytes.toString(v.getQualifier());
135                 final TableMapping tableMapping = this.getMapping();
136                 final ColumnAttrib attrib = tableMapping.getAttribFromFamilyQualifiedName(familyName, columnName);
137 
138                 // Do not bother setting value if it is not used in expression
139                 if (this.getExpressionTree().getAttribsUsedInExpr().contains(attrib)) {
140                     if (this.getVerbose())
141                         LOG.debug("In filterKeyValue() setting value for: " + familyName + ":" + columnName);
142                     final Object val = attrib.getValueFromBytes(null, v.getValue());
143                     this.getHRecord().setCurrentValue(familyName, columnName, v.getTimestamp(), val);
144                     this.getHRecord().setVersionValue(familyName, columnName, v.getTimestamp(), val, true);
145                 }
146             }
147             catch (Exception e) {
148                 Utils.logException(LOG, e);
149                 LOG.debug("Exception in filterKeyValue(): " + e.getClass().getName() + " - " + e.getMessage());
150             }
151         }
152 
153         return ReturnCode.INCLUDE;
154     }
155 
156     public boolean filterRow() {
157 
158         if (this.getVerbose())
159             LOG.debug("In filterRow()");
160 
161         boolean filterRow;
162         if (!this.hasValidExpressionTree()) {
163             if (this.getVerbose())
164                 LOG.debug("In filterRow() had invalid hasValidExpressionTree(): ");
165             filterRow = false;
166         }
167         else {
168             try {
169                 filterRow = !this.getExpressionTree().evaluate(null, this.getHRecord());
170                 if (this.getVerbose())
171                     LOG.debug("In filterRow() filtering record: " + filterRow);
172             }
173             catch (ResultMissingColumnException e) {
174                 if (this.getVerbose())
175                     LOG.debug("In filterRow() had ResultMissingColumnException exception: " + e.getMessage());
176                 filterRow = true;
177             }
178             catch (NullColumnValueException e) {
179                 if (this.getVerbose())
180                     LOG.debug("In filterRow() had NullColumnValueException exception: " + e.getMessage());
181                 filterRow = true;
182             }
183             catch (HBqlException e) {
184                 e.printStackTrace();
185                 Utils.logException(LOG, e);
186                 LOG.debug("In filterRow() had HBqlException: " + e.getMessage());
187                 filterRow = true;
188             }
189         }
190 
191         //LOG.debug("In filterRow() returning: " + filterRow);
192         return filterRow;
193     }
194 
195     public void write(DataOutput out) throws IOException {
196         try {
197             out.writeBoolean(this.getVerbose());
198             final byte[] b = IO.getSerialization().getObjectAsBytes(this.getExpressionTree());
199             Bytes.writeByteArray(out, b);
200         }
201         catch (HBqlException e) {
202             e.printStackTrace();
203             Utils.logException(LOG, e);
204             throw new IOException(e.getCause());
205         }
206     }
207 
208     public void readFields(DataInput in) throws IOException {
209 
210         try {
211             this.verbose = in.readBoolean();
212             final byte[] b = Bytes.readByteArray(in);
213             this.expressionTree = (ExpressionTree)IO.getSerialization().getScalarFromBytes(FieldType.ObjectType, b);
214 
215             this.getHRecord().setMappingContext(this.getExpressionTree().getMappingContext());
216             this.getMapping().resetDefaultValues();
217         }
218         catch (HBqlException e) {
219             e.printStackTrace();
220             Utils.logException(LOG, e);
221             throw new IOException(e.getCause());
222         }
223     }
224 
225     public static void testFilter(final RecordFilter origFilter) throws HBqlException, IOException {
226 
227         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
228         final ObjectOutputStream oos = new ObjectOutputStream(baos);
229         origFilter.write(oos);
230         oos.flush();
231         oos.close();
232         final byte[] b = baos.toByteArray();
233 
234         final ByteArrayInputStream bais = new ByteArrayInputStream(b);
235         final ObjectInputStream ois = new ObjectInputStream(bais);
236 
237         RecordFilter filter = new RecordFilter();
238         filter.readFields(ois);
239 
240         filter.reset();
241 
242         final String family = "family1";
243         final String column = "author";
244         final String[] vals = {"An author value-81252702162528282000",
245                                "An author value-812527021593753270002009",
246                                "An author value-81252702156610125000",
247                                "An author value-812527021520532270002009",
248                                "An author value-81252702147337884000"
249         };
250 
251         for (String val : vals) {
252             filter.getHRecord().setCurrentValue(family, column, 100, val);
253             filter.getHRecord().setVersionValue(family, column, 100, val, true);
254         }
255 
256         boolean v = filter.filterRow();
257     }
258 }