tirsdag den 3. februar 2015

Use less memory for Term Indices


Intro


Solr/Lucene 4.4, but also relevant for later versions

Term indices live in memory. If you have a lot of documents and/or lots of indexed fields those term indices will require a lot of memory. But you can do something to limit it.

Problem


The term index is basically the .tip files in your Lucene index-folder. They live almost 1-to-1 in memory.

A concrete case I have worked on
  • Have an indexed id field of type string. Fairly long id's - all unique
  • SolrCloud system where each Solr contains about 30 billion documents
  • Memory usage for in-memory term-index alone is about 10 GB
10 GB is a lot, so we wanted to reduce that

Solution


BlockTreeTermsWriter used by Lucene41PostingsFormat has support for something called minTermBlockSize and maxTermBlockSize with default values 25 and 48 respectively. Increasing those values will reduce term index size. There is not an out-of-the-box support for it in Lucene/Solr, but you can do it yourself

Create a new abstract postings-format that let you increase the term-block-sizes by a factor
 
 /** 
  * Same as {@link Lucene41PostingsFormat} except that
  * * minTermBlockSize is {@link BlockTreeTermsWriter.DEFAULT_MIN_BLOCK_SIZE} * <some factor> (instead of just {@link BlockTreeTermsWriter.DEFAULT_MIN_BLOCK_SIZE}) 
  * * maxTermBlockSize is 2 * (minTermBlockSize - 1)
  */
 public abstract class Lucene41FactorPostingsFormat extends PostingsFormat {
  private static final Logger log = LoggerFactory.getLogger(Lucene41FactorPostingsFormat.class);
  
  private final Lucene41PostingsFormat delegate; 
 
  public Lucene41FactorPostingsFormat(int factor) {
   super("Lucene41x" + factor);
   int minTermBlockSize = BlockTreeTermsWriter.DEFAULT_MIN_BLOCK_SIZE * factor; 
   int maxTermBlockSize = 2 * (minTermBlockSize - 1);
   log.info(getName() + "(" + minTermBlockSize + "," + maxTermBlockSize + ")");
   delegate = new Lucene41PostingsFormat(minTermBlockSize, maxTermBlockSize);
  }
 
  @Override
  public String toString() {
   return delegate.toString();
  }
 
  @Override
  public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
   return delegate.fieldsConsumer(state);
  }
 
  @Override
  public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
   return delegate.fieldsProducer(state);
  }
 
 }

Create new postings-formats with concrete factors. E.g 4 and 16 factor postings-formats
 
 /** 
  * {@link Lucene41FactorPostingsFormat} using factor 4
  */
 public class Lucene41x4PostingsFormat extends Lucene41FactorPostingsFormat {
 
  public Lucene41x4PostingsFormat() {
   super(4);
  }
 
 }
 
 /** 
  * {@link Lucene41FactorPostingsFormat} using factor 16
  */
 public class Lucene41x16PostingsFormat extends Lucene41FactorPostingsFormat {
 
  public Lucene41x16PostingsFormat() {
   super(16);
  }
 
 }

Use those postings-formats instead of plain Lucene41PostingsFormat

In schema.xml
  • Declare the new field-types - e.g. for strings
  •  
     <fieldType name="stringx4" class="solr.StrField" sortMissingLast="true" postingsFormat="Lucene41x4" />
     <fieldType name="stringx16" class="solr.StrField" sortMissingLast="true" postingsFormat="Lucene41x16" />
    
  • Use the new field-types for your fields - e.g. an indexed id string field
  •  
     <field name="id" type="stringx16" indexed="true" stored="true" required="true"/>
    
You can make the change for any indexed field. We only did it for our id field.

You can make the change for existing Lucene-indices (replica in Solr) and continue searching and indexing in them. Lucene is still able to read the term indices of the existing segments (written with Lucene41PostingsFormat), but their size in memory will not be reduced. New segments in this index will be written with Lucene41xXXPostingsFormat and their size in memory will be reduced. As merging occur you will have more and more segments written in the new postings-format. If you optimize, all segments will be written in the new postings-format and you will have gained fully.

Consequences


In our concrete setup we saw the following consequences
  • The 10 GB memory usage reduced to about 1.5 GB
  • No significant changes in search response-time (for the searches we do in practice)
  • Indexing about 10% slower
What consequences you will see, will probably depend a lot on you concrete setup, so make sure to test thoroughly.

The initiative seems to matter much more for string fields than for e.g. int or long fields.

mandag den 26. maj 2014

Performance of AND-queries with uneven hits


Intro


Using Solr 4.4

