Skip to content
Snippets Groups Projects
Commit 26d1859e authored by Gazder Bence's avatar Gazder Bence
Browse files

initial commit. With the initial files.

parents
No related branches found
No related tags found
No related merge requests found
File added
package eventservice.impl;
import java.rmi.RemoteException;
import java.rmi.Remote;
import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
import java.util.List;
import eventservice.Event;
import eventservice.Pullable;
import eventservice.Pushable;
import eventservice.RemotelyPullable;
public class PullConsumerProxyImpl implements Remote, RemotelyPullable, Pushable {
private static final long serialVersionUID = 1L;
//private final BlockingQueue<Event> events;
private List events;
private int buffersize;
PullConsumerProxyImpl(int buffersize)
throws RemoteException {
super();
events = new ArrayList();
//events = new ArrayBlockingQueue<Event>(bufferSize);
this.buffersize = buffersize;
}
// Ezt a fogyaszto hivja
public Event pull() throws RemoteException {
Event e = null;
synchronized (events) {
while (events.isEmpty()) {
try {
events.wait();
} catch (InterruptedException ie) {}
}
e = (Event)events.remove(0);
}
return e;
}
// Ezt az EventChannelImpl hivja
public void push(Event event) throws RemoteException {
synchronized (events) {
if (events.size() >= buffersize) {
events.remove(0);
}
events.add(event);
events.notify();
}
}
}
package eventservice.impl;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.server.Unreferenced;
import eventservice.Event;
import eventservice.RemotelyPushable;
import java.util.ArrayList;
public class PushConsumerProxyImpl extends Thread
implements RemotelyPushable, Unreferenced {
private static final long serialVersionUID = 1L;
private final RemotelyPushable consumer;
private final ArrayList events;
private boolean registered;
private int buffersize;
PushConsumerProxyImpl(RemotelyPushable consumer, int buffersize)
throws RemoteException {
super();
this.consumer = consumer;
registered = true;
events = new ArrayList();
this.buffersize = buffersize;
}
// Ezt az EventChannelImpl hivja.
// Ha uj esemeny erkezik, felebreszti a szalat
public void push(Event event) throws RemoteException {
/* TODO: implementalni */
}
// Ez az onallo szal kodja.
// Vgtelen ciklusban fut, amg a registered rtkefalse-ra nem vltozik.
// A ciklusban: ha nincs a pufferben esemny, addig var,
// amig fel nem ebresztik. Ha felbredt, vagy volt esemny,
// akkor az ujonnan jott esemeny(eke)t atkuldi a consumer-nek push-sal,
// majd a ciklus kezddik ellrl
// Figyeljnk a klcsns kizrsra!
public void run() {
/* TODO: implementalni */
}
// itt jelezzuk a szalnak, hogy vege a futasnak
public void unreferenced() {
/* TODO: implementalni */
}
}
// TODO: PullConsumer osztly forrsa
// TODO: PushConsumer osztly forrsa
package suppliers;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import eventservice.EventChannel;
import eventservice.RemotelyPushable;
public class PushSupplier extends TimerTask {
private final Timer timer;
private final long interval;
private RemotelyPushable channelHandle;
private int counter;
PushSupplier(RemotelyPushable handle, long interval) {
timer = new Timer();
this.interval = interval;
this.channelHandle = handle;
}
public void run() {
++counter;
try {
channelHandle.push(new SimpleEvent(Integer.toString(counter)));
} catch (RemoteException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
public void start() {
timer.schedule(this, new Date(), interval);
}
public void stop() {
timer.cancel();
}
public static void main(String[] args) {
if (args.length < 3) {
System.err
.println("Usage: PushSupplier host channel_name interval_in_ms");
return;
}
String host = args[0];
String channelName = args[1];
long interval = Long.parseLong(args[2]);
Registry registry = null;
try {
registry = LocateRegistry.getRegistry(host);
} catch (RemoteException e) {
System.err.println("Cannot locate registry at host '"+host+"': "+e.getMessage());
return;
}
EventChannel channel = null;
try {
channel = (EventChannel) registry.lookup(channelName);
} catch (Exception e) {
System.err.println("Cannot lookup channel '"+channelName+"': "+e.getMessage());
return;
}
RemotelyPushable handle = null;
try {
handle = channel.registerPushSupplier();
} catch (RemoteException e) {
System.err.println("Cannot register supplier to channel '"+channelName+"': "+e.getMessage());
return;
}
if (handle == null) {
System.err.println("Supplier is already registered in '"+channelName+"'.");
return;
}
PushSupplier supplier = new PushSupplier(handle, interval);
supplier.start();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(
System.in));
reader.readLine();
} catch (IOException e) {
}
supplier.stop();
try {
channel.unregister(handle);
} catch (RemoteException e) {
System.err.println(
"Cannot unregister supplier from channel '"+channelName+"': "+e.getMessage());
}
}
}
package suppliers;
import eventservice.Event;
public class SimpleEvent implements Event {
private static final long serialVersionUID = 1L;
private final String data;
public SimpleEvent(String data) {
super();
this.data = data;
}
public String toString() {
return data;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment