001
002package edu.wpi.first.wpilibj.networktables2;
003
004import edu.wpi.first.wpilibj.networktables2.thread.*;
005import edu.wpi.first.wpilibj.networktables2.util.*;
006
007/**
008 * A write manager is a {@link IncomingEntryReceiver} that buffers transactions and then and then dispatches them to a flushable transaction receiver that is periodically offered all queued transaction and then flushed
009 * 
010 * @author Mitchell
011 *
012 */
013public class WriteManager implements OutgoingEntryReceiver, PeriodicRunnable{
014        private final int SLEEP_TIME = 100;
015        
016        private final int queueSize = 500;
017        
018        private Object transactionsLock = new Object();
019        private NTThread thread;
020        private NTThreadManager threadManager;
021        private final AbstractNetworkTableEntryStore entryStore;
022        
023        private volatile HalfQueue incomingAssignmentQueue;
024        private volatile HalfQueue incomingUpdateQueue;
025        private volatile HalfQueue outgoingAssignmentQueue;
026        private volatile HalfQueue outgoingUpdateQueue;
027        
028        private FlushableOutgoingEntryReceiver receiver;
029        private long lastWrite;
030
031        private final long keepAliveDelay;
032
033        /**
034         * Create a new Write manager
035         * @param receiver
036         * @param threadManager
037         * @param transactionPool
038         * @param entryStore
039         */
040        public WriteManager(final FlushableOutgoingEntryReceiver receiver, final NTThreadManager threadManager, final AbstractNetworkTableEntryStore entryStore, long keepAliveDelay) {
041                this.receiver = receiver;
042                this.threadManager = threadManager;
043                this.entryStore = entryStore;
044                
045                incomingAssignmentQueue = new HalfQueue(queueSize);
046                incomingUpdateQueue = new HalfQueue(queueSize);
047                outgoingAssignmentQueue = new HalfQueue(queueSize);
048                outgoingUpdateQueue = new HalfQueue(queueSize);
049                
050                this.keepAliveDelay = keepAliveDelay;
051        }
052
053        /**
054         * start the write thread
055         */
056        public void start(){
057                if(thread!=null)
058                        stop();
059                lastWrite = System.currentTimeMillis();
060                thread = threadManager.newBlockingPeriodicThread(this, "Write Manager Thread");
061        }
062        /**
063         * stop the write thread
064         */
065        public void stop(){
066                if(thread!=null)
067                        thread.stop();
068        }
069
070
071        public void offerOutgoingAssignment(NetworkTableEntry entry) {
072                synchronized(transactionsLock){
073                        incomingAssignmentQueue.queue(entry);
074                        if(incomingAssignmentQueue.isFull()){
075                                try {
076                                        run();
077                                } catch (InterruptedException e) {}
078                                System.err.println("assignment queue overflowed. decrease the rate at which you create new entries or increase the write buffer size");
079                        }
080                }
081        }
082
083
084        public void offerOutgoingUpdate(NetworkTableEntry entry) {
085                synchronized(transactionsLock){
086                        incomingUpdateQueue.queue(entry);
087                        if(incomingUpdateQueue.isFull()){
088                                try {
089                                        run();
090                                } catch (InterruptedException e) {}
091                                System.err.println("update queue overflowed. decrease the rate at which you update entries or increase the write buffer size");
092                        }
093                }
094        }
095
096        
097        /**
098         * the periodic method that sends all buffered transactions
099         */
100        public void run() throws InterruptedException {
101                synchronized(transactionsLock){
102                        //swap the assignment and update queue
103                        HalfQueue tmp = incomingAssignmentQueue;
104                        incomingAssignmentQueue = outgoingAssignmentQueue;
105                        outgoingAssignmentQueue = tmp;
106                        
107                        tmp = incomingUpdateQueue;
108                        incomingUpdateQueue = outgoingUpdateQueue;
109                        outgoingUpdateQueue = tmp;
110                }
111                
112                boolean wrote = false;
113                NetworkTableEntry entry;
114                int i;
115                int size = outgoingAssignmentQueue.size();
116                Object[] array = outgoingAssignmentQueue.array;
117                for(i = 0; i<size; ++i){
118                        entry = (NetworkTableEntry)array[i];
119                        synchronized(entryStore){
120                                entry.makeClean();
121                        }
122                        wrote = true;
123                        receiver.offerOutgoingAssignment(entry);
124                }
125                outgoingAssignmentQueue.clear();
126                
127                
128                size = outgoingUpdateQueue.size();
129                array = outgoingUpdateQueue.array;
130                for(i = 0; i<size; ++i){
131                        entry = (NetworkTableEntry)array[i];
132                        synchronized(entryStore){
133                                entry.makeClean();
134                        }
135                        wrote = true;
136                        receiver.offerOutgoingUpdate(entry);
137                }
138                outgoingUpdateQueue.clear();
139                
140                
141                if(wrote){
142                        receiver.flush();
143                        lastWrite = System.currentTimeMillis();
144                }
145                else if(System.currentTimeMillis()-lastWrite>keepAliveDelay)
146                        receiver.ensureAlive();
147                
148                Thread.sleep(SLEEP_TIME);
149        }
150
151}