Selected parts of my schema.xml
 
    <field name="_version_" type="dlng" indexed="false" stored="true" required="true" docValues="true"/>
    <field name="id" type="string" indexed="true" stored="true" required="true"/>
    <dynamicField name="*_str_ind_sto" type="string" indexed="true" stored="true"/>
    <dynamicField name="*_dstr_doc_sto" type="dstring" indexed="false" stored="true" required="true" docValues="true"/>
    <dynamicField name="*_str_sto" type="string" indexed="false" stored="true"/>
    <dynamicField name="*_int_sto" type="int" indexed="false" stored="true"/>
    <dynamicField name="*_lng_sto" type="long" indexed="false" stored="true"/>
    <dynamicField name="*_lng_ind_sto" type="long" indexed="true" stored="true"/>
    <dynamicField name="*_dlng_doc_sto" type="dlng" indexed="false" stored="true" required="true" docValues="true"/>
    <dynamicField name="*_dlng_doc_ind_sto" type="dlng" indexed="true" stored="true" required="true" docValues="true"/>
    <dynamicField name="*_txt_ind_sto_tp" type="text" indexed="true" stored="true" termVectors="true" termPositions="true"/>
    ...
    <fieldType name="string" class="solr.StrField" sortMissingLast="true" />
    <fieldType name="dstring" class="solr.StrField" sortMissingLast="true" docValuesFormat="Disk"/>
    <fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/> 
    <fieldType name="dlng" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0" docValuesFormat="Disk"/>
    <fieldType name="text" class="solr.TextField" positionIncrementGap="100">
        ...
    </fieldType>
All documents have the following fields
  • start_timestamp_dlng_doc_ind_sto (long, docvalue, indexed, stored)
  • end_timestamp_lng_ind_sto (long, indexed, stored)
  • no1_dlng_doc_ind_sto (long, docvalue, indexed, stored)
  • no2_dlng_doc_ind_sto (long, docvalue, indexed, stored)
  • 2 other int_sto fields
  • 2 other dlng_doc_ind_sto fields
  • 3 other lng_ind_sto fields
  • 2 other str_sto fields
  • 1 other lng_sto field
  • 2 other int_sto fields
My collection contains about 5 billion documents with the following distribution
  • start_timestamp (close to) evenly (uniform) distributed over a month (december 2013)
  • no1 distributed over 5 million numbers
  • no2 distributed over millions of numbers - not in any way correlated with the distribution of no1
I want, for a certian period in time (a sub-period of december 2013), to find the unique no2's for a particular no1.

First obvious attempt


This can be achieved by doing a facet-search like this
 
q=no1_dlng_doc_ind_sto:(<some-number>) AND 
start_timestamp_dlng_doc_ind_sto:([<TIME_START> TO <TIME_END>])&
facet=true&facet.field=no2_dlng_doc_ind_sto&facet.limit=-1&facet.offset=0&facet.mincount=1&facet.zeros=false&start=0&rows=0&distrib=true

Such a search hits about 500-3000 documents on the no1_dlng_doc_ind_sto part of the query and billions of documents on the start_timestamp_dlng_doc_ind_sto part of the query - depending on the length of the time-interval (all of december: 5 billion hits, half of december 2.5 billion hits etc). Number of unique no2_dlng_doc_ind_sto vary from 100 to 800.

I have a test looping for an hour using different values for no1_dlng_doc_ind_sto every time, but using the same time-interval all the time (about 2/3 of december). For every loop it does
  • 1 concurrent search
  • 2 concurrent searches
  • 4 concurrent searches
  • 8 concurrent searches
This in order to see how things behave when doing the searches in parallel. no1_dlng_doc_ind_sto values are never reused - neither inside a loop nor between loops. The test keeps track of average response-times

Output from the test when doing the query as stated above
1 concurrent average response-time: 6549
2 concurrent average response-time: 6350
4 concurrent average response-time: 9492
8 concurrent average response-time: 15149

Hmmm, I am really not impressed. I fear that Solr/Lucene is actually finding the 500-3000 document-ids and the several-billions document-ids using indices on no1_dlng_doc_ind_sto and start_timestamp_dlng_doc_ind_sto respectively. And then intersects them. This might not be the most efficient way of finding the documents.

Trying with filter-query


Do not know exactly what filter-queries do, so lets just try moving the start_timestamp_dlng_doc_ind_sto over as a filter-query
 
q=no1_dlng_doc_ind_sto:(<some-number>)&
fq=start_timestamp_dlng_doc_ind_sto:([<TIME_START> TO <TIME_END>])&
facet=true&facet.field=no2_dlng_doc_ind_sto&facet.limit=-1&facet.offset=0&facet.mincount=1&facet.zeros=false&start=0&rows=0&distrib=true

Output from the test when doing this search
1 concurrent average response-time: 35658
2 concurrent average response-time: 35087
4 concurrent average response-time: 45384
8 concurrent average response-time: 65576

Ahhhh, that definitely did not help. Did not dive more into that.

