fuse_kafka
src/zookeeper.c
00001 /*
00002  * librdkafka - Apache Kafka C library
00003  * *
00004  * Copyright (c) 2012, Magnus Edenhill
00005  * All rights reserved.
00006  * *
00007  * Redistribution and use in source and binary forms, with or
00008  * without
00009  * modification, are permitted provided that the following
00010  * conditions are met:
00011  * *
00012  * 1. Redistributions of source code must retain the above copyright
00013  * notice,
00014  * this list of conditions and the following disclaimer.
00015  * 2. Redistributions in binary form must reproduce the above
00016  * copyright notice,
00017  * this list of conditions and the following disclaimer in the
00018  * documentation
00019  * and/or other materials provided with the distribution.
00020  * *
00021  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
00022  * CONTRIBUTORS "AS IS"
00023  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00024  * TO, THE
00025  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
00026  * PARTICULAR PURPOSE
00027  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00028  * CONTRIBUTORS BE
00029  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
00030  * OR
00031  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
00032  * OF
00033  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
00034  * BUSINESS
00035  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
00036  * WHETHER IN
00037  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
00038  * OTHERWISE)
00039  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
00040  * ADVISED OF THE
00041  * POSSIBILITY OF SUCH DAMAGE.
00042  */
00043 #include <output.h>
00044 #ifndef TEST
00045 #include <librdkafka/rdkafka.h>
00046 #include <zookeeper/zookeeper.h>
00047 #endif
00048 #ifndef FUSE_KAFKA_ZOOKEEPER_C
00049 #define FUSE_KAFKA_ZOOKEEPER_C
00050 #include <jansson.h>
00051 #define BROKER_PATH "/brokers/ids"
00052 #include "server_list.c"
00053 #include <trace.c>
00054 #include <string.h>
00055 static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers)
00056 {
00057     //trace_debug("set_brokerlist_from_zookeeper: entry");
00058     if (zzh)
00059     {
00060         struct String_vector brokerlist;
00061         if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK)
00062         {
00063             fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);
00064             return;
00065         }
00066         int i;
00067         char *brokerptr = brokers;
00068         if(brokerptr == NULL) return;
00069         for (i = 0; i < brokerlist.count; i++)
00070         {
00071             char path[255], cfg[1024];
00072             sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);
00073             int len = sizeof(cfg);
00074             zoo_get(zzh, path, 0, cfg, &len, NULL);
00075             if (len > 0)
00076             {
00077                 cfg[len] = '\0';
00078                 json_error_t jerror;
00079                 json_t *jobj = json_loads(cfg, 0, &jerror);
00080                 if (jobj)
00081                 {
00082                     json_t *jhost = json_object_get(jobj, "host");
00083                     json_t *jport = json_object_get(jobj, "port");
00084                     if (jhost && jport)
00085                     {
00086                         const char *host = json_string_value(jhost);
00087                         const int port = json_integer_value(jport);
00088                         //trace_debug("set_brokerlist_from_zookeeper: %s %d", host, port);
00089                         sprintf(brokerptr, "%s:%d", host, port);
00090                         brokerptr += strlen(brokerptr);
00091                         if (i < brokerlist.count - 1)
00092                         {
00093                             *brokerptr++ = ',';
00094                         }
00095                     }
00096                     json_decref(jobj);
00097                 }
00098             }
00099         }
00100         deallocate_String_vector(&brokerlist);
00101     }
00102 }
00103 void watcher_add_brokers(kafka_t* k, char* brokers, char* topic)
00104 {
00105     rd_kafka_topic_conf_t *topic_conf;
00106     if (brokers[0] != '\0' && k->rk != NULL &&
00107             server_list_add_once(&(k->broker_list), brokers))
00108     {
00109         rd_kafka_brokers_add(k->rk, brokers);
00110         k->no_brokers = 0;
00111         rd_kafka_poll(k->rk, 10);
00112         topic_conf = rd_kafka_topic_conf_new();
00113         k->rkt = rd_kafka_topic_new(k->rk, topic, topic_conf);
00114         if(k->rkt == NULL)
00115             printf("topic %s creation failed\n", topic);
00116     }
00117 }
00118 static void watcher(zhandle_t *zh, int type,
00119         int state, const char *path, void *param)
00120 {
00121     char brokers[1024];
00122     kafka_t* k = (kafka_t*) param;
00123     if(k->conf == NULL) return;
00124     char* topic;
00125     if(k->conf->topic_n <= 0)
00126     {
00127         trace("topic missing");
00128         return;
00129     }
00130     topic = k->conf->topic[0];
00131     //trace_debug("zookeeper_init: topic: %s, path: %s", topic, path);
00132     if (k->no_brokers || type == ZOO_CHILD_EVENT && strncmp(
00133                 path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
00134     {
00135         brokers[0] = '\0';
00136         /*trace_debug("zookeeper_init: "
00137                 "calling set_brokerlist_from_zookeeper(\"%s\")", brokers);*/
00138         set_brokerlist_from_zookeeper(zh, brokers);
00139         watcher_add_brokers(k, brokers, topic);
00140     }
00141 }
00142 static zhandle_t* initialize_zookeeper(const char * zookeeper, void* param)
00143 {
00144     zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
00145     ((kafka_t*) param)->no_brokers = 1;
00146     ((kafka_t*) param)->broker_list = NULL;
00147     zhandle_t *zh;
00148     //trace_debug("initialize_zookeeper: calling zookeeper_init(%s)", zookeeper);
00149     zh = zookeeper_init(zookeeper, watcher, 10000, 0, param, 0);
00150     if (zh == NULL)
00151     {
00152         fprintf(stderr, "Zookeeper connection not established.");
00153         //exit(1);
00154     }
00155     return zh;
00156 }
00157 #endif
 All Data Structures Files Functions Variables Defines