fuse_kafka
src/queue.c
00001 typedef struct _queue_event
00002 {
00003     char* prefix;
00004     char* path;
00005     char* buf;
00006     size_t size;
00007     off_t offset;
00008     struct _queue_event* previous;
00009     struct _queue_event* next;
00010 } queue_event;
00011 queue_event** event_last_get()
00012 {
00013     static queue_event* last = NULL;
00014     return &last;
00015 }
00016 int* event_queue_size()
00017 {
00018     static int n = 0;
00019     return &n;
00020 }
00021 int* event_queue_max_size()
00022 {
00023     static int n = 10000;
00024     return &n;
00025 }
00026 void events_drop_first()
00027 {
00028     queue_event* current = *event_last_get();
00029     while(current != NULL)
00030     {
00031         if(current->previous != NULL && current->previous->previous == NULL)
00032         {
00033             free(current->previous);
00034             current->previous = NULL;
00035             (*event_queue_size())--;
00036             return;
00037         }
00038         current = current->previous;
00039     }
00040 }
00041 void event_enqueue(char *prefix, char *path, char *buf,
00042         size_t size, off_t offset)
00043 {
00044     queue_event** last = event_last_get();
00045     queue_event* new = malloc(sizeof(queue_event));
00046     new->prefix = strdup(prefix);
00047     new->path = strdup(path);
00048     new->buf = strdup(buf);
00049     new->size = size;
00050     new->offset = offset;
00051     new->previous = *last;
00052     if(new->previous != NULL) new->previous->next = new;
00053     new->next = NULL;
00054     *last = new;
00055     if((*event_queue_size())++ >= *(event_queue_max_size()))
00056         events_drop_first();
00057 }
00058 void event_delete(queue_event* event)
00059 {
00060     free(event->prefix);
00061     free(event->path);
00062     free(event->buf);
00063     free(event);
00064     (*event_queue_size())--;
00065 }
00066 void events_queue_empty()
00067 {
00068     queue_event** last = event_last_get();
00069     while(*last != NULL)
00070     {
00071         queue_event* to_free = *last;
00072         *last = (*last)->previous;
00073         event_delete(to_free);
00074     }
00075 }
00076 void events_dequeue(void (*writer)(const char *prefix, const char *path, char *buf,
00077         size_t size, off_t offset))
00078 {
00079     queue_event* current = *event_last_get();
00080     if(current == NULL) return;
00081     while(current->previous != NULL) current = current->previous;
00082     for(; current != NULL; current = current->next)
00083     {
00084         writer(current->prefix, current->path,
00085                 current->buf, current->size, current->offset);
00086     }
00087     events_queue_empty();
00088 }
 All Data Structures Files Functions Variables Defines