Doing time-interval filter and faceting on client


Well we could try to just request the documents having the correct value for no1_dlng_doc_ind_sto and then do filtering on start_timestamp_dlng_doc_ind_sto and faceting on client-side
 
q=no1_dlng_doc_ind_sto:(<some-number>)&fl=no2_dlng_doc_ind_sto,start_timestamp_dlng_doc_ind_sto&start=0&rows=10000&distrib=true

Output from the test when doing this search
1 concurrent average response-time: 3104
2 concurrent average response-time: 4430
4 concurrent average response-time: 6623
8 concurrent average response-time: 10738

Better, but still not really impressed. I profiled a little on this one and found that it is mainly "slow" because it fetches the values to be returned from "slow" storage (even though all values that should be retrieved are docvalue - maybe someone should change that)

Doing time-interval filter by iterating docvalues on result-candidates


I would really like to see how this performs if you, on server-side, just use the index on no1_dlng_doc_ind_sto to find candidate-docs and then for each of those documents fetch the value of start_timestamp_dlng_doc_ind_sto (using docvalue) to filter out the docs that does not fit the time-interval. I hacked SolrIndexSearcher to do exactly that
 
q=no1_dlng_doc_ind_sto:(<some-number>)&
facet=true&facet.field=no2_dlng_doc_ind_sto&facet.limit=-1&facet.offset=0&facet.mincount=1&facet.zeros=false&start=0&rows=0&distrib=true

You do not get the entire picture from that query - remember that documents are filtered with respect to the time-interval, but this is hardcoded and not expressed in the URL. Basically running Solrs with this hacky patch and starting them with the following params
  • -DDO_HARDCODED_FILTERING=true
  • -DHARDCODED_FILTERING_START_TIME=<TIME_START>
  • -DHARDCODED_FILTERING_END_TIME=<TIME_END>
The query in question hits the getDocListAndSetNC.branch#2 path and I really do not know if it works on any of the other paths. Please note that this patch is only to test how it will perform, and in no way expressing my suggestion on how it should be implemented if introduced as a real feature of Solr/Lucene.

Output from the test when doing the search like that
1 concurrent average response-time: 449
2 concurrent average response-time: 573
4 concurrent average response-time: 776
8 concurrent average response-time: 1198

This is much better.

Using the existing Solr/Lucene solution :-)


Along the way testing all this I asked on solr-user@apache.lucene.org mailing-list if this feature is already in Solr/Lucene and how to activate it. Yonik Seeley pointed me to this page and suggested the following way of doing the search
 
q=no1_dlng_doc_ind_sto:(<some-number>)&
fq={!frange cache=false cost=150 v=start_timestamp_dlng_doc_ind_sto l=<TIME_START> u=<TIME_END>}&
facet=true&facet.field=no2_dlng_doc_ind_sto&facet.limit=-1&facet.offset=0&facet.mincount=1&facet.zeros=false&start=0&rows=0&distrib=true

Output from the test when doing the search like that
1 concurrent average response-time: 394
2 concurrent average response-time: 448
4 concurrent average response-time: 678
8 concurrent average response-time: 969

Seems a little bit better than with my hacky patch. I am not sure you can trust the small differences in the measured response-times, but this seem at least as performant as what I did.

Looking at the code it seems like using the way of querying show above actually does nearly the same thing as I did with my hacky patch - just as a nice generalized, established and (hopefully) well tested feature. Using FunctionRangeQuery.FunctionRangeCollector implementing PostFilter, which gets its values from LongFieldSource.getValues using LongDocValue through FieldCache. That is great!

Outro


It been a journey, but I am very happy with it. I feel like I got a confirmation that my thoughts on the best strategy for carrying out such a query was right. I got some hands-on trying to implement it as a proof-of-concept. The fact that is it already there is just a bonus, that will save me from a lot of work. And I learned about more advanced ways to express queries.

onsdag den 22. januar 2014

Upgrading from SolrCloud 4.x to 4.y - as if you had used 4.y all along

Introduction

 

I have recently been upgrading one of my systems that use Solr in "cloud"-mode - that is, using ZooKeeper to make several Solr-nodes work together in a cluster. I upgraded from Solr 4.0.0 to 4.4.0, but this blog will also be relevant for several other combinations of from- and to-version - you just need to filter what is relevant for you.

Basically you can just stop your Solr-nodes, copy the new binaries and start Solr-nodes again, but
  • If you upgraded from 4.0.0 to a newer version, routing will not work the same anymore
  • If you, like me, like the state/configuration of the system after the upgrade, to be as it would have been if you had used 4.4.0 all along, there are several differences you need to correct 

Differences

 

Let me go through the differences I encountered between a 4.0.0-to-4.4.0 upgraded system and a clean 4.4.0 system

