#!/usr/bin/env python
# **********************************************************************
#
# Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
#
# This copy of Ice is licensed to you under the terms described in the
# ICE_LICENSE file included in this distribution.
#
# **********************************************************************
import Ice, Test
def test(b):
if not b:
raise RuntimeError('test assertion failed')
def allTests(communicator, ref):
manager = Test.ServerManagerPrx.checkedCast(communicator.stringToProxy(ref))
locator = communicator.getDefaultLocator()
test(manager)
print "testing stringToProxy...",
base = communicator.stringToProxy("test @ TestAdapter")
base2 = communicator.stringToProxy("test @ TestAdapter")
base3 = communicator.stringToProxy("test")
base4 = communicator.stringToProxy("ServerManager")
base5 = communicator.stringToProxy("test2")
print "ok"
print "testing ice_locator and ice_getLocator... ",
test(Ice.proxyIdentityEqual(base.ice_getLocator(), communicator.getDefaultLocator()));
anotherLocator = Ice.LocatorPrx.uncheckedCast(communicator.stringToProxy("anotherLocator"));
base = base.ice_locator(anotherLocator);
test(Ice.proxyIdentityEqual(base.ice_getLocator(), anotherLocator));
communicator.setDefaultLocator(None);
base = communicator.stringToProxy("test @ TestAdapter");
test(not base.ice_getLocator());
base = base.ice_locator(anotherLocator);
test(Ice.proxyIdentityEqual(base.ice_getLocator(), anotherLocator));
communicator.setDefaultLocator(locator);
base = communicator.stringToProxy("test @ TestAdapter");
test(Ice.proxyIdentityEqual(base.ice_getLocator(), communicator.getDefaultLocator()));
#
# We also test ice_router/ice_getRouter (perhaps we should add a
# test/Ice/router test?)
#
test(not base.ice_getRouter());
anotherRouter = Ice.RouterPrx.uncheckedCast(communicator.stringToProxy("anotherRouter"));
base = base.ice_router(anotherRouter);
test(Ice.proxyIdentityEqual(base.ice_getRouter(), anotherRouter));
router = Ice.RouterPrx.uncheckedCast(communicator.stringToProxy("dummyrouter"));
communicator.setDefaultRouter(router);
base = communicator.stringToProxy("test @ TestAdapter");
test(Ice.proxyIdentityEqual(base.ice_getRouter(), communicator.getDefaultRouter()));
communicator.setDefaultRouter(None);
base = communicator.stringToProxy("test @ TestAdapter");
test(not base.ice_getRouter());
print "ok"
print "starting server...",
manager.startServer()
print "ok"
print "testing checked cast...",
obj = Test.TestIntfPrx.checkedCast(base)
obj = Test.TestIntfPrx.checkedCast(communicator.stringToProxy("test@TestAdapter"))
obj = Test.TestIntfPrx.checkedCast(communicator.stringToProxy("test @TestAdapter"))
obj = Test.TestIntfPrx.checkedCast(communicator.stringToProxy("test@ TestAdapter"))
test(obj)
obj2 = Test.TestIntfPrx.checkedCast(base2)
test(obj2)
obj3 = Test.TestIntfPrx.checkedCast(base3)
test(obj3)
obj4 = Test.ServerManagerPrx.checkedCast(base4)
test(obj4)
obj5 = Test.TestIntfPrx.checkedCast(base5)
test(obj5)
print "ok"
print "testing id@AdapterId indirect proxy...",
obj.shutdown()
manager.startServer()
try:
obj2 = Test.TestIntfPrx.checkedCast(base2)
obj2.ice_ping()
except Ice.LocalException:
test(False)
print "ok"
print "testing identity indirect proxy...",
obj.shutdown()
manager.startServer()
try:
obj3 = Test.TestIntfPrx.checkedCast(base3)
obj3.ice_ping()
except Ice.LocalException:
test(False)
try:
obj2 = Test.TestIntfPrx.checkedCast(base2)
obj2.ice_ping()
except Ice.LocalException:
test(False)
obj.shutdown()
manager.startServer()
try:
obj2 = Test.TestIntfPrx.checkedCast(base2)
obj2.ice_ping()
except Ice.LocalException:
test(False)
try:
obj3 = Test.TestIntfPrx.checkedCast(base3)
obj3.ice_ping()
except Ice.LocalException:
test(False)
obj.shutdown()
manager.startServer()
try:
obj2 = Test.TestIntfPrx.checkedCast(base2)
obj2.ice_ping()
except Ice.LocalException:
test(False)
obj.shutdown()
manager.startServer()
try:
obj3 = Test.TestIntfPrx.checkedCast(base3)
obj3.ice_ping()
except Ice.LocalException:
test(False)
obj.shutdown()
manager.startServer()
try:
obj2 = Test.TestIntfPrx.checkedCast(base2)
obj2.ice_ping()
except Ice.LocalException:
test(False)
obj.shutdown()
manager.startServer()
try:
obj5 = Test.TestIntfPrx.checkedCast(base5)
obj5.ice_ping()
except Ice.LocalException:
test(False)
print "ok"
print "testing reference with unknown identity...",
try:
base = communicator.stringToProxy("unknown/unknown")
base.ice_ping()
test(False)
except Ice.NotRegisteredException, ex:
test(ex.kindOfObject == "object")
test(ex.id == "unknown/unknown")
print "ok"
print "testing reference with unknown adapter...",
try:
base = communicator.stringToProxy("test @ TestAdapterUnknown")
base.ice_ping()
test(False)
except Ice.NotRegisteredException, ex:
test(ex.kindOfObject == "object adapter")
test(ex.id == "TestAdapterUnknown")
print "ok"
print "testing object reference from server...",
hello = obj.getHello()
hello.sayHello()
print "ok"
print "testing object reference from server after shutdown...",
obj.shutdown()
manager.startServer()
hello.sayHello()
print "ok"
print "testing object migration...",
hello = Test.HelloPrx.checkedCast(communicator.stringToProxy("hello"))
obj.migrateHello()
hello.sayHello()
obj.migrateHello()
hello.sayHello()
obj.migrateHello()
hello.sayHello()
print "ok"
print "shutdown server...",
obj.shutdown()
print "ok"
print "testing whether server is gone...",
try:
obj2.ice_ping()
test(False)
except Ice.LocalException:
pass
try:
obj3.ice_ping()
test(False)
except Ice.LocalException:
pass
try:
obj5.ice_ping()
test(False)
except Ice.LocalException:
pass
print "ok"
#
# Collocated invocations are not supported in Python.
#
#print "testing indirect references to collocated objects...",
print "shutdown server manager...",
manager.shutdown()
print "ok"
syntax highlighted by Code2HTML, v. 0.9.1