-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPublisher.java
179 lines (171 loc) · 7.15 KB
/
Publisher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Scanner;
public class Publisher {
private static int portid;
static ServerSocket InfoTaker = null;
static ArrayList<Message> allchoices = new ArrayList<>();
static ArrayList<String> PubsDuty = new ArrayList<>();
static ArrayList<Message> busPositions = new ArrayList<>();
static HashMap<String, String> busLines = new HashMap<>();
private static final String PUBIP = "192.168.1.8";
private static String IPofBroker;
private static int PORTTOSEND;
public static HashMap<String, String> IPPORT;
public static void main(String[] args) {
IPPORT = FileReaders.readHash(new File("Brokers.txt"));
for (String key : IPPORT.keySet()) {
//System.out.println("- "+ IPPORT);
PORTTOSEND = Integer.parseInt(key);
//System.out.println("port: "+PORTTOSEND);
IPofBroker = IPPORT.get(key);
//System.out.println("IP of Broker: "+IPofBroker);
}
busPositions = FileReaders.readBusPositions(new File("busPositionsNew.txt"));
System.out.println(busPositions.get(1));
init();
getBrokerList();
new Publisher().startClient();
}
/**
* This method creates a communication between the Publisher and one of the Brokers. Then, the broker informs the
* Publisher about all the other Brokers and which bus lines each of them is responsible for.
*/
public static void getBrokerList() {
Socket requestSocket;
ObjectOutputStream out;
try {
requestSocket = new Socket(IPofBroker, PORTTOSEND);
out = new ObjectOutputStream(requestSocket.getOutputStream());
out.writeObject(new Message("NotifyPub", PUBIP + "", portid + ""));
out.flush();
} catch (Exception e) {
}
Socket s;
int i = 0;
while (true) {
try {
s = InfoTaker.accept();
ObjectInputStream in = new ObjectInputStream(s.getInputStream());
Message temp = (Message) in.readObject();
Message info = new Message(temp.topics, temp.port);
allchoices.add(info);
System.out.println("Broker[" + temp.port + "]: " + temp.topics);
i++;
if (i == 3) {
in.close();
break;
}
} catch (Exception e) {
continue;
}
}
}
/**
* This method is used so that the publisher can inform a broker about a bus's location.
*
* @param out Output stream used to pass messages.
* @param msg Message containing information about a bus line and its location.
*/
public static synchronized void push(ObjectOutputStream out, Message msg) {
try {
msg.setPubSubBrok("BusInfoByPub");
out.writeObject(msg);
out.flush();
} catch (IOException e) {
System.err.println("Couldn't send from Pub to Broker");
}
}
/**
* In this method we create a thread that allows concurrent communication between the Publisher
* and the Broker that is responsible for a specific bus line.
*/
public void startClient() {
Socket requestSocket;
System.out.println(allchoices.size());
System.out.println(PubsDuty.size());
for (int i = 0; i < allchoices.size(); i++) {
for (int j = 0; j < PubsDuty.size(); j++) {
if (allchoices.get(i).topics.contains(PubsDuty.get(j))) {
try {
requestSocket = new Socket(IPPORT.get(allchoices.get(i).port + ""), allchoices.get(i).port);
ObjectOutputStream out = new ObjectOutputStream(requestSocket.getOutputStream());
PubHandler ph = new PubHandler(requestSocket, out, allchoices.get(i).port);
ph.start();
out.flush();
break;
} catch (Exception e) {
System.out.println(IPPORT.get(allchoices.get(i)));
System.err.println(allchoices.get(i).port);
e.printStackTrace();
}
}
}
}
}
/**
* This method creates a Publisher node. At first we're asked to input who this Publisher
* is going to be (1 or 2), information which will be used to allocate the available bus
* lines to the Publishers. The first one will get the first nine and the other one will get
* the remaining nine. Then, we're asked to input the Publisher's Port which will be used
* to communicate with that particular Publisher.
* After the bus lines are allocated evenly to the two Publishers, they're printed out on the
* console for the Subscriber to see.
* Finally, a socket is created using the Publisher's port, used for communication with the Broker.
*/
public static void init() {
busLines = FileReaders.readBusLines(new File("busLinesNew.txt"));
System.out.println("Which Publisher is this? Type 1 for first & 2 for second: ");
Scanner input = new Scanner(System.in);
int choice = input.nextInt();
System.out.println("Give the port of the Publisher");
portid = input.nextInt();
int flag = 0;
if (choice == 1) {
for (String key : busLines.keySet()) {
if (!PubsDuty.contains(busLines.get(key))) {
if (!busLines.get(key).equals("036"))
PubsDuty.add(busLines.get(key));
}
if (PubsDuty.size() == 9) {
break;
}
}
System.out.println("Publisher_1[" + portid + "]: " + PubsDuty);
} else {
for (String key : busLines.keySet()) {
flag++;
if (flag > busLines.size() / 2) {
if (!PubsDuty.contains(busLines.get(key)))
PubsDuty.add(busLines.get(key));
}
}
System.out.println("Publisher_2[" + portid + "]: " + PubsDuty);
}
try {
InfoTaker = new ServerSocket(portid);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* This method is used to notify the Broker in case the Publisher wasn't able to retrieve information
* regarding a bus's location.
*
* @param out Output stream used to pass message.
*/
public static void notifyFailure(ObjectOutputStream out) {
try {
out.writeObject(new Message("Failure", "A publisher couldn't pass all the info", ""));
out.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}