clusterstate.json

clusterstate.json lives in the root (of Solr-area) in ZooKeeper. It contains information about collection, shards and replica.

Before the upgrade my clusterstate.json looks like this (Solr 4.0.0)
 
{
  "my_collection_1":{
    "shard1":{
      "range":"80000000-8ccccccc",
      "replicas":{"my_host_1:8983_solr_my_collection_1_shard1_replica1":{
          "shard":"shard1",
          "roles":null,
          "state":"active",
          "core":"my_collection_1_shard1_replica1",
          "collection":"my_collection_1",
          "node_name":"my_host_1:8983_solr",
          "base_url":"http://my_host_1:8983/solr",
          "leader":"true"}}},
    "shard2":
    ... 19 more shards under "my_collection_1" ...
  },
  "my_collection_2":
  ... 23 more collections ...
}

If I had used Solr 4.4.0 all along my clusterstate.json would have looked like this
 
{
  "my_collection_1":{
    "shards":{
      "shard1":{
        "range":"80000000-8ccccccc",
        "state":"active",
        "replicas":{"core_node1":{
            "state":"active",
            "core":"my_collection_1_shard1_replica1",
            "node_name":"192.168.xxx.yyy:8983_solr",
            "base_url":"http://192.168.xxx.yyy:8983/solr",
            "leader":"true"}}},
      "shard2":
      ... 19 more shards under "my_collection_1" ...
    },
    "router":"compositeId"
  },
  "my_collection_2":
  ... 23 more collections ...
}

Differences
  1. All shards at collection-level has been wrapped inside a shards-map in 4.4.0
  2. This is probably to make room for other key-values at collection-level. router=compositeId has been added at this level in 4.4.0
  3. state=active has been added at shard-level in 4.4.0
  4. Replica-keys/names have changed from something on the form <hostname>:<port>_<context>_<shard-name>_replica<X> to just core_node<Y>
  5. shard, roles and collection no longer present at replica-level in 4.4.0
  6. node_name and base_url are now based on IP instead of hostname
Now if I just stop my 4.0.0 Solr-nodes, do the upgrade (copy the new Solr 4.4.0 binaries) and start Solr-nodes again, clusterstate.json will automatically be changed and look like this
 
{
  "my_collection_1":{"shards":{
      "shard1":{
        "range":"80000000-8ccccccc",
        "replicas":{"my_host_1:8983_solr_my_collection_1_shard1_replica1":{
            "state":"active",
            "core":"my_collection_1_shard1_replica1",
            "node_name":"my_host_1:8983_solr",
            "base_url":"http://my_host_1:8983/solr",
            "leader":"true"}},
        "state":"active"},
      "shard2":
      ... 19 more shards under "my_collection_1" ...
  }},
  "my_collection_2":
  ... 23 more collections ...
}

