![]() |
fuse_kafka
|
00001 /* 00002 * librdkafka - Apache Kafka C library 00003 * * 00004 * Copyright (c) 2012, Magnus Edenhill 00005 * All rights reserved. 00006 * * 00007 * Redistribution and use in source and binary forms, with or 00008 * without 00009 * modification, are permitted provided that the following 00010 * conditions are met: 00011 * * 00012 * 1. Redistributions of source code must retain the above copyright 00013 * notice, 00014 * this list of conditions and the following disclaimer. 00015 * 2. Redistributions in binary form must reproduce the above 00016 * copyright notice, 00017 * this list of conditions and the following disclaimer in the 00018 * documentation 00019 * and/or other materials provided with the distribution. 00020 * * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND 00022 * CONTRIBUTORS "AS IS" 00023 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00024 * TO, THE 00025 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 00026 * PARTICULAR PURPOSE 00027 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 00028 * CONTRIBUTORS BE 00029 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 00030 * OR 00031 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 00032 * OF 00033 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR 00034 * BUSINESS 00035 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 00036 * WHETHER IN 00037 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 00038 * OTHERWISE) 00039 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 00040 * ADVISED OF THE 00041 * POSSIBILITY OF SUCH DAMAGE. 00042 */ 00043 #include <output.h> 00044 #ifndef TEST 00045 #include <librdkafka/rdkafka.h> 00046 #include <zookeeper/zookeeper.h> 00047 #endif 00048 #ifndef FUSE_KAFKA_ZOOKEEPER_C 00049 #define FUSE_KAFKA_ZOOKEEPER_C 00050 #include <jansson.h> 00051 #define BROKER_PATH "/brokers/ids" 00052 #include "server_list.c" 00053 #include <trace.c> 00054 #include <string.h> 00055 static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) 00056 { 00057 //trace_debug("set_brokerlist_from_zookeeper: entry"); 00058 if (zzh) 00059 { 00060 struct String_vector brokerlist; 00061 if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) 00062 { 00063 fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH); 00064 return; 00065 } 00066 int i; 00067 char *brokerptr = brokers; 00068 if(brokerptr == NULL) return; 00069 for (i = 0; i < brokerlist.count; i++) 00070 { 00071 char path[255], cfg[1024]; 00072 sprintf(path, "/brokers/ids/%s", brokerlist.data[i]); 00073 int len = sizeof(cfg); 00074 zoo_get(zzh, path, 0, cfg, &len, NULL); 00075 if (len > 0) 00076 { 00077 cfg[len] = '\0'; 00078 json_error_t jerror; 00079 json_t *jobj = json_loads(cfg, 0, &jerror); 00080 if (jobj) 00081 { 00082 json_t *jhost = json_object_get(jobj, "host"); 00083 json_t *jport = json_object_get(jobj, "port"); 00084 if (jhost && jport) 00085 { 00086 const char *host = json_string_value(jhost); 00087 const int port = json_integer_value(jport); 00088 //trace_debug("set_brokerlist_from_zookeeper: %s %d", host, port); 00089 sprintf(brokerptr, "%s:%d", host, port); 00090 brokerptr += strlen(brokerptr); 00091 if (i < brokerlist.count - 1) 00092 { 00093 *brokerptr++ = ','; 00094 } 00095 } 00096 json_decref(jobj); 00097 } 00098 } 00099 } 00100 deallocate_String_vector(&brokerlist); 00101 } 00102 } 00103 void watcher_add_brokers(kafka_t* k, char* brokers, char* topic) 00104 { 00105 rd_kafka_topic_conf_t *topic_conf; 00106 if (brokers[0] != '\0' && k->rk != NULL && 00107 server_list_add_once(&(k->broker_list), brokers)) 00108 { 00109 rd_kafka_brokers_add(k->rk, brokers); 00110 k->no_brokers = 0; 00111 rd_kafka_poll(k->rk, 10); 00112 topic_conf = rd_kafka_topic_conf_new(); 00113 k->rkt = rd_kafka_topic_new(k->rk, topic, topic_conf); 00114 if(k->rkt == NULL) 00115 printf("topic %s creation failed\n", topic); 00116 } 00117 } 00118 static void watcher(zhandle_t *zh, int type, 00119 int state, const char *path, void *param) 00120 { 00121 char brokers[1024]; 00122 kafka_t* k = (kafka_t*) param; 00123 if(k->conf == NULL) return; 00124 char* topic; 00125 if(k->conf->topic_n <= 0) 00126 { 00127 trace("topic missing"); 00128 return; 00129 } 00130 topic = k->conf->topic[0]; 00131 //trace_debug("zookeeper_init: topic: %s, path: %s", topic, path); 00132 if (k->no_brokers || type == ZOO_CHILD_EVENT && strncmp( 00133 path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0) 00134 { 00135 brokers[0] = '\0'; 00136 /*trace_debug("zookeeper_init: " 00137 "calling set_brokerlist_from_zookeeper(\"%s\")", brokers);*/ 00138 set_brokerlist_from_zookeeper(zh, brokers); 00139 watcher_add_brokers(k, brokers, topic); 00140 } 00141 } 00142 static zhandle_t* initialize_zookeeper(const char * zookeeper, void* param) 00143 { 00144 zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); 00145 ((kafka_t*) param)->no_brokers = 1; 00146 ((kafka_t*) param)->broker_list = NULL; 00147 zhandle_t *zh; 00148 //trace_debug("initialize_zookeeper: calling zookeeper_init(%s)", zookeeper); 00149 zh = zookeeper_init(zookeeper, watcher, 10000, 0, param, 0); 00150 if (zh == NULL) 00151 { 00152 fprintf(stderr, "Zookeeper connection not established."); 00153 //exit(1); 00154 } 00155 return zh; 00156 } 00157 #endif