# Copyright (c) 2001-2004 Twisted Matrix Laboratories. # See LICENSE for details. from twisted.trial import unittest from twisted.web import server, resource, microdom, domhelpers from twisted.web import http from twisted.web.test import test_web from twisted.internet import reactor, defer, address from twisted.web.woven import template, model, view, controller, widgets, input, page, guard outputNum = 0 # Reusable test harness class WovenTC(unittest.TestCase): modelFactory = lambda self: None resourceFactory = None def setUp(self): self.m = self.modelFactory() self.t = self.resourceFactory(self.m) self.r = test_web.DummyRequest(['']) self.r.prepath = [''] self.prerender() self.t.render(self.r) self.channel = "a fake channel" self.output = ''.join(self.r.written) assert self.output, "No output was generated by the test." global outputNum open("wovenTestOutput%s.html" % (outputNum + 1), 'w').write(self.output) outputNum += 1 self.d = microdom.parseString(self.output) def prerender(self): pass # Test 1 # Test that replacing nodes with a string works properly class SimpleTemplate(template.DOMTemplate): template = """ <span view="getTitle">Hello</span>

Hi

""" def factory_getTitle(self, request, node): return "Title" def factory_getHello(self, request, node): return "Hello" class DOMTemplateTest(WovenTC): resourceFactory = SimpleTemplate def testSimpleRender(self): titleNode = self.d.getElementById("title") helloNode = self.d.getElementById("hello") self.assert_(domhelpers.gatherTextNodes(titleNode) == 'Title') self.assert_(domhelpers.gatherTextNodes(helloNode) == 'Hello') # Test 2 # Test just like the first, but with Text widgets class TemplateWithWidgets(SimpleTemplate): def wcfactory_getTitle(self, request, node): return widgets.Text("Title") def wcfactory_getHello(self, request, node): return widgets.Text("Hello") class TWWTest(DOMTemplateTest): resourceFactory = TemplateWithWidgets # Test 3 # Test a fancier widget, and controllers handling submitted input class MDemo(model.AttributeModel): foo = "Hello world" color = 'blue' class FancyBox(widgets.Widget): def setUp(self, request, node, data): self['style'] = 'margin: 1em; padding: 1em; background-color: %s' % data class VDemo(view.View): template = """
Type a color and hit submit:
""" def wvfactory_FancyBox(self, request, node, model): return FancyBox(model) def renderFailure(self, failure, request): return failure class ChangeColor(input.Anything): def commit(self, request, node, data): session = request.getSession() session.color = data self.model.setData(request, data) self.model.notify({'request': request}) class CDemo(controller.Controller): def setUp(self, request): session = request.getSession() self.model.color = getattr(session, 'color', self.model.color) def wcfactory_change(self, request, node, model): return ChangeColor(model) view.registerViewForModel(VDemo, MDemo) controller.registerControllerForModel(CDemo, MDemo) class ControllerTest(WovenTC): modelFactory = MDemo resourceFactory = CDemo def prerender(self): self.r.addArg('color', 'red') def testControllerOutput(self): boxNode = self.d.getElementById("box") assert boxNode, "Test %s failed" % outputNum style = boxNode.getAttribute("style") styles = style.split(";") sDict = {} for item in styles: key, value = item.split(":") key = key.strip() value = value.strip() sDict[key] = value # print sDict assert sDict['background-color'] == 'red' # Test 4 # Test a list, a list widget, and Deferred data handling identityList = ['asdf', 'foo', 'fredf', 'bob'] class MIdentityList(model.AttributeModel): def __init__(self): model.Model.__init__(self) self.identityList = defer.Deferred() self.identityList.callback(identityList) class VIdentityList(view.View): template = """ """ def wvfactory_identityList(self, request, node, model): return widgets.List(model) def wvfactory_text(self, request, node, model): return widgets.Text(model) def renderFailure(self, failure, request): return failure class CIdentityList(controller.Controller): pass view.registerViewForModel(VIdentityList, MIdentityList) controller.registerControllerForModel(CIdentityList, MIdentityList) class ListDeferredTest(WovenTC): modelFactory = MIdentityList resourceFactory = CIdentityList def testOutput(self): listNode = self.d.getElementById("list") assert listNode, "Test %s failed; there was no element with the id 'list' in the output" % outputNum liNodes = domhelpers.getElementsByTagName(listNode, 'li') assert len(liNodes) == len(identityList), "Test %s failed; the number of 'li' nodes did not match the list size" % outputNum # Test 5 # Test nested lists class LLModel(model.AttributeModel): data = [['foo', 'bar', 'baz'], ['gum', 'shoe'], ['ggg', 'hhh', 'iii'] ] class LLView(view.View): template = """ """ def wvfactory_List(self, request, node, model): return widgets.List(model) class NestedListTest(WovenTC): modelFactory = LLModel resourceFactory = LLView def testOutput(self): listNode = self.d.getElementById("first") assert listNode, "Test %s failed" % outputNum liNodes = filter(lambda x: hasattr(x, 'tagName') and x.tagName == 'li', listNode.childNodes) # print len(liNodes), len(self.m.data), liNodes, self.m.data assert len(liNodes) == len(self.m.data), "Test %s failed" % outputNum for i in range(len(liNodes)): sublistNode = domhelpers.getElementsByTagName(liNodes[i], 'ol')[0] subLiNodes = domhelpers.getElementsByTagName(sublistNode, 'li') assert len(self.m.data[i]) == len(subLiNodes) # Test 6 # Test notification when a model is a dict or a list class MNotifyTest(model.AttributeModel): def initialize(self, *args, **kwargs): self.root = {"inventory": [], 'log': ""} class VNotifyTest(view.View): template = """
""" def wvfactory_someText(self, request, node, m): return widgets.Text(m) class InventoryUpdater(input.Anything): def commit(self, request, node, data): invmodel = self.model.getSubmodel(request, "inventory") log = self.model.getSubmodel(request, "log") inv = invmodel.getData(request) inv.append(data) # just add a string to the list log.setData(request, log.getData(request) + ("%s added to servers\n" % data)) invmodel.setData(request, inv) invmodel.notify({'request': request}) class CNotifyTest(controller.Controller): def wcfactory_updateInventory(self, request, node, model): return InventoryUpdater(model) view.registerViewForModel(VNotifyTest, MNotifyTest) controller.registerControllerForModel(CNotifyTest, MNotifyTest) class NotifyTest(WovenTC): modelFactory = MNotifyTest resourceFactory = CNotifyTest def prerender(self): self.r.addArg('root', 'test') def testComplexNotification(self): listNode = self.d.getElementById("theList") self.assert_(listNode, "Test %s failed" % outputNum) liNodes = domhelpers.getElementsByTagName(listNode, 'li') self.assert_(liNodes, "DOM was not updated by notifying Widgets. Test %s" % outputNum) text = domhelpers.gatherTextNodes(liNodes[0]) self.assert_(text.strip() == "test", "Wrong output: %s. Test %s" % (text, outputNum)) view.registerViewForModel(LLView, LLModel) #### Test 7 # Test model path syntax # model="/" should get you the root object # model="." should get you the current object # model=".." should get you the parent model object # xxx sanity check for now; just make sure it doesn't raise anything class ModelPathTest(WovenTC): modelFactory = lambda self: ['hello', ['hi', 'there'], 'hi', ['asdf', ['qwer', 'asdf']]] resourceFactory = page.Page def prerender(self): self.t.template = """
""" #### Test 8 # Test a large number of widgets class HugeTest(WovenTC): modelFactory = lambda self: ['hello' for x in range(100)] resourceFactory = page.Page def prerender(self): self.t.template = """
""" def testHugeTest(self): pass class ListOfDeferredsTest(WovenTC): """Test rendering a model which is a list of Deferreds.""" modelFactory = lambda self: [defer.succeed("hello"), defer.succeed("world")] resourceFactory = page.Page def prerender(self): t = '''
''' self.t.template = t def testResult(self): n = self.d.firstChild() self.assertEquals(len(n.childNodes), 2) for c in n.childNodes: self.assertEquals(c.nodeName, "b") self.assertEquals(n.firstChild().firstChild().toxml().strip(), "hello") self.assertEquals(n.lastChild().firstChild().toxml().strip(), "world") class FakeHTTPChannel: # TODO: this should be an interface in twisted.protocols.http... lots of # things want to fake out HTTP def __init__(self): self.transport = self self.factory = self self.received_cookies = {} # 'factory' attribute needs this def log(self, req): pass # 'channel' of request needs this def requestDone(self, req): self.req = req # 'transport' attribute needs this def getPeer(self): return address.IPv4Address("TCP", "fake", "fake") def getHost(self): return address.IPv4Address("TCP", "fake", 80) def write(self, data): # print data pass def writeSequence(self, datas): for data in datas: self.write(data) # Utility for testing. def makeFakeRequest(self, path, **vars): req = FakeHTTPRequest(self, queued=0) req.received_cookies.update(self.received_cookies) req.requestReceived("GET", path, "1.0") return req class FakeHTTPRequest(server.Request): def __init__(self, *args, **kw): server.Request.__init__(self, *args, **kw) self._cookieCache = {} from cStringIO import StringIO self.content = StringIO() self.received_headers['host'] = 'fake.com' self.written = StringIO() def write(self, data): self.written.write(data) server.Request.write(self, data) def addCookie(self, k, v, *args,**kw): server.Request.addCookie(self,k,v,*args,**kw) assert not self._cookieCache.has_key(k), "Should not be setting duplicate cookies!" self._cookieCache[k] = v self.channel.received_cookies[k] = v def processingFailed(self, fail): raise fail class FakeSite(server.Site): def getResourceFor(self, req): res = server.Site.getResourceFor(self,req) self.caughtRes = res return res from twisted.web import static class GuardTest(unittest.TestCase): def testSessionInit(self): sessWrapped = static.Data("you should never see this", "text/plain") swChild = static.Data("NO", "text/plain") sessWrapped.putChild("yyy",swChild) swrap = guard.SessionWrapper(sessWrapped) da = static.Data("b","text/plain") da.putChild("xxx", swrap) st = FakeSite(da) chan = FakeHTTPChannel() chan.site = st # first we're going to make sure that the session doesn't get set by # accident when browsing without first explicitly initializing the # session req = FakeHTTPRequest(chan, queued=0) req.requestReceived("GET", "/xxx/yyy", "1.0") assert len(req._cookieCache.values()) == 0, req._cookieCache.values() self.assertEquals(req.getSession(),None) # now we're going to make sure that the redirect and cookie are properly set req = FakeHTTPRequest(chan, queued=0) req.requestReceived("GET", "/xxx/"+guard.INIT_SESSION, "1.0") ccv = req._cookieCache.values() self.assertEquals(len(ccv),1) cookie = ccv[0] # redirect set? self.failUnless(req.headers.has_key('location')) # redirect matches cookie? self.assertEquals(req.headers['location'].split('/')[-1], guard.SESSION_KEY+cookie) # URL is correct? self.assertEquals(req.headers['location'], 'http://fake.com/xxx/'+guard.SESSION_KEY+cookie) oldreq = req # now let's try with a request for the session-cookie URL that has a cookie set url = "/"+(oldreq.headers['location'].split('http://fake.com/',1))[1] req = chan.makeFakeRequest(url) self.assertEquals(req.headers['location'].split('?')[0], 'http://fake.com/xxx/') for sz in swrap.sessions.values(): sz.expire() def testPerspectiveInit(self): # TODO: this is an awful lot of crap to have to import / do in order to # create a functional authentication system. Cut down on it. from twisted.internet.app import MultiService ms = MultiService("hello") from twisted.cred.authorizer import DefaultAuthorizer auth = DefaultAuthorizer(ms) from twisted.cred.service import Service svc = Service("test_service", ms, auth) myp = svc.createPerspective("test") myp.makeIdentity("test") sessWrapped = static.Data("you should never see this", "text/plain") swChild = static.Data("NO", "text/plain") sessWrapped.putChild("yyy",swChild) da = static.Data("b","text/plain") q = static.Data("you should never see this either", "text/plain") q.putChild("yyy", static.Data("YES", "text/plain")) authFactory = lambda p, q=q: q pwrap = guard.PerspectiveWrapper(svc, sessWrapped, authFactory) swrap = guard.SessionWrapper(pwrap) da.putChild("xxx", swrap) st = FakeSite(da) chan = FakeHTTPChannel() chan.site = st req = chan.makeFakeRequest("/xxx/"+guard.INIT_SESSION+"/yyy") req = chan.makeFakeRequest("/xxx/yyy") self.assertEquals(req.written.getvalue(),"NO") req = chan.makeFakeRequest("/xxx/"+guard.INIT_PERSPECTIVE+ "?identity=test&password=tenxt") assert not req.session.services.values() req = chan.makeFakeRequest("/xxx/"+guard.INIT_PERSPECTIVE+ "?identity=test&password=test") self.assertEquals(req.session.services.values()[0][0], myp) # print req.written.getvalue() req = chan.makeFakeRequest("/xxx/yyy") self.assertEquals(req.written.getvalue(), "YES") # print req.session.services for sz in swrap.sessions.values(): sz.expire() class _TestPage(page.Page): template = """
First:
Second:
Third:
""" def wmfactory_title(self, request): d = defer.Deferred() reactor.callLater(0, d.callback, 'The Result') return d class DeferredModelTestCase(unittest.TestCase): def testDeferredModel(self): # Test that multiple uses of a deferred model work correctly. channel = FakeHTTPChannel() channel.site = FakeSite(_TestPage()) request = channel.makeFakeRequest('/') while not request.finished: reactor.iterate() dom = microdom.parseXMLString(request.written.getvalue()) spanElems = domhelpers.findNodesNamed(dom, 'span') for spanElem in spanElems: self.failUnlessEqual('The Result', spanElem.childNodes[0].data) class MyMacroPage(page.Page): template = """\ """ def wvfactory_foo(self, request, node, model): return widgets.ExpandMacro(model, macroFile = 'cdataxtester.html', macroFileDirectory = '.', macroName = 'foo' ) class ExpandMacroTestCase(WovenTC): resourceFactory = MyMacroPage def setUp(self, *args, **kwargs): thepage = """\ """ file('cdatatester.html', 'wb').write(thepage) WovenTC.setUp(self, *args, **kwargs) def testCDATANotQuoted(self): self.failUnless(self.output.find('<>\'"&')>=0)