Which of the differences between 4.0.0 and 4.4.0, was automatically "corrected" by 4.4.0 started on top of a system that used to run 4.0.0
  1. All shards at collection-level has been wrapped inside a shards-map. Check!
  2. router=compositeId has not been added at collection-level :-(
  3. state=active has been added at shard-level. Check!
  4. Replica-keys/names have not been changed :-(
  5. shard, roles and collection have been removed from replica-level. Check!
  6. node_name and base_url are not based on IP :-(
solr.xml files

In 4.0.0 my solr.xml files contain <cores>-tag with lots of <core>-tags underneath. This still works after upgrading to 4.4.0, but you will be running in "legacy mode", which will not be supported from Solr 5.x

I would like my solr.xml files in my 4.0.0-to-4.4.0 upgraded system to be as they would have been if I had run 4.4.0 all along. Appendix 1) below shows my 4.4.0 solr.xml

core.properties files

In Solr 4.0.0 there are no core.properties files in <solr-home>/<replica-name> on disk (<solr-home> is controlled by VM-param -Dsolr.solr.home given when you start your Solr web-container (e.g. Jetty))

In Solr 4.4.0 there are a core.properties file for each replica in <solr-home>/<replica-name>. For my_collection_1 | shard1 | core_node1 it contains the following

 
name=my_collection_1_shard1_replica1
shard=shard1
collection=my_collection_1
coreNodeName=core_node1
I would like to have core.properties files in my 4.0.0-to-4.4.0 upgraded system, just as I would have if I had run 4.4.0 all along

Data in collection znode's

For each collection there exist a znode (folder) in ZooKeeper at /collections/<collection-name> (in Solr-area). As you probably know, a znode can contain data, even though it is a folder (contains "children")

In 4.0.0 the data of those collection-znodes is
 
{"configName":"my_conf"}

In 4.4.0 the data is
 
{"configName":"my_conf", "router":"implicit"}

I would like that also in my 4.0.0-to.4.4.0 upgraded system.

How I corrected the differences

 

Now that we have seen all the differences I encountered, lets look at what I did to "correct" them, in order for my 4.0.0-to-4.4.0 upgraded system to seem as if it had been 4.4.0 all along

  1. Make sure ZooKeeper is running, but that Solr 4.0.0 nodes are not
  2. Extract/download (using your favorite tool) clusterstate.json from ZooKeeper (root of Solr-area) in a folder <my-favorite-folder>/upgrade/before on the machine from which you do the upgrade
  3. Correct ranges in clusterstate.json as explained here (only necessary if you upgrade from 4.0.0)
  4.  
    java -classpath .:${SOLR_4_0_0_INSTALL}/dist/apache-solr-solrj-4.0.0.jar:${SOLR_4_0_0_INSTALL}/dist/solrj-lib/zookeeper-3.3.6.jar:${SOLR_4_0_0_INSTALL}/dist/solrj-lib/commons-io-2.1.jar:${SOLR_4_0_0_INSTALL}/dist/solrj-lib/slf4j-api-1.6.4.jar CorrectShardRangesInClusterState <my-favorite-folder>/upgrade/before/clusterstate.json <my-favorite-folder>/upgrade/after_ranges_fix/clusterstate.json
  5. Compile ClusterState4_0ToClusterStateAndCoreProperties4_4Upgrader.java from Appendix 2) below against 4.4.0 Solr code. You need to implement method hostnameToIP yourself first
  6. Convert clusterstate.json from 4.0.0-style to 4.4.0-style and generate all core.properties files
  7.  
    java -classpath .:${SOLR_4_4_0_INSTALL}/dist/apache-solr-solrj-4.4.0.jar:${SOLR_4_4_0_INSTALL}/dist/solrj-lib/commons-io-2.1.jar ClusterState4_0ToClusterStateAndCoreProperties4_4Upgrader <my-favorite-folder>/upgrade/after_ranges_fix <my-favorite-folder>/upgrade/after
  8. By now you have your new configuration files in <my-favorite-folder>/upgrade/after
    • clusterstate.json
    • <IP>/data/<replica-name>/core.properties (a <IP> for each Solr-node in your system, and a <replica-name> for each replica run by that Solr-node
  9. Upload (using your favorite tool) <my-favorite-folder>/upgrade/after/clusterstate.json to ZooKeeper (root of Solr-area) replacing the existing one
  10. Upload all core.properties files (bash example)
  11.  
    for IP in <IP#1> <IP#2> ... <IP#N>; do  # mention the IP's of all your Solr-nodes
     scp -r <my-favorite-folder>/upgrade/after/${IP}/data/. <solr-node-user>@${IP}:<solr-home>
    done
    
  12. Compile SolrConfigDirInZookeeperUpgrader.java from Appendix 3) below against 4.4.0 Solr code
  13. Modify data in all collection/<collection-name> in ZooKeeper (in Solr-area)
  14.  
    java -classpath .:${SOLR_4_4_0_INSTALL}/dist/apache-solr-solrj-4.4.0.jar:${SOLR_4_4_0_INSTALL}/dist/solrj-lib/zookeeper-3.4.5.jar SolrConfigDirInZookeeperUpgrader <solr-zookeeper-connection-string> <name-of-your-solr-configuration>
  15. Now install the 4.4.0 binaries on all Solr-nodes (replacing existing 4.0.0 binaries) and start them again - viola!

Disclaimer

 

No warranty. Test it thoroughly before you do it in production!

I have successfully followed the sketched procedure and upgraded a 4.0.0 system, so that it seems as if it had been running Solr 4.4.0 all along

Appendix

 

1)  My 4.4.0 solr.xml
 
<solr>
  <str name="sharedLib">${sharedLib:}</str>
  
  <solrcloud>
    <str name="host">${host:}</str>
    <int name="hostPort">${jetty.port:8983}</int>
    <str name="hostContext">${hostContext:solr}</str>
    <int name="zkClientTimeout">${zkClientTimeout:30000}</int>
    <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
  </solrcloud>

  <shardHandlerFactory name="shardHandlerFactory"
    class="HttpShardHandlerFactory">
    <int name="socketTimeout">${socketTimeout:0}</int>
    <int name="connTimeout">${connTimeout:0}</int>
  </shardHandlerFactory>

</solr>

2)  ClusterState4_0ToClusterStateAndCoreProperties4_4Upgrader.java
 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.File;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.io.FileUtils;
import org.apache.solr.common.cloud.ZkStateReader;

public class ClusterState4_0ToClusterStateAndCoreProperties4_4Upgrader {

