![]() |
fuse_kafka
|
00001 #include <stdio.h> 00002 #include <stdlib.h> 00003 #include <sys/types.h> 00004 #include <sys/stat.h> 00005 #include <sys/timeb.h> 00006 #include <unistd.h> 00007 #include <pthread.h> 00008 #include <sys/file.h> 00009 #ifdef TEST 00010 #define FUSE_KAFKA_DYNAMIC_CONFIGURATION_PATH "/tmp/fuse_kafka.args" 00011 #else 00012 #define FUSE_KAFKA_DYNAMIC_CONFIGURATION_PATH "/var/run/fuse_kafka.args" 00013 #endif 00014 typedef struct 00015 { 00016 int argc; 00017 char** argv; 00018 char* line; 00019 int loaded; 00020 void* context; 00021 char* path; 00022 pthread_t thread; 00023 } dynamic_configuration; 00027 static dynamic_configuration* dynamic_configuration_get() 00028 { 00029 static dynamic_configuration conf; 00030 return &conf; 00031 } 00032 static char* dynamic_configuration_get_path() 00033 { 00034 dynamic_configuration* conf = dynamic_configuration_get(); 00035 if(conf->path == NULL) 00036 return FUSE_KAFKA_DYNAMIC_CONFIGURATION_PATH; 00037 return conf->path; 00038 } 00039 int parse_line_from_file_nolock(FILE *f, char** linep, int* sizep) 00040 { 00041 fseek(f, 0, SEEK_END); 00042 (*sizep) = ftell(f); 00043 (*linep) = (char*) malloc((*sizep) + 1); 00044 if((*linep) == NULL) return 2; 00045 fseek(f, 0, SEEK_SET); 00046 int read = fread((*linep), 1, (*sizep), f); 00047 if(read < (*sizep)) { free((*linep)); return 3; } 00048 (*linep)[*sizep] = 0; 00049 return 0; 00050 } 00058 int parse_line_from_file(char* path, char** linep, int* sizep) 00059 { 00060 int result; 00061 FILE* f = fopen(path, "r"); 00062 if(f == NULL) return 1; 00063 #ifndef LOCK_SH 00064 #define LOCK_SH 1 00065 #endif 00066 flock(fileno(f), LOCK_SH); 00067 result = parse_line_from_file_nolock(f, linep, sizep); 00068 fclose(f); 00069 return result; 00070 } 00079 int parse_args_from_file(char* path, int* argcp, char*** argvp, char** linep) 00080 { 00081 int i = *argcp = 0, k = 0, size = 0, l; 00082 if((i = parse_line_from_file(path, linep, &size)) != 0) return i; 00083 for(i = 0; i < size; i++) 00084 if((i == 0 || (*linep)[i-1] == ' ') && (*linep)[i] != ' ') (*argcp)++; 00085 (*argvp) = calloc((*argcp), sizeof(char*)); 00086 if((*argvp) == NULL) { free((*linep)); return 4; } 00087 for(i = 0; i <= size; i++) 00088 { 00089 if((i == 0 || !(*linep)[i-1] || (*linep)[i-1] == ' ') && ((*linep)[i] != ' ' || k == *argcp)) 00090 { 00091 if(k < *argcp) (*argvp)[k++] = (*linep) + i; 00092 if(i > 0) 00093 { 00094 (*linep)[i - 1] = 0; 00095 for(l = i - 2; l > 0; l--) 00096 { 00097 if((*linep)[l] != ' ') break; 00098 else (*linep)[l] = 0; 00099 } 00100 } 00101 } 00102 } 00103 return 0; 00104 } 00105 unsigned long long millisecond(struct stat* info) 00106 { 00107 return info->st_mtime * 1000 00108 #ifndef _WIN32 00109 + info->st_mtim.tv_nsec / 1000000 00110 #endif 00111 ; 00112 } 00113 unsigned long long millisecond_clock() 00114 { 00115 struct timeb t; 00116 ftime(&t); 00117 return t.time * 1000 + t.millitm; 00118 } 00119 long long* dynamic_configuration_get_last_change() 00120 { 00121 static long long last_change = 0; 00122 return &last_change; 00123 } 00127 int dynamic_configuration_changed() 00128 { 00129 long long * last_change = dynamic_configuration_get_last_change(); 00130 long long new_change; 00131 struct stat stats; 00132 if(*last_change == 0) *last_change = millisecond_clock(); 00133 if(stat(dynamic_configuration_get_path(), &stats) != 0) 00134 return 0; 00135 new_change = millisecond(&stats); 00136 if(new_change <= *last_change) return 0; 00137 *last_change = new_change; 00138 return 1; 00139 } 00143 void dynamic_configuration_free() 00144 { 00145 dynamic_configuration* conf = dynamic_configuration_get(); 00146 if(conf == NULL) return; 00147 free(conf->line); 00148 free(conf->argv); 00149 } 00153 int dynamic_configuration_load() 00154 { 00155 char* line; 00156 char** argv; 00157 int argc; 00158 dynamic_configuration* conf = dynamic_configuration_get(); 00159 if(parse_args_from_file(dynamic_configuration_get_path(), 00160 &argc, &argv, &line) == 0) 00161 { 00162 if(conf->loaded) dynamic_configuration_free(); 00163 conf->argc = argc; 00164 conf->line = line; 00165 conf->argv = argv; 00166 conf->loaded = 1; 00167 return 0; 00168 } 00169 return 1; 00170 } 00171 int* dynamic_configuration_watch_routine_running() 00172 { 00173 static int running = 1; 00174 return &running; 00175 } 00180 void* dynamic_configuration_watch_routine(void(*f)(int argc, char** argv, void* context)) 00181 { 00182 while(*dynamic_configuration_watch_routine_running()) 00183 { 00184 if(dynamic_configuration_get()->context && 00185 dynamic_configuration_changed() && 00186 dynamic_configuration_load() == 0) 00187 { 00188 dynamic_configuration* conf = dynamic_configuration_get(); 00189 f(conf->argc, conf->argv, conf->context); 00190 } 00191 fk_sleep(5); 00192 } 00193 return NULL; 00194 } 00198 void dynamic_configuration_watch(void(*f)(int argc, char** argv, void* context)) 00199 { 00200 pthread_create(&(dynamic_configuration_get()->thread), NULL, 00201 (void * (*)(void *)) 00202 dynamic_configuration_watch_routine, (void*) f); 00203 } 00204 void dynamic_configuration_watch_stop() 00205 { 00206 *dynamic_configuration_watch_routine_running() = 0; 00207 dynamic_configuration* conf = dynamic_configuration_get(); 00208 if(conf != NULL && conf->thread != 0) 00209 { 00210 pthread_cancel(conf->thread); 00211 pthread_join(conf->thread, NULL); 00212 } 00213 *dynamic_configuration_watch_routine_running() = 1; 00214 }