001 002package edu.wpi.first.wpilibj.networktables2; 003 004import edu.wpi.first.wpilibj.networktables2.thread.*; 005import edu.wpi.first.wpilibj.networktables2.util.*; 006 007/** 008 * A write manager is a {@link IncomingEntryReceiver} that buffers transactions and then and then dispatches them to a flushable transaction receiver that is periodically offered all queued transaction and then flushed 009 * 010 * @author Mitchell 011 * 012 */ 013public class WriteManager implements OutgoingEntryReceiver, PeriodicRunnable{ 014 private final int SLEEP_TIME = 100; 015 016 private final int queueSize = 500; 017 018 private Object transactionsLock = new Object(); 019 private NTThread thread; 020 private NTThreadManager threadManager; 021 private final AbstractNetworkTableEntryStore entryStore; 022 023 private volatile HalfQueue incomingAssignmentQueue; 024 private volatile HalfQueue incomingUpdateQueue; 025 private volatile HalfQueue outgoingAssignmentQueue; 026 private volatile HalfQueue outgoingUpdateQueue; 027 028 private FlushableOutgoingEntryReceiver receiver; 029 private long lastWrite; 030 031 private final long keepAliveDelay; 032 033 /** 034 * Create a new Write manager 035 * @param receiver 036 * @param threadManager 037 * @param transactionPool 038 * @param entryStore 039 */ 040 public WriteManager(final FlushableOutgoingEntryReceiver receiver, final NTThreadManager threadManager, final AbstractNetworkTableEntryStore entryStore, long keepAliveDelay) { 041 this.receiver = receiver; 042 this.threadManager = threadManager; 043 this.entryStore = entryStore; 044 045 incomingAssignmentQueue = new HalfQueue(queueSize); 046 incomingUpdateQueue = new HalfQueue(queueSize); 047 outgoingAssignmentQueue = new HalfQueue(queueSize); 048 outgoingUpdateQueue = new HalfQueue(queueSize); 049 050 this.keepAliveDelay = keepAliveDelay; 051 } 052 053 /** 054 * start the write thread 055 */ 056 public void start(){ 057 if(thread!=null) 058 stop(); 059 lastWrite = System.currentTimeMillis(); 060 thread = threadManager.newBlockingPeriodicThread(this, "Write Manager Thread"); 061 } 062 /** 063 * stop the write thread 064 */ 065 public void stop(){ 066 if(thread!=null) 067 thread.stop(); 068 } 069 070 071 public void offerOutgoingAssignment(NetworkTableEntry entry) { 072 synchronized(transactionsLock){ 073 incomingAssignmentQueue.queue(entry); 074 if(incomingAssignmentQueue.isFull()){ 075 try { 076 run(); 077 } catch (InterruptedException e) {} 078 System.err.println("assignment queue overflowed. decrease the rate at which you create new entries or increase the write buffer size"); 079 } 080 } 081 } 082 083 084 public void offerOutgoingUpdate(NetworkTableEntry entry) { 085 synchronized(transactionsLock){ 086 incomingUpdateQueue.queue(entry); 087 if(incomingUpdateQueue.isFull()){ 088 try { 089 run(); 090 } catch (InterruptedException e) {} 091 System.err.println("update queue overflowed. decrease the rate at which you update entries or increase the write buffer size"); 092 } 093 } 094 } 095 096 097 /** 098 * the periodic method that sends all buffered transactions 099 */ 100 public void run() throws InterruptedException { 101 synchronized(transactionsLock){ 102 //swap the assignment and update queue 103 HalfQueue tmp = incomingAssignmentQueue; 104 incomingAssignmentQueue = outgoingAssignmentQueue; 105 outgoingAssignmentQueue = tmp; 106 107 tmp = incomingUpdateQueue; 108 incomingUpdateQueue = outgoingUpdateQueue; 109 outgoingUpdateQueue = tmp; 110 } 111 112 boolean wrote = false; 113 NetworkTableEntry entry; 114 int i; 115 int size = outgoingAssignmentQueue.size(); 116 Object[] array = outgoingAssignmentQueue.array; 117 for(i = 0; i<size; ++i){ 118 entry = (NetworkTableEntry)array[i]; 119 synchronized(entryStore){ 120 entry.makeClean(); 121 } 122 wrote = true; 123 receiver.offerOutgoingAssignment(entry); 124 } 125 outgoingAssignmentQueue.clear(); 126 127 128 size = outgoingUpdateQueue.size(); 129 array = outgoingUpdateQueue.array; 130 for(i = 0; i<size; ++i){ 131 entry = (NetworkTableEntry)array[i]; 132 synchronized(entryStore){ 133 entry.makeClean(); 134 } 135 wrote = true; 136 receiver.offerOutgoingUpdate(entry); 137 } 138 outgoingUpdateQueue.clear(); 139 140 141 if(wrote){ 142 receiver.flush(); 143 lastWrite = System.currentTimeMillis(); 144 } 145 else if(System.currentTimeMillis()-lastWrite>keepAliveDelay) 146 receiver.ensureAlive(); 147 148 Thread.sleep(SLEEP_TIME); 149 } 150 151}