 /*
  * Assuming that the environment is set up with topology
  */
 public static void main(String[] args) throws Exception {
  String inputFolder = args[0];
  String outputFolder = args[1];

  byte[] bytes = FileUtils.readFileToByteArray(new File(inputFolder + File.separator + "clusterstate.json"));
  Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
  for (Entry<String, Object> collectionEntry : stateMap.entrySet()) {
   int nextCoreNodeNameNumber=1;
   Object collectionEntryValue = collectionEntry.getValue();
   if (collectionEntryValue instanceof Map<?, ?>) {
    for (Entry<?, ?> collectionMapEntry : ((Map<?, ?>)collectionEntryValue).entrySet()) {
     if (collectionMapEntry.getKey().toString().startsWith("shard")) {
      Object shardEntryValue = collectionMapEntry.getValue();
      if (shardEntryValue instanceof Map<?, ?>) {
       Map<String, Object> shardEntryMap = (Map<String, Object>)shardEntryValue;
       shardEntryMap.put("state", "active");
       Object replicasEntryValue = shardEntryMap.get("replicas");
       if (replicasEntryValue instanceof Map<?, ?>) {
        Map<String, Object> replicasEntryMap = (Map<String, Object>)replicasEntryValue;
        List<Object> replicasEntryMapValues = new ArrayList<Object>(replicasEntryMap.values());
        replicasEntryMap.clear();
        for (Object replicasEntryMapValue : replicasEntryMapValues) {
         String coreNodeName="core_node" + (nextCoreNodeNameNumber++);
         replicasEntryMap.put(coreNodeName, replicasEntryMapValue);
        }
        for (Entry<String, Object> replicasEntryMapEntry : replicasEntryMap.entrySet()) {
         Object replicasEntryMapEntryValue = replicasEntryMapEntry.getValue(); 
         if (replicasEntryMapEntryValue instanceof Map<?, ?>) {
          Map<String, Object> replicaMap = (Map<String, Object>)replicasEntryMapEntryValue;
          String nodeName = replicaMap.get("node_name").toString();
          String hostname = nodeName.substring(0, nodeName.indexOf(':'));
          String IP = hostnameToIP(hostname);
          String core = replicaMap.get("core").toString();
          String shard = replicaMap.get("shard").toString();
          String collection = replicaMap.get("collection").toString();
          File corePropertiesDir = new File(outputFolder + File.separator + IP + File.separator + "data" + File.separator + core);
          corePropertiesDir.mkdirs();
          File corePropertiesFile = new File(corePropertiesDir, "core.properties");
          corePropertiesFile.createNewFile();
          PrintWriter corePropertiesFileWriter = new PrintWriter(corePropertiesFile);
          try {
           corePropertiesFileWriter.println("name=" + core);
           corePropertiesFileWriter.println("shard=" + shard);
           corePropertiesFileWriter.println("collection=" + collection);
           corePropertiesFileWriter.println("coreNodeName=" + replicasEntryMapEntry.getKey());
          } finally {
           corePropertiesFileWriter.flush();
           corePropertiesFileWriter.close();
          }
          replicaMap.remove("roles");
          replicaMap.remove("shard");
          replicaMap.remove("collection");
          replicaMap.put("state", "down");
          replicaMap.put("node_name", replicaMap.get("node_name").toString().replace(hostname, IP));
          replicaMap.put("base_url", replicaMap.get("base_url").toString().replace(hostname, IP));
         }
        }
       }
      }
     }
    }
    Map<String, Object> shardEntries = new LinkedHashMap<String, Object>();
    for (Entry<?, ?> collectionMapEntry : ((Map<?, ?>)collectionEntryValue).entrySet()) {
     if (collectionMapEntry.getKey().toString().startsWith("shard")) {
      shardEntries.put(collectionMapEntry.getKey().toString(), collectionMapEntry.getValue());
     }
    }
    for (Entry<String, Object> shardEntry : shardEntries.entrySet()) {
     ((Map<?, ?>)collectionEntryValue).remove(shardEntry.getKey());
    }
    ((Map<String, Object>)collectionEntryValue).put("shards", shardEntries);
    ((Map<String, Object>)collectionEntryValue).put("router", "compositeId");
   }
  }

  bytes = ZkStateReader.toJSON(stateMap);
  FileUtils.writeByteArrayToFile(new File(outputFolder + File.separator + "clusterstate.json"), bytes);
  System.exit(0);
 }
 
 protected static String hostnameToIP(String hostname) {
  // TODO calculate and return the IP corresponding to hostname
 }
}

3)  SolrConfigDirInZookeeperUpgrader.java
 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.security.NoSuchAlgorithmException;
import java.util.List;

import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.SolrZooKeeper;
import org.apache.zookeeper.KeeperException;

public class SolrConfigDirInZookeeperUpgrader
{
    public static final String SOLR_COLLECTION_NODE = "/collections";
    public static final String SOLR_CONFIG_PREFIX = 
      "{\n" + 
      "  \"configName\":\"";
    public static final String SOLR_CONFIG_POSTFIX = 
      "\",\n" + 
      "  \"router\":\"implicit\"}";
    
    public static void main(String[] args) throws KeeperException, InterruptedException, NoSuchAlgorithmException 
    {
        SolrConfigDirInZookeeperUpgrader instance = new SolrConfigDirInZookeeperUpgrader(args[0], args[1]);
        instance.updateSolrConfNameForAllCollections();
    }

    private final String solrZkConnectionStr;
    private final String confName;
    
 public SolrConfigDirInZookeeperUpgrader(String solrZkConnectionStr, String confName) {
  super();
  this.solrZkConnectionStr = solrZkConnectionStr;
  this.confName = confName;
 }

 private void updateSolrConfNameForAllCollections() throws KeeperException, InterruptedException
 {
        final SolrZkClient client = new SolrZkClient(solrZkConnectionStr, 16000);
        try {
         SolrZooKeeper zk = client.getSolrZooKeeper();
         List children = zk.getChildren(SOLR_COLLECTION_NODE, null);
         
         for(String child: children) {
          updateData(zk, SOLR_COLLECTION_NODE + "/" + child);
         }
        } finally {
         client.close();
        }
 }
 
 private void updateData(SolrZooKeeper zk, String node) throws KeeperException, InterruptedException {
  zk.setData(node, new String(SOLR_CONFIG_PREFIX + confName + SOLR_CONFIG_POSTFIX).getBytes(), -1);
 }

}

onsdag den 15. januar 2014

Document routing problem, upgrading from SolrCloud 4.0.0 to later 4.x

Problem


Situation


You are using Solr 4.0.0 in "cloud"-mode - that is, using ZooKeeper to make several Solr-nodes work together in a cluster.

You want to upgrade to a later version of Solr 4.x (e.g. Solr 4.4.0)

You already have a collection with at least 10 shards

Description


You do the upgrade by just installing, the new Solr binaries, config etc. on top of the old installation, of course leaving the data folder(s) untouched.

When you start the upgraded Solr-nodes, it will not route documents to shards, the same way as it did before the upgrade (for collections with at least 10 shards)

Consequences


Solr-nodes internally use the routing-algorithm to decide which shard a new or updated document has to go to.
  • If you try to insert a new document, with the same id as an existing document, that new document will (potentially) go to a shard different from the shard where the document with the same id already exists. You will be left with two documents in store (the old one and the new one), and if you are using optimistic locking you will not receive the intended exception basically saying that you cannot insert a document that already exists.
  • If you try to update an existing document (by using same id), the update-request will (potentially) go to a shard different from the shard where the document with the id already exists. Depending on consistency features (like optimistic locking) you will either be left with two documents in store (the old one and the new one), or you will receive an exception basically saying that you cannot update a document that does not exist (even though it does).
  • If you do real-time-gets from clients by looking at where Solr will route a document with the specific id (e.g using SOLR-5360), you will (potentially) not find the document, even though it actually exists.

Reason


When SolrCloud creates a collection, it writes stuff about that collection in clusterstate.json persisted in ZooKeeper. Among other, for each shard in the collection, clusterstate.json will contain a range. The ranges assigned to each shard is made by cutting (in equal sized portions) the number range from Integer.MIN_VALUE to Integer.MAX_VALUE, and assigning each shard a portion. The range on each shard is supposed to say which documents belong to this shard - a hash is made of the documents id and the document belongs to the shard for which the hash fits the range.

In SolrCloud 4.0.0, SolrCloud does not use the ranges written to clusterstate.json when it is routing documents. Instead it uses ranges calculated in the code - ranges that are supposed to be distributed among shards, the same way they are distributed in clusterstate.json.
But if you have 10 or more shards in you collection, the calculated ranges will not fit the ranges in clusterstate.json
Ranges in clusterstate.json are distributed among shards by shard number: shard1 gets the 1th range, shard2 gets the 2nd range ... shard10 gets the 10th range ... shardN gets the last range.
Ranges actually used for routing are distributed among shards in the order where shards are sorted by their name. E.g. if you have 15 shards, shard1 gets the 1st range, shard10 gets the 2rd range, shard11 gets the 3rd range ... shard2 gets the 8th range etc.

Solution


There are several solutions. Solr releases after 4.0.0 allows you to specify you own routing-algorithm per collection. You could write your own routing-algorithm and use that for the "old" collections.

I suggest that you modify clusterstate.json when you upgrade from 4.0.0 to a later 4.x version

Code

 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.File;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.io.FileUtils;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.HashPartitioner.Range;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.data.Stat;

public class CorrectShardRangesInClusterState {

 public static void main(String[] args) throws Exception {
  String inputClusterStateFile = args[0];
  String outputClusterStateFile = args[1];
  
  byte[] bytes = FileUtils.readFileToByteArray(new File(inputClusterStateFile));
  Stat stat = new Stat();
  ClusterState clusterState = ClusterState.load(stat.getVersion(), bytes, new HashSet());

     for (Entry rangeInfo : getRangeInfos(clusterState).entrySet()) {
       RangeInfo ri = rangeInfo.getValue();
       Map shards = clusterState.getCollectionStates().get(rangeInfo.getKey());
       for (int i = 0; i < ri.getShardList().size(); i++) {
         String shard = ri.getShardList().get(i);
         int min = ri.getRanges().get(i).min;
         int max = ri.getRanges().get(i).max;
         Slice slice = shards.get(shard);
         Range range = getRange(slice);
         if ((range.min != min) || (range.max != max)) {
          System.out.println("Changing collection " + rangeInfo.getKey() + ", shard " + shard + ", min " + Integer.toHexString(range.min) + " -> " + Integer.toHexString(min) + ", max " + Integer.toHexString(range.max) + " -> " + Integer.toHexString(max));
          range.min = min;
          range.max = max;
         }
         Map newPropMap = new HashMap(slice.getProperties());
         newPropMap.put(Slice.RANGE, range);
         setPropMap(slice, Collections.unmodifiableMap(newPropMap));
       }
     }

     bytes = ZkStateReader.toJSON(clusterState.getCollectionStates());
     FileUtils.writeByteArrayToFile(new File(outputClusterStateFile), bytes);
 }
 
 private static class RangeInfo {
  Object clusterStateRangeInfo;
  
  private RangeInfo(Object clusterStateRangeInfo) {
   this.clusterStateRangeInfo = clusterStateRangeInfo;
  }
  
  private List getShardList() throws Exception {
   Field field = clusterStateRangeInfo.getClass().getDeclaredField("shardList");
   field.setAccessible(true);
   List result = (List)field.get(clusterStateRangeInfo);
   field.setAccessible(false);
   return result;
  }
  
  private List getRanges() throws Exception {
   Field field = clusterStateRangeInfo.getClass().getDeclaredField("ranges");
   field.setAccessible(true);
   List result = (List)field.get(clusterStateRangeInfo);
   field.setAccessible(false);
   return result;
  }
 }
 
 private static Range getRange(Slice slice) throws Exception {
  Field field = Slice.class.getDeclaredField("range");
  field.setAccessible(true);
  Range result = (Range)field.get(slice);
  field.setAccessible(false);
  return result;
 }
 
 private static Map getRangeInfos(ClusterState clusterState) throws Exception {
  Field field = ClusterState.class.getDeclaredField("rangeInfos");
  field.setAccessible(true);
  Map clusterStateRangeInfos = (Map)field.get(clusterState);
  Map result = new HashMap(clusterStateRangeInfos.size());
  for (Entry entry : clusterStateRangeInfos.entrySet()) {
   result.put(entry.getKey(), new RangeInfo(entry.getValue()));
  }
  field.setAccessible(false);
  return result;
 }
  
 private static void setPropMap(Slice slice, Map newPropMap) throws Exception {
  Field field = ZkNodeProps.class.getDeclaredField("propMap");
  field.setAccessible(true);
  field.set(slice, newPropMap);
  field.setAccessible(false);
 }

}
 

How to use it


  • Compile CorrectShardRangesInClusterState above against 4.0.0 Solr code
  • Before doing the upgrade (still running Solr 4.0.0)
    • Stop Solr
    • Extract clusterstate.json from ZooKeeper to file clusterstate_old.json
    • Run CorrectShardRangesInClusterState like this
    •  
      java -classpath .:${SOLR_4_0_0_INSTALL}/dist/apache-solr-solrj-4.0.0.jar:${SOLR_4_0_0_INSTALL}/dist/solrj-lib/zookeeper-3.3.6.jar:${SOLR_4_0_0_INSTALL}/dist/solrj-lib/commons-io-2.1.jar:${SOLR_4_0_0_INSTALL}/dist/solrj-lib/slf4j-api-1.6.4.jar CorrectShardRangesInClusterState clusterstate_old.json clusterstate_new.json
    • Upload/replace clusterstate.json in ZooKeeper with content from generated file clusterstate_new.json
  • Now do the upgrade to your target Solr version (after 4.0.0)

Disclaimer


No warranty. Test it thoroughly before you do it in production!

I have successfully used CorrectShardRangesInClusterState as described above, to fix clusterstate.json before upgrading from Solr 4.0.0 to Solr 4.4.0.

The exact versions (4.0.0 on one side and any 4.x version after that on the other side) between which the problem seems to start has only been found by code-inspection. But the problem is definitely there between 4.0.0 and 4